Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: insert error improve #3725

Merged
merged 8 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ USE db1;
-- SUCCEED: Database changed
```

创建两张表:
创建一张自定义索引的表:

```sql
CREATE TABLE t1 (col0 STRING, col1 int, std_time TIMESTAMP, INDEX(KEY=col1, TS=std_time, TTL_TYPE=absolute, TTL=30d));
Expand All @@ -64,6 +64,37 @@ desc t1;

```

有离线数据的表:

```sql
--- ------- ----------- ------ ---------
# Field Type Null Default
--- ------- ----------- ------ ---------
1 c1 Varchar YES
2 c2 Int YES
3 c3 BigInt YES
4 c4 Float YES
5 c5 Double YES
6 c6 Timestamp YES
7 c7 Date YES
--- ------- ----------- ------ ---------
--- -------------------- ------ ---- ------ ---------------
# name keys ts ttl ttl_type
--- -------------------- ------ ---- ------ ---------------
1 INDEX_0_1705743486 c1 - 0min kAbsoluteTime
--- -------------------- ------ ---- ------ ---------------
---------------------------------------------------------- ------------------------------------------ --------- ---------
Data path Symbolic paths Format Options
---------------------------------------------------------- ------------------------------------------ --------- ---------
file:///tmp/openmldb_offline_storage/demo_db/demo_table1 file:///work/taxi-trip/data/data.parquet parquet
---------------------------------------------------------- ------------------------------------------ --------- ---------

