CVE-2025-27817 Apache Kafka Client 任意文件读取与SSRF 漏洞分析复现

CVE-2025-27817 Apache Kafka Client 任意文件读取与SSRF 漏洞分析复现

本文转自 hahaha123 并作补充

一、漏洞成因

该漏洞源于Apache Kafka Client在配置SASL/OAUTHBEARER连接时,对sasl.oauthbearer.token.endpoint.url和sasl.oauthbearer.jwks.endpoint.url参数的安全控制存在缺陷。攻击者可通过构造恶意URL参数,利用该缺陷实现任意文件读取或发起SSRF请求(访问非预期目标地址)。

二、影响版本

3.1.0 <= Apache Kafka <= 3.9.0

三、漏洞复现

image

image

四、漏洞分析

根据漏洞触发点所对应的路由 connectors,可定位至以下相关代码:

image

147-152行代码对传入的信息处理,以及创建示例等,漏洞入口是:

1
herder.putConnectorConfig(name, configs, createRequest.initialTargetState(), false, cb);

继续跟进putConnectorConfig方法,
这里会跟到两个类:StandaloneHerderDistributedHerder,之前说的修复点也确实是这里,基本可以确定这是入口点,我们看DistributedHerder

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void putConnectorConfig(final String connName, final Map config, final TargetState targetState,
final boolean allowReplace, final Callback&gt; callback) {
log.trace("Submitting connector config write request {}", connName);
addRequest(
() -&gt; {
doPutConnectorConfig(connName, config, targetState, allowReplace, callback);
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}

继续跟进doPutConnectorConfig方法,这里的config,就是先前connector路由传的json数据

image

然后跟进validateConnectorConfig方法

image

image

进入validateConnectorConfig方法,到现在,逻辑检验都是普通的,所以就不多赘述了,继续跟进

image

调用了:

1
2
3
protected Connector getConnector(String connType) {
return tempConnectors.computeIfAbsent(connType, k -&gt; plugins().newConnector(k));
}

利用 plugins().newConnector() 动态加载类,例如:MirrorSourceConnector

image

这里可以看到根据json数据的config[‘‘]=*.MirrorSourceConnector调用MirrorSourceConnector这个Connector用于后续的执行

后续跟进的话可以看到通过反射调用Connector:

1
Class&lt;?&gt; klass = loader.loadClass(classOrAlias, false);

回到AbstractHerder类,继续分析接下来的逻辑

image

接下来就是确定connectorType为sink or source,然后对数据进行处理等,可以自己看

image

java config = connector.validate(connectorProps);
跟进:

1
2
3
4
5
6
7
8
@Override
public org.apache.kafka.common.config.Config validate(Map props) {
List configValues = super.validate(props).configValues();
validateExactlyOnceConfigs(props, configValues);
validateEmitOffsetSyncConfigs(props, configValues);

return new org.apache.kafka.common.config.Config(configValues);
}

接下来进入到validate验证阶段,这里就很绕了,我们知道是MirrorSourceConnector这个Connector创建了Tasks,所以他肯定要进到这个方法

image

可以看到过了validate初始验证,进入到start方法,继续跟进,由于漏洞触发点是认证相关,我们断点在认证的各个方法,一个一个看

image

这里进入到认证阶段forwardingAdmin

image

实例化ForwardingAdmin实现类

image

通过json数据中的值:

1
2
3
"****": "SASL_PLAINTEXT",
"****": "OAUTHBEARER",
"****": "****.OAuthBearerLoginCallbackHandler",

来调用相关认证方法,然后调用create方法(漏洞触发点)

image

通过

1
URL tokenEndpointUrl = cu.validateUrl(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);

获取json中的sasl.oauthbearer.token.endpoint.url,继续跟进

image

这里可以看到java accessTokenRetriever.retrieve();返回了文件信息,我们向上追踪

image

发现需要传参,全局搜new FileTokenRetriever(

image

发现就在Create方法内,尴尬……

好,利用链如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[1] 用户提交 Connector 配置请求(HTTP API)
|
|--&gt; REST API: Connect REST `/connectors` 接口处理 connector 配置
|
[2] WorkerConfig / ConnectorConfig 解析配置(Map)
|
[3] validateConnectorConfig(...) 进行 connector 配置验证
|
[4] connector.config() -&gt; 返回 ConfigDef
|
|--&gt; connector.validate(...)(触发 MirrorSourceConnector.validate())
|
|--&gt; validateExactlyOnceConfigs(...)
|--&gt; validateEmitOffsetSyncConfigs(...)
|
[5] connector.start(props)
|
|--&gt; new MirrorSourceConfig(props)
|
|--&gt; super(props) --&gt; AbstractConfig 初始化
|
|--&gt; createAdmin(...)(构造 ForwardingAdmin)
|
|--&gt; forwardingAdmin(config)
|
|--&gt; get(FORWARDING_ADMIN_CLASS)
|--&gt; Utils.newParameterizedInstance(...)
|
|--&gt; KafkaMirrorMakerClientBasedAdmin.create(...)
|
|--&gt; OAuthBearerLoginModule / SaslClientAuthenticator 初始化
|
|--&gt; AccessTokenRetriever.create(...)
|
|--&gt; cu.validateUrl(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL)
|
|--&gt; protocol == "file" ?
--&gt; new FileTokenRetriever(Path)
|
|--&gt; init()
|
|--&gt; Utils.readFileAsString(path)
|
|--&gt; Files.readAllBytes(...)

查看文件结果看这个路由:

image

获取statusBackingStore中对应Connector的tasks,结果就在tasks[‘trace’]里,可以自己看看这个的逻辑,这里就不多说了

环境搭建

下载3.9.0源码包
gradle构建一下,然后运行命令:

1
2
3
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
./bin/connect-distributed.sh config/connect-distributed.properties

这里的connect-distributed.sh如果需要用idea调试的话,最好在里面加上debug,用idea的jvm连接

五、修复方式

Standalone模式:修改connect-standalone.properties中的listeners或rest.host.name字段Distributed模式:修改connect-distributed.properties中的listeners或rest.host.name字段使用流量防护设备(如WAF、防火墙)拦截/connectors接口请求中携带敏感文件路径的恶意流量

END

DIFF一下,一眼就能发现3.9.1对uri进行了校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
// AccessTokenRetrieverFactory.java
public static AccessTokenRetriever create(Map<String, ?> configs, Map<String, Object> metadata) {
ConfigurationUtils cu = new ConfigurationUtils(configs);
cu.throwIfURLIsNotAllowed(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL); // 新增校验
// ...原有逻辑
}

// VerificationKeyResolverFactory.java
public static VerificationKeyResolver create(Map<String, ?> configs) {
ConfigurationUtils cu = new ConfigurationUtils(configs);
cu.throwIfURLIsNotAllowed(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL); // 新增校验
// ...原有逻辑
}

漏洞点就是这个。好啦,结束。高中生,菜勿喷。