diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/BenchmarkConfig.java b/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/BenchmarkConfig.java index 4f9861cbda2..d8e13ef51db 100644 --- a/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/BenchmarkConfig.java +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/BenchmarkConfig.java @@ -32,6 +32,8 @@ public class BenchmarkConfig { public static int JOIN_NUM = 2; public static int PK_BASE = 1000000; public static long TS_BASE = System.currentTimeMillis(); + public static boolean PARSE_RESULT = false; + public static int BATCH_SIZE = 0; public static String DEPLOY_NAME; public static String CSV_PATH; public static int PUT_BACH_SIZE = 1; @@ -55,6 +57,8 @@ public class BenchmarkConfig { JOIN_NUM = Integer.valueOf(prop.getProperty("JOIN_NUM")); PK_NUM = Integer.valueOf(prop.getProperty("PK_NUM", "100000")); PK_MAX = Integer.valueOf(prop.getProperty("PK_MAX", "0")); + PARSE_RESULT = Boolean.valueOf(prop.getProperty("PARSE_RESULT", "false")); + BATCH_SIZE = Integer.valueOf(prop.getProperty("BATCH_SIZE", "0")); CSV_PATH = prop.getProperty("CSV_PATH"); // if(!CSV_PATH.startsWith("/")){ // CSV_PATH=Util.getRootPath()+CSV_PATH; diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/OpenMLDBPerfBenchmark.java b/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/OpenMLDBPerfBenchmark.java index 5fc8e4c346f..8b9bcef063d 100644 --- a/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/OpenMLDBPerfBenchmark.java +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/OpenMLDBPerfBenchmark.java @@ -182,11 +182,18 @@ public void executeDeployment() { numberKey += pkList.get(pos); } try { - PreparedStatement stat = Util.getPreparedStatement(deployName, numberKey, tableSchema.get("mt"), executor); + PreparedStatement stat = null; + if (BenchmarkConfig.BATCH_SIZE > 0) { + stat = Util.getBatchPreparedStatement(deployName, numberKey, BenchmarkConfig.BATCH_SIZE, tableSchema.get("mt"), executor); + } else { + stat = Util.getPreparedStatement(deployName, numberKey, tableSchema.get("mt"), executor); + } ResultSet resultSet = stat.executeQuery(); - /*resultSet.next(); - Map val = Util.extractResultSet(resultSet); - int a = 0;*/ + long total = 0; + while (BenchmarkConfig.PARSE_RESULT && resultSet.next()) { + Map val = Util.extractResultSet(resultSet); + total += (long)val.get("sum_w0_col_i1"); + } } catch (Exception e) { e.printStackTrace(); } @@ -195,8 +202,11 @@ public void executeDeployment() { public static void main(String[] args) { /*OpenMLDBPerfBenchmark benchmark = new OpenMLDBPerfBenchmark(); benchmark.initEnv(); - benchmark.executeDeployment(); - benchmark.cleanEnv();*/ + while (true) { + benchmark.executeDeployment(); + }*/ + + //benchmark.cleanEnv(); try { Options opt = new OptionsBuilder() diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/Util.java b/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/Util.java index 287bde22cf0..4c7717509a5 100644 --- a/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/Util.java +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/benchmark/Util.java @@ -279,25 +279,78 @@ public static PreparedStatement getPreparedStatement(String deployName, int numb return requestPs; } - public static Map extractResultSet(ResultSet resultSet) { - Map val = new HashMap<>(); + public static PreparedStatement getBatchPreparedStatement(String deployName, int numberKey, int batchSize, + TableSchema tableSchema, + SqlExecutor executor) throws SQLException { + + String dbName = tableSchema.getDataBase(); + List schema = tableSchema.getSchema(); + Set index = tableSchema.getIndex(); + Set tsIndex = tableSchema.getTsIndex(); + PreparedStatement requestPs = executor.getCallablePreparedStmtBatch(dbName, deployName); + ResultSetMetaData metaData = requestPs.getMetaData(); + if (schema.size() != metaData.getColumnCount()) { + return null; + } + for (int idx = 0; idx < batchSize; idx++) { + for (int i = 0; i < metaData.getColumnCount(); i++) { + int columnType = metaData.getColumnType(i + 1); + if (columnType == Types.VARCHAR) { + if (index.contains(i)) { + requestPs.setString(i + 1, "k" + String.valueOf(10 + i) + String.valueOf(numberKey)); + } else { + requestPs.setString(i + 1, "val" + String.valueOf(numberKey)); + } + } else if (columnType == Types.DOUBLE) { + requestPs.setDouble(i + 1, 1.4d); + } else if (columnType == Types.FLOAT) { + requestPs.setFloat(i + 1, 1.3f); + } else if (columnType == Types.INTEGER) { + if (index.contains(i)) { + requestPs.setInt(i + 1, numberKey); + } else { + requestPs.setInt(i + 1, i); + } + } else if (columnType == Types.BIGINT) { + if (index.contains(i)) { + requestPs.setLong(i + 1, numberKey); + } else if (tsIndex.contains(i)) { + requestPs.setLong(i + 1, System.currentTimeMillis()); + } else { + requestPs.setLong(i + 1, i); + } + } else if (columnType == Types.TIMESTAMP) { + requestPs.setTimestamp(i + 1, new Timestamp(System.currentTimeMillis())); + } else if (columnType == Types.DATE) { + requestPs.setDate(i + 1, new Date(System.currentTimeMillis())); + } else if (columnType == Types.BOOLEAN) { + requestPs.setBoolean(i + 1, true); + } + } + requestPs.addBatch(); + } + return requestPs; + } + + public static Map extractResultSet(ResultSet resultSet) { + Map val = new HashMap<>(); try { ResultSetMetaData metaData = resultSet.getMetaData(); for (int i = 0; i < metaData.getColumnCount(); i++) { String columnName = metaData.getColumnName(i + 1); int columnType = metaData.getColumnType(i + 1); if (columnType == Types.VARCHAR) { - val.put(columnName, String.valueOf(resultSet.getString(i + 1))); + val.put(columnName, resultSet.getString(i + 1)); } else if (columnType == Types.DOUBLE) { - val.put(columnName, String.valueOf(resultSet.getDouble(i + 1))); + val.put(columnName, resultSet.getDouble(i + 1)); } else if (columnType == Types.FLOAT) { - val.put(columnName, String.valueOf(resultSet.getFloat(i + 1))); + val.put(columnName, resultSet.getFloat(i + 1)); } else if (columnType == Types.INTEGER) { - val.put(columnName, String.valueOf(resultSet.getInt(i + 1))); + val.put(columnName, resultSet.getInt(i + 1)); } else if (columnType == Types.BIGINT) { - val.put(columnName, String.valueOf(resultSet.getLong(i + 1))); + val.put(columnName, resultSet.getLong(i + 1)); } else if (columnType == Types.TIMESTAMP) { - val.put(columnName, String.valueOf(resultSet.getTimestamp(i + 1))); + val.put(columnName, resultSet.getTimestamp(i + 1)); } } } catch (Exception e) { diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/stability/Config.java b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/Config.java new file mode 100644 index 00000000000..92cf340f49b --- /dev/null +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/Config.java @@ -0,0 +1,75 @@ +package com._4paradigm.openmldb.stability; + + +import com._4paradigm.openmldb.sdk.SdkOption; +import com._4paradigm.openmldb.sdk.SqlExecutor; +import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor; + +import java.util.Properties; + +public class Config { + public static String ZK_CLUSTER; + public static String ZK_PATH; + public static int PUT_THREAD_NUM; + public static int QUERY_THREAD_NUM; + public static boolean NEED_CREATE; + public static boolean NEED_DEPLOY; + public static int REPLICA_NUM; + public static int PARTITION_NUM; + public static String CASE_PATH; + public static boolean ENABLE_PUT; + public static boolean ENABLE_QUERY; + public static int PK_NUM; + public static float INSERT_RATIO; + public static String DB_NAME; + public static String CASE_NAME; + public static String STORAGE_MODE; + public static double ERROR_RATIO; + private static SqlExecutor executor = null; + private static SdkOption option = null; + + static { + try { + Properties prop = new Properties(); + prop.load(Config.class.getClassLoader().getResourceAsStream("stability.properties")); + ZK_CLUSTER = prop.getProperty("ZK_CLUSTER"); + ZK_PATH = prop.getProperty("ZK_PATH"); + PUT_THREAD_NUM = Integer.parseInt(prop.getProperty("PUT_THREAD_NUM")); + QUERY_THREAD_NUM = Integer.parseInt(prop.getProperty("QUERY_THREAD_NUM")); + NEED_CREATE = Boolean.valueOf(prop.getProperty("CREATE_TABLE")); + NEED_DEPLOY = Boolean.valueOf(prop.getProperty("DEPLOY_SQL")); + REPLICA_NUM = Integer.parseInt(prop.getProperty("REPLICA_NUM")); + PARTITION_NUM = Integer.parseInt(prop.getProperty("PARTITION_NUM")); + CASE_PATH = prop.getProperty("CASE_PATH"); + ENABLE_PUT = Boolean.valueOf(prop.getProperty("ENABLE_PUT")); + ENABLE_QUERY = Boolean.valueOf(prop.getProperty("ENABLE_QUERY")); + PK_NUM = Integer.parseInt(prop.getProperty("PK_NUM")); + INSERT_RATIO = Float.parseFloat(prop.getProperty("INSERT_RATIO")); + DB_NAME = prop.getProperty("DB_NAME"); + CASE_NAME = prop.getProperty("CASE_NAME"); + STORAGE_MODE = prop.getProperty("STORAGE_MODE"); + ERROR_RATIO = Double.parseDouble(prop.getProperty("ERROR_RATIO")); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static SqlExecutor GetSqlExecutor(boolean enableDebug) { + if (executor != null) { + return executor; + } + SdkOption sdkOption = new SdkOption(); + sdkOption.setSessionTimeout(30000); + sdkOption.setZkCluster(Config.ZK_CLUSTER); + sdkOption.setZkPath(Config.ZK_PATH); + sdkOption.setEnableDebug(enableDebug); + sdkOption.setRequestTimeout(10000000); + option = sdkOption; + try { + executor = new SqlClusterExecutor(option); + } catch (Exception e) { + e.printStackTrace(); + } + return executor; + } +} diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/stability/FileUtil.java b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/FileUtil.java new file mode 100644 index 00000000000..1b091c31934 --- /dev/null +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/FileUtil.java @@ -0,0 +1,32 @@ +package com._4paradigm.openmldb.stability; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; + +public class FileUtil { + public static String ReadFile(String path) { + File file = new File(path); + BufferedReader reader = null; + StringBuilder builder = new StringBuilder(); + try { + reader = new BufferedReader(new FileReader(file)); + String tempStr = null; + while((tempStr = reader.readLine()) != null) { + builder.append(tempStr); + builder.append("\n"); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + return builder.toString(); + } +} diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/stability/OpenMLDBStability.java b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/OpenMLDBStability.java new file mode 100644 index 00000000000..c15862df8ca --- /dev/null +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/OpenMLDBStability.java @@ -0,0 +1,524 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com._4paradigm.openmldb.stability; + +import com._4paradigm.openmldb.proto.Common; +import com._4paradigm.openmldb.proto.NS; +import com._4paradigm.openmldb.proto.Type; +import com._4paradigm.openmldb.sdk.QueryFuture; +import com._4paradigm.openmldb.jdbc.CallablePreparedStatement; +import com._4paradigm.openmldb.sdk.SqlExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.sql.Date; +import java.util.*; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class OpenMLDBStability { + private static Logger logger = LoggerFactory.getLogger(OpenMLDBStability.class); + private SqlExecutor executor; + private String db; + private String pName; + private String script; + private String ddl; + private String mainTable = "flattenRequest"; + private Map tableMap = new HashMap<>(); + private Map tableInsertSqlMap = new HashMap<>(); + + private ExecutorService putExecuteService; + private ExecutorService queryExecuteService; + private Random random = new Random(); + private AtomicBoolean running = new AtomicBoolean(true); + private AtomicLong insertTotalCnt = new AtomicLong(0); + private AtomicLong insertErrorCnt = new AtomicLong(0); + private AtomicLong queryTotalCnt = new AtomicLong(0); + private AtomicLong queryErrorCnt = new AtomicLong(0); + + public OpenMLDBStability() { + executor = Config.GetSqlExecutor(false); + this.db = Config.DB_NAME; + this.pName = Config.CASE_NAME; + init(); + } + + public void init() { + String baseDir = this.getClass().getClassLoader().getResource("").getPath() + "/" + Config.CASE_PATH + "/" + Config.CASE_NAME; + String rawScript = FileUtil.ReadFile(baseDir + "/sql.txt"); + script = rawScript.trim().replace("\n", " "); + ddl = FileUtil.ReadFile(baseDir + "/ddl.txt"); + } + + public boolean create() { + Statement statement = executor.getStatement(); + try { + statement.execute("CREATE DATABASE IF NOT EXISTS " + db + ";"); + logger.info("create db " + db); + statement.execute("USE " + db + ";"); + String[] arr = ddl.split(";"); + for (String item : arr) { + if (item.trim().isEmpty()) { + continue; + } + String storageMode = Config.STORAGE_MODE.equals("memory") ? "" : ", STORAGE_MODE='HDD'"; + String curSQL = item + " OPTIONS ( PARTITIONNUM=" + Config.PARTITION_NUM + ", REPLICANUM=" + Config.REPLICA_NUM + storageMode + ");"; + statement.execute(curSQL); + logger.info(curSQL + " execute ok!"); + } + statement.execute("set @@SESSION.execute_mode='online';"); + String deploySQL = "DEPLOY " + pName + " " + script; + logger.info(deploySQL); + statement.execute(deploySQL); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } + + public void getTableInfo() { + List tables = executor.getTableNames(db); + try { + for (String name : tables) { + NS.TableInfo tableInfo = executor.getTableInfo(db, name); + tableMap.put(name, new TableInfo(tableInfo)); + tableInsertSqlMap.put(name, buildStatementSQL(name, tableInfo.getColumnDescList().size())); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private String buildStatementSQL(String name, int colSize) { + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO ").append(name).append(" VALUES ("); + for (int i = 0; i < colSize; i++) { + if (i > 0) { + builder.append(", "); + } + builder.append("?"); + } + builder.append(");"); + return builder.toString(); + } + + public long getInsertTotalCnt() { + return insertTotalCnt.get(); + } + + public long getInsertErrorCnt() { + return insertErrorCnt.get(); + } + + public long getQueryTotalCnt() { + return queryTotalCnt.get(); + } + + public long getQueryErrorCnt() { + return queryErrorCnt.get(); + } + + public void resetCnt() { + insertErrorCnt.set(0); + insertTotalCnt.set(0); + queryTotalCnt.set(0); + queryErrorCnt.set(0); + } + + public void clear() { + Statement statement = executor.getStatement(); + try { + statement.execute("use " + db + ";"); + statement.execute("drop deployment " + pName + ";"); + for (String name : tableMap.keySet()) { + statement.execute("drop table " + name + ";"); + logger.info("drop table " + name); + } + statement.execute("drop database " + db + ";"); + logger.info("drop db " + db); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // putTableData with random pkNum + private void putTableData(TableInfo table) { + putTableData(table, null); + } + + //putTableData with given pkNum if it's not null + private void putTableData(TableInfo table, Integer pkNum) { + + long ts = System.currentTimeMillis(); + int curNum = null == pkNum ? random.nextInt(Config.PK_NUM) : pkNum; + StringBuilder builder = new StringBuilder(); + builder.append("insert into "); + builder.append(table.getName()); + builder.append(" values("); + List schema = table.getSchema(); + for (int pos = 0; pos < schema.size(); pos++) { + if (pos > 0) { + builder.append(", "); + } + Type.DataType type = schema.get(pos).getDataType(); + if (type == Type.DataType.kString || type == Type.DataType.kVarchar) { + builder.append("'"); + builder.append("key"); + builder.append("-"); + builder.append(curNum); + builder.append("'"); + } else if (type == Type.DataType.kFloat) { + builder.append(random.nextFloat()); + } else if (type == Type.DataType.kDouble) { + builder.append(random.nextDouble()); + } else if (type == Type.DataType.kBigInt) { + if (table.isTsCol(schema.get(pos).getName())) { + builder.append(ts); + } else { + builder.append(curNum); + } + } else if (type == Type.DataType.kInt || type == Type.DataType.kSmallInt) { + builder.append(curNum); + } else if (type == Type.DataType.kTimestamp) { + builder.append(ts); + } else if (type == Type.DataType.kBool) { + builder.append(true); + } else if (type == Type.DataType.kDate) { + builder.append("'2020-11-27'"); + } else { + logger.warn("invalid type"); + } + } + builder.append(");"); + String exeSql = builder.toString(); + boolean ret = executor.executeInsert(db, exeSql); + insertTotalCnt.getAndIncrement(); + if (!ret) { + insertErrorCnt.getAndIncrement(); + } + } + + private boolean setInsertData(PreparedStatement requestPs, long ts, String tableName) { + return setRequestData(requestPs, ts, tableName); + } + + private boolean setRequestData(PreparedStatement requestPs, long ts, String tableName) { + try { + ResultSetMetaData metaData = requestPs.getMetaData(); + TableInfo table = null; + if (tableName == null) { + table = tableMap.get(mainTable); + } else { + table = tableMap.get(tableName); + } + if (table.getSchema().size() != metaData.getColumnCount()) { + return false; + } + int curNum = random.nextInt(Config.PK_NUM); + for (int i = 0; i < metaData.getColumnCount(); i++) { + int columnType = metaData.getColumnType(i + 1); + if (columnType == Types.VARCHAR) { + requestPs.setString(i + 1, "key" + "-" + String.valueOf(curNum)); + } else if (columnType == Types.DOUBLE) { + requestPs.setDouble(i + 1, random.nextDouble()); + } else if (columnType == Types.FLOAT) { + requestPs.setFloat(i + 1, random.nextFloat()); + } else if (columnType == Types.INTEGER) { + requestPs.setInt(i + 1, curNum); + } else if (columnType == Types.BIGINT) { + if (table.isTsCol(metaData.getColumnName(i + 1))) { + requestPs.setLong(i + 1, ts); + } else { + requestPs.setLong(i + 1, curNum); + } + } else if (columnType == Types.TIMESTAMP) { + requestPs.setTimestamp(i + 1, new Timestamp(ts)); + } else if (columnType == Types.DATE) { + requestPs.setDate(i + 1, new Date(ts)); + } + } + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return true; + } + + private PreparedStatement getPreparedStatement(boolean isProcedure) throws SQLException { + PreparedStatement requestPs = null; + long ts = System.currentTimeMillis(); + /*if (mode == BenchmarkConfig.Mode.BATCH_REQUEST) { + if (isProcedure) { + requestPs = executor.getCallablePreparedStmtBatch(db, pName); + } else { + requestPs = executor.getBatchRequestPreparedStmt(db, script, commonColumnIndices); + } + for (int i = 0; i < BenchmarkConfig.BATCH_SIZE; i++) { + if (setRequestData(requestPs, ts, null)) { + requestPs.addBatch(); + } + } + }*/ + if (isProcedure) { + requestPs = executor.getCallablePreparedStmt(db, pName); + } else { + requestPs = executor.getRequestPreparedStmt(db, script); + } + setRequestData(requestPs, ts, null); + return requestPs; + } + + private void getData(ResultSet resultSet) { + if (resultSet == null) { + return; + } + try { + ResultSetMetaData metaData = resultSet.getMetaData(); + while (resultSet.next()) { + for (int i = 0; i < metaData.getColumnCount(); i++) { + // String columnName = metaData.getColumnName(i + 1); + int columnType = metaData.getColumnType(i + 1); + if (columnType == Types.VARCHAR) { + String val = resultSet.getString(i + 1); + } else if (columnType == Types.DOUBLE) { + double val = resultSet.getDouble(i + 1); + } else if (columnType == Types.INTEGER) { + int val = resultSet.getInt(i + 1); + } else if (columnType == Types.BIGINT) { + long val = resultSet.getLong(i + 1); + } else if (columnType == Types.TIMESTAMP) { + Timestamp ts = resultSet.getTimestamp(i + 1); + } else if (columnType == Types.DATE) { + Date date = resultSet.getDate(i + 1); + } + } + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void putData() { + //while (running.get()) { + for (TableInfo table : tableMap.values()) { + putTableData(table); + } + //} + } + + private void insert() { + while (running.get()) { + if (random.nextFloat() < Config.INSERT_RATIO) { + for (TableInfo table : tableMap.values()) { + insertTableData(table); + } + } else { + for (TableInfo table : tableMap.values()) { + putTableData(table); + } + } + } + } + + // putTableData with random pkNum + private void insertTableData(TableInfo table) { + insertTableData(table, null); + } + + //putTableData with given pkNum if it's not null + private void insertTableData(TableInfo table, Integer pkNum) { + PreparedStatement ps = null; + try { + ps = getInsertPstmt(table, pkNum, tableInsertSqlMap.get(table.getName())); + ps.execute(); + } catch (SQLException e) { + e.printStackTrace(); + } finally { + if (ps != null) { + try { + ps.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + } + + private PreparedStatement getInsertPstmt(TableInfo table, Integer pkNum, String insertSql) throws SQLException{ + long ts = System.currentTimeMillis(); + PreparedStatement ps = executor.getInsertPreparedStmt(db, insertSql); + setInsertData(ps, ts, table.getName()); + return ps; + } + + public void query() { + Random curRandom = new Random(); + int cnt = 0; + while (running.get()) { + queryTotalCnt.getAndIncrement(); + boolean isProcedure = true; + //if (curRandom.nextFloat() > BenchmarkConfig.PROCEDURE_RATIO) { + // isProcedure = true; + //} + PreparedStatement ps = null; + ResultSet resultSet = null; + boolean isSync = curRandom.nextBoolean(); + if (!isSync) { + try { + ps = null; + resultSet = null; + ps = getPreparedStatement(isProcedure); + if (ps instanceof CallablePreparedStatement) { + QueryFuture future = ((CallablePreparedStatement) ps).executeQueryAsync(100, TimeUnit.MILLISECONDS); + resultSet = future.get(); + getData(resultSet); + } + } catch (Exception e) { + queryErrorCnt.getAndIncrement(); + e.printStackTrace(); + } finally { + close(ps, resultSet); + } + } else { + try { + ps = getPreparedStatement(isProcedure); + resultSet = ps.executeQuery(); + getData(resultSet); + } catch (Exception e) { + queryErrorCnt.getAndIncrement(); + e.printStackTrace(); + } finally { + close(ps, resultSet); + } + + } + /*try { + String strLimit = String.valueOf(curRandom.nextInt(10000) + 1); + String limitSql = "select * from " + mainTable + " limit " + strLimit + ";"; + resultSet = executor.executeSQL(db, limitSql); + getData(resultSet); + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + resultSet.close(); + } catch (Exception e) { + e.printStackTrace(); + } + }*/ + } + } + + private void close(PreparedStatement ps, ResultSet rs) { + try { + if (ps != null) { + ps.close(); + } + } catch (Exception e) { + e.printStackTrace(); + } + try { + if (rs != null) { + rs.close(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + + public void runPut(int threadNum) { + putExecuteService = Executors.newFixedThreadPool(threadNum); + logger.info("put thread: " + threadNum); + for (int i = 0; i < threadNum; i++) { + putExecuteService.submit(new Runnable() { + @Override + public void run() { + insert(); + } + }); + } + } + + public void runQuery(int threadNum) { + queryExecuteService = Executors.newFixedThreadPool(threadNum); + logger.info("query thread: " + threadNum); + for (int i = 0; i < threadNum; i++) { + queryExecuteService.submit(new Runnable() { + @Override + public void run() { + query(); + } + }); + } + } + + public void close() { + logger.info("stop run"); + running.set(false); + queryExecuteService.shutdownNow(); + putExecuteService.shutdownNow(); + executor.close(); + } + + public static void main(String[] args) { + OpenMLDBStability stability = new OpenMLDBStability(); + if (Config.NEED_CREATE) { + if (!stability.create()) { + return; + } + } + stability.getTableInfo(); + //stability.query(); + //stability.clear(); + if (Config.ENABLE_PUT) { + stability.runPut(Config.PUT_THREAD_NUM); + } + if (Config.ENABLE_QUERY) { + stability.runQuery(Config.QUERY_THREAD_NUM); + } + //stability.putData(); + //perf.query(); + while (true) { + try { + Thread.sleep(1000 * 60); + long insertCnt = stability.getInsertTotalCnt(); + long insertErr = stability.getInsertErrorCnt(); + long queryCnt = stability.getQueryTotalCnt(); + long queryErr = stability.getQueryErrorCnt(); + stability.resetCnt(); + double insert_ratio = insertErr / (double) insertCnt; + double query_ratio = queryErr / (double) queryCnt; + if (insert_ratio > Config.ERROR_RATIO) { + logger.error("insert error ratio {} total {} error {}", insert_ratio, insertCnt, insertErr); + } + if (query_ratio > Config.ERROR_RATIO) { + logger.error("query error ratio {} total {} error {}", query_ratio, queryCnt, queryErr); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/benchmark/src/main/java/com/_4paradigm/openmldb/stability/TableInfo.java b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/TableInfo.java new file mode 100644 index 00000000000..97246126572 --- /dev/null +++ b/benchmark/src/main/java/com/_4paradigm/openmldb/stability/TableInfo.java @@ -0,0 +1,37 @@ +package com._4paradigm.openmldb.stability; + +import java.util.*; +import com._4paradigm.openmldb.proto.NS; +import com._4paradigm.openmldb.proto.Common; + +public class TableInfo { + private String db; + private String name; + private NS.TableInfo tableInfo; + private List columns; + private Set tsCol; + + public TableInfo(NS.TableInfo tableInfo) { + this.db = tableInfo.getDb(); + this.name = tableInfo.getName(); + this.tableInfo = tableInfo; + tsCol = new HashSet<>(); + for (Common.ColumnKey columnKey : tableInfo.getColumnKeyList()) { + if (columnKey.hasTsName()) { + tsCol.add(columnKey.getTsName()); + } + } + } + + public boolean isTsCol (String colName) { + return tsCol.contains(colName); + } + + public String getName() { + return name; + } + + public List getSchema() { + return tableInfo.getColumnDescList(); + } +} diff --git a/benchmark/src/main/resources/cases/myhug/ddl.txt b/benchmark/src/main/resources/cases/myhug/ddl.txt new file mode 100644 index 00000000000..616eadf1796 --- /dev/null +++ b/benchmark/src/main/resources/cases/myhug/ddl.txt @@ -0,0 +1,104 @@ +create table `action`( +`reqId` string, +`eventTime` timestamp, +`ingestionTime` timestamp, +`actionValue` double, +index(key=(`reqId`), ttl=120m, ttl_type=absolute) +) ; +create table `bo_hislabel`( +`ingestionTime` timestamp, +`zUserId` string, +`uUserId` string, +`nRequestTime` timestamp, +`fWatchedTimeLen` double, +index(key=(`zUserId`,`uUserId`), ts=`ingestionTime`, ttl=2881m, ttl_type=absolute) +) ; +create table `flattenRequest`( +`reqId` string, +`eventTime` timestamp, +`index1` string, +`uUserId` string, +`zUserId` string, +`fRequestId` string, +`fDisplayRank` double, +`fSessionId` string, +`nRoomUserNum` double, +`nRoomInLm` double, +`nRoomInGame` double, +`nRequestTime` timestamp, +`zSex` string, +`zPhoneType` string, +`zLongitude` double, +`zLatitude` double, +`zPosition` string, +`zHome` string, +`zChannel` string, +`zAge` double, +`zHasCreatedGroup` string, +`zRegTime` timestamp, +`zFaceScore` double, +`zFansNum` double, +`zFollowNum` double, +`zGainNum` double, +`zSGiftNum` double, +`zSWihsperNum` double, +`zSChatMsgNum` double, +`zLiveAvgLength` double, +`zLiveFrequency` double, +`zLiveDawn` double, +`zLiveMorning` double, +`zLiveAfternoon` double, +`zLiveEvening` double, +`zMaxRGiftNumOneUser` double, +`zRGiftUserNum` double, +`zLiveMsgNum` double, +`zLiveDisharmony` double, +`zLiveShareNum` double, +`zSmallGiftNum` double, +`zBigGiftNum` double, +`uSex` string, +`uPhoneType` string, +`uLongitude` double, +`uLatitude` double, +`uPosition` string, +`uHome` string, +`uChannel` string, +`uAge` double, +`uHasJoinedGroup` string, +`uRegTime` timestamp, +`uFirstChargeNum` double, +`uLatestChargeTime` timestamp, +`uRemainDiamondNum` double, +`uFansNum` double, +`uFollowNum` double, +`uGainNum` double, +`uSGiftNum` double, +`uSWihsperNum` double, +`uSChatMsgNum` double, +`uLiveSMsgNum` double, +`uHasBeenBanned` double, +`uSMsgFiltered` double, +`uWatchDawn` double, +`uWatchMorning` double, +`uWatchAfternoon` double, +`uWatchEvening` double, +`uWatchAvgLength` double, +`uEnterRoomFrequency` double, +`uTopThreeNum` double, +`uWatchSameCity` double, +`uPlayGame` string, +`uLive` double, +`uLmNum` double, +`uSBigGiftNum` double, +`uSSmallGiftNum` double, +`uRGiftUserNum` double, +`uWatchTopList` int, +`split_id` int, +index(key=(`uUserId`), ts=`eventTime`, ttl=(2881m, 10), ttl_type=absandlat), +index(key=(`zChannel`), ts=`eventTime`, ttl=2881m, ttl_type=absolute), +index(key=(`uSex`), ts=`eventTime`, ttl=(2881m, 10), ttl_type=absandlat), +index(key=(`zUserId`), ts=`eventTime`, ttl=2881m, ttl_type=absolute), +index(key=(`uPlayGame`), ts=`eventTime`, ttl=601m, ttl_type=absolute), +index(key=(`uHasJoinedGroup`), ts=`eventTime`, ttl=2881m, ttl_type=absolute), +index(key=(`zUserId`,`uUserId`), ts=`eventTime`, ttl=2881m, ttl_type=absolute) +) ; \ No newline at end of file diff --git a/benchmark/src/main/resources/cases/myhug/sql.txt b/benchmark/src/main/resources/cases/myhug/sql.txt new file mode 100644 index 00000000000..be885b4304b --- /dev/null +++ b/benchmark/src/main/resources/cases/myhug/sql.txt @@ -0,0 +1,137 @@ +select * from +( +select + `reqId` as reqId_1, + `reqId` as flattenRequest_reqId_original_0, + `eventTime` as flattenRequest_eventTime_original_1, + `uUserId` as flattenRequest_uUserId_original_2, + `zUserId` as flattenRequest_zUserId_original_3, + `nRoomUserNum` as flattenRequest_nRoomUserNum_original_4, + `nRoomInLm` as flattenRequest_nRoomInLm_original_5, + `nRoomInGame` as flattenRequest_nRoomInGame_original_6, + `nRequestTime` as flattenRequest_nRequestTime_original_7, + `zSex` as flattenRequest_zSex_original_8, + `zPhoneType` as flattenRequest_zPhoneType_original_9, + `zLongitude` as flattenRequest_zLongitude_original_10, + `zLatitude` as flattenRequest_zLatitude_original_11, + `zPosition` as flattenRequest_zPosition_original_12, + `zHome` as flattenRequest_zHome_original_13, + `zChannel` as flattenRequest_zChannel_original_14, + `zAge` as flattenRequest_zAge_original_15, + `zHasCreatedGroup` as flattenRequest_zHasCreatedGroup_original_16, + `zRegTime` as flattenRequest_zRegTime_original_17, + `zFaceScore` as flattenRequest_zFaceScore_original_18, + `zFansNum` as flattenRequest_zFansNum_original_19, + `zFollowNum` as flattenRequest_zFollowNum_original_20, + `zGainNum` as flattenRequest_zGainNum_original_21, + `zSGiftNum` as flattenRequest_zSGiftNum_original_22, + `zSWihsperNum` as flattenRequest_zSWihsperNum_original_23, + `zSChatMsgNum` as flattenRequest_zSChatMsgNum_original_24, + `zLiveAvgLength` as flattenRequest_zLiveAvgLength_original_25, + `zLiveFrequency` as flattenRequest_zLiveFrequency_original_26, + `zLiveDawn` as flattenRequest_zLiveDawn_original_27, + `zLiveMorning` as flattenRequest_zLiveMorning_original_28, + `zLiveAfternoon` as flattenRequest_zLiveAfternoon_original_29, + `zLiveEvening` as flattenRequest_zLiveEvening_original_30, + `zMaxRGiftNumOneUser` as flattenRequest_zMaxRGiftNumOneUser_original_31, + `zRGiftUserNum` as flattenRequest_zRGiftUserNum_original_32, + `zLiveMsgNum` as flattenRequest_zLiveMsgNum_original_33, + `zLiveDisharmony` as flattenRequest_zLiveDisharmony_original_34, + `zLiveShareNum` as flattenRequest_zLiveShareNum_original_35, + `zSmallGiftNum` as flattenRequest_zSmallGiftNum_original_36, + `zBigGiftNum` as flattenRequest_zBigGiftNum_original_37, + `uSex` as flattenRequest_uSex_original_38, + `uPhoneType` as flattenRequest_uPhoneType_original_39, + `uLongitude` as flattenRequest_uLongitude_original_40, + `uLatitude` as flattenRequest_uLatitude_original_41, + `uPosition` as flattenRequest_uPosition_original_42, + `uHome` as flattenRequest_uHome_original_43, + `uChannel` as flattenRequest_uChannel_original_44, + `uAge` as flattenRequest_uAge_original_45, + `uHasJoinedGroup` as flattenRequest_uHasJoinedGroup_original_46, + `uRegTime` as flattenRequest_uRegTime_original_47, + `uFirstChargeNum` as flattenRequest_uFirstChargeNum_original_48, + `uRemainDiamondNum` as flattenRequest_uRemainDiamondNum_original_49, + `uFansNum` as flattenRequest_uFansNum_original_50, + `uFollowNum` as flattenRequest_uFollowNum_original_51, + `uGainNum` as flattenRequest_uGainNum_original_52, + `uSGiftNum` as flattenRequest_uSGiftNum_original_53, + `uSWihsperNum` as flattenRequest_uSWihsperNum_original_54, + `uSChatMsgNum` as flattenRequest_uSChatMsgNum_original_55, + `uLiveSMsgNum` as flattenRequest_uLiveSMsgNum_original_56, + `uHasBeenBanned` as flattenRequest_uHasBeenBanned_original_57, + `uSMsgFiltered` as flattenRequest_uSMsgFiltered_original_58, + `uWatchDawn` as flattenRequest_uWatchDawn_original_59, + `uWatchMorning` as flattenRequest_uWatchMorning_original_60, + `uWatchAfternoon` as flattenRequest_uWatchAfternoon_original_61, + `uWatchEvening` as flattenRequest_uWatchEvening_original_62, + `uWatchAvgLength` as flattenRequest_uWatchAvgLength_original_63, + `uEnterRoomFrequency` as flattenRequest_uEnterRoomFrequency_original_64, + `uTopThreeNum` as flattenRequest_uTopThreeNum_original_65, + `uWatchSameCity` as flattenRequest_uWatchSameCity_original_66, + `uPlayGame` as flattenRequest_uPlayGame_original_67, + `uLive` as flattenRequest_uLive_original_68, + `uLmNum` as flattenRequest_uLmNum_original_69, + `uSBigGiftNum` as flattenRequest_uSBigGiftNum_original_70, + `uSSmallGiftNum` as flattenRequest_uSSmallGiftNum_original_71, + `uRGiftUserNum` as flattenRequest_uRGiftUserNum_original_72, + sum(`uWatchAvgLength`) over flattenRequest_uUserId_eventTime_0_10 as flattenRequest_uWatchAvgLength_window_sum_76, + `uUserId` as flattenRequest_uUserId_combine_77, + `uSex` as flattenRequest_uSex_combine_77, + `uPhoneType` as flattenRequest_uPhoneType_combine_77, + avg(`uWatchMorning`) over flattenRequest_uUserId_eventTime_0_10 as flattenRequest_uWatchMorning_window_avg_78, + avg(`uWatchEvening`) over flattenRequest_uUserId_eventTime_0_10 as flattenRequest_uWatchEvening_window_avg_79, + sum(`zSWihsperNum`) over flattenRequest_zChannel_eventTime_0s_172801s as flattenRequest_zSWihsperNum_window_sum_80, + avg(`uWatchAvgLength`) over flattenRequest_uUserId_eventTime_0_10 as flattenRequest_uWatchAvgLength_window_avg_81, + case when !isnull(at(`zUserId`, 0)) over flattenRequest_uUserId_eventTime_0s_36001s then count(`zUserId`) over flattenRequest_uUserId_eventTime_0s_36001s else null end as flattenRequest_zUserId_window_count_82, + case when !isnull(at(`zUserId`, 0)) over flattenRequest_uUserId_eventTime_0s_172801s then count(`zUserId`) over flattenRequest_uUserId_eventTime_0s_172801s else null end as flattenRequest_zUserId_window_count_83, + case when !isnull(at(`zUserId`, 0)) over flattenRequest_uSex_eventTime_0_10 then count(`zUserId`) over flattenRequest_uSex_eventTime_0_10 else null end as flattenRequest_zUserId_window_count_84, + case when !isnull(at(`zUserId`, 0)) over flattenRequest_uUserId_eventTime_0_10 then count(`zUserId`) over flattenRequest_uUserId_eventTime_0_10 else null end as flattenRequest_zUserId_window_count_85, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_zUserId_eventTime_0s_36001s then count(`uUserId`) over flattenRequest_zUserId_eventTime_0s_36001s else null end as flattenRequest_uUserId_window_count_86, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_zUserId_eventTime_0s_172801s then count(`uUserId`) over flattenRequest_zUserId_eventTime_0s_172801s else null end as flattenRequest_uUserId_window_count_87, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_uPlayGame_eventTime_0s_36001s then count(`uUserId`) over flattenRequest_uPlayGame_eventTime_0s_36001s else null end as flattenRequest_uUserId_window_count_88, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_uHasJoinedGroup_eventTime_0s_36001s then count(`uUserId`) over flattenRequest_uHasJoinedGroup_eventTime_0s_36001s else null end as flattenRequest_uUserId_window_count_89, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_uHasJoinedGroup_eventTime_0s_172801s then count(`uUserId`) over flattenRequest_uHasJoinedGroup_eventTime_0s_172801s else null end as flattenRequest_uUserId_window_count_90, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_uSex_eventTime_0s_172801s then count(`uUserId`) over flattenRequest_uSex_eventTime_0s_172801s else null end as flattenRequest_uUserId_window_count_91, + case when !isnull(at(`uUserId`, 0)) over flattenRequest_uSex_eventTime_0s_36001s then count(`uUserId`) over flattenRequest_uSex_eventTime_0s_36001s else null end as flattenRequest_uUserId_window_count_92, + `uUserId` as flattenRequest_uUserId_combine_93, + `zUserId` as flattenRequest_zUserId_combine_93, + `uHome` as flattenRequest_uHome_combine_93 +from + `flattenRequest` + window flattenRequest_uUserId_eventTime_0_10 as (partition by `uUserId` order by `eventTime` rows between 10 preceding and 0 preceding), + flattenRequest_zChannel_eventTime_0s_172801s as (partition by `zChannel` order by `eventTime` rows_range between 172801s preceding and 0s preceding), + flattenRequest_uUserId_eventTime_0s_36001s as (partition by `uUserId` order by `eventTime` rows_range between 36001s preceding and 0s preceding), + flattenRequest_uUserId_eventTime_0s_172801s as (partition by `uUserId` order by `eventTime` rows_range between 172801s preceding and 0s preceding), + flattenRequest_uSex_eventTime_0_10 as (partition by `uSex` order by `eventTime` rows between 10 preceding and 0 preceding), + flattenRequest_zUserId_eventTime_0s_36001s as (partition by `zUserId` order by `eventTime` rows_range between 36001s preceding and 0s preceding), + flattenRequest_zUserId_eventTime_0s_172801s as (partition by `zUserId` order by `eventTime` rows_range between 172801s preceding and 0s preceding), + flattenRequest_uPlayGame_eventTime_0s_36001s as (partition by `uPlayGame` order by `eventTime` rows_range between 36001s preceding and 0s preceding), + flattenRequest_uHasJoinedGroup_eventTime_0s_36001s as (partition by `uHasJoinedGroup` order by `eventTime` rows_range between 36001s preceding and 0s preceding), + flattenRequest_uHasJoinedGroup_eventTime_0s_172801s as (partition by `uHasJoinedGroup` order by `eventTime` rows_range between 172801s preceding and 0s preceding), + flattenRequest_uSex_eventTime_0s_172801s as (partition by `uSex` order by `eventTime` rows_range between 172801s preceding and 0s preceding), + flattenRequest_uSex_eventTime_0s_36001s as (partition by `uSex` order by `eventTime` rows_range between 36001s preceding and 0s preceding)) +as out0 +last join +( +select + flattenRequest.reqId as reqId_74, + `action_reqId`.`actionValue` as action_actionValue_multi_direct_73 +from + `flattenRequest` + last join `action` as `action_reqId` on `flattenRequest`.`reqId` = `action_reqId`.`reqId`) +as out1 +on out0.reqId_1 = out1.reqId_74 +last join +( +select + reqId as reqId_75, + max(`fWatchedTimeLen`) over bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s as bo_hislabel_fWatchedTimeLen_multi_max_74, + avg(`fWatchedTimeLen`) over bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s as bo_hislabel_fWatchedTimeLen_multi_avg_75 +from + (select `eventTime` as `ingestionTime`, `zUserId` as `zUserId`, `uUserId` as `uUserId`, timestamp('2019-07-18 09:20:20') as `nRequestTime`, double(0) as `fWatchedTimeLen`, reqId from `flattenRequest`) + window bo_hislabel_zUserId_uUserId_ingestionTime_1s_172801s as ( +UNION (select `ingestionTime`, `zUserId`, `uUserId`, `nRequestTime`, `fWatchedTimeLen`, '' as reqId from `bo_hislabel`) partition by `zUserId`,`uUserId` order by `ingestionTime` rows_range between 172801s preceding and 1s preceding INSTANCE_NOT_IN_WINDOW)) +as out2 +on out0.reqId_1 = out2.reqId_75 +; \ No newline at end of file diff --git a/benchmark/src/main/resources/conf.properties b/benchmark/src/main/resources/conf.properties index bcde106ed08..1bd6e3636db 100644 --- a/benchmark/src/main/resources/conf.properties +++ b/benchmark/src/main/resources/conf.properties @@ -1,9 +1,9 @@ ZK_CLUSTER=172.24.4.55:32200 ZK_PATH=/openmldb_test -WINDOW_NUM=2 -WINDOW_SIZE=1000 -JOIN_NUM=2 +WINDOW_NUM=1 +WINDOW_SIZE=1 +JOIN_NUM=0 PK_NUM=10 PK_MAX=0 @@ -11,6 +11,8 @@ PK_BASE=1000000 DATABASE=bank_perf DEPLOY_NAME=deploy_bank +PARSE_RESULT=true +BATCH_SIZE=0 CSV_PATH=data/bank_flattenRequest.csv -PUT_BACH_SIZE=100 \ No newline at end of file +PUT_BACH_SIZE=100 diff --git a/benchmark/src/main/resources/stability.properties b/benchmark/src/main/resources/stability.properties new file mode 100644 index 00000000000..272df35be11 --- /dev/null +++ b/benchmark/src/main/resources/stability.properties @@ -0,0 +1,20 @@ +ZK_CLUSTER=172.24.4.55:32200 +ZK_PATH=/openmldb +CREATE_TABLE=true +DEPLOY_SQL=true +ENABLE_PUT=true +ENABLE_QUERY=true +PK_NUM=1000 + +PUT_THREAD_NUM=1 +QUERY_THREAD_NUM=1 +REPLICA_NUM=1 +PARTITION_NUM=8 + +INSERT_RATIO=0.5 +DB_NAME=myhug +CASE_NAME=myhug +CASE_PATH=cases +STORAGE_MODE=hdd + +ERROR_RATIO=0.005 \ No newline at end of file