Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Maxwell读取MySQL数据
  • 重编译MaxWell自定义Producer
    • 实现功能
    • GitHub地址
    • 开始编译
      • IDEA配置JDK11
    • 开发自定义的Producer
      • 定义需要传入的参数
      • 将需要添加的参数添加进MaxWell源码中
      • 修改com.zendesk.maxwell.MaxwellConfig类
      • 修改com.zendesk.maxwell.MaxwellContext类
      • 创建自定义Producer类
      • 打包
    • 开发接收消息接口
      • 接口代码
      • 发送消息测试
      • 部署运行
      • java接口程序部署
      • 读取MySQL发送到指定接口
      • 解压MaxWell
      • 脚本中JDK指定
      • 启动
  • 《Maxwell教程》笔记
Jast-zsh
2022-10-14
目录

重编译MaxWell自定义Producer

[toc]

# 实现功能

新增Maxwell Producer功能,实现将同步数据发送到指定Http接口

后续想发送到其他Producer可以随意扩展

# GitHub地址

https://github.com/zendesk/maxwell

源码版本采用当前最新版本:1.38.0

# 开始编译

# IDEA配置JDK11

首先要使用JDK 11,不然会提示报错

错误如下:

在这里插入图片描述

插件也使用的11

在这里插入图片描述

进入 File -> Project Structure

在这里插入图片描述

添加JDK

在这里插入图片描述

选择JDK11

在这里插入图片描述

在这里插入图片描述

# 开发自定义的Producer

Maxwell代码逻辑自己可以阅读一下,这里只介绍新增Producer

# 定义需要传入的参数

参数 说明
http_url 请求接口的链接
header 设置请求头

注意:设置的参数不要重复,Maxwell 参数加载这com.zendesk.maxwell.MaxwellConfig类下

# 将需要添加的参数添加进MaxWell源码中

# 修改com.zendesk.maxwell.MaxwellConfig类

添加自定义参数变量

	public String httpUrl;
	public String header;
1
2

在buildOptionParser方法中添加我们需要的参数名、说明、类型

parser.section("restful");
parser.accepts( "http_url", "This url is used to send data" ).withRequiredArg();
parser.accepts( "header", "This header is used to set request headers " ).withRequiredArg();
1
2
3

在setup方法中添加我们需要设置的参数

		this.httpUrl = fetchStringOption("http_url", options, properties, null);
		this.header = fetchStringOption("header", options, properties, null);
1
2

# 修改com.zendesk.maxwell.MaxwellContext类

在getProducer方法中switch代码块添加我们自定义的producer

case "restful":
				this.producer = new MaxwellRestFulProducer(this);
				break;
1
2
3

# 创建自定义Producer类

创建一个Java类,该类要继承AbstractProducer,实现StoppableTask

public class MaxwellRestFulProducerForJast extends AbstractProducer implements StoppableTask {
}
1
2

自定义类代码如下

package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.util.StoppableTask;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;

/**
 * @date 2022/10/12 下午2:55
 * @description 发送到RestFul接口
 * @author Jast
*/
public class MaxwellRestFulProducer extends AbstractProducer implements StoppableTask {

	private static final Logger logger = LoggerFactory.getLogger(MaxwellRestFulProducer.class);

	/**
	 * 接收数据接口地址
	 */
	private final String httpUrl;

	/**
	 * 接口需要的请求头
	 */
	private final String header;

	private List<BasicHeader> headerList;

	public MaxwellRestFulProducerForAvris(MaxwellContext context) {
		super(context);

		logger.info("init RestFul Producer");

		this.httpUrl = context.getConfig().httpUrl;
		this.header = context.getConfig().header;

		headerList = new ArrayList<>();

		if (StringUtils.isNotBlank(header)) {
			byte[] decode = Base64.getDecoder().decode(header);
			String decryptHeader = new String(decode, StandardCharsets.UTF_8);
			JSONObject jsonObject = new JSONObject(decryptHeader);
			Iterator<String> keys = jsonObject.keys();
			while (keys.hasNext()) {
				String key = keys.next();
				String value = jsonObject.getString(key);
				BasicHeader basicHeader = new BasicHeader(key, value);
				this.headerList.add(basicHeader);
			}
		}

		logger.info("RestFul Api Headers:{}",headerList);

	}


	private void sendToRestFul(RowMap msg) throws Exception {

		String messageStr = msg.toJSON(outputConfig);

		this.httpExecute(messageStr);

		if (logger.isDebugEnabled()) {
			logger.debug("->  send data to url:{}", messageStr);
		}
	}

	private void httpExecute(String messageStr) {

		CloseableHttpClient httpClient = HttpClientBuilder.create().build();

		HttpPost httpPost = new HttpPost(httpUrl);

		headerList.stream().forEach(httpPost::addHeader);

		StringEntity entity = new StringEntity(messageStr, "utf-8");
		entity.setContentEncoding("UTF-8");
		entity.setContentType("application/json");
		httpPost.setEntity(entity);
		CloseableHttpResponse response = null;

		try {

			response = httpClient.execute(httpPost);
			HttpEntity responseEntity = response.getEntity();

			if (responseEntity != null) {
				String responseStr = EntityUtils.toString(responseEntity, StandardCharsets.UTF_8);
				logger.info("HTTP响应内容为:" + responseStr);
			}

		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				if (httpClient != null) {
					httpClient.close();
				}
				if (response != null) {
					response.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	@Override
	public void push(RowMap r) throws Exception {
		if (!r.shouldOutput(outputConfig)) {
			context.setPosition(r.getNextPosition());
			return;
		}

		boolean sentToRestFul = false;

		for (int cxErrors = 0; cxErrors < 2; cxErrors++) {
			try {
				//发送到自定义接口
				this.sendToRestFul(r);
				sentToRestFul = true;
				break;
			} catch (Exception e) {

				logger.error("Exception during put", e);

				if (!context.getConfig().ignoreProducerError) {
					throw new RuntimeException(e);
				}

			}
		}

		if (sentToRestFul) {
			this.succeededMessageCount.inc();
			this.succeededMessageMeter.mark();
		} else {
			this.failedMessageCount.inc();
			this.failedMessageMeter.mark();
		}

		if (r.isTXCommit()) {
			context.setPosition(r.getNextPosition());
		}
	}

	@Override
	public void requestStop() {

	}

	@Override
	public void awaitStop(Long timeout) {
	}

	@Override
	public StoppableTask getStoppableTask() {
		return this;
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180

# 打包

打包成功后在target目录中会有maxwell-1.38.0.tar.gz文件,这个文件就是我们编译后的,可以直接部署到服务器

项目下载完之后先package进行打包,直接编译有些类会提示找不到

# 开发接收消息接口

# 接口代码

这里只做演示,具体怎么处理根据业务逻辑自行修改


import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author Jast
 * @description
 * @date 2022-10-12 14:05
 */
@RestController
public class MessageController {
    @RequestMapping(value ="/data/send",method = RequestMethod.POST)
    public String queryUserByUid(@RequestBody String json) {
        System.out.println("data:"+json);
        return "OK";
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 发送消息测试

在这里插入图片描述

# 部署运行

# java接口程序部署

这里不做描述

# 读取MySQL发送到指定接口

# 解压MaxWell
tar -zxvf maxwell-1.38.0.tar.gz 
1
# 脚本中JDK指定

如果你服务器JDK是11请跳过此步骤

在启动脚本中指定JDK 11版本

> vim maxwell

# 在上面加入环境变量引用
#!/bin/bash

export JAVA_HOME=/home/hadoop/maxwell/jdk-11.0.16.1
export PATH=$JAVA_HOME/bin:$PATH
1
2
3
4
5
6
7
# 启动
bin/maxwell --user='maxwell' --password='123456' --host='172.16.24.143' --producer=restful --http_url=http://127.0.0.1:20081/data/send 
1

控制台打印,可以成功打印我们代码中打印的日志init RestFul Producer

[hadoop@10 ~/maxwell/maxwell-1.38.0]$ bin/maxwell --user='maxwell' --password='123456' --host='172.16.24.143' --producer=restful --http_url=http://127.0.0.1:20081/data/send 
2022-10-12 17:04:28 INFO  Maxwell - Starting Maxwell. maxMemory: 8350859264 bufferMemoryUsage: 0.25
2022-10-12 17:04:29 INFO  MaxwellRestFulProducerForAvris - init RestFul Producer
2022-10-12 17:04:29 INFO  MaxwellRestFulProducerForAvris - RestFul Api Headers:[] 
1
2
3
4

修改MySQL数据进行测试

修改zuser_text表数据,将张三年龄修改为99

在这里插入图片描述

在这里插入图片描述

在Java接口端可以看到日志输出

data:{"database":"avris-cdp-cdc","table":"zuser_text","type":"update","ts":1665565676,"xid":44487309,"commit":true,"data":{"id":2,"name":"张三","age":99,"gender":"N","birthday":"2001-06-01","salary":3000.0,"registration_date":"2022-10-11 02:38:43"},"old":{"age":22}}
1

在Maxwell日志输出调用接口成功

2022-10-12 17:08:47 INFO  MaxwellRestFulProducerForAvris - HTTP响应内容为:OK
1
上次更新: 2023/03/10, 16:49:38
Maxwell读取MySQL数据

← Maxwell读取MySQL数据

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式