Flink代码读写Iceberg
# 创建基础信息
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs:///user/hive/warehouse/iceberg_hadoop_catalog',
'property-version'='1'
);
1
2
3
4
5
6
2
3
4
5
6
创建数据库
create database hadoop_catalog.iceberg_db;
1
创建表
CREATE TABLE `hadoop_catalog`.`iceberg_db`.`flink_read_1` (
id BIGINT COMMENT 'unique id',
data STRING
);
1
2
3
4
2
3
4
写入数据
INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (1, 'a');
INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (2, 'a');
INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (3, 'a');
INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (4, 'a');
1
2
3
4
2
3
4
# Flink追加写入数据
创建写入表
create table `hadoop_catalog`.`iceberg_db`.`flink_write_1` like `hadoop_catalog`.`iceberg_db`.`flink_read_1`;
1
# Flink覆盖写入数据
# 异常
# 异常一
Caused by: java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (265524090384035565, 3858118682089226033]
at org.apache.iceberg.IncrementalDataTableScan.snapshotsWithin(IncrementalDataTableScan.java:121)
at org.apache.iceberg.IncrementalDataTableScan.planFiles(IncrementalDataTableScan.java:73)
at org.apache.iceberg.BaseTableScan.planTasks(BaseTableScan.java:204)
at org.apache.iceberg.DataTableScan.planTasks(DataTableScan.java:30)
at org.apache.iceberg.flink.source.FlinkSplitPlanner.planTasks(FlinkSplitPlanner.java:109)
at org.apache.iceberg.flink.source.FlinkSplitPlanner.planInputSplits(FlinkSplitPlanner.java:41)
at org.apache.iceberg.flink.source.StreamingMonitorFunction.monitorAndForwardSplits(StreamingMonitorFunction.java:143)
at org.apache.iceberg.flink.source.StreamingMonitorFunction.run(StreamingMonitorFunction.java:121)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
报错原因
Flink还不能以 Streaming 的方式读取 Iceberg 的增量update/delete数据。
上次更新: 2023/03/10, 20:58:04