Skip to content

Commit

Permalink
feat: put if absent & insert or ignore (#3692)
Browse files Browse the repository at this point in the history
- insert or ignore in c++/java
- put if absent for load data
- abort imediately in spark data writer
- appendentries fix: delete or put
- doc
- refactor spark connector config setup
  • Loading branch information
vagetablechicken authored Jan 20, 2024
1 parent dfd860e commit ff7e8ac
Show file tree
Hide file tree
Showing 60 changed files with 659 additions and 456 deletions.
4 changes: 3 additions & 1 deletion demo/usability_testing/data_mocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional
import numpy as np
import pandas as pd
import dateutil


# to support save csv, and faster parquet, we don't use faker-cli directly
Expand Down Expand Up @@ -146,8 +147,9 @@ def type_converter(sql_type):
if sql_type in ['varchar', 'string']:
# TODO(hw): set max length
return 'pystr', {}
# timestamp should > 0 cuz tablet insert will check it, use utc
if sql_type in ['date', 'timestamp']:
return 'iso8601', {}
return 'iso8601', {"tzinfo": dateutil.tz.UTC}
if sql_type in ['float', 'double']:
return 'pyfloat', ranges[sql_type]
return 'py' + sql_type, {}
Expand Down
4 changes: 2 additions & 2 deletions docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ IndexOption ::=
| ----------- | ------------------------------------------------------------ | ---------------------------------------------------- | ------------------------------------------------------------ |
| `ABSOLUTE` | TTL的值代表过期时间。配置值为时间段如`100m, 12h, 1d, 365d`。最大可以配置的过期时间为`15768000m`(即30年) | 当记录过期时,会被淘汰。 | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=absolute, TTL=100m)`<br />OpenMLDB将会删除100分钟之前的数据。 |
| `LATEST` | TTL的值代表最大存活条数。即同一个索引下面,最大允许存在的数据条数。最大可以配置1000条 | 记录超过最大条数时,会被淘汰。 | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=LATEST, TTL=10)`。OpenMLDB只会保留最近10条记录,删除以前的记录。 |
| `ABSORLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当且仅当记录过期****记录超过最大条数时,才会淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absorlat)`。当记录超过100条,**或者**当记录过期时,会被淘汰 |
| `ABSANDLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当记录过期****记录超过最大条数时,记录会被淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absandlat)`。当记录超过100条,**而且**记录过期时,会被淘汰 |
| `ABSORLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当且仅当记录过期****记录超过最大条数时,才会淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absorlat)`。当记录超过100条,**或者**当记录过期时,会被淘汰 |
| `ABSANDLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当记录过期****记录超过最大条数时,记录会被淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absandlat)`。当记录超过100条,**而且**记录过期时,会被淘汰 |

```{note}
最大过期时间和最大存活条数的限制,是出于性能考虑。如果你一定要配置更大的TTL值,请使用UpdateTTL来增大(可无视max限制),或者调整nameserver配置`absolute_ttl_max`和`latest_ttl_max`,重启生效。
Expand Down
3 changes: 2 additions & 1 deletion docs/zh/openmldb_sql/dml/INSERT_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ OpenMLDB 支持一次插入单行或多行数据。
## syntax

```
INSERT INFO tbl_name (column_list) VALUES (value_list) [, value_list ...]
INSERT [[OR] IGNORE] INTO tbl_name (column_list) VALUES (value_list) [, value_list ...]
column_list:
col_name [, col_name] ...
Expand All @@ -16,6 +16,7 @@ value_list:

**说明**
- `INSERT` 只能用在在线模式
- 默认`INSERT`不会去重,`INSERT OR IGNORE` 则可以忽略已存在于表中的数据,可以反复重试。

## Examples

Expand Down
11 changes: 11 additions & 0 deletions docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ FilePathPattern
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 |
| thread | Integer | 1 | 仅在本地文件导入时生效,即`load_mode='local'`或者单机版,表示本地插入数据的线程数。 最大值为`50`|
| writer_type | String | single | 集群版在线导入中插入数据的writer类型。可选值为`single``batch`,默认为`single``single`表示数据即读即写,节省内存。`batch`则是将整个rdd分区读完,确认数据类型有效性后,再写入集群,需要更多内存。在部分情况下,`batch`模式有利于筛选未写入的数据,方便重试这部分数据。 |
| put_if_absent | Boolean | false | 在源数据无重复行也不与表中已有数据重复时,可以使用此选项避免插入重复数据,特别是job失败后可以重试。等价于使用`INSERT OR IGNORE`。更多详情见下文。 |