--------------- --------------
compress_type storage_mode
--------------- --------------
NoCompress Memory
--------------- --------------
```


## 相关语句
Expand Down
89 changes: 68 additions & 21 deletions docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# LOAD DATA INFILE
`LOAD DATA INFILE`语句能高效地将文件中的数据读取到数据库中的表中。`LOAD DATA INFILE` 与 `SELECT INTO OUTFILE`互补。要将数据从 table导出到文件,请使用[SELECT INTO OUTFILE](../dql/SELECT_INTO_STATEMENT.md)。要将文件数据导入到 table 中,请使用`LOAD DATA INFILE`。
`LOAD DATA INFILE`语句能高效地将文件中的数据读取到数据库中的表中。`LOAD DATA INFILE` 与 `SELECT INTO OUTFILE`互补。要将数据从 table导出到文件,请使用[SELECT INTO OUTFILE](../dql/SELECT_INTO_STATEMENT.md)。要将文件数据导入到 table 中,请使用`LOAD DATA INFILE`。注意,导入的文件schema顺序应与表的schema顺序一致。

```{note}
无论何种load_mode,INFILE 的 filePath既可以是单个文件名,也可以是目录,也可以使用`*`通配符。
- load_mode=cluster的具体格式等价于`DataFrameReader.read.load(String)`,可以使用spark shell来read你想要的文件路径,确认能否读入成功。如果目录中存在多格式的文件,只会选择 LoadDataInfileOptionsList 中指定的FORMAT格式文件。
- load_mode=local则使用glob选择出符合的所有文件,不会检查单个文件的格式,所以,请保证满足条件的是csv格式,建议使用`*.csv`限制文件格式。
- load_mode=local则使用glob选择出符合的所有文件,不会检查单个文件的格式,所以,请保证满足条件的文件都是csv格式,建议使用`*.csv`限制文件格式。
```

## Syntax
Expand Down Expand Up @@ -55,7 +55,7 @@ FilePathPattern
| quote | String | " | 输入数据的包围字符串。字符串长度<=1。<br />load_mode=`cluster`默认为双引号`"`。配置包围字符后,被包围字符包围的内容将作为一个整体解析。例如,当配置包围字符串为"#"时, `1, 1.0, #This is a string field, even there is a comma#, normal_string`将为解析为三个filed,第一个是整数1,第二个是浮点1.0,第三个是一个字符串,第四个虽然没有quote,但也是一个字符串。<br /> **local_mode=`local`默认为`\0`,也可使用空字符串赋值,不处理包围字符。** |
| mode | String | "error_if_exists" | 导入模式:<br />`error_if_exists`: 仅离线模式可用,若离线表已有数据则报错。<br />`overwrite`: 仅离线模式可用,数据将覆盖离线表数据。<br />`append`:离线在线均可用,若文件已存在,数据将追加到原文件后面。<br /> **local_mode=`local`默认为`append`** |
| deep_copy | Boolean | true | `deep_copy=false`仅支持离线load, 可以指定`INFILE` Path为该表的离线存储地址,从而不需要硬拷贝。 |
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 |
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 <br />local模式的使用限制见[local导入模式说明](#local导入模式说明) |
| thread | Integer | 1 | 仅在本地文件导入时生效,即`load_mode='local'`或者单机版,表示本地插入数据的线程数。 最大值为`50`。 |
| writer_type | String | single | 集群版在线导入中插入数据的writer类型。可选值为`single`和`batch`,默认为`single`。`single`表示数据即读即写,节省内存。`batch`则是将整个rdd分区读完,确认数据类型有效性后,再写入集群,需要更多内存。在部分情况下,`batch`模式有利于筛选未写入的数据,方便重试这部分数据。 |
| put_if_absent | Boolean | false | 在源数据无重复行也不与表中已有数据重复时,可以使用此选项避免插入重复数据,特别是job失败后可以重试。等价于使用`INSERT OR IGNORE`。更多详情见下文。 |
Expand Down Expand Up @@ -121,7 +121,36 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1;

## 离线导入规则

表的离线信息可通过`desc <table>`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。
表的离线信息可通过`desc <table>`查看。我们将数据地址分为两类,Data path与Symbolic path,离线地址Data path是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址Symbolic path,则是软链接导入的地址列表,可以是多个。
```
--- ------- ----------- ------ ---------
# Field Type Null Default
--- ------- ----------- ------ ---------
1 c1 Varchar YES
2 c2 Int YES
3 c3 BigInt YES
4 c4 Float YES
5 c5 Double YES
6 c6 Timestamp YES
7 c7 Date YES
--- ------- ----------- ------ ---------
--- -------------------- ------ ---- ------ ---------------
# name keys ts ttl ttl_type
--- -------------------- ------ ---- ------ ---------------
1 INDEX_0_1705743486 c1 - 0min kAbsoluteTime
--- -------------------- ------ ---- ------ ---------------
---------------------------------------------------------- ------------------------------------------ --------- ---------
Data path Symbolic paths Format Options
---------------------------------------------------------- ------------------------------------------ --------- ---------
file:///tmp/openmldb_offline_storage/demo_db/demo_table1 file:///work/taxi-trip/data/data.parquet parquet
---------------------------------------------------------- ------------------------------------------ --------- ---------

--------------- --------------
compress_type storage_mode
--------------- --------------
NoCompress Memory
--------------- --------------
```
根据模式的不同,对离线信息的修改也不同。
- overwrite模式,将会覆盖原有的所有字段,包括离线地址、软链接地址、格式、读取选项,仅保留当前overwrite进入的信息。
- overwrite 硬拷贝,离线地址如果存在数据将被覆盖,软链接全部清空,格式更改为内部默认格式parquet,读取选项全部清空。
Expand All @@ -144,28 +173,46 @@ curl http://<ns_endpoint>/NameServer/UpdateOfflineTableInfo -d '{"db":"<db_name>
## CSV源数据格式说明

导入支持csv和parquet两种数据格式,csv的格式需要特别注意,下面举例说明。

```
c1, c2
,
"",""
ab,cd
"ef","gh"
null,null
```
这个csv源数据中,第一行两个空值(blank value)。
- cluster模式空值会被当作`null`(无论null_value是什么)。
- local模式空值会被当作空字符串,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)。

