Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • DataX下载安装
  • DataX插件开发-KafkaWriter
    • 下载源码
    • 插件开发
      • 创建kafkawriter模块
      • pom.xml
      • plugin.sjon
      • package.xml
      • 类
      • com.alibaba.datax.plugin.writer.KafkaWriter
      • com.alibaba.datax.plugin.writer.KafkaWriterErrorCode
      • com.alibaba.datax.plugin.writer.Key
      • 在DataX项目根目录下修改package.xml文件
      • 打包
      • 安装Datax
      • 下载DataX
      • 解压
      • 上传自定义KafkaWriter
      • 创建启动配置文件text2kafka.json
      • 启动
      • 执行结果
      • 在Kafka查看消息
    • 脚本样例
      • 读Txt发送Kafka
      • 读Hive发送Kafka
  • DataX读取Hive Orc格式表丢失数据处理记录
  • 《DataX教程》笔记
Jast-zsh
2023-03-10
目录

DataX插件开发-KafkaWriter

[toc]

# 下载源码

下载源码:https://github.com/alibaba/DataX/releases/tag/datax_v202210

这里使用的是 : datax_v202210 版本

DataX使用手册:https://github.com/alibaba/DataX/blob/master/introduction.md

DataX插件说明:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

# 插件开发

# 创建kafkawriter模块

# pom.xml

主要添加kafka-clients依赖,datax一些默认依赖,内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>datax-all</artifactId>
        <groupId>com.alibaba.datax</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>kafkawriter</artifactId>

    <properties>
        <kafka.version>1.1.1</kafka.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>${datax-project-version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- compiler plugin -->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${jdk-version}</source>
                    <target>${jdk-version}</target>
                    <encoding>${project-sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <!-- assembly plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                    <finalName>datax</finalName>
                </configuration>
                <executions>
                    <execution>
                        <id>dwzip</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

说明:

  • maven-assembly-plugin
首先我们要了解Datax打包的方式:Maven-assembly-plugin
1、作用:要想将写的程序和它本身所依赖的jar包一起build到一个包里,是maven中针对打包任务而提供的标准插件。
2、其他作用:
  1)提供一个把工程依赖元素、模块、网站文档等其他文件存放到单个归档文件里。
  2)打包成指定格式分发包,支持各种主流的格式如zip、tar.gz、jar和war等,具体打包哪些文件是高度可控的。
  3)能够自定义包含/排除指定的目录或文件。
  总体来说,实现插件maven-assembly-plugin需要两个步骤:
  第1步骤:pom.xml文件里配置maven-assembly-plugin,指定描述文件
  第2步骤:描述文件配置具体参数
  
在kafkawriter的pom文件中需要新增 
   <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
           <configuration>
               <descriptors> <!--描述文件路径-->
                   <descriptor>src/main/assembly/package.xml</descriptor>
               </descriptors> 
               <finalName>datax</finalName>
           </configuration>
           <executions>
               <execution>
                   <id>dwzip</id>
                   <phase>package</phase> <!-- 绑定到package生命周期阶段上 -->
                   <goals>
                       <goal>single</goal> <!-- 只运行一次 -->
                   </goals>
               </execution>
           </executions>
    </plugin>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# plugin.sjon

存放目录 src/main/resources/plugin.json

{
    "name": "kafkawriter",
    "class": "com.alibaba.datax.plugin.writer.KafkaWriter",
    "description": "Kafka Writer",
    "developer": "Jast"
}
1
2
3
4
5
6

# package.xml

存在目录src/main/assembly/package.xml

<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
            </includes>
            <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 类

# com.alibaba.datax.plugin.writer.KafkaWriter
package com.alibaba.datax.plugin.writer;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


/**
 *
 Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
 prepare和post在Job和Task中都存在,插件需要根据实际情况确定在什么地方执行操作。
 * @author mac
 */
public class KafkaWriter extends Writer {


    public static class Job extends Writer.Job {

        private static final Logger log = LoggerFactory.getLogger(Job.class);

        private Configuration conf = null;

        /**
         * init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。
         * 读插件获得配置中reader部分,写插件获得writer部分。
         */
        @Override
        public void init() {
            this.conf = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
            log.info("kafka writer params:{}", conf.toJSON());
            //校验 参数配置
            this.validateParameter();
        }


