Flink日志Log4j发送到Kafka
[toc]
# 背景
Flink版本:1.14.3
# 自定义KafkaAppender
可以在自己项目中自定义这个类,也可以将该类打成Jar包方式引用
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
@SuppressWarnings("unused")
@Plugin(name = "KafkaAppender", category = "Core", elementType = "appender", printObject = true)
public final class KafkaAppender extends AbstractAppender {
private KafkaProducer<String, String> producer = null;
private String topic;
/** Kafka地址 */
private String kafkaBroker;
private boolean append = true;
/** 日志发送等级 */
private Level level;
private Layout<? extends Serializable> layout;
/** 包含规则条件 */
private Set<String> includeSet = new HashSet<>();
private Set<String> includeMatchSet = new HashSet<>();
/** 不包含规则条件 */
private Set<String> excludeSet = new HashSet<>();
private Set<String> excludeMatchSet = new HashSet<>();
public static void main(String[] args) {
String a = "x.x.*1";
System.out.println(a.endsWith(".*"));
System.out.println(a.replace(".*", ""));
}
public KafkaAppender(
String name,
String topic,
String kafkaBroker,
Filter filter,
Layout<? extends Serializable> layout,
boolean append,
String level,
String includes,
String excludes) {
super(name, filter, layout);
System.out.println(
"初始化加载 Kafka Appender , Kafka Broker:"
+ kafkaBroker
+ " , Log Topic:"
+ topic
+ " , Log Level:"
+ level);
if (includes != null) {
for (String include : includes.split(",")) {
if (include.length() > 0) {
if (include.endsWith(".*")) {
includeMatchSet.add(include.replace(".*", ""));
} else {
includeSet.add(include);
}
}
}
}
if (excludes != null) {
for (String exclude : excludes.split(",")) {
if (exclude.length() > 0) {
if (exclude.endsWith(".*")) {
excludeMatchSet.add(exclude.replace(".*", ""));
} else {
excludeSet.add(exclude);
}
}
}
}
this.topic = topic;
this.kafkaBroker = kafkaBroker;
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
this.append = append;
if (level == null) {
level = "INFO";
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
this.layout = layout;
this.level = Level.toLevel(level);
}
@PluginFactory
public static KafkaAppender createAppender(
/** 发送到的Topic */
@PluginAttribute("topic") String topic,
/** Kafka地址 */
@PluginAttribute("kafkaBroker") String kafkaBroker,
/** 设置的数据格式Layout */
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute("name") String name,
@PluginAttribute("append") boolean append,
/** 日志等级 */
@PluginAttribute("level") String level,
/** 设置打印包含的包名,前缀匹配,逗号分隔多个 */
@PluginAttribute("includes") String includes,
/** 设置打印不包含的包名,前缀匹配,同时存在会被排除,逗号分隔多个 */
@PluginAttribute("excludes") String excludes) {
return new KafkaAppender(
name, topic, kafkaBroker, null, layout, append, level, includes, excludes);
}
@Override
public final void stop() {
super.stop();
if (producer != null) {
producer.close();
}
}
@Override
public void append(LogEvent event) {
if (event.getLevel().isMoreSpecificThan(this.level)) {
if (filterPackageName(event)) {
return;
}
try {
if (producer != null) {
Future<RecordMetadata> result =
producer.send(
new ProducerRecord<String, String>(
topic, getLayout().toSerializable(event).toString()));
// result.get();
}
} catch (Exception e) {
LOGGER.error("Unable to write to kafka for appender [{}].", this.getName(), e);
throw new AppenderLoggingException(
"Unable to write to kafka in appender: " + e.getMessage(), e);
} finally {
}
}
}
/**
* 过滤包名,如果为True则不发送到Kafka
*
* @name filterPackageName
* @date 2023/2/28 下午4:07
* @return boolean
* @param event
* @author Jast
*/
private boolean filterPackageName(LogEvent event) {
boolean flag = true;
if (includeSet.size() == 0
&& includeMatchSet.size() == 0
&& excludeSet.size() == 0
&& excludeMatchSet.size() == 0) {
return false;
}
if (includeSet.size() == 0 && includeMatchSet.size() == 0) {
flag = false;
}
/** 打印日志类/名称 */
String loggerName = event.getLoggerName();
for (String include : includeSet) {
if (loggerName.equals(include)) {
flag = false;
}
}
for (String include : includeMatchSet) {
if (loggerName.startsWith(include)) {
flag = false;
}
}
for (String exclude : excludeMatchSet) {
if (loggerName.startsWith(exclude)) {
flag = true;
}
}
for (String exclude : excludeSet) {
if (loggerName.equals(exclude)) {
flag = true;
}
}
return flag;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# log4j.properties配置文件修改
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level=INFO
rootLogger.appenderRef.file.ref=MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name=akka
logger.akka.level=INFO
logger.kafka.name=org.apache.kafka
logger.kafka.level=INFO
logger.hadoop.name=org.apache.hadoop
logger.hadoop.level=INFO
logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=INFO
logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level=INFO
# Log all infos in the given file
appender.main.name=MainAppender
appender.main.type=RollingFile
appender.main.append=true
appender.main.fileName=${sys:log.file}
appender.main.filePattern=${sys:log.file}.%i
appender.main.layout.type=PatternLayout
appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type=Policies
appender.main.policies.size.type=SizeBasedTriggeringPolicy
appender.main.policies.size.size=100MB
appender.main.policies.startup.type=OnStartupTriggeringPolicy
appender.main.strategy.type=DefaultRolloverStrategy
appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name=org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level=OFF
# 从这里开始上面是Flink官方配置文件内容,下面是我们自定义的Kafka配置
# kafka appender config
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=flink_job_logs
# Kafka Broker
appender.kafka.kafkaBroker=172.16.24.194:9092
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern,JSONLayout与PatternLayout 二选一
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} ${sys:log.file} ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 日志文件路径和名称,可以根据这个值区分出执行所在机器
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
# Flink的执行名称这个需要在执行启动命令时候手动指定,指定方法 -yD env.java.opts="-Dflink_per_job_name=profile-platform"
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# 启动命令指定配置文件
通过-yD \$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties" \
指定配置文件
/opt/flink/bin/flink run \
-m yarn-cluster \
-p $P \
-ys $YS \
-yjm $YJM \
-ytm $YTM \
-yt $JAR_PATH/lib \
-ynm $YNM \
-yD \$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties" \
-yD env.java.opts="-Dfile.encoding=UTF-8" \
-yD env.java.opts="-Dflink_per_job_name=profile-platform" \
-c $START_CLASS \
$JAR_PATH/$JAR_NAME.jar $1
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 在Kafka中消费数据格式
{
"thread":"Sink Label Group Data Send Topic (4/5)#0",
"level":"INFO",
"loggerName":"org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler",
"message":"Committing the state for checkpoint 23",
"endOfBatch":false,
"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger",
"instant":{
"epochSecond":1677741263,
"nanoOfSecond":777000000
},
"threadId":104,
"threadPriority":5,
"logdir":"/hadoop/yarn/log/application_1675237371712_0142/container_e25_1675237371712_0142_01_000003/taskmanager.log",
"flink_per_job_name":"avris-profile-platform-0301"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 字段说明
字段名称 | 说明 |
---|---|
epochSecond | 日志时间单位秒 |
logdir | 日志文件名称 |
loggerName | 打印日志的类名 |
level | 日志等级 |
thread | 线程名 |
message | 程序中打印的日志内容 |
# 一键应用
为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下
- maven中引用依赖
<dependency>
<groupId>com.gitee.jastee</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>1.0.5</version>
</dependency>
1
2
3
4
5
2
3
4
5
- 在代码中使用Log打印日志
- 配置文件
log4j.properties
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level=INFO
rootLogger.appenderRef.file.ref=MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name=akka
logger.akka.level=INFO
logger.kafka.name=org.apache.kafka
logger.kafka.level=INFO
logger.hadoop.name=org.apache.hadoop
logger.hadoop.level=INFO
logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=INFO
logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level=INFO
# Log all infos in the given file
appender.main.name=MainAppender
appender.main.type=RollingFile
appender.main.append=true
appender.main.fileName=${sys:log.file}
appender.main.filePattern=${sys:log.file}.%i
appender.main.layout.type=PatternLayout
appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type=Policies
appender.main.policies.size.type=SizeBasedTriggeringPolicy
appender.main.policies.size.size=100MB
appender.main.policies.startup.type=OnStartupTriggeringPolicy
appender.main.strategy.type=DefaultRolloverStrategy
appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name=org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level=OFF
# 从这里开始上面是Flink官方配置文件内容,下面是我们自定义的Kafka配置
# kafka appender config
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=flink_job_logs
# Kafka Broker
appender.kafka.kafkaBroker=172.16.24.194:9092
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern,JSONLayout与PatternLayout 二选一
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} ${sys:log.file} ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 日志文件路径和名称,可以根据这个值区分出执行所在机器
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
# Flink的执行名称这个需要在执行启动命令时候手动指定,指定方法 -yD env.java.opts="-Dflink_per_job_name=profile-platform"
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
- 启动命令中指定日志配置文件
-yD \$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties" \
1
- 启动
/opt/flink/bin/flink run \
-m yarn-cluster \
-p $P \
-ys $YS \
-yjm $YJM \
-ytm $YTM \
-yt $JAR_PATH/lib \
-ynm $YNM \
-yD \$internal.yarn.log-config-file="/data/flink-job/profile-platform-1.0.0/log4j.properties" \
-yD env.java.opts="-Dfile.encoding=UTF-8" \
-yD env.java.opts="-Dflink_per_job_name=profile-platform" \
-c $START_CLASS \
$JAR_PATH/$JAR_NAME.jar $1
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 参考链接
https://blog.csdn.net/weixin_44500374/article/details/117931457
https://blog.csdn.net/shi_xiansheng/article/details/119778656
https://blog.csdn.net/u010772882/article/details/125493323
https://blog.csdn.net/Andrew_2018/article/details/115033630?ops_request_misc=&request_id=&biz_id=102&utm_term=appender.kafka.filter&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-0-115033630.142^v73^insert_down3,201^v4^add_ask,239^v2^insert_chatgpt&spm=1018.2226.3001.4187
https://www.codenong.com/31757361/
上次更新: 2023/05/11, 15:51:39