```{note}
在集群版中,`LOAD DATA INFILE`语句会根据当前执行模式(execute_mode)决定将数据导入到在线或离线存储。单机版中没有存储区别,只会导入到在线存储中,同时也不支持`deep_copy`选项。
Expand All @@ -73,6 +74,7 @@ FilePathPattern
所以,请尽量使用绝对路径。单机测试中,本地文件用`file://`开头;生产环境中,推荐使用hdfs等文件系统。
```

## SQL语句模版

```sql
Expand Down Expand Up @@ -158,3 +160,12 @@ null,null
第二行两列都是两个双引号。
- cluster模式默认quote为`"`,所以这一行是两个空字符串。
- local模式默认quote为`\0`,所以这一行两列都是两个双引号。local模式quote可以配置为`"`,但escape规则是`""`为单个`"`,和Spark不一致,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)

## PutIfAbsent说明

PutIfAbsent是一个特殊的选项,它可以避免插入重复数据,仅需一个配置,操作简单,特别适合load datajob失败后重试,等价于使用`INSERT OR IGNORE`。如果你想要导入的数据中存在重复,那么通过PutIfAbsent导入,会导致部分数据丢失。如果你需要保留重复数据,不应使用此选项,建议通过其他方式去重后再导入。

PutIfAbsent需要去重这一额外开销,所以,它的性能与去重的复杂度有关:

- 表中只存在ts索引,且同一key+ts的数据量少于10k时(为了精确去重,在同一个key+ts下会逐行对比整行数据),PutIfAbsent的性能表现不会很差,通常导入时间在普通导入时间的2倍以内。
- 表中如果存在time索引(ts列为空),或者ts索引同一key+ts的数据量大于100k时,PutIfAbsent的性能会很差,导入时间可能超过普通导入时间的10倍,无法正常使用。这样的数据条件下,更建议进行去重后再导入。
10 changes: 10 additions & 0 deletions docs/zh/quickstart/beginner_must_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ OpenMLDB是在线离线存储计算分离的,所以,你需要明确自己导

关于如何设计你的数据流入流出,可参考[实时决策系统中 OpenMLDB 的常见架构整合方式](../tutorial/app_arch.md)

### 在线表

在线表是存在内存中的数据,同时也会使用硬盘进行备份恢复。在线表的数据,可以通过`select count(*) from t1`来检查条数,或者使用`show table status`来查看表状态(可能有一定延迟,可以稍等再查)。

在线表是可以有多个索引的,通过`desc <table>`可以查看。写入一条数据时每个索引中都会写入一条,区别是各个索引的分类排序不同。但由于索引还有TTL淘汰机制,各个索引的数据量可能不一致。`select count(*) from t1``show table status`的结果是第一个索引的数据量,它并不代表其他索引的数据量。SQL查询会使用哪一个索引,是由SQL Engine选择的最优索引,可以通过SQL物理计划来查看。

建表时,可以指定索引,也可以不指定,不指定时,会默认创建一个索引。如果是默认索引,它无ts列(用当前time作为排序列,我们称为time索引)将会永不淘汰数据,可以以它为标准检查数据量是否准确,但这样的索引会占用太多的内存,目前也不可以删除第一条索引(计划未来支持),可以通过NS Client修改TTL淘汰数据,减少它的内存占用。

