From d7be10d1575438a8ba03e601098e582e82c4601f Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 20 Sep 2023 10:51:45 +0800 Subject: [PATCH 1/9] feat: add interface to sql_router --- src/sdk/sql_insert_row.h | 67 ++++++++++++++++++++++++++++++++++++++++ src/sdk/sql_router_sdk.i | 2 ++ 2 files changed, 69 insertions(+) diff --git a/src/sdk/sql_insert_row.h b/src/sdk/sql_insert_row.h index bee50291b3c..317d492c91b 100644 --- a/src/sdk/sql_insert_row.h +++ b/src/sdk/sql_insert_row.h @@ -29,12 +29,75 @@ #include "codec/fe_row_codec.h" #include "node/sql_node.h" #include "proto/name_server.pb.h" +#include "schema/schema_adapter.h" #include "sdk/base.h" namespace openmldb::sdk { typedef std::shared_ptr>> DefaultValueMap; +// used in java to build InsertPreparedStatementCache +class DefaultValueContainer { + public: + DefaultValueContainer(const DefaultValueMap& default_map) : default_map_(default_map) {} + + bool IsValid(int idx) { + return idx >= 0 && idx < Size(); + } + + int Size() { + return default_map_->size(); + } + + bool IsNull(int idx) { + return default_map_->at(idx)->IsNull(); + } + + openmldb::type::DataType GetType(int idx) { + openmldb::type::DataType type; + schema::SchemaAdapter::ConvertType(default_map_->at(idx)->GetDataType(), &type); + return type; + } + + bool GetBool(int idx) { + return default_map_->at(idx)->GetBool(); + } + + int16_t GetSmallInt(int idx) { + return default_map_->at(idx)->GetSmallInt(); + } + + int32_t GetInt(int idx) { + return default_map_->at(idx)->GetInt(); + } + + int64_t GetBigInt(int idx) { + return default_map_->at(idx)->GetLong(); + } + + float GetFloat(int idx) { + return default_map_->at(idx)->GetFloat(); + } + + double GetDouble(int idx) { + return default_map_->at(idx)->GetDouble(); + } + + int32_t GetDate(int idx) { + return default_map_->at(idx)->GetInt(); + } + + int64_t GetTimeStamp(int idx) { + return default_map_->at(idx)->GetLong(); + } + + std::string GetString(int idx) { + return default_map_->at(idx)->GetStr(); + } + private: + DefaultValueMap default_map_; +}; + class SQLInsertRow { public: SQLInsertRow(std::shared_ptr<::openmldb::nameserver::TableInfo> table_info, @@ -81,6 +144,10 @@ class SQLInsertRow { const std::vector& stmt_column_idx_in_table, const std::shared_ptr<::hybridse::sdk::Schema>& schema); + std::shared_ptr GetDefaultValue() { + return std::make_shared(default_map_); + } + private: bool MakeDefault(); void PackDimension(const std::string& val); diff --git a/src/sdk/sql_router_sdk.i b/src/sdk/sql_router_sdk.i index 1146aeba42e..22ee63b3e6d 100644 --- a/src/sdk/sql_router_sdk.i +++ b/src/sdk/sql_router_sdk.i @@ -65,6 +65,7 @@ %shared_ptr(openmldb::sdk::QueryFuture); %shared_ptr(openmldb::sdk::TableReader); %shared_ptr(hybridse::node::CreateTableLikeClause); +%shared_ptr(openmldb::sdk::DefaultValueContainer); %template(VectorUint32) std::vector; %template(VectorString) std::vector; @@ -93,6 +94,7 @@ using openmldb::sdk::ExplainInfo; using hybridse::sdk::ProcedureInfo; using openmldb::sdk::QueryFuture; using openmldb::sdk::TableReader; +using openmldb::sdk::DefaultValueContainer; %} %include "sdk/sql_router.h" From 29a68bb5c2765bf19da545ba28b8a69107d65424 Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 22 Sep 2023 11:18:31 +0800 Subject: [PATCH 2/9] feat: optimize insert --- .../openmldb/jdbc/SQLInsertMetaData.java | 39 +- .../_4paradigm/openmldb/sdk/QueryFuture.java | 2 + .../_4paradigm/openmldb/sdk/SqlExecutor.java | 3 + .../impl/InsertPreparedStatementCache.java | 215 +++++ .../sdk/impl/InsertPreparedStatementImpl.java | 889 ++++-------------- .../openmldb/sdk/impl/SqlClusterExecutor.java | 20 +- src/sdk/sql_insert_row.h | 18 +- 7 files changed, 446 insertions(+), 740 deletions(-) create mode 100644 java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java index e4ccd903146..150b39ef8a5 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java @@ -18,10 +18,7 @@ import static com._4paradigm.openmldb.sdk.impl.Util.sqlTypeToString; -import com._4paradigm.openmldb.DataType; -import com._4paradigm.openmldb.Schema; -import com._4paradigm.openmldb.common.Pair; -import com._4paradigm.openmldb.sdk.Common; +import com._4paradigm.openmldb.sdk.Schema; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -29,42 +26,30 @@ public class SQLInsertMetaData implements ResultSetMetaData { - private final List schema; - private final Schema realSchema; - private final List> idx; + private final Schema schema; + private final List holeIdx; - public SQLInsertMetaData(List schema, - Schema realSchema, - List> idx) { + public SQLInsertMetaData(Schema schema, List holeIdx) { this.schema = schema; - this.realSchema = realSchema; - this.idx = idx; - } - - private void checkSchemaNull() throws SQLException { - if (schema == null) { - throw new SQLException("schema is null"); - } + this.holeIdx = holeIdx; } private void checkIdx(int i) throws SQLException { if (i <= 0) { throw new SQLException("index underflow"); } - if (i > schema.size()) { + if (i > holeIdx.size()) { throw new SQLException("index overflow"); } } public void check(int i) throws SQLException { checkIdx(i); - checkSchemaNull(); } @Override public int getColumnCount() throws SQLException { - checkSchemaNull(); - return schema.size(); + return holeIdx.size(); } @Override @@ -94,8 +79,8 @@ public boolean isCurrency(int i) throws SQLException { @Override public int isNullable(int i) throws SQLException { check(i); - Long index = idx.get(i - 1).getKey(); - if (realSchema.IsColumnNotNull(index)) { + boolean nullable = schema.isNullable(holeIdx.get(i)); + if (!nullable) { return columnNoNulls; } else { return columnNullable; @@ -123,8 +108,7 @@ public String getColumnLabel(int i) throws SQLException { @Override public String getColumnName(int i) throws SQLException { check(i); - Long index = idx.get(i - 1).getKey(); - return realSchema.GetColumnName(index); + return schema.getColumnName(holeIdx.get(i)); } @Override @@ -160,8 +144,7 @@ public String getCatalogName(int i) throws SQLException { @Override public int getColumnType(int i) throws SQLException { check(i); - Long index = idx.get(i - 1).getKey(); - return Common.type2SqlType(realSchema.GetColumnType(index)); + return schema.getColumnType(holeIdx.get(i)); } @Override diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java index 12bbd1ab8d9..94a75df69d4 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java @@ -74,6 +74,8 @@ public java.sql.ResultSet get() throws InterruptedException, ExecutionException if (resultSet != null) { resultSet.delete(); } + queryFuture.delete(); + queryFuture = null; logger.error("call procedure failed: {}", msg); throw new ExecutionException(new SqlException("call procedure failed: " + msg)); } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java index c89e53379bd..b55da67a430 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java @@ -48,10 +48,13 @@ public interface SqlExecutor { @Deprecated java.sql.ResultSet executeSQL(String db, String sql); + @Deprecated SQLInsertRow getInsertRow(String db, String sql); + @Deprecated SQLInsertRows getInsertRows(String db, String sql); + @Deprecated ResultSet executeSQLRequest(String db, String sql, SQLRequestRow row); Statement getStatement(); diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java new file mode 100644 index 00000000000..9516308e95e --- /dev/null +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java @@ -0,0 +1,215 @@ +package com._4paradigm.openmldb.sdk.impl; + +import com._4paradigm.openmldb.SQLInsertRow; +import com._4paradigm.openmldb.DefaultValueContainer; +import com._4paradigm.openmldb.VectorUint32; +import com._4paradigm.openmldb.common.codec.CodecMetaData; +import com._4paradigm.openmldb.common.codec.CodecUtil; +import com._4paradigm.openmldb.proto.NS; +import com._4paradigm.openmldb.sdk.Common; +import com._4paradigm.openmldb.sdk.Schema; + +import java.sql.Timestamp; +import java.sql.Types; +import java.util.*; + +public class InsertPreparedStatementCache { + + public static String NONETOKEN = "!N@U#L$L%"; + public static String EMPTY_STRING = "!@#$%"; + + private String sql; + private String db; + private String name; + private int tid; + private int partitionNum; + private Schema schema; + private CodecMetaData codecMetaData; + private Map defaultValue = new HashMap<>(); + private List holeIdx = new ArrayList<>(); + private Set indexPos = new HashSet<>(); + private Map> indexMap = new HashMap<>(); + private Map defaultIndexValue = new HashMap<>(); + + public InsertPreparedStatementCache(String sql, SQLInsertRow insertRow) { + this.sql = sql; + NS.TableInfo tableInfo = insertRow.GetTableInfo(); + try { + schema = Common.convertSchema(tableInfo.getColumnDescList()); + codecMetaData = new CodecMetaData(tableInfo.getColumnDescList()); + } catch (Exception e) { + e.printStackTrace(); + } + db = tableInfo.getDb(); + name = tableInfo.getName(); + tid = tableInfo.getTid(); + partitionNum = tableInfo.getTablePartitionCount(); + buildIndex(tableInfo); + DefaultValueContainer value = insertRow.GetDefaultValue(); + buildDefaultValue(value); + value.delete(); + VectorUint32 idxArray = insertRow.GetHoleIdx(); + buildHoleIdx(idxArray); + idxArray.delete(); + } + + private void buildIndex(NS.TableInfo tableInfo) { + Map nameIdxMap = new HashMap<>(); + for (int i = 0; i < schema.size(); i++) { + nameIdxMap.put(schema.getColumnName(i), i); + } + for (int i = 0; i < tableInfo.getColumnKeyList().size(); i++) { + com._4paradigm.openmldb.proto.Common.ColumnKey columnKey = tableInfo.getColumnKeyList().get(i); + List colList = new ArrayList<>(columnKey.getColNameCount()); + for (String name : columnKey.getColNameList()) { + colList.add(nameIdxMap.get(name)); + indexPos.add(nameIdxMap.get(name)); + } + indexMap.put(i, colList); + } + } + + private void buildHoleIdx(VectorUint32 idxArray) { + int size = idxArray.size(); + for (int i = 0; i < size; i++) { + holeIdx.add(idxArray.get(i).intValue()); + } + } + + private void buildDefaultValue(DefaultValueContainer valueContainer) { + VectorUint32 defaultPos = valueContainer.GetAllPosition(); + int size = defaultPos.size(); + for (int i = 0; i < size; i++) { + int schemaIdx = defaultPos.get(i).intValue(); + boolean isIndexVal = indexPos.contains(schemaIdx); + if (valueContainer.IsNull(schemaIdx)) { + defaultValue.put(schemaIdx, null); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, NONETOKEN); + } + } else { + switch (schema.getColumnType(schemaIdx)) { + case Types.BOOLEAN: { + boolean val = valueContainer.GetBool(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.SMALLINT: { + short val = valueContainer.GetSmallInt(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.INTEGER: { + int val = valueContainer.GetInt(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.BIGINT: { + long val = valueContainer.GetBigInt(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.FLOAT: + defaultValue.put(schemaIdx, valueContainer.GetFloat(schemaIdx)); + break; + case Types.DOUBLE: + defaultValue.put(schemaIdx, valueContainer.GetDouble(schemaIdx)); + break; + case Types.DATE: { + int val = valueContainer.GetDate(schemaIdx); + defaultValue.put(schemaIdx, CodecUtil.dateIntToDate(val)); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.TIMESTAMP: { + long val = valueContainer.GetTimeStamp(schemaIdx); + defaultValue.put(schemaIdx, new Timestamp(val)); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.VARCHAR: { + String val = valueContainer.GetString(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + if (val.isEmpty()) { + defaultIndexValue.put(schemaIdx, EMPTY_STRING); + } else { + defaultIndexValue.put(schemaIdx, val); + } + } + break; + } + } + } + } + defaultPos.delete(); + } + + public Schema getSchema() { + return schema; + } + + public String getDatabase() { + return db; + } + + public String getName() { + return name; + } + + public int getTid() { + return tid; + } + + public int getPartitionNum() { + return partitionNum; + } + + public CodecMetaData getCodecMeta() { + return codecMetaData; + } + + public Map getDefaultValue() { + return defaultValue; + } + + public String getSql() { + return sql; + } + + public int getSchemaIdx(int idx) { + return holeIdx.get(idx); + } + + List getHoleIdx() { + return holeIdx; + } + + Set getIndexPos() { + return indexPos; + } + + Map> getIndexMap() { + return indexMap; + } + + Map getDefaultIndexValue() { + return defaultIndexValue; + } +} diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java index 1eeb10865b5..9a0eb3d105a 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java @@ -18,99 +18,46 @@ import com._4paradigm.openmldb.*; -import com._4paradigm.openmldb.common.Pair; +import com._4paradigm.openmldb.common.codec.CodecUtil; +import com._4paradigm.openmldb.common.codec.FlexibleRowBuilder; +import com._4paradigm.openmldb.jdbc.PreparedStatement; import com._4paradigm.openmldb.jdbc.SQLInsertMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; -import java.io.Reader; -import java.math.BigDecimal; -import java.net.URL; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.sql.*; import java.sql.Date; import java.sql.ResultSet; import java.util.*; -import java.util.stream.Collectors; -public class InsertPreparedStatementImpl implements PreparedStatement { - public static final Charset CHARSET = StandardCharsets.UTF_8; +public class InsertPreparedStatementImpl extends PreparedStatement { private static final Logger logger = LoggerFactory.getLogger(InsertPreparedStatementImpl.class); - private final String db; - private final String sql; - private final SQLRouter router; - - // need manual deletion - private final List currentRows = new ArrayList<>(); - private Schema currentSchema; - - private final List currentDatas; - private final List currentDatasType; - private final List hasSet; - // stmt insert idx -> real table schema idx - private final List> schemaIdxes; - // used by building row - private final List> sortedIdxes; - - private boolean closed = false; - private boolean closeOnComplete = false; - private Integer stringsLen = 0; - - public InsertPreparedStatementImpl(String db, String sql, SQLRouter router) throws SQLException { - this.db = db; - this.sql = sql; - this.router = router; + private SQLRouter router; + private FlexibleRowBuilder rowBuilder; + private InsertPreparedStatementCache cache; - SQLInsertRow tempRow = getSQLInsertRow(); - this.currentSchema = tempRow.GetSchema(); - VectorUint32 idxes = tempRow.GetHoleIdx(); - - // In stmt order, if no columns in stmt, in schema order - // We'll sort it to schema order later, so needs the map - schemaIdxes = new ArrayList<>(idxes.size()); - // CurrentData and Type order is consistent with insert stmt. We'll do appending in schema order when build - // row. - currentDatas = new ArrayList<>(idxes.size()); - currentDatasType = new ArrayList<>(idxes.size()); - hasSet = new ArrayList<>(idxes.size()); - - for (int i = 0; i < idxes.size(); i++) { - Long realIdx = idxes.get(i); - schemaIdxes.add(new Pair<>(realIdx, i)); - DataType type = currentSchema.GetColumnType(realIdx); - currentDatasType.add(type); - currentDatas.add(null); - hasSet.add(false); - logger.debug("add col {}, {}", currentSchema.GetColumnName(realIdx), type); - } - // SQLInsertRow::AppendXXX order is the schema order(skip the no-hole columns) - sortedIdxes = schemaIdxes.stream().sorted(Comparator.comparing(Pair::getKey)) - .collect(Collectors.toList()); - } + private Set indexCol; + private Map> indexMap; + private Map indexValue; + private Map defaultIndexValue; + private List> batchValues; - private SQLInsertRow getSQLInsertRow() throws SQLException { - Status status = new Status(); - SQLInsertRow row = router.GetInsertRow(db, sql, status); - if (status.getCode() != 0) { - String msg = status.ToString(); - status.delete(); - if (row != null) { - row.delete(); - } - throw new SQLException("getSQLInsertRow failed, " + msg); - } - status.delete(); - return row; + public InsertPreparedStatementImpl(InsertPreparedStatementCache cache, SQLRouter router) throws SQLException { + this.router = router; + rowBuilder = new FlexibleRowBuilder(cache.getCodecMeta()); + this.cache = cache; + indexCol = cache.getIndexPos(); + indexMap = cache.getIndexMap(); + indexValue = new HashMap<>(); + defaultIndexValue = cache.getDefaultIndexValue(); + batchValues = new ArrayList<>(); } - private void clearSQLInsertRowList() { - for (SQLInsertRow row : currentRows) { - row.delete(); - } - currentRows.clear(); + private int getSchemaIdx(int idx) { + return cache.getSchemaIdx(idx); } @Override @@ -125,246 +72,211 @@ public int executeUpdate() throws SQLException { throw new SQLException("current do not support this method"); } - private void checkIdx(int i) throws SQLException { - if (closed) { - throw new SQLException("prepared statement closed"); - } - if (i <= 0) { - throw new SQLException("error sqe number"); - } - if (i > schemaIdxes.size()) { - throw new SQLException("out of data range"); - } - } - - private void checkType(int i, DataType type) throws SQLException { - if (currentDatasType.get(i - 1) != type) { - throw new SQLException("data type not match, expect " + currentDatasType.get(i - 1) + ", actual " + type); - } - } - private void setNull(int i) throws SQLException { - checkIdx(i); - boolean notAllowNull = checkNotAllowNull(i); - if (notAllowNull) { + if (!cache.getSchema().isNullable(i)) { throw new SQLException("this column not allow null"); } - hasSet.set(i - 1, true); - currentDatas.set(i - 1, null); + rowBuilder.setNULL(i); } @Override public void setNull(int i, int i1) throws SQLException { - setNull(i); + int realIdx = getSchemaIdx(i); + if (indexCol.contains(realIdx)) { + indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + } + setNull(realIdx); } @Override public void setBoolean(int i, boolean b) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeBool); - hasSet.set(i - 1, true); - currentDatas.set(i - 1, b); - } - - @Override - @Deprecated - public void setByte(int i, byte b) throws SQLException { - throw new SQLException("current do not support this method"); + int realIdx = getSchemaIdx(i); + rowBuilder.setBool(realIdx, b); + if (indexCol.contains(realIdx)) { + indexValue.put(realIdx, String.valueOf(b)); + } } @Override public void setShort(int i, short i1) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeInt16); - hasSet.set(i - 1, true); - currentDatas.set(i - 1, i1); + int realIdx = getSchemaIdx(i); + rowBuilder.setSmallInt(realIdx, i1); + if (indexCol.contains(realIdx)) { + indexValue.put(realIdx, String.valueOf(i1)); + } } @Override public void setInt(int i, int i1) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeInt32); - hasSet.set(i - 1, true); - currentDatas.set(i - 1, i1); - + int realIdx = getSchemaIdx(i); + rowBuilder.setInt(realIdx, i1); + if (indexCol.contains(realIdx)) { + indexValue.put(realIdx, String.valueOf(i1)); + } } @Override public void setLong(int i, long l) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeInt64); - hasSet.set(i - 1, true); - currentDatas.set(i - 1, l); + int realIdx = getSchemaIdx(i); + rowBuilder.setBigInt(realIdx, l); + if (indexCol.contains(realIdx)) { + indexValue.put(realIdx, String.valueOf(l)); + } } @Override public void setFloat(int i, float v) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeFloat); - hasSet.set(i - 1, true); - currentDatas.set(i - 1, v); + rowBuilder.setFloat(getSchemaIdx(i), v); } @Override public void setDouble(int i, double v) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeDouble); - hasSet.set(i - 1, true); - currentDatas.set(i - 1, v); - } - - @Override - @Deprecated - public void setBigDecimal(int i, BigDecimal bigDecimal) throws SQLException { - throw new SQLException("current do not support this type"); - } - - private boolean checkNotAllowNull(int i) { - Long idx = this.schemaIdxes.get(i - 1).getKey(); - return this.currentSchema.IsColumnNotNull(idx); + rowBuilder.setDouble(getSchemaIdx(i), v); } @Override public void setString(int i, String s) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeString); + int realIdx = getSchemaIdx(i); + if (indexCol.contains(realIdx)) { + if (s == null) { + indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + } else if (s.isEmpty()) { + indexValue.put(realIdx, InsertPreparedStatementCache.EMPTY_STRING); + } else { + indexValue.put(realIdx, s); + } + } if (s == null) { - setNull(i); + setNull(realIdx); return; } - byte[] bytes = s.getBytes(CHARSET); - // if this index already set, should first reduce length of bytes last time - if (hasSet.get(i - 1)) { - stringsLen -= ((byte[]) currentDatas.get(i - 1)).length; - } - stringsLen += bytes.length; - hasSet.set(i - 1, true); - currentDatas.set(i - 1, bytes); - } - - @Override - @Deprecated - public void setBytes(int i, byte[] bytes) throws SQLException { - throw new SQLException("current do not support this type"); + rowBuilder.setString(getSchemaIdx(i), s); } @Override public void setDate(int i, Date date) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeDate); + int realIdx = getSchemaIdx(i); + if (indexCol.contains(realIdx)) { + if (date != null) { + indexValue.put(realIdx, String.valueOf(CodecUtil.dateToDateInt(date))); + } else { + indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + } + } if (date == null) { - setNull(i); + setNull(realIdx); return; } - hasSet.set(i - 1, true); - currentDatas.set(i - 1, date); + rowBuilder.setDate(realIdx, date); } - @Override - @Deprecated - public void setTime(int i, Time time) throws SQLException { - throw new SQLException("current do not support this type"); - } @Override public void setTimestamp(int i, Timestamp timestamp) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeTimestamp); + int realIdx = getSchemaIdx(i); + if (indexCol.contains(realIdx)) { + if (timestamp != null) { + indexValue.put(realIdx, String.valueOf(timestamp.getTime())); + } else { + indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + } + } if (timestamp == null) { - setNull(i); + setNull(realIdx); return; } - hasSet.set(i - 1, true); - long ts = timestamp.getTime(); - currentDatas.set(i - 1, ts); - } - - @Override - @Deprecated - public void setAsciiStream(int i, InputStream inputStream, int i1) throws SQLException { - throw new SQLException("current do not support this type"); - } - - @Override - @Deprecated - public void setUnicodeStream(int i, InputStream inputStream, int i1) throws SQLException { - throw new SQLException("current do not support this type"); - } - - @Override - @Deprecated - public void setBinaryStream(int i, InputStream inputStream, int i1) throws SQLException { - throw new SQLException("current do not support this type"); + rowBuilder.setTimestamp(realIdx, timestamp); } @Override public void clearParameters() throws SQLException { - for (int i = 0; i < hasSet.size(); i++) { - hasSet.set(i, false); - currentDatas.set(i, null); - } - stringsLen = 0; - } - - @Override - @Deprecated - public void setObject(int i, Object o, int i1) throws SQLException { - throw new SQLException("current do not support this method"); - } - - private void buildRow() throws SQLException { - SQLInsertRow currentRow = getSQLInsertRow(); - boolean ok = currentRow.Init(stringsLen); - if (!ok) { - throw new SQLException("init row failed"); + rowBuilder.clear(); + indexValue.clear(); + } + + private ByteBuffer buildDimension() throws SQLException { + int totalLen = 0; + Map lenMap = new HashMap<>(); + for (Map.Entry> entry : indexMap.entrySet()) { + totalLen += 4; // encode the size of idx(int) + totalLen += 4; // encode the value size + int curLen = entry.getValue().size() - 1; + for (Integer pos : entry.getValue()) { + if (indexValue.containsKey(pos)) { + curLen += indexValue.get(pos).getBytes(CodecUtil.CHARSET).length; + } else if (defaultIndexValue.containsKey(pos)) { + curLen += defaultIndexValue.get(pos).getBytes(CodecUtil.CHARSET).length; + } else { + throw new SQLException("cannot get index value. pos is " + pos); + } + } + totalLen += curLen; + lenMap.put(entry.getKey(), curLen); } - - for (Pair sortedIdx : sortedIdxes) { - Integer currentDataIdx = sortedIdx.getValue(); - Object data = currentDatas.get(currentDataIdx); - if (data == null) { - ok = currentRow.AppendNULL(); - } else { - DataType curType = currentDatasType.get(currentDataIdx); - if (DataType.kTypeBool.equals(curType)) { - ok = currentRow.AppendBool((boolean) data); - } else if (DataType.kTypeDate.equals(curType)) { - Date date = (Date) data; - ok = currentRow.AppendDate(date.getYear() + 1900, date.getMonth() + 1, date.getDate()); - } else if (DataType.kTypeDouble.equals(curType)) { - ok = currentRow.AppendDouble((double) data); - } else if (DataType.kTypeFloat.equals(curType)) { - ok = currentRow.AppendFloat((float) data); - } else if (DataType.kTypeInt16.equals(curType)) { - ok = currentRow.AppendInt16((short) data); - } else if (DataType.kTypeInt32.equals(curType)) { - ok = currentRow.AppendInt32((int) data); - } else if (DataType.kTypeInt64.equals(curType)) { - ok = currentRow.AppendInt64((long) data); - } else if (DataType.kTypeString.equals(curType)) { - byte[] bdata = (byte[]) data; - ok = currentRow.AppendString(bdata, bdata.length); - } else if (DataType.kTypeTimestamp.equals(curType)) { - ok = currentRow.AppendTimestamp((long) data); + ByteBuffer dimensionValue = ByteBuffer.allocate(totalLen).order(ByteOrder.LITTLE_ENDIAN); + for (Map.Entry> entry : indexMap.entrySet()) { + Integer indexPos = entry.getKey(); + dimensionValue.putInt(indexPos); + dimensionValue.putInt(lenMap.get(indexPos)); + for (Integer pos : entry.getValue()) { + if (pos > 0) { + dimensionValue.put((byte)'|'); + } + if (indexValue.containsKey(pos)) { + dimensionValue.put(indexValue.get(pos).getBytes(CodecUtil.CHARSET)); } else { - throw new SQLException("unknown data type"); + dimensionValue.put(defaultIndexValue.get(pos).getBytes(CodecUtil.CHARSET)); } } - if (!ok) { - throw new SQLException("append failed on currentDataIdx: " + currentDataIdx + ", curType: " + currentDatasType.get(currentDataIdx) + ", current data: " + data); + } + return dimensionValue; + } + + private ByteBuffer buildRow() throws SQLException { + Map defaultValue = cache.getDefaultValue(); + if (!defaultValue.isEmpty()) { + for (Map.Entry entry : defaultValue.entrySet()) { + int idx = entry.getKey(); + Object val = entry.getValue(); + if (val == null) { + rowBuilder.setNULL(idx); + continue; + } + switch (cache.getSchema().getColumnType(idx)) { + case Types.BOOLEAN: + rowBuilder.setBool(idx, (boolean)val); + break; + case Types.SMALLINT: + rowBuilder.setSmallInt(idx, (short)val); + break; + case Types.INTEGER: + rowBuilder.setInt(idx, (int)val); + break; + case Types.BIGINT: + rowBuilder.setBigInt(idx, (long)val); + break; + case Types.FLOAT: + rowBuilder.setFloat(idx, (float)val); + break; + case Types.DOUBLE: + rowBuilder.setDouble(idx, (double)val); + break; + case Types.DATE: + rowBuilder.setDate(idx, (Date)val); + break; + case Types.TIMESTAMP: + rowBuilder.setTimestamp(idx, (Timestamp)val); + break; + case Types.VARCHAR: + rowBuilder.setString(idx, (String)val); + break; + } } } - if (!currentRow.Build()) { - throw new SQLException("build insert row failed(str size init != actual)"); + if (!rowBuilder.build()) { + throw new SQLException("encode row failed"); } - currentRows.add(currentRow); - clearParameters(); - } - - @Override - @Deprecated - public void setObject(int i, Object o) throws SQLException { - throw new SQLException("current do not support this method"); + return rowBuilder.getValue(); } @Override @@ -372,17 +284,15 @@ public boolean execute() throws SQLException { if (closed) { throw new SQLException("InsertPreparedStatement closed"); } - // buildRow will add a new row to currentRows - if (!currentRows.isEmpty()) { - throw new SQLException("please use executeBatch"); - } - buildRow(); + ByteBuffer dimensions = buildDimension(); + ByteBuffer value = buildRow(); Status status = new Status(); // actually only one row - boolean ok = router.ExecuteInsert(db, sql, currentRows.get(0), status); + boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(), cache.getTid(), + dimensions.array(), dimensions.capacity(), value.array(), value.capacity(), status); // cleanup rows even if insert failed // we can't execute() again without set new row, so we must clean up here - clearSQLInsertRowList(); + clearParameters(); if (!ok) { logger.error("execute insert failed: {}", status.ToString()); status.delete(); @@ -401,220 +311,24 @@ public void addBatch() throws SQLException { if (closed) { throw new SQLException("InsertPreparedStatement closed"); } - // build the current row and cleanup the cache of current row - // so that the cache is ready for new row - buildRow(); - } - - @Override - @Deprecated - public void setCharacterStream(int i, Reader reader, int i1) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setRef(int i, Ref ref) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setBlob(int i, Blob blob) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setClob(int i, Clob clob) throws SQLException { - throw new SQLException("current do not support this method"); + batchValues.add(new AbstractMap.SimpleImmutableEntry<>(buildDimension(), buildRow())); + clearParameters(); } - @Override - @Deprecated - public void setArray(int i, Array array) throws SQLException { - throw new SQLException("current do not support this method"); - } @Override public ResultSetMetaData getMetaData() throws SQLException { - return new SQLInsertMetaData(this.currentDatasType, this.currentSchema, this.schemaIdxes); + return new SQLInsertMetaData(cache.getSchema(), cache.getHoleIdx()); } @Override public void setDate(int i, Date date, Calendar calendar) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeDate); - if (date == null) { - setNull(i); - return; - } - hasSet.set(i - 1, true); - currentDatas.set(i - 1, date); - } - - @Override - @Deprecated - public void setTime(int i, Time time, Calendar calendar) throws SQLException { - throw new SQLException("current do not support this method"); + setDate(i, date); } @Override public void setTimestamp(int i, Timestamp timestamp, Calendar calendar) throws SQLException { - checkIdx(i); - checkType(i, DataType.kTypeTimestamp); - if (timestamp == null) { - setNull(i); - return; - } - hasSet.set(i - 1, true); - long ts = timestamp.getTime(); - currentDatas.set(i - 1, ts); - } - - @Override - @Deprecated - public void setNull(int i, int i1, String s) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setURL(int i, URL url) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public ParameterMetaData getParameterMetaData() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setRowId(int i, RowId rowId) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setNString(int i, String s) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setNCharacterStream(int i, Reader reader, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setNClob(int i, NClob nClob) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setClob(int i, Reader reader, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setBlob(int i, InputStream inputStream, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setNClob(int i, Reader reader, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setSQLXML(int i, SQLXML sqlxml) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setObject(int i, Object o, int i1, int i2) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setAsciiStream(int i, InputStream inputStream, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setBinaryStream(int i, InputStream inputStream, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setCharacterStream(int i, Reader reader, long l) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setAsciiStream(int i, InputStream inputStream) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setBinaryStream(int i, InputStream inputStream) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setCharacterStream(int i, Reader reader) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setNCharacterStream(int i, Reader reader) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setClob(int i, Reader reader) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setBlob(int i, InputStream inputStream) throws SQLException { - - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setNClob(int i, Reader reader) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public ResultSet executeQuery(String s) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int executeUpdate(String s) throws SQLException { - throw new SQLException("current do not support this method"); + setTimestamp(i, timestamp); } @Override @@ -622,158 +336,21 @@ public void close() throws SQLException { if (closed) { return; } - clearSQLInsertRowList(); - if (currentSchema != null) { - currentSchema.delete(); - currentSchema = null; - } closed = true; } - @Override - @Deprecated - public int getMaxFieldSize() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setMaxFieldSize(int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int getMaxRows() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setMaxRows(int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setEscapeProcessing(boolean b) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int getQueryTimeout() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setQueryTimeout(int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void cancel() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public SQLWarning getWarnings() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void clearWarnings() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setCursorName(String s) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean execute(String s) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public ResultSet getResultSet() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int getUpdateCount() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean getMoreResults() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public void setFetchDirection(int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Deprecated - @Override - public int getFetchDirection() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - public void setFetchSize(int i) throws SQLException { - } - - @Override - @Deprecated - public int getFetchSize() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int getResultSetConcurrency() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int getResultSetType() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - public void addBatch(String s) throws SQLException { - throw new SQLException("cannot take arguments in PreparedStatement"); - } - - @Override - @Deprecated - public void clearBatch() throws SQLException { - throw new SQLException("current do not support this method"); - } - @Override public int[] executeBatch() throws SQLException { if (closed) { throw new SQLException("InsertPreparedStatement closed"); } - int[] result = new int[currentRows.size()]; + int[] result = new int[batchValues.size()]; Status status = new Status(); - for (int i = 0; i < currentRows.size(); i++) { - boolean ok = router.ExecuteInsert(db, sql, currentRows.get(i), status); + for (int i = 0; i < batchValues.size(); i++) { + AbstractMap.SimpleImmutableEntry pair = batchValues.get(i); + boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(), cache.getTid(), + pair.getKey().array(), pair.getKey().capacity(), + pair.getValue().array(), pair.getValue().capacity(), status); if (!ok) { // TODO(hw): may lost log, e.g. openmldb-batch online import in yarn mode? logger.warn(status.ToString()); @@ -781,106 +358,8 @@ public int[] executeBatch() throws SQLException { result[i] = ok ? 0 : -1; } status.delete(); - clearSQLInsertRowList(); + clearParameters(); + batchValues.clear(); return result; } - - @Override - @Deprecated - public Connection getConnection() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean getMoreResults(int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public ResultSet getGeneratedKeys() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int executeUpdate(String s, int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int executeUpdate(String s, int[] ints) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int executeUpdate(String s, String[] strings) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean execute(String s, int i) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean execute(String s, int[] ints) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean execute(String s, String[] strings) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public int getResultSetHoldability() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - public boolean isClosed() throws SQLException { - return closed; - } - - @Override - @Deprecated - public void setPoolable(boolean b) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean isPoolable() throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - public void closeOnCompletion() throws SQLException { - this.closeOnComplete = true; - } - - @Override - public boolean isCloseOnCompletion() throws SQLException { - return this.closeOnComplete; - } - - @Override - @Deprecated - public T unwrap(Class aClass) throws SQLException { - throw new SQLException("current do not support this method"); - } - - @Override - @Deprecated - public boolean isWrapperFor(Class aClass) throws SQLException { - throw new SQLException("current do not support this method"); - } } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index 7d32ac092af..c3587e95258 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -52,6 +52,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -62,6 +63,7 @@ public class SqlClusterExecutor implements SqlExecutor { private SQLRouter sqlRouter; private DeploymentManager deploymentManager; private ZKClient zkClient; + private Map cacheMap = new ConcurrentHashMap<>(); public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlException { initJavaSdkLibrary(libraryPath); @@ -183,7 +185,23 @@ public Statement getStatement() { @Override public PreparedStatement getInsertPreparedStmt(String db, String sql) throws SQLException { - return new InsertPreparedStatementImpl(db, sql, this.sqlRouter); + InsertPreparedStatementCache cache = cacheMap.get(sql); + if (cache == null) { + Status status = new Status(); + SQLInsertRow row = sqlRouter.GetInsertRow(db, sql, status); + if (!status.IsOK()) { + String msg = status.ToString(); + status.delete(); + if (row != null) { + row.delete(); + } + throw new SQLException("getSQLInsertRow failed, " + msg); + } + cache = new InsertPreparedStatementCache(sql, row); + row.delete(); + cacheMap.putIfAbsent(sql, cache); + } + return new InsertPreparedStatementImpl(cache, this.sqlRouter); } @Override diff --git a/src/sdk/sql_insert_row.h b/src/sdk/sql_insert_row.h index 317d492c91b..fbaa9e078b4 100644 --- a/src/sdk/sql_insert_row.h +++ b/src/sdk/sql_insert_row.h @@ -41,6 +41,14 @@ class DefaultValueContainer { public: DefaultValueContainer(const DefaultValueMap& default_map) : default_map_(default_map) {} + std::vector GetAllPosition() { + std::vector vec; + for (const auto& kv : *default_map_) { + vec.push_back(kv.first); + } + return vec; + } + bool IsValid(int idx) { return idx >= 0 && idx < Size(); } @@ -53,12 +61,6 @@ class DefaultValueContainer { return default_map_->at(idx)->IsNull(); } - openmldb::type::DataType GetType(int idx) { - openmldb::type::DataType type; - schema::SchemaAdapter::ConvertType(default_map_->at(idx)->GetDataType(), &type); - return type; - } - bool GetBool(int idx) { return default_map_->at(idx)->GetBool(); } @@ -148,6 +150,10 @@ class SQLInsertRow { return std::make_shared(default_map_); } + ::openmldb::nameserver::TableInfo GetTableInfo() { + return *table_info_; + } + private: bool MakeDefault(); void PackDimension(const std::string& val); From f1129424bde411885abf853ed71cec93259473f4 Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 22 Sep 2023 14:31:42 +0800 Subject: [PATCH 3/9] feat: add interface to sql router --- src/base/hash.h | 8 +++-- src/client/tablet_client.cc | 19 +++++++---- src/client/tablet_client.h | 3 ++ src/sdk/sql_cluster_router.cc | 61 +++++++++++++++++++++++++++++++++++ src/sdk/sql_cluster_router.h | 4 +++ src/sdk/sql_insert_row.h | 3 +- src/sdk/sql_router.h | 4 +++ 7 files changed, 93 insertions(+), 9 deletions(-) diff --git a/src/base/hash.h b/src/base/hash.h index 6e98be06d7f..df6962d3c5a 100644 --- a/src/base/hash.h +++ b/src/base/hash.h @@ -104,8 +104,8 @@ static uint64_t MurmurHash64A(const void* key, int len, unsigned int seed) { return h; } -static inline int64_t hash64(const std::string& key) { - uint64_t raw_value = MurmurHash64A(key.c_str(), key.length(), 0xe17a1465); +static inline int64_t hash64(const void* ptr, int len) { + uint64_t raw_value = MurmurHash64A(ptr, len, 0xe17a1465); int64_t cur_value = (int64_t)raw_value; // convert to signed integer as same as java client if (cur_value < 0) { @@ -114,6 +114,10 @@ static inline int64_t hash64(const std::string& key) { return cur_value; } +static inline int64_t hash64(const std::string& key) { + return hash64(key.c_str(), key.length()); +} + } // namespace base } // namespace openmldb diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 9357b23e29a..938a1b747d7 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -189,16 +189,23 @@ bool TabletClient::UpdateTableMetaForAddField(uint32_t tid, const std::vector>& dimensions) { - ::openmldb::api::PutRequest request; - request.set_time(time); - request.set_value(value); - request.set_tid(tid); - request.set_pid(pid); + ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> pb_dimensions; for (size_t i = 0; i < dimensions.size(); i++) { - ::openmldb::api::Dimension* d = request.add_dimensions(); + ::openmldb::api::Dimension* d = pb_dimensions.Add(); d->set_key(dimensions[i].first); d->set_idx(dimensions[i].second); } + return Put(tid, pid, time, base::Slice(value), &pb_dimensions); +} + +bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, + ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions) { + ::openmldb::api::PutRequest request; + request.set_time(time); + request.set_value(value.data(), value.size()); + request.set_tid(tid); + request.set_pid(pid); + request.mutable_dimensions()->Swap(dimensions); ::openmldb::api::PutResponse response; bool ok = client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1); diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index f955040a157..f9dfd897361 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -76,6 +76,9 @@ class TabletClient : public Client { bool Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value, const std::vector>& dimensions); + bool Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value, + ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions); + bool Get(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, std::string& value, // NOLINT uint64_t& ts, // NOLINT std::string& msg); ; // NOLINT diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 296cd3d5755..d41ed9eec83 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1432,6 +1432,67 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& s } } +bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& name, int tid, int partition_num, + hybridse::sdk::ByteArrayPtr dimension, int dimension_len, + hybridse::sdk::ByteArrayPtr value, int len, hybridse::sdk::Status* status) { + RET_FALSE_IF_NULL_AND_WARN(status, "output status is nullptr"); + if (dimension == nullptr || dimension_len <= 0 || value == nullptr || len <= 0 || partition_num <= 0) { + *status = {StatusCode::kCmdError, "invalid parameter"}; + return false; + } + std::vector> tablets; + bool ret = cluster_sdk_->GetTablet(db, name, &tablets); + if (!ret || tablets.empty()) { + status->msg = "fail to get table " + name + " tablet"; + return false; + } + std::map> dimensions_map; + int pos = 0; + while (pos < dimension_len) { + int idx = *(reinterpret_cast(dimension + pos)); + pos += sizeof(int); + int len = *(reinterpret_cast(dimension + pos)); + pos += sizeof(int); + base::Slice key(dimension + pos, len); + uint32_t pid = static_cast(::openmldb::base::hash64(key.data(), key.size()) % partition_num); + auto it = dimensions_map.find(pid); + if (it == dimensions_map.end()) { + it = dimensions_map.emplace(pid, ::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>()).first; + } + auto dim = it->second.Add(); + dim->set_idx(idx); + dim->set_key(key.data(), key.size()); + } + base::Slice row_value(value, len); + uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000; + for (auto& kv : dimensions_map) { + uint32_t pid = kv.first; + if (pid < tablets.size()) { + auto tablet = tablets[pid]; + if (tablet) { + auto client = tablet->GetClient(); + if (client) { + DLOG(INFO) << "put data to endpoint " << client->GetEndpoint() << " with dimensions size " + << kv.second.size(); + bool ret = client->Put(tid, pid, cur_ts, row_value, &kv.second); + if (!ret) { + SET_STATUS_AND_WARN(status, StatusCode::kCmdError, + "INSERT failed, tid " + std::to_string(tid) + + ". Note that data might have been partially inserted. " + "You are encouraged to perform DELETE to remove any partially " + "inserted data before trying INSERT again."); + return false; + } + continue; + } + } + } + SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "fail to get tablet client. pid " + std::to_string(pid)); + return false; + } + return true; +} + bool SQLClusterRouter::GetSQLPlan(const std::string& sql, ::hybridse::node::NodeManager* nm, ::hybridse::node::PlanNodeList* plan) { if (nm == NULL || plan == NULL) return false; diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index 033bda8d090..d2e6b52b790 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -84,6 +84,10 @@ class SQLClusterRouter : public SQLRouter { bool ExecuteInsert(const std::string& db, const std::string& sql, std::shared_ptr rows, hybridse::sdk::Status* status) override; + bool ExecuteInsert(const std::string& db, const std::string& name, int tid, int partition_num, + hybridse::sdk::ByteArrayPtr dimension, int dimension_len, + hybridse::sdk::ByteArrayPtr value, int len, hybridse::sdk::Status* status) override; + bool ExecuteDelete(std::shared_ptr row, hybridse::sdk::Status* status) override; std::shared_ptr GetTableReader() override; diff --git a/src/sdk/sql_insert_row.h b/src/sdk/sql_insert_row.h index fbaa9e078b4..ded1c824e19 100644 --- a/src/sdk/sql_insert_row.h +++ b/src/sdk/sql_insert_row.h @@ -39,7 +39,7 @@ typedef std::shared_ptr GetAllPosition() { std::vector vec; @@ -96,6 +96,7 @@ class DefaultValueContainer { std::string GetString(int idx) { return default_map_->at(idx)->GetStr(); } + private: DefaultValueMap default_map_; }; diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index aa12b6dff56..f88cc0b00f9 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -110,6 +110,10 @@ class SQLRouter { virtual bool ExecuteInsert(const std::string& db, const std::string& sql, std::shared_ptr row, hybridse::sdk::Status* status) = 0; + virtual bool ExecuteInsert(const std::string& db, const std::string& name, int tid, int partition_num, + hybridse::sdk::ByteArrayPtr dimension, int dimension_len, + hybridse::sdk::ByteArrayPtr value, int len, hybridse::sdk::Status* status) = 0; + virtual bool ExecuteDelete(std::shared_ptr row, hybridse::sdk::Status* status) = 0; virtual std::shared_ptr GetTableReader() = 0; From 5d0b6b93567161d933e29c95e3e3c3e814fa8075 Mon Sep 17 00:00:00 2001 From: dl239 Date: Fri, 22 Sep 2023 17:30:11 +0800 Subject: [PATCH 4/9] fix: fix test --- .../common/codec/FlexibleRowBuilder.java | 3 + .../openmldb/jdbc/SQLInsertMetaData.java | 25 +++--- .../impl/InsertPreparedStatementCache.java | 8 +- .../sdk/impl/InsertPreparedStatementImpl.java | 85 +++++++++++++------ .../openmldb/jdbc/SQLRouterSmokeTest.java | 22 +++-- src/sdk/sql_cluster_router.cc | 5 +- 6 files changed, 96 insertions(+), 52 deletions(-) diff --git a/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java b/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java index 5497237ce20..e9029fb7663 100644 --- a/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java +++ b/java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java @@ -213,6 +213,9 @@ public boolean setNULL(int idx) { } Type.DataType type = metaData.getSchema().get(idx).getDataType(); if (type == Type.DataType.kVarchar || type == Type.DataType.kString) { + if (settedValue.at(idx)) { + return false; + } if (idx != metaData.getStrIdxList().get(curStrIdx)) { if (stringValueCache == null) { stringValueCache = new TreeMap<>(); diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java index 150b39ef8a5..144c889c5b4 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java @@ -34,19 +34,15 @@ public SQLInsertMetaData(Schema schema, List holeIdx) { this.holeIdx = holeIdx; } - private void checkIdx(int i) throws SQLException { - if (i <= 0) { + private void check(int i) throws SQLException { + if (i < 0) { throw new SQLException("index underflow"); } - if (i > holeIdx.size()) { + if (i >= holeIdx.size()) { throw new SQLException("index overflow"); } } - public void check(int i) throws SQLException { - checkIdx(i); - } - @Override public int getColumnCount() throws SQLException { return holeIdx.size(); @@ -78,8 +74,9 @@ public boolean isCurrency(int i) throws SQLException { @Override public int isNullable(int i) throws SQLException { - check(i); - boolean nullable = schema.isNullable(holeIdx.get(i)); + int realIdx = i - 1; + check(realIdx); + boolean nullable = schema.isNullable(holeIdx.get(realIdx)); if (!nullable) { return columnNoNulls; } else { @@ -107,8 +104,9 @@ public String getColumnLabel(int i) throws SQLException { @Override public String getColumnName(int i) throws SQLException { - check(i); - return schema.getColumnName(holeIdx.get(i)); + int realIdx = i - 1; + check(realIdx); + return schema.getColumnName(holeIdx.get(realIdx)); } @Override @@ -143,8 +141,9 @@ public String getCatalogName(int i) throws SQLException { @Override public int getColumnType(int i) throws SQLException { - check(i); - return schema.getColumnType(holeIdx.get(i)); + int realIdx = i - 1; + check(realIdx); + return schema.getColumnType(holeIdx.get(realIdx)); } @Override diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java index 9516308e95e..cc292e6e981 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java @@ -9,6 +9,7 @@ import com._4paradigm.openmldb.sdk.Common; import com._4paradigm.openmldb.sdk.Schema; +import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.*; @@ -36,7 +37,7 @@ public InsertPreparedStatementCache(String sql, SQLInsertRow insertRow) { NS.TableInfo tableInfo = insertRow.GetTableInfo(); try { schema = Common.convertSchema(tableInfo.getColumnDescList()); - codecMetaData = new CodecMetaData(tableInfo.getColumnDescList()); + codecMetaData = new CodecMetaData(tableInfo.getColumnDescList(), false); } catch (Exception e) { e.printStackTrace(); } @@ -193,7 +194,10 @@ public String getSql() { return sql; } - public int getSchemaIdx(int idx) { + public int getSchemaIdx(int idx) throws SQLException { + if (idx >= holeIdx.size()) { + throw new SQLException("out of data range"); + } return holeIdx.get(idx); } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java index 9a0eb3d105a..270eddedaed 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java @@ -56,8 +56,8 @@ public InsertPreparedStatementImpl(InsertPreparedStatementCache cache, SQLRouter batchValues = new ArrayList<>(); } - private int getSchemaIdx(int idx) { - return cache.getSchemaIdx(idx); + private int getSchemaIdx(int idx) throws SQLException { + return cache.getSchemaIdx(idx - 1); } @Override @@ -72,26 +72,30 @@ public int executeUpdate() throws SQLException { throw new SQLException("current do not support this method"); } - private void setNull(int i) throws SQLException { + private boolean setNull(int i) throws SQLException { if (!cache.getSchema().isNullable(i)) { throw new SQLException("this column not allow null"); } - rowBuilder.setNULL(i); + return rowBuilder.setNULL(i); } @Override public void setNull(int i, int i1) throws SQLException { int realIdx = getSchemaIdx(i); + if (!setNull(realIdx)) { + throw new SQLException("set null failed. pos is " + i); + } if (indexCol.contains(realIdx)) { indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); } - setNull(realIdx); } @Override public void setBoolean(int i, boolean b) throws SQLException { int realIdx = getSchemaIdx(i); - rowBuilder.setBool(realIdx, b); + if (!rowBuilder.setBool(realIdx, b)) { + throw new SQLException("set bool failed. pos is " + i); + } if (indexCol.contains(realIdx)) { indexValue.put(realIdx, String.valueOf(b)); } @@ -100,7 +104,9 @@ public void setBoolean(int i, boolean b) throws SQLException { @Override public void setShort(int i, short i1) throws SQLException { int realIdx = getSchemaIdx(i); - rowBuilder.setSmallInt(realIdx, i1); + if (!rowBuilder.setSmallInt(realIdx, i1)) { + throw new SQLException("set short failed. pos is " + i); + } if (indexCol.contains(realIdx)) { indexValue.put(realIdx, String.valueOf(i1)); } @@ -109,7 +115,9 @@ public void setShort(int i, short i1) throws SQLException { @Override public void setInt(int i, int i1) throws SQLException { int realIdx = getSchemaIdx(i); - rowBuilder.setInt(realIdx, i1); + if (!rowBuilder.setInt(realIdx, i1)) { + throw new SQLException("set int failed. pos is " + i); + } if (indexCol.contains(realIdx)) { indexValue.put(realIdx, String.valueOf(i1)); } @@ -118,7 +126,9 @@ public void setInt(int i, int i1) throws SQLException { @Override public void setLong(int i, long l) throws SQLException { int realIdx = getSchemaIdx(i); - rowBuilder.setBigInt(realIdx, l); + if (!rowBuilder.setBigInt(realIdx, l)) { + throw new SQLException("set long failed. pos is " + i); + } if (indexCol.contains(realIdx)) { indexValue.put(realIdx, String.valueOf(l)); } @@ -126,31 +136,38 @@ public void setLong(int i, long l) throws SQLException { @Override public void setFloat(int i, float v) throws SQLException { - rowBuilder.setFloat(getSchemaIdx(i), v); + if (!rowBuilder.setFloat(getSchemaIdx(i), v)) { + throw new SQLException("set float failed. pos is " + i); + } } @Override public void setDouble(int i, double v) throws SQLException { - rowBuilder.setDouble(getSchemaIdx(i), v); + if (!rowBuilder.setDouble(getSchemaIdx(i), v)) { + throw new SQLException("set double failed. pos is " + i); + } } @Override public void setString(int i, String s) throws SQLException { int realIdx = getSchemaIdx(i); - if (indexCol.contains(realIdx)) { - if (s == null) { + if (s == null) { + setNull(realIdx); + if (indexCol.contains(realIdx)) { indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); - } else if (s.isEmpty()) { + } + return; + } + if (!rowBuilder.setString(getSchemaIdx(i), s)) { + throw new SQLException("set string failed. pos is " + i); + } + if (indexCol.contains(realIdx)) { + if (s.isEmpty()) { indexValue.put(realIdx, InsertPreparedStatementCache.EMPTY_STRING); } else { indexValue.put(realIdx, s); } } - if (s == null) { - setNull(realIdx); - return; - } - rowBuilder.setString(getSchemaIdx(i), s); } @Override @@ -164,10 +181,14 @@ public void setDate(int i, Date date) throws SQLException { } } if (date == null) { - setNull(realIdx); + if (!setNull(realIdx)) { + throw new SQLException("set date failed. pos is " + i); + } return; } - rowBuilder.setDate(realIdx, date); + if (!rowBuilder.setDate(realIdx, date)) { + throw new SQLException("set date failed. pos is " + i); + } } @@ -182,10 +203,14 @@ public void setTimestamp(int i, Timestamp timestamp) throws SQLException { } } if (timestamp == null) { - setNull(realIdx); + if (!setNull(realIdx)) { + throw new SQLException("set timestamp failed. pos is " + i); + } return; } - rowBuilder.setTimestamp(realIdx, timestamp); + if (!rowBuilder.setTimestamp(realIdx, timestamp)) { + throw new SQLException("set timestamp failed. pos is " + i); + } } @Override @@ -218,8 +243,9 @@ private ByteBuffer buildDimension() throws SQLException { Integer indexPos = entry.getKey(); dimensionValue.putInt(indexPos); dimensionValue.putInt(lenMap.get(indexPos)); - for (Integer pos : entry.getValue()) { - if (pos > 0) { + for (int i = 0; i < entry.getValue().size(); i++) { + int pos = entry.getValue().get(i); + if (i > 0) { dimensionValue.put((byte)'|'); } if (indexValue.containsKey(pos)) { @@ -284,11 +310,15 @@ public boolean execute() throws SQLException { if (closed) { throw new SQLException("InsertPreparedStatement closed"); } + if (!batchValues.isEmpty()) { + throw new SQLException("please use executeBatch"); + } ByteBuffer dimensions = buildDimension(); ByteBuffer value = buildRow(); Status status = new Status(); // actually only one row - boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(), cache.getTid(), + boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(), + cache.getTid(), cache.getPartitionNum(), dimensions.array(), dimensions.capacity(), value.array(), value.capacity(), status); // cleanup rows even if insert failed // we can't execute() again without set new row, so we must clean up here @@ -348,7 +378,8 @@ public int[] executeBatch() throws SQLException { Status status = new Status(); for (int i = 0; i < batchValues.size(); i++) { AbstractMap.SimpleImmutableEntry pair = batchValues.get(i); - boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(), cache.getTid(), + boolean ok = router.ExecuteInsert(cache.getDatabase(), cache.getName(), + cache.getTid(), cache.getPartitionNum(), pair.getKey().array(), pair.getKey().capacity(), pair.getValue().array(), pair.getValue().capacity(), status); if (!ok) { diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java index b8f54bfa5ca..68dc237d1cf 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/SQLRouterSmokeTest.java @@ -380,7 +380,7 @@ public void testInsertPreparedState(SqlExecutor router) { try { impl2.setString(2, "c"); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("data type not match")); + Assert.assertTrue(e.getMessage().contains("set string failed")); } impl2.setString(1, "sandong"); impl2.setDate(2, d3); @@ -390,11 +390,16 @@ public void testInsertPreparedState(SqlExecutor router) { insert = "insert into tsql1010 values(?, ?, ?, ?, ?);"; PreparedStatement impl3 = router.getInsertPreparedStmt(dbname, insert); impl3.setLong(1, 1003); - impl3.setString(3, "zhejiangxx"); impl3.setString(3, "zhejiang"); - impl3.setString(4, "xxhangzhou"); + try { + impl3.setString(3, "zhejiangxx"); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(true); + } impl3.setString(4, "hangzhou"); impl3.setDate(2, d4); + impl3.setInt(5, 3); impl3.setInt(5, 4); impl3.closeOnCompletion(); Assert.assertTrue(impl3.isCloseOnCompletion()); @@ -500,7 +505,7 @@ public void testInsertPreparedStateBatch(SqlExecutor router) { try { impl.setInt(2, 1002); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("data type not match")); + Assert.assertTrue(e.getMessage().contains("set int failed")); } try { // set failed, so the row is uncompleted, appending row will be failed @@ -510,7 +515,7 @@ public void testInsertPreparedStateBatch(SqlExecutor router) { // j > 0, addBatch has been called Assert.assertEquals(e.getMessage(), "please use executeBatch"); } else { - Assert.assertTrue(e.getMessage().contains("append failed")); + Assert.assertTrue(e.getMessage().contains("cannot get index value")); } } impl.setLong(1, (Long) datas1[j][0]); @@ -536,7 +541,7 @@ public void testInsertPreparedStateBatch(SqlExecutor router) { try { impl2.setInt(2, 1002); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("data type not match")); + Assert.assertTrue(e.getMessage().contains("set int failed")); } try { impl2.execute(); @@ -544,7 +549,7 @@ public void testInsertPreparedStateBatch(SqlExecutor router) { if (j > 0) { Assert.assertEquals(e.getMessage(), "please use executeBatch"); } else { - Assert.assertTrue(e.getMessage().contains("append failed")); + Assert.assertTrue(e.getMessage().contains("cannot get index value")); } } impl2.setLong(1, (Long) datas1[j][0]); @@ -562,8 +567,9 @@ public void testInsertPreparedStateBatch(SqlExecutor router) { Object[] datas2 = batchData[i]; try { impl2.addBatch((String) datas2[0]); + Assert.fail(); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "cannot take arguments in PreparedStatement"); + Assert.assertTrue(true); } int[] result = impl.executeBatch(); diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index d41ed9eec83..1e90398445b 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1451,9 +1451,9 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& n while (pos < dimension_len) { int idx = *(reinterpret_cast(dimension + pos)); pos += sizeof(int); - int len = *(reinterpret_cast(dimension + pos)); + int key_len = *(reinterpret_cast(dimension + pos)); pos += sizeof(int); - base::Slice key(dimension + pos, len); + base::Slice key(dimension + pos, key_len); uint32_t pid = static_cast(::openmldb::base::hash64(key.data(), key.size()) % partition_num); auto it = dimensions_map.find(pid); if (it == dimensions_map.end()) { @@ -1462,6 +1462,7 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& n auto dim = it->second.Add(); dim->set_idx(idx); dim->set_key(key.data(), key.size()); + pos += key_len; } base::Slice row_value(value, len); uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000; From 08bde329ad55049c7892eaeb82ffd21a18c2e5c9 Mon Sep 17 00:00:00 2001 From: dl239 Date: Mon, 25 Sep 2023 18:05:49 +0800 Subject: [PATCH 5/9] test: fix case --- .../test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java index 5c62bca51dc..6d449928b44 100644 --- a/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java +++ b/java/openmldb-jdbc/src/test/java/com/_4paradigm/openmldb/jdbc/JDBCDriverTest.java @@ -212,7 +212,6 @@ public void testForKafkaConnector() throws SQLException { // don't work, but do not throw exception pstmt.setFetchSize(100); - pstmt.addBatch(); insertSql = "INSERT INTO " + tableName + "(`c3`,`c2`) VALUES(?,?)"; From ad083402334f06b3cb6da0f9f0109f7f794d4552 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 26 Sep 2023 17:32:24 +0800 Subject: [PATCH 6/9] feat: add cache --- java/openmldb-jdbc/pom.xml | 5 + .../impl/InsertPreparedStatementCache.java | 258 ++++-------------- .../sdk/impl/InsertPreparedStatementImpl.java | 14 +- .../sdk/impl/InsertPreparedStatementMeta.java | 218 +++++++++++++++ .../openmldb/sdk/impl/SqlClusterExecutor.java | 16 +- 5 files changed, 298 insertions(+), 213 deletions(-) create mode 100644 java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementMeta.java diff --git a/java/openmldb-jdbc/pom.xml b/java/openmldb-jdbc/pom.xml index 3978579b373..82a41344e86 100644 --- a/java/openmldb-jdbc/pom.xml +++ b/java/openmldb-jdbc/pom.xml @@ -61,6 +61,11 @@ snappy-java 1.1.7.2 + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java index cc292e6e981..1f440a5e3c9 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java @@ -1,219 +1,77 @@ package com._4paradigm.openmldb.sdk.impl; -import com._4paradigm.openmldb.SQLInsertRow; -import com._4paradigm.openmldb.DefaultValueContainer; -import com._4paradigm.openmldb.VectorUint32; -import com._4paradigm.openmldb.common.codec.CodecMetaData; -import com._4paradigm.openmldb.common.codec.CodecUtil; +import com._4paradigm.openmldb.common.zk.ZKClient; import com._4paradigm.openmldb.proto.NS; -import com._4paradigm.openmldb.sdk.Common; -import com._4paradigm.openmldb.sdk.Schema; +import com._4paradigm.openmldb.sdk.SqlException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.sql.Types; import java.util.*; +import java.util.concurrent.TimeUnit; public class InsertPreparedStatementCache { - - public static String NONETOKEN = "!N@U#L$L%"; - public static String EMPTY_STRING = "!@#$%"; - - private String sql; - private String db; - private String name; - private int tid; - private int partitionNum; - private Schema schema; - private CodecMetaData codecMetaData; - private Map defaultValue = new HashMap<>(); - private List holeIdx = new ArrayList<>(); - private Set indexPos = new HashSet<>(); - private Map> indexMap = new HashMap<>(); - private Map defaultIndexValue = new HashMap<>(); - - public InsertPreparedStatementCache(String sql, SQLInsertRow insertRow) { - this.sql = sql; - NS.TableInfo tableInfo = insertRow.GetTableInfo(); - try { - schema = Common.convertSchema(tableInfo.getColumnDescList()); - codecMetaData = new CodecMetaData(tableInfo.getColumnDescList(), false); - } catch (Exception e) { - e.printStackTrace(); - } - db = tableInfo.getDb(); - name = tableInfo.getName(); - tid = tableInfo.getTid(); - partitionNum = tableInfo.getTablePartitionCount(); - buildIndex(tableInfo); - DefaultValueContainer value = insertRow.GetDefaultValue(); - buildDefaultValue(value); - value.delete(); - VectorUint32 idxArray = insertRow.GetHoleIdx(); - buildHoleIdx(idxArray); - idxArray.delete(); - } - - private void buildIndex(NS.TableInfo tableInfo) { - Map nameIdxMap = new HashMap<>(); - for (int i = 0; i < schema.size(); i++) { - nameIdxMap.put(schema.getColumnName(i), i); - } - for (int i = 0; i < tableInfo.getColumnKeyList().size(); i++) { - com._4paradigm.openmldb.proto.Common.ColumnKey columnKey = tableInfo.getColumnKeyList().get(i); - List colList = new ArrayList<>(columnKey.getColNameCount()); - for (String name : columnKey.getColNameList()) { - colList.add(nameIdxMap.get(name)); - indexPos.add(nameIdxMap.get(name)); - } - indexMap.put(i, colList); - } - } - - private void buildHoleIdx(VectorUint32 idxArray) { - int size = idxArray.size(); - for (int i = 0; i < size; i++) { - holeIdx.add(idxArray.get(i).intValue()); - } - } - - private void buildDefaultValue(DefaultValueContainer valueContainer) { - VectorUint32 defaultPos = valueContainer.GetAllPosition(); - int size = defaultPos.size(); - for (int i = 0; i < size; i++) { - int schemaIdx = defaultPos.get(i).intValue(); - boolean isIndexVal = indexPos.contains(schemaIdx); - if (valueContainer.IsNull(schemaIdx)) { - defaultValue.put(schemaIdx, null); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, NONETOKEN); - } - } else { - switch (schema.getColumnType(schemaIdx)) { - case Types.BOOLEAN: { - boolean val = valueContainer.GetBool(schemaIdx); - defaultValue.put(schemaIdx, val); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, String.valueOf(val)); - } - break; - } - case Types.SMALLINT: { - short val = valueContainer.GetSmallInt(schemaIdx); - defaultValue.put(schemaIdx, val); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, String.valueOf(val)); - } - break; - } - case Types.INTEGER: { - int val = valueContainer.GetInt(schemaIdx); - defaultValue.put(schemaIdx, val); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, String.valueOf(val)); - } - break; - } - case Types.BIGINT: { - long val = valueContainer.GetBigInt(schemaIdx); - defaultValue.put(schemaIdx, val); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, String.valueOf(val)); - } - break; + static final int CACHE_SIZE = 100000; + static final int EXPIRE_TIME = 1000 * 60; // 1 minute + + private Cache, InsertPreparedStatementMeta> cache; + + private ZKClient zkClient; + private NodeCache nodeCache; + private String tablePath; + + public InsertPreparedStatementCache(ZKClient zkClient) throws SqlException { + cache = Caffeine.newBuilder().maximumSize(CACHE_SIZE).expireAfterAccess(EXPIRE_TIME, TimeUnit.MILLISECONDS).build(); + this.zkClient = zkClient; + if (zkClient != null) { + tablePath = zkClient.getConfig().getNamespace() + "/table/db_table_data"; + nodeCache = new NodeCache(zkClient.getClient(), zkClient.getConfig().getNamespace() + "/table/notify"); + try { + nodeCache.start(); + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + checkAndInvalid(); } - case Types.FLOAT: - defaultValue.put(schemaIdx, valueContainer.GetFloat(schemaIdx)); - break; - case Types.DOUBLE: - defaultValue.put(schemaIdx, valueContainer.GetDouble(schemaIdx)); - break; - case Types.DATE: { - int val = valueContainer.GetDate(schemaIdx); - defaultValue.put(schemaIdx, CodecUtil.dateIntToDate(val)); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, String.valueOf(val)); - } - break; - } - case Types.TIMESTAMP: { - long val = valueContainer.GetTimeStamp(schemaIdx); - defaultValue.put(schemaIdx, new Timestamp(val)); - if (isIndexVal) { - defaultIndexValue.put(schemaIdx, String.valueOf(val)); - } - break; - } - case Types.VARCHAR: { - String val = valueContainer.GetString(schemaIdx); - defaultValue.put(schemaIdx, val); - if (isIndexVal) { - if (val.isEmpty()) { - defaultIndexValue.put(schemaIdx, EMPTY_STRING); - } else { - defaultIndexValue.put(schemaIdx, val); - } - } - break; - } - } + }); + } catch (Exception e) { + throw new SqlException("NodeCache exception: " + e.getMessage()); } } - defaultPos.delete(); - } - - public Schema getSchema() { - return schema; } - public String getDatabase() { - return db; + public InsertPreparedStatementMeta get(String db, String sql) { + return cache.getIfPresent(new AbstractMap.SimpleImmutableEntry<>(db, sql)); } - public String getName() { - return name; + public void put(String db, String sql, InsertPreparedStatementMeta meta) { + cache.put(new AbstractMap.SimpleImmutableEntry<>(db, sql), meta); } - public int getTid() { - return tid; - } - - public int getPartitionNum() { - return partitionNum; - } - - public CodecMetaData getCodecMeta() { - return codecMetaData; - } - - public Map getDefaultValue() { - return defaultValue; - } - - public String getSql() { - return sql; - } - - public int getSchemaIdx(int idx) throws SQLException { - if (idx >= holeIdx.size()) { - throw new SQLException("out of data range"); + public void checkAndInvalid() throws Exception { + if (!zkClient.checkExists(tablePath)) { + return; + } + List children = zkClient.getChildren(tablePath); + Map, InsertPreparedStatementMeta> view = cache.asMap(); + Map, Integer> tableMap = new HashMap<>(); + for (String path : children) { + byte[] bytes = zkClient.getClient().getData().forPath(tablePath + "/" + path); + NS.TableInfo tableInfo = NS.TableInfo.parseFrom(bytes); + tableMap.put(new AbstractMap.SimpleImmutableEntry<>(tableInfo.getDb(), tableInfo.getName()), tableInfo.getTid()); + } + Iterator, InsertPreparedStatementMeta>> iterator + = view.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry, InsertPreparedStatementMeta> entry = iterator.next(); + String db = entry.getKey().getKey(); + InsertPreparedStatementMeta meta = entry.getValue(); + String name = meta.getName(); + Integer tid = tableMap.get(new AbstractMap.SimpleImmutableEntry<>(db, name)); + if (tid != null && tid != meta.getTid()) { + cache.invalidate(entry.getKey()); + } } - return holeIdx.get(idx); - } - - List getHoleIdx() { - return holeIdx; - } - - Set getIndexPos() { - return indexPos; - } - - Map> getIndexMap() { - return indexMap; - } - - Map getDefaultIndexValue() { - return defaultIndexValue; } } diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java index 270eddedaed..6acefe8acff 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementImpl.java @@ -37,7 +37,7 @@ public class InsertPreparedStatementImpl extends PreparedStatement { private SQLRouter router; private FlexibleRowBuilder rowBuilder; - private InsertPreparedStatementCache cache; + private InsertPreparedStatementMeta cache; private Set indexCol; private Map> indexMap; @@ -45,7 +45,7 @@ public class InsertPreparedStatementImpl extends PreparedStatement { private Map defaultIndexValue; private List> batchValues; - public InsertPreparedStatementImpl(InsertPreparedStatementCache cache, SQLRouter router) throws SQLException { + public InsertPreparedStatementImpl(InsertPreparedStatementMeta cache, SQLRouter router) throws SQLException { this.router = router; rowBuilder = new FlexibleRowBuilder(cache.getCodecMeta()); this.cache = cache; @@ -86,7 +86,7 @@ public void setNull(int i, int i1) throws SQLException { throw new SQLException("set null failed. pos is " + i); } if (indexCol.contains(realIdx)) { - indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + indexValue.put(realIdx, InsertPreparedStatementMeta.NONETOKEN); } } @@ -154,7 +154,7 @@ public void setString(int i, String s) throws SQLException { if (s == null) { setNull(realIdx); if (indexCol.contains(realIdx)) { - indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + indexValue.put(realIdx, InsertPreparedStatementMeta.NONETOKEN); } return; } @@ -163,7 +163,7 @@ public void setString(int i, String s) throws SQLException { } if (indexCol.contains(realIdx)) { if (s.isEmpty()) { - indexValue.put(realIdx, InsertPreparedStatementCache.EMPTY_STRING); + indexValue.put(realIdx, InsertPreparedStatementMeta.EMPTY_STRING); } else { indexValue.put(realIdx, s); } @@ -177,7 +177,7 @@ public void setDate(int i, Date date) throws SQLException { if (date != null) { indexValue.put(realIdx, String.valueOf(CodecUtil.dateToDateInt(date))); } else { - indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + indexValue.put(realIdx, InsertPreparedStatementMeta.NONETOKEN); } } if (date == null) { @@ -199,7 +199,7 @@ public void setTimestamp(int i, Timestamp timestamp) throws SQLException { if (timestamp != null) { indexValue.put(realIdx, String.valueOf(timestamp.getTime())); } else { - indexValue.put(realIdx, InsertPreparedStatementCache.NONETOKEN); + indexValue.put(realIdx, InsertPreparedStatementMeta.NONETOKEN); } } if (timestamp == null) { diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementMeta.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementMeta.java new file mode 100644 index 00000000000..448438e9d31 --- /dev/null +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementMeta.java @@ -0,0 +1,218 @@ +package com._4paradigm.openmldb.sdk.impl; + +import com._4paradigm.openmldb.SQLInsertRow; +import com._4paradigm.openmldb.DefaultValueContainer; +import com._4paradigm.openmldb.VectorUint32; +import com._4paradigm.openmldb.common.codec.CodecMetaData; +import com._4paradigm.openmldb.common.codec.CodecUtil; +import com._4paradigm.openmldb.proto.NS; +import com._4paradigm.openmldb.sdk.Common; +import com._4paradigm.openmldb.sdk.Schema; + +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.*; + +public class InsertPreparedStatementMeta { + + public static String NONETOKEN = "!N@U#L$L%"; + public static String EMPTY_STRING = "!@#$%"; + + private String sql; + private String db; + private String name; + private int tid; + private int partitionNum; + private Schema schema; + private CodecMetaData codecMetaData; + private Map defaultValue = new HashMap<>(); + private List holeIdx = new ArrayList<>(); + private Set indexPos = new HashSet<>(); + private Map> indexMap = new HashMap<>(); + private Map defaultIndexValue = new HashMap<>(); + + public InsertPreparedStatementMeta(String sql, NS.TableInfo tableInfo, SQLInsertRow insertRow) { + this.sql = sql; + try { + schema = Common.convertSchema(tableInfo.getColumnDescList()); + codecMetaData = new CodecMetaData(tableInfo.getColumnDescList(), false); + } catch (Exception e) { + e.printStackTrace(); + } + db = tableInfo.getDb(); + name = tableInfo.getName(); + tid = tableInfo.getTid(); + partitionNum = tableInfo.getTablePartitionCount(); + buildIndex(tableInfo); + DefaultValueContainer value = insertRow.GetDefaultValue(); + buildDefaultValue(value); + value.delete(); + VectorUint32 idxArray = insertRow.GetHoleIdx(); + buildHoleIdx(idxArray); + idxArray.delete(); + } + + private void buildIndex(NS.TableInfo tableInfo) { + Map nameIdxMap = new HashMap<>(); + for (int i = 0; i < schema.size(); i++) { + nameIdxMap.put(schema.getColumnName(i), i); + } + for (int i = 0; i < tableInfo.getColumnKeyList().size(); i++) { + com._4paradigm.openmldb.proto.Common.ColumnKey columnKey = tableInfo.getColumnKeyList().get(i); + List colList = new ArrayList<>(columnKey.getColNameCount()); + for (String name : columnKey.getColNameList()) { + colList.add(nameIdxMap.get(name)); + indexPos.add(nameIdxMap.get(name)); + } + indexMap.put(i, colList); + } + } + + private void buildHoleIdx(VectorUint32 idxArray) { + int size = idxArray.size(); + for (int i = 0; i < size; i++) { + holeIdx.add(idxArray.get(i).intValue()); + } + } + + private void buildDefaultValue(DefaultValueContainer valueContainer) { + VectorUint32 defaultPos = valueContainer.GetAllPosition(); + int size = defaultPos.size(); + for (int i = 0; i < size; i++) { + int schemaIdx = defaultPos.get(i).intValue(); + boolean isIndexVal = indexPos.contains(schemaIdx); + if (valueContainer.IsNull(schemaIdx)) { + defaultValue.put(schemaIdx, null); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, NONETOKEN); + } + } else { + switch (schema.getColumnType(schemaIdx)) { + case Types.BOOLEAN: { + boolean val = valueContainer.GetBool(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.SMALLINT: { + short val = valueContainer.GetSmallInt(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.INTEGER: { + int val = valueContainer.GetInt(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.BIGINT: { + long val = valueContainer.GetBigInt(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.FLOAT: + defaultValue.put(schemaIdx, valueContainer.GetFloat(schemaIdx)); + break; + case Types.DOUBLE: + defaultValue.put(schemaIdx, valueContainer.GetDouble(schemaIdx)); + break; + case Types.DATE: { + int val = valueContainer.GetDate(schemaIdx); + defaultValue.put(schemaIdx, CodecUtil.dateIntToDate(val)); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.TIMESTAMP: { + long val = valueContainer.GetTimeStamp(schemaIdx); + defaultValue.put(schemaIdx, new Timestamp(val)); + if (isIndexVal) { + defaultIndexValue.put(schemaIdx, String.valueOf(val)); + } + break; + } + case Types.VARCHAR: { + String val = valueContainer.GetString(schemaIdx); + defaultValue.put(schemaIdx, val); + if (isIndexVal) { + if (val.isEmpty()) { + defaultIndexValue.put(schemaIdx, EMPTY_STRING); + } else { + defaultIndexValue.put(schemaIdx, val); + } + } + break; + } + } + } + } + defaultPos.delete(); + } + + public Schema getSchema() { + return schema; + } + + public String getDatabase() { + return db; + } + + public String getName() { + return name; + } + + public int getTid() { + return tid; + } + + public int getPartitionNum() { + return partitionNum; + } + + public CodecMetaData getCodecMeta() { + return codecMetaData; + } + + public Map getDefaultValue() { + return defaultValue; + } + + public String getSql() { + return sql; + } + + public int getSchemaIdx(int idx) throws SQLException { + if (idx >= holeIdx.size()) { + throw new SQLException("out of data range"); + } + return holeIdx.get(idx); + } + + List getHoleIdx() { + return holeIdx; + } + + Set getIndexPos() { + return indexPos; + } + + Map> getIndexMap() { + return indexMap; + } + + Map getDefaultIndexValue() { + return defaultIndexValue; + } +} diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index c3587e95258..5dd3b7911c3 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -63,7 +63,7 @@ public class SqlClusterExecutor implements SqlExecutor { private SQLRouter sqlRouter; private DeploymentManager deploymentManager; private ZKClient zkClient; - private Map cacheMap = new ConcurrentHashMap<>(); + private InsertPreparedStatementCache insertCache; public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlException { initJavaSdkLibrary(libraryPath); @@ -93,6 +93,7 @@ public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlExcept throw new SqlException("fail to create sql executor"); } deploymentManager = new DeploymentManager(zkClient); + insertCache = new InsertPreparedStatementCache(zkClient); } public SqlClusterExecutor(SdkOption option) throws SqlException { @@ -185,8 +186,8 @@ public Statement getStatement() { @Override public PreparedStatement getInsertPreparedStmt(String db, String sql) throws SQLException { - InsertPreparedStatementCache cache = cacheMap.get(sql); - if (cache == null) { + InsertPreparedStatementMeta meta = insertCache.get(db, sql); + if (meta == null) { Status status = new Status(); SQLInsertRow row = sqlRouter.GetInsertRow(db, sql, status); if (!status.IsOK()) { @@ -197,11 +198,14 @@ public PreparedStatement getInsertPreparedStmt(String db, String sql) throws SQL } throw new SQLException("getSQLInsertRow failed, " + msg); } - cache = new InsertPreparedStatementCache(sql, row); + status.delete(); + String name = row.GetTableInfo().getName(); + NS.TableInfo tableInfo = getTableInfo(db, name); + meta = new InsertPreparedStatementMeta(sql, tableInfo, row); row.delete(); - cacheMap.putIfAbsent(sql, cache); + insertCache.put(db, sql, meta); } - return new InsertPreparedStatementImpl(cache, this.sqlRouter); + return new InsertPreparedStatementImpl(meta, this.sqlRouter); } @Override From 321669be01e1df5a52cdfe32f30df10538c95dfe Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 26 Sep 2023 17:41:51 +0800 Subject: [PATCH 7/9] feat: rm expire time --- .../openmldb/sdk/impl/InsertPreparedStatementCache.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java index 1f440a5e3c9..0b7f522513c 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java @@ -13,7 +13,6 @@ public class InsertPreparedStatementCache { static final int CACHE_SIZE = 100000; - static final int EXPIRE_TIME = 1000 * 60; // 1 minute private Cache, InsertPreparedStatementMeta> cache; @@ -22,7 +21,7 @@ public class InsertPreparedStatementCache { private String tablePath; public InsertPreparedStatementCache(ZKClient zkClient) throws SqlException { - cache = Caffeine.newBuilder().maximumSize(CACHE_SIZE).expireAfterAccess(EXPIRE_TIME, TimeUnit.MILLISECONDS).build(); + cache = Caffeine.newBuilder().maximumSize(CACHE_SIZE).build(); this.zkClient = zkClient; if (zkClient != null) { tablePath = zkClient.getConfig().getNamespace() + "/table/db_table_data"; From c753ef83174781286e489ad2c49dc5bf7e986343 Mon Sep 17 00:00:00 2001 From: dl239 Date: Wed, 27 Sep 2023 09:45:24 +0800 Subject: [PATCH 8/9] feat: update cache size --- .../openmldb/sdk/impl/InsertPreparedStatementCache.java | 5 ++--- .../com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java index 0b7f522513c..9139217cc45 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/InsertPreparedStatementCache.java @@ -12,7 +12,6 @@ import java.util.concurrent.TimeUnit; public class InsertPreparedStatementCache { - static final int CACHE_SIZE = 100000; private Cache, InsertPreparedStatementMeta> cache; @@ -20,8 +19,8 @@ public class InsertPreparedStatementCache { private NodeCache nodeCache; private String tablePath; - public InsertPreparedStatementCache(ZKClient zkClient) throws SqlException { - cache = Caffeine.newBuilder().maximumSize(CACHE_SIZE).build(); + public InsertPreparedStatementCache(int cacheSize, ZKClient zkClient) throws SqlException { + cache = Caffeine.newBuilder().maximumSize(cacheSize).build(); this.zkClient = zkClient; if (zkClient != null) { tablePath = zkClient.getConfig().getNamespace() + "/table/db_table_data"; diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index 5dd3b7911c3..9505cd6aba9 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -93,7 +93,7 @@ public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlExcept throw new SqlException("fail to create sql executor"); } deploymentManager = new DeploymentManager(zkClient); - insertCache = new InsertPreparedStatementCache(zkClient); + insertCache = new InsertPreparedStatementCache(option.getMaxSqlCacheSize(), zkClient); } public SqlClusterExecutor(SdkOption option) throws SqlException { From 659ce139bc0d4b3164c46608cf61750d9665f595 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 10 Oct 2023 14:46:00 +0800 Subject: [PATCH 9/9] fix: delete schema --- .../src/main/java/com/_4paradigm/openmldb/sdk/Common.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java index 0c57cf26a5a..81f85482750 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java @@ -171,8 +171,12 @@ public static ProcedureInfo convertProcedureInfo(com._4paradigm.openmldb.Procedu spInfo.setDbName(procedureInfo.GetDbName()); spInfo.setProName(procedureInfo.GetSpName()); spInfo.setSql(procedureInfo.GetSql()); - spInfo.setInputSchema(convertSchema(procedureInfo.GetInputSchema())); - spInfo.setOutputSchema(convertSchema(procedureInfo.GetOutputSchema())); + com._4paradigm.openmldb.Schema inputSchema = procedureInfo.GetInputSchema(); + spInfo.setInputSchema(convertSchema(inputSchema)); + inputSchema.delete(); + com._4paradigm.openmldb.Schema outputSchema = procedureInfo.GetOutputSchema(); + spInfo.setOutputSchema(convertSchema(outputSchema)); + outputSchema.delete(); spInfo.setMainTable(procedureInfo.GetMainTable()); spInfo.setInputTables(procedureInfo.GetTables()); spInfo.setInputDbs(procedureInfo.GetDbs());