重编译MaxWell自定义Producer
[toc]
# 实现功能
新增Maxwell Producer功能,实现将同步数据发送到指定Http接口
后续想发送到其他Producer可以随意扩展
# GitHub地址
https://github.com/zendesk/maxwell
源码版本采用当前最新版本:1.38.0
# 开始编译
# IDEA配置JDK11
首先要使用JDK 11,不然会提示报错
错误如下:
插件也使用的11
进入 File -> Project Structure
添加JDK
选择JDK11
# 开发自定义的Producer
Maxwell代码逻辑自己可以阅读一下,这里只介绍新增Producer
# 定义需要传入的参数
参数 | 说明 |
---|---|
http_url | 请求接口的链接 |
header | 设置请求头 |
注意:设置的参数不要重复,Maxwell 参数加载这
com.zendesk.maxwell.MaxwellConfig
类下
# 将需要添加的参数添加进MaxWell源码中
# 修改com.zendesk.maxwell.MaxwellConfig
类
添加自定义参数变量
public String httpUrl;
public String header;
1
2
2
在buildOptionParser
方法中添加我们需要的参数名、说明、类型
parser.section("restful");
parser.accepts( "http_url", "This url is used to send data" ).withRequiredArg();
parser.accepts( "header", "This header is used to set request headers " ).withRequiredArg();
1
2
3
2
3
在setup
方法中添加我们需要设置的参数
this.httpUrl = fetchStringOption("http_url", options, properties, null);
this.header = fetchStringOption("header", options, properties, null);
1
2
2
# 修改com.zendesk.maxwell.MaxwellContext
类
在getProducer
方法中switch
代码块添加我们自定义的producer
case "restful":
this.producer = new MaxwellRestFulProducer(this);
break;
1
2
3
2
3
# 创建自定义Producer类
创建一个Java类,该类要继承AbstractProducer
,实现StoppableTask
public class MaxwellRestFulProducerForJast extends AbstractProducer implements StoppableTask {
}
1
2
2
自定义类代码如下
package com.zendesk.maxwell.producer;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.util.StoppableTask;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
/**
* @date 2022/10/12 下午2:55
* @description 发送到RestFul接口
* @author Jast
*/
public class MaxwellRestFulProducer extends AbstractProducer implements StoppableTask {
private static final Logger logger = LoggerFactory.getLogger(MaxwellRestFulProducer.class);
/**
* 接收数据接口地址
*/
private final String httpUrl;
/**
* 接口需要的请求头
*/
private final String header;
private List<BasicHeader> headerList;
public MaxwellRestFulProducerForAvris(MaxwellContext context) {
super(context);
logger.info("init RestFul Producer");
this.httpUrl = context.getConfig().httpUrl;
this.header = context.getConfig().header;
headerList = new ArrayList<>();
if (StringUtils.isNotBlank(header)) {
byte[] decode = Base64.getDecoder().decode(header);
String decryptHeader = new String(decode, StandardCharsets.UTF_8);
JSONObject jsonObject = new JSONObject(decryptHeader);
Iterator<String> keys = jsonObject.keys();
while (keys.hasNext()) {
String key = keys.next();
String value = jsonObject.getString(key);
BasicHeader basicHeader = new BasicHeader(key, value);
this.headerList.add(basicHeader);
}
}
logger.info("RestFul Api Headers:{}",headerList);
}
private void sendToRestFul(RowMap msg) throws Exception {
String messageStr = msg.toJSON(outputConfig);
this.httpExecute(messageStr);
if (logger.isDebugEnabled()) {
logger.debug("-> send data to url:{}", messageStr);
}
}
private void httpExecute(String messageStr) {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
HttpPost httpPost = new HttpPost(httpUrl);
headerList.stream().forEach(httpPost::addHeader);
StringEntity entity = new StringEntity(messageStr, "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/json");
httpPost.setEntity(entity);
CloseableHttpResponse response = null;
try {
response = httpClient.execute(httpPost);
HttpEntity responseEntity = response.getEntity();
if (responseEntity != null) {
String responseStr = EntityUtils.toString(responseEntity, StandardCharsets.UTF_8);
logger.info("HTTP响应内容为:" + responseStr);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (httpClient != null) {
httpClient.close();
}
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void push(RowMap r) throws Exception {
if (!r.shouldOutput(outputConfig)) {
context.setPosition(r.getNextPosition());
return;
}
boolean sentToRestFul = false;
for (int cxErrors = 0; cxErrors < 2; cxErrors++) {
try {
//发送到自定义接口
this.sendToRestFul(r);
sentToRestFul = true;
break;
} catch (Exception e) {
logger.error("Exception during put", e);
if (!context.getConfig().ignoreProducerError) {
throw new RuntimeException(e);
}
}
}
if (sentToRestFul) {
this.succeededMessageCount.inc();
this.succeededMessageMeter.mark();
} else {
this.failedMessageCount.inc();
this.failedMessageMeter.mark();
}
if (r.isTXCommit()) {
context.setPosition(r.getNextPosition());
}
}
@Override
public void requestStop() {
}
@Override
public void awaitStop(Long timeout) {
}
@Override
public StoppableTask getStoppableTask() {
return this;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# 打包
打包成功后在target
目录中会有maxwell-1.38.0.tar.gz
文件,这个文件就是我们编译后的,可以直接部署到服务器
项目下载完之后先
package
进行打包,直接编译有些类会提示找不到
# 开发接收消息接口
# 接口代码
这里只做演示,具体怎么处理根据业务逻辑自行修改
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Jast
* @description
* @date 2022-10-12 14:05
*/
@RestController
public class MessageController {
@RequestMapping(value ="/data/send",method = RequestMethod.POST)
public String queryUserByUid(@RequestBody String json) {
System.out.println("data:"+json);
return "OK";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 发送消息测试
# 部署运行
# java接口程序部署
这里不做描述
# 读取MySQL发送到指定接口
# 解压MaxWell
tar -zxvf maxwell-1.38.0.tar.gz
1
# 脚本中JDK指定
如果你服务器JDK是11请跳过此步骤
在启动脚本中指定JDK 11版本
> vim maxwell
# 在上面加入环境变量引用
#!/bin/bash
export JAVA_HOME=/home/hadoop/maxwell/jdk-11.0.16.1
export PATH=$JAVA_HOME/bin:$PATH
1
2
3
4
5
6
7
2
3
4
5
6
7
# 启动
bin/maxwell --user='maxwell' --password='123456' --host='172.16.24.143' --producer=restful --http_url=http://127.0.0.1:20081/data/send
1
控制台打印,可以成功打印我们代码中打印的日志init RestFul Producer
[hadoop@10 ~/maxwell/maxwell-1.38.0]$ bin/maxwell --user='maxwell' --password='123456' --host='172.16.24.143' --producer=restful --http_url=http://127.0.0.1:20081/data/send
2022-10-12 17:04:28 INFO Maxwell - Starting Maxwell. maxMemory: 8350859264 bufferMemoryUsage: 0.25
2022-10-12 17:04:29 INFO MaxwellRestFulProducerForAvris - init RestFul Producer
2022-10-12 17:04:29 INFO MaxwellRestFulProducerForAvris - RestFul Api Headers:[]
1
2
3
4
2
3
4
修改MySQL数据进行测试
修改zuser_text
表数据,将张三年龄修改为99
在Java接口端可以看到日志输出
data:{"database":"avris-cdp-cdc","table":"zuser_text","type":"update","ts":1665565676,"xid":44487309,"commit":true,"data":{"id":2,"name":"张三","age":99,"gender":"N","birthday":"2001-06-01","salary":3000.0,"registration_date":"2022-10-11 02:38:43"},"old":{"age":22}}
1
在Maxwell日志输出调用接口成功
2022-10-12 17:08:47 INFO MaxwellRestFulProducerForAvris - HTTP响应内容为:OK
1
上次更新: 2023/03/10, 16:49:38