Hive sink connector
Write data to Hive.
In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9.
Tips: Hive Sink Connector not support array, map and struct datatype now
By default, we use 2PC commit to ensure exactly-once
- schema projection
- file format
- text
- parquet
- orc
name | type | required | default value |
---|---|---|---|
table_name | string | yes | - |
metastore_uri | string | yes | - |
partition_by | array | required if hive sink table have partitions | - |
sink_columns | array | no | When this parameter is empty, all fields are sink columns |
is_enable_transaction | boolean | no | true |
save_mode | string | no | "append" |
common-options | no | - |
Target Hive table name eg: db1.table1
Hive metastore uri
Partition data based on selected fields
Which columns need be write to hive, default value is all of the columns get from Transform
or Source
.
The order of the fields determines the order in which the file is actually written.
If is_enable_transaction
is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.
Only support true
now.
Storage mode, we need support overwrite
and append
. append
is now supported.
Streaming Job not support overwrite
.
Sink plugin common parameters, please refer to Sink Common Options for details
Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://namenode001:9083"
}
We have a source table like this:
create table test_hive_source(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE,
test_array ARRAY<INT>,
test_map MAP<STRING, FLOAT>,
test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
We need read data from the source table and write to another table:
create table test_hive_sink_text_simple(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
The job config file can like this:
env {
# You can set flink configuration here
execution.parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
table_name = "test_hive.test_hive_source"
metastore_uri = "thrift://ctyun7:9083"
}
}
transform {
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
partition_by = ["test_par1", "test_par2"]
sink_columns = ["test_tinyint", "test_smallint", "test_int", "test_bigint", "test_boolean", "test_float", "test_double", "test_string", "test_binary", "test_timestamp", "test_decimal", "test_char", "test_varchar", "test_date", "test_par1", "test_par2"]
}
}
- Add Hive Sink Connector
- [Improve] Hive Sink supports automatic partition repair (3133)