time索引(无ts的索引)还会影响PutIfAbsent导入。如果你的数据导入可能中途失败,无其他方法进行删除或去重,想要使用PutIfAbsent来进行导入重试时,请参考[PutIfAbsent说明](../openmldb_sql/dml/LOAD_DATA_STATEMENT.md#putifabsent说明)对自己的数据进行评估,避免PutIfAbsent效率太差。

## 源数据

### LOAD DATA
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/node/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class NodeManager {
SqlNode *MakeInsertTableNode(const std::string &db_name,
const std::string &table_name,
const ExprListNode *column_names,
const ExprListNode *values);
const ExprListNode *values, InsertStmt::InsertMode insert_mode);
CreateStmt *MakeCreateTableNode(bool op_if_not_exist,
const std::string &db_name,
const std::string &table_name,
Expand Down
25 changes: 21 additions & 4 deletions hybridse/include/node/sql_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1865,26 +1865,43 @@ class ColumnDefNode : public SqlNode {

class InsertStmt : public SqlNode {
public:
// ref zetasql ASTInsertStatement
enum InsertMode {
DEFAULT_MODE, // plain INSERT
REPLACE, // INSERT OR REPLACE
UPDATE, // INSERT OR UPDATE
IGNORE // INSERT OR IGNORE
};

InsertStmt(const std::string &db_name,
const std::string &table_name,
const std::vector<std::string> &columns,
const std::vector<ExprNode *> &values)
const std::vector<ExprNode *> &values,
InsertMode insert_mode)
: SqlNode(kInsertStmt, 0, 0),
db_name_(db_name),
table_name_(table_name),
columns_(columns),
values_(values),
is_all_(columns.empty()) {}
is_all_(columns.empty()),
insert_mode_(insert_mode) {}

InsertStmt(const std::string &db_name, const std::string &table_name, const std::vector<ExprNode *> &values)
: SqlNode(kInsertStmt, 0, 0), db_name_(db_name), table_name_(table_name), values_(values), is_all_(true) {}
InsertStmt(const std::string &db_name, const std::string &table_name, const std::vector<ExprNode *> &values,
InsertMode insert_mode)
: SqlNode(kInsertStmt, 0, 0),
db_name_(db_name),
table_name_(table_name),
values_(values),
is_all_(true),
insert_mode_(insert_mode) {}
void Print(std::ostream &output, const std::string &org_tab) const;

const std::string db_name_;
const std::string table_name_;
const std::vector<std::string> columns_;
const std::vector<ExprNode *> values_;
const bool is_all_;
const InsertMode insert_mode_;
};

class StorageModeNode : public SqlNode {
Expand Down
7 changes: 4 additions & 3 deletions hybridse/src/node/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,10 @@ AllNode *NodeManager::MakeAllNode(const std::string &relation_name, const std::s
}

SqlNode *NodeManager::MakeInsertTableNode(const std::string &db_name, const std::string &table_name,
const ExprListNode *columns_expr, const ExprListNode *values) {
const ExprListNode *columns_expr, const ExprListNode *values,
InsertStmt::InsertMode insert_mode) {
if (nullptr == columns_expr) {
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, values->children_);
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, values->children_, insert_mode);
return RegisterNode(node_ptr);
} else {
std::vector<std::string> column_names;
Expand All @@ -811,7 +812,7 @@ SqlNode *NodeManager::MakeInsertTableNode(const std::string &db_name, const std:
}
}
}
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, column_names, values->children_);
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, column_names, values->children_, insert_mode);
return RegisterNode(node_ptr);
}
}
Expand Down
3 changes: 2 additions & 1 deletion hybridse/src/node/sql_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ TEST_F(SqlNodeTest, MakeInsertNodeTest) {
value_expr_list->PushBack(value4);
ExprListNode *insert_values = node_manager_->MakeExprList();
insert_values->PushBack(value_expr_list);
SqlNode *node_ptr = node_manager_->MakeInsertTableNode("", "t1", column_expr_list, insert_values);
SqlNode *node_ptr = node_manager_->MakeInsertTableNode("", "t1", column_expr_list, insert_values,
InsertStmt::InsertMode::DEFAULT_MODE);

ASSERT_EQ(kInsertStmt, node_ptr->GetType());
InsertStmt *insert_stmt = dynamic_cast<InsertStmt *>(node_ptr);
Expand Down
9 changes: 5 additions & 4 deletions hybridse/src/planv2/ast_node_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1962,8 +1962,9 @@ base::Status ConvertInsertStatement(const zetasql::ASTInsertStatement* root, nod
}
CHECK_TRUE(nullptr == root->query(), common::kSqlAstError, "Un-support insert statement with query");

CHECK_TRUE(zetasql::ASTInsertStatement::InsertMode::DEFAULT_MODE == root->insert_mode(), common::kSqlAstError,
"Un-support insert mode ", root->GetSQLForInsertMode());
CHECK_TRUE(zetasql::ASTInsertStatement::InsertMode::DEFAULT_MODE == root->insert_mode() ||
zetasql::ASTInsertStatement::InsertMode::IGNORE == root->insert_mode(),
common::kSqlAstError, "Un-support insert mode ", root->GetSQLForInsertMode());
CHECK_TRUE(nullptr == root->returning(), common::kSqlAstError,
"Un-support insert statement with return clause currently", root->GetSQLForInsertMode());
CHECK_TRUE(nullptr == root->assert_rows_modified(), common::kSqlAstError,
Expand Down Expand Up @@ -2000,8 +2001,8 @@ base::Status ConvertInsertStatement(const zetasql::ASTInsertStatement* root, nod
if (names.size() == 2) {
db_name = names[0];
}
*output =
dynamic_cast<node::InsertStmt*>(node_manager->MakeInsertTableNode(db_name, table_name, column_list, rows));
*output = dynamic_cast<node::InsertStmt*>(node_manager->MakeInsertTableNode(
db_name, table_name, column_list, rows, static_cast<node::InsertStmt::InsertMode>(root->insert_mode())));
return base::Status::OK();
}
base::Status ConvertDropStatement(const zetasql::ASTDropStatement* root, node::NodeManager* node_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@ object LoadDataPlan {
loadDataSql)

// write
logger.info("write data to storage {}, writer[mode {}], is deep? {}", storage, mode, deepCopy.toString)
logger.info("write data to storage {}, writer mode {}, is deep {}", storage, mode, deepCopy.toString)
if (storage == "online") { // Import online data
require(deepCopy && mode == "append", "import to online storage, can't do soft copy, and mode must be append")
val writeType = extra.get("writer_type").get
val putIfAbsent = extra.get("put_if_absent").get.toBoolean
logger.info(s"online write type ${writeType}, put if absent ${putIfAbsent}")
val writeOptions = Map(
"db" -> db,
"table" -> table,
"zkCluster" -> ctx.getConf.openmldbZkCluster,
"zkPath" -> ctx.getConf.openmldbZkRootPath,
"writerType" -> writeType
"writerType" -> writeType,
"putIfAbsent" -> putIfAbsent.toString
)
df.write.options(writeOptions).format("openmldb").mode(mode).save()
} else { // Import offline data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,17 @@ object HybridseUtil {
}

// extra options for some special case
// only for PhysicalLoadDataNode
var extraOptions: mutable.Map[String, String] = mutable.Map()
// only for PhysicalLoadDataNode
extraOptions += ("deep_copy" -> parseOption(getOptionFromNode(node, "deep_copy"), "true", getBoolOrDefault))

// only for select into, "" means N/A
extraOptions += ("coalesce" -> parseOption(getOptionFromNode(node, "coalesce"), "0", getIntOrDefault))
extraOptions += ("sql" -> parseOption(getOptionFromNode(node, "sql"), "", getStringOrDefault))
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single",
getStringOrDefault)
extraOptions += ("sql" -> parseOption(getOptionFromNode(node, "sql"), "", getStringOrDefault))
extraOptions += ("put_if_absent" -> parseOption(getOptionFromNode(node, "put_if_absent"), "false",
getBoolOrDefault))

// only for select into, "" means N/A
extraOptions += ("coalesce" -> parseOption(getOptionFromNode(node, "coalesce"), "0", getIntOrDefault))
extraOptions += ("create_if_not_exists" -> parseOption(getOptionFromNode(node, "create_if_not_exists"),
"true", getBoolOrDefault))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public java.sql.Statement createStatement() throws SQLException {
@Override
public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException {
String lower = sql.toLowerCase();
if (lower.startsWith("insert into")) {
// insert, insert or xxx
if (lower.startsWith("insert ")) {
return client.getInsertPreparedStmt(this.defaultDatabase, sql);
} else if (lower.startsWith("select")) {
return client.getPreparedStatement(this.defaultDatabase, sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package com._4paradigm.openmldb.sdk;

import lombok.Data;
import java.io.Serializable;

import com._4paradigm.openmldb.BasicRouterOptions;
import com._4paradigm.openmldb.SQLRouterOptions;
import com._4paradigm.openmldb.StandaloneOptions;

@Data
public class SdkOption {
public class SdkOption implements Serializable {
// TODO(hw): set isClusterMode automatically
private boolean isClusterMode = true;
// options for cluster mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public boolean execute() throws SQLException {
// actually only one row
boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(),
cache.getTid(), cache.getPartitionNum(),
dimensions.array(), dimensions.capacity(), value.array(), value.capacity(), status);
dimensions.array(), dimensions.capacity(), value.array(), value.capacity(), cache.isPutIfAbsent(), status);
// cleanup rows even if insert failed
// we can't execute() again without set new row, so we must clean up here
clearParameters();
Expand Down Expand Up @@ -381,7 +381,7 @@ public int[] executeBatch() throws SQLException {
boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(),
cache.getTid(), cache.getPartitionNum(),
pair.getKey().array(), pair.getKey().capacity(),
pair.getValue().array(), pair.getValue().capacity(), status);
pair.getValue().array(), pair.getValue().capacity(), cache.isPutIfAbsent(), status);
if (!ok) {
// TODO(hw): may lost log, e.g. openmldb-batch online import in yarn mode?
logger.warn(status.ToString());
Expand Down
Loading

0 comments on commit ff7e8ac

Please sign in to comment.