        private void validateParameter() {
            //toipc 必须填
            this.conf
                    .getNecessaryValue(
                            Key.TOPIC,
                            KafkaWriterErrorCode.REQUIRED_VALUE);


            this.conf
                    .getNecessaryValue(
                            Key.BOOTSTRAP_SERVERS,
                            KafkaWriterErrorCode.REQUIRED_VALUE);

        }

        /**
         * prepare: 全局准备工作,比如odpswriter清空目标表。
         */
        @Override
        public void prepare() {

        }

        /**
         * split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
         * @param mandatoryNumber
         *            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
         *
         * @return
         */
        @Override
        public List<Configuration> split(int mandatoryNumber) {
            //按照reader 配置文件的格式  来 组织相同个数的writer配置文件
            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
            for (int i = 0; i < mandatoryNumber; i++) {
                configurations.add(conf);
            }
            return configurations;
        }


        /**
         * post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
         */
        @Override
        public void post() {
					log.info("job destroy ");
        }

        /**
         * destroy: Job对象自身的销毁工作。
         */
        @Override
        public void destroy() {

        }

    }


    public static class Task extends Writer.Task {
        private static final Logger log = LoggerFactory.getLogger(Task.class);

        private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");

        private Producer<String, String> producer;

        private String fieldDelimiter;

        private Configuration conf;

        /**
         * init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Job的split方法返回的配置列表中的其中一个。
         */
        @Override
        public void init() {
            this.conf = super.getPluginJobConf();
            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
            //初始化kafka
            Properties props = new Properties();
            props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
            props.put("acks", "all");//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
            props.put("retries", 0);
            // Controls how much bytes sender would wait to batch up before publishing to Kafka.
            //控制发送者在发布到kafka之前等待批处理的字节数。
            //控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
            //默认16384   16kb
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer(props);

        }

        /**
         * prepare:局部的准备工作。
         */
        @Override
        public void prepare() {
            super.prepare();
        }

        /**
         * startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
         * @param lineReceiver
         */
        @Override
        public void startWrite(RecordReceiver lineReceiver) {

            log.info("start to writer kafka");
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
                //获取一行数据,按照指定分隔符 拼成字符串 发送出去
                producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                        recordToString(record), recordToString(record)));

            }
        }

        /**
         * destroy: Task象自身的销毁工作。
         */
        @Override
        public void destroy() {
            log.info("Waiting for message to be successfully sent");
            producer.flush();
            log.info("Message sent successfully");
            if (producer != null) {
                producer.close();
            }
        }


        private String recordToString(Record record) {
            int recordLength = record.getColumnNumber();
            if (0 == recordLength) {
                return NEWLINE_FLAG;
            }

            Column column;
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < recordLength; i++) {
                column = record.getColumn(i);
                sb.append(column.asString()).append(fieldDelimiter);
            }
            sb.setLength(sb.length() - 1);
            sb.append(NEWLINE_FLAG);

            return sb.toString();
        }

    }


}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# com.alibaba.datax.plugin.writer.KafkaWriterErrorCode
package com.alibaba.datax.plugin.writer;

import com.alibaba.datax.common.spi.ErrorCode;

public enum KafkaWriterErrorCode implements ErrorCode {
    REQUIRED_VALUE("KafkaWriter-00", "Required parameter is not filled .")
    ;

    private final String code;
    private final String description;