第二行两列都是两个双引号。
- cluster模式默认quote为`"`,所以这一行是两个空字符串。
- local模式默认quote为`\0`,所以这一行两列都是两个双引号。local模式quote可以配置为`"`,但escape规则是`""`为单个`"`,和Spark不一致,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)。
1. csv的列分隔符默认为`,`,不允许出现空格,否则,"a, b"将被解析为两列,第一列为`a`,第二列为` b`(有一个空格)。
1. local模式会trim掉列分隔符两边的空格,所以`a, b`会被解析为两列,第一列为`a`,第二列为`b`。但从规范上来说,csv的列分隔符左右不应该有空格,请不要依赖这个特性。
2. cluster和local模式对于空值的处理不同,具体为:
```
c1, c2
,
"",""
ab,cd
"ef","gh"
null,null
```
这个csv源数据中,第一行两个空值(blank value)。
- cluster模式空值会被当作`null`(无论null_value是什么)。
- local模式空值会被当作空字符串,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)。

第二行两列都是两个双引号。
- cluster模式默认quote为`"`,所以这一行是两个空字符串。
- local模式默认quote为`\0`,所以这一行两列都是两个双引号。local模式quote可以配置为`"`,但escape规则是`""`为单个`"`,和Spark不一致,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)。

3. cluster的csv格式支持两种格式的timestamp,但同一次load只会选择一种格式,不会混合使用。如果csv中存在两种格式的timestamp,会导致解析失败。选择哪种格式由第一行数据决定,如果第一行数据是`2020-01-01 00:00:00`,则后续所有timestamp都会按照`yyyy-MM-dd HH:mm:ss`格式解析;如果第一行数据是整型`1577808000000`,则后续所有timestamp都会按照整型格式解析。
1. timestamp可以为字符串格式,比如`"2020-01-01 00:00:00"`。
2. date可以是年月日(`yyyy-MM-dd`)或者年月日时分秒(`yyyy-MM-dd HH:mm:ss`)。
4. local的csv格式只支持整型timestamp,date类型为年月日,例如`2022-2-2`。
1. timestamp和date均不可以为字符串格式,比如`"2020-01-01"`将解析失败。
2. date不可以是年月日时分秒,例如`2022-2-2 00:00:00`将解析失败。
5. local的字符串不支持quote转义,所以如果你的字符串中存在quote字符,请使用cluster模式。
6. cluster如果读取csv时解析失败,将会把失败的列值设为NULL,继续导入流程,但local模式会直接报错,不会继续导入。

## PutIfAbsent说明

PutIfAbsent是一个特殊的选项,它可以避免插入重复数据,仅需一个配置,操作简单,特别适合load datajob失败后重试,等价于使用`INSERT OR IGNORE`。如果你想要导入的数据中存在重复,那么通过PutIfAbsent导入,会导致部分数据丢失。如果你需要保留重复数据,不应使用此选项,建议通过其他方式去重后再导入。
PutIfAbsent是一个特殊的选项,它可以避免插入重复数据,仅需一个配置,操作简单,特别适合load datajob失败后重试,等价于使用`INSERT OR IGNORE`。如果你想要导入的数据中存在重复,那么通过PutIfAbsent导入,会导致部分数据丢失。如果你需要保留重复数据,不应使用此选项,建议通过其他方式去重后再导入。local模式暂不支持此选项。

PutIfAbsent需要去重这一额外开销,所以,它的性能与去重的复杂度有关:

- 表中只存在ts索引,且同一key+ts的数据量少于10k时(为了精确去重,在同一个key+ts下会逐行对比整行数据),PutIfAbsent的性能表现不会很差,通常导入时间在普通导入时间的2倍以内。
- 表中如果存在time索引(ts列为空),或者ts索引同一key+ts的数据量大于100k时,PutIfAbsent的性能会很差,导入时间可能超过普通导入时间的10倍,无法正常使用。这样的数据条件下,更建议进行去重后再导入。

## local导入模式说明

load_mode可使用local模式,但与cluster模式有一些不同,如果你部署了TaskManager,我们建议使用cluster模式。不同之处如下:

- local模式仅支持在线,不支持离线。也只支持csv格式,不支持parquet格式。
- csv的读取支持有限,(SplitLineWithDelimiterForStrings)
16 changes: 8 additions & 8 deletions docs/zh/quickstart/sdk/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
```

- 目前仅支持插入一条数据。
- 数据需严格按照 schema 排列。
- 数据需严格按照表 schema 排列。

请求数据样例:

Expand Down Expand Up @@ -152,13 +152,13 @@ curl http://127.0.0.1:8080/dbs/demo_db/deployments/demo_data_service -X POST -d'

请求参数:

| **参数** | **类型** | **必需** | **说明** |
| -------- | -------- | -------- | ------------------------------------------------------------ |
| mode | String | 是 | 可配 `offsync` , `offasync`, `online` |
| sql | String | 是 | |
| input | Object | 否 | |
| schema | Array | 否 | 可支持数据类型(大小写不敏感):`Bool`, `Int16`, `Int32`, `Int64`, `Float`, `Double`, `String`, `Date` and `Timestamp`. |
| data | Array | 否 | |
| **参数** | **类型** | **必需** | **说明** |
| -------- | -------- | -------- | ----------------------------------------------------------------------------------------------------------------------- |
| mode | String | 是 | 可配 `offsync` , `offasync`, `online` |
| sql | String | 是 | |
| input | Object | 否 | |
| schema | Array | 否 | 可支持数据类型(大小写不敏感):`Bool`, `Int16`, `Int32`, `Int64`, `Float`, `Double`, `String`, `Date` and `Timestamp`. |
| data | Array | 否 | schema和data字段必须同时存在 |

**请求数据样例**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public class RequestEngine implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(SqlEngine.class);
private static final Logger logger = LoggerFactory.getLogger(RequestEngine.class);

private SimpleCatalog catalog;
private EngineOptions options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import com._4paradigm.openmldb.proto.Type.DataType;
import com._4paradigm.openmldb.proto.Common.ColumnDesc;
import org.joda.time.DateTime;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ public void write(InternalRow record) throws IOException {
ResultSetMetaData metaData = preparedStatement.getMetaData();
Preconditions.checkState(record.numFields() == metaData.getColumnCount());
OpenmldbDataWriter.addRow(record, preparedStatement);
// check return for put result

// you can cache failed rows and throw exception when commit/close,
// but it still may interrupt other writers(pending or slow writers)

// check return for put result
if(!preparedStatement.execute()) {
throw new IOException("execute failed");
}
} catch (Exception e) {
throw new IOException("write row to openmldb failed on " + record, e);
throw new IOException("write row to openmldb failed on " + OpenmldbDataWriter.readable(record, preparedStatement), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com._4paradigm.openmldb.sdk.SdkOption;
import com._4paradigm.openmldb.sdk.SqlException;
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
import com._4paradigm.openmldb.spark.OpenmldbTable;
import com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void write(InternalRow record) throws IOException {
addRow(record, preparedStatement);
preparedStatement.addBatch();
} catch (Exception e) {
throw new IOException("convert to openmldb row failed on " + record, e);
throw new IOException("convert to openmldb row failed on " + readable(record, preparedStatement), e);
}
}

Expand Down Expand Up @@ -126,6 +127,19 @@ static void addRow(InternalRow record, PreparedStatement preparedStatement) thro
}
}

static String readable(InternalRow record, PreparedStatement preparedStatement) {
try {
ResultSetMetaData metaData = preparedStatement.getMetaData();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < record.numFields(); i++) {
sb.append(record.get(i, OpenmldbTable.sdkTypeToSparkType(metaData.getColumnType(i + 1)))).append(",");
}
return sb.toString();
} catch (SQLException e) {
return "readable error: " + e.getMessage();
}
}

@Override
public WriterCommitMessage commit() throws IOException {
try {
Expand Down
Loading
Loading