    KafkaWriterErrorCode(String code, String description) {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode() {
        return this.code;
    }

    @Override
    public String getDescription() {
        return this.description;
    }

    @Override
    public String toString() {
        return String.format("Code:[%s], Description:[%s]. ", this.code,
                this.description);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# com.alibaba.datax.plugin.writer.Key
package com.alibaba.datax.plugin.writer;

public class Key {
	public static final String FIELD_DELIMITER = "fieldDelimiter";
	public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
	public static final String TOPIC = "topic";
}
1
2
3
4
5
6
7

# 在DataX项目根目录下修改package.xml文件

在fileSets中添加

 <fileSet>
            <directory>kafkawriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>
1
2
3
4
5
6
7

# 打包

mvn -U clean package assembly:assembly -Dmaven.test.skip=true
1

或者IDEA中直接打包

生成打插件在目录DataX-datax_v202210/kafkawriter/target/datax/plugin

# 安装Datax

# 下载DataX

wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
1

# 解压

tar -zxvf datax.tar.gz 
1

# 上传自定义KafkaWriter

将自己开发的plugin目录上传到DataX工具目录下,并解压

[hadoop@10 ~/datax]$ ll plugin/writer/
total 148
.....
drwxr-xr-x 3 hadoop hadoop 4096 Dec  9 15:22 kafkawriter
.....
1
2
3
4
5

# 创建启动配置文件text2kafka.json

实现功能:读取文本内容,将数据发送到Kafka

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/home/hadoop/data.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "kafkawriter",
                    "parameter": {
                        "topic": "behavior_test",
                        "bootstrapServers": "10.16.0.2:9092",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 启动

python bin/datax.py job/text2kafka.json 
1

可能报错:

2022-12-09 15:18:30.412 [main] WARN ConfigParser - 插件[txtfilereader,kafkawriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/home/hadoop/datax/plugin/writer/.DS_Store/plugin.json]不存在. 请检查您的配置文件.

原因,Mac电脑打包自己默认将.DS_Store打进去了,需要删除,不然DataX会解析失败

# 执行结果

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2022-12-09 15:23:56.947 [main] INFO  MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2022-12-09 15:23:56.949 [main] INFO  MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2022-12-09 15:23:56.959 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-12-09 15:23:56.963 [main] INFO  Engine - the machine info  => 

        osInfo: Tencent 1.8 25.282-b1
        jvmInfo:        Linux amd64 5.4.119-19-0007
        cpu num:        8

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2022-12-09 15:23:56.976 [main] INFO  Engine - 
{
        "content":[
                {
                        "reader":{
                                "name":"txtfilereader",
                                "parameter":{
                                        "column":[],
                                        "encoding":"UTF-8",
                                        "fieldDelimiter":",",
                                        "path":[
                                                "/home/hadoop/data.txt"
                                        ]
                                }
                        },
                        "writer":{
                                "name":"kafkawriter",
                                "parameter":{
                                        "bootstrapServers":"10.16.0.2:9092",
                                        "fieldDelimiter":",",
                                        "topic":"behavior_test"
                                }
                        }
                }
        ],
        "setting":{
                "speed":{
                        "channel":2
                }
        }
}

2022-12-09 15:23:56.988 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2022-12-09 15:23:56.989 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-12-09 15:23:56.989 [main] INFO  JobContainer - DataX jobContainer starts job.
2022-12-09 15:23:56.991 [main] INFO  JobContainer - Set jobId = 0
2022-12-09 15:23:57.003 [job-0] INFO  KafkaWriter$Job - kafka writer params:{"bootstrapServers":"10.16.0.2:9092","fieldDelimiter":",","topic":"behavior_test"}
2022-12-09 15:23:57.004 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2022-12-09 15:23:57.004 [job-0] INFO  JobContainer - DataX Reader.Job [txtfilereader] do prepare work .
2022-12-09 15:23:57.005 [job-0] INFO  TxtFileReader$Job - add file [/home/hadoop/data.txt] as a candidate to be read.
2022-12-09 15:23:57.005 [job-0] INFO  TxtFileReader$Job - 您即将读取的文件数为: [1]
2022-12-09 15:23:57.005 [job-0] INFO  JobContainer - DataX Writer.Job [kafkawriter] do prepare work .
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - Job set Channel-Number to 2 channels.
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - DataX Reader.Job [txtfilereader] splits to [1] tasks.
2022-12-09 15:23:57.006 [job-0] INFO  JobContainer - DataX Writer.Job [kafkawriter] splits to [1] tasks.
2022-12-09 15:23:57.021 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2022-12-09 15:23:57.024 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2022-12-09 15:23:57.025 [job-0] INFO  JobContainer - Running by standalone Mode.
2022-12-09 15:23:57.030 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-12-09 15:23:57.033 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-12-09 15:23:57.034 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2022-12-09 15:23:57.042 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-12-09 15:23:57.042 [0-0-0-reader] INFO  TxtFileReader$Task - reading file : [/home/hadoop/data.txt]
2022-12-09 15:23:57.058 [0-0-0-writer] INFO  ProducerConfig - ProducerConfig values: 
        acks = all
        batch.size = 16384
        bootstrap.servers = [10.16.0.2:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 1
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-12-09 15:23:57.105 [0-0-0-reader] INFO  UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":",","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-12-09 15:23:57.158 [0-0-0-writer] INFO  AppInfoParser - Kafka version : 1.1.1
2022-12-09 15:23:57.158 [0-0-0-writer] INFO  AppInfoParser - Kafka commitId : 98b6346a977495f6
2022-12-09 15:23:57.159 [0-0-0-writer] INFO  KafkaWriter$Task - start to writer kafka
2022-12-09 15:23:57.227 [kafka-producer-network-thread | producer-1] INFO  Metadata - Cluster ID: 4SU0GyNWQpOfGrHCJfZwQw
2022-12-09 15:23:57.236 [0-0-0-writer] INFO  KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2022-12-09 15:23:57.243 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[202]ms
2022-12-09 15:23:57.243 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-12-09 15:24:07.040 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.040 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2022-12-09 15:24:07.040 [job-0] INFO  JobContainer - DataX Writer.Job [kafkawriter] do post work.
2022-12-09 15:24:07.040 [job-0] INFO  JobContainer - DataX Reader.Job [txtfilereader] do post work.
2022-12-09 15:24:07.040 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2022-12-09 15:24:07.041 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /home/hadoop/datax/hook
2022-12-09 15:24:07.042 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2022-12-09 15:24:07.042 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-12-09 15:24:07.043 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.043 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2022-12-09 15:23:56
任务结束时刻                    : 2022-12-09 15:24:07
任务总计耗时                    :                 10s
任务平均流量                    :                3B/s
记录写入速度                    :              1rec/s
读出记录总数                    :                  11
读写失败总数                    :                   0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173

# 在Kafka查看消息

这里不做截图,实际是发送进来了

# 脚本样例

# 读Txt发送Kafka

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/home/hadoop/data.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "kafkawriter",
                    "parameter": {
                        "topic": "behavior_test",
                        "bootstrapServers": "10.16.0.2:9092",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 读Hive发送Kafka

同步脚本

 python bin/datax.py -p "-Dday=2020-03-10" job/hive2kafka.json
1

hive2kafka.json 文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/usr/hive/warehouse/ods.db/ods_tt_clue_intent/dt=${day}",
                        "defaultFS": "hdfs://10.16.0.12:4010",
                        "column": [
                               {
                                "index": 0,
                                "type": "string"
                               },
                               {
                                 "index": 1,
                                 "type": "string"
                               },
                               {
                                 "index": 2,
                                 "type": "string"
                               },
                               {
                                 "index": 3,
                                 "type": "string"
                               },
                               {
                                 "index": 4,
                                 "type": "string"
                               },
                               {
                                 "index": 5,
                                 "type": "string"
                               },
                               {
                                 "index": 6,
                                 "type": "string"
                               },
                               {
                                 "index": 7,
                                 "type": "string"
                               },
                               {
                                 "index": 8,
                                 "type": "string"
                               },
                               {
                                 "index": 9,
                                 "type": "string"
                               },
                               {
                                 "index": 10,
                                 "type": "string"
                               },
                               {
                                 "index": 11,
                                 "type": "string"
                               },
                               {
                                 "index": 12,
                                 "type": "string"
                               },
                               {
                                 "index": 13,
                                 "type": "string"
                               },
                               {
                                 "index": 14,
                                 "type": "string"
                               },
                               {
                                 "index": 15,
                                 "type": "string"
                               },
                               {
                                 "index": 16,
                                 "type": "string"
                               },
                               {
                                 "index": 17,
                                 "type": "string"
                               },
                               {
                                 "index": 18,
                                 "type": "string"
                               },
                               {
                                 "index": 19,
                                 "type": "string"
                               },
                               {
                                 "index": 20,
                                 "type": "string"
                               },
                               {
                                 "index": 21,
                                 "type": "string"
                               },
                               {
                                 "index": 22,
                                 "type": "string"
                               },
                               {
                                 "index": 23,
                                 "type": "string"
                               },
                               {
                                 "index": 24,
                                 "type": "string"
                               },
                               {
                                 "index": 25,
                                 "type": "string"
                               },
                               {
                                 "index": 26,
                                 "type": "string"
                               },
                               {
                                 "index": 27,
                                 "type": "string"
                               },
                               {
                                 "type": "string",
                                 "value": "${day}"
                               }
                        ],
                        "fieldDelimiter":",",
                        "fileType": "orc",
                        "nullFormat":"\\N"
                    }
                },
                "writer": {
                    "name": "kafkawriter",
                    "parameter": {
                        "topic": "behavior_test",
                        "bootstrapServers": "10.16.0.2:9092",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

参考:https://www.imooc.com/article/259830

上次更新: 2023/05/25, 21:00:09
DataX下载安装
DataX读取Hive Orc格式表丢失数据处理记录

← DataX下载安装 DataX读取Hive Orc格式表丢失数据处理记录→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式