Skip to content

Commit

Permalink
[Fix](test) Fix Show Data Case (apache#47224)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Jan 21, 2025
1 parent 6e49c4e commit bad5b60
Show file tree
Hide file tree
Showing 23 changed files with 313 additions and 210 deletions.
6 changes: 3 additions & 3 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ CONF_String(custom_conf_path, "./conf/doris_cloud.conf");
CONF_mInt64(recycle_interval_seconds, "3600");
CONF_mInt64(retention_seconds, "259200"); // 72h, global retention time
CONF_Int32(recycle_concurrency, "16");
CONF_Int32(recycle_job_lease_expired_ms, "60000");
CONF_mInt32(recycle_job_lease_expired_ms, "60000");
CONF_mInt64(compacted_rowset_retention_seconds, "1800"); // 0.5h
CONF_mInt64(dropped_index_retention_seconds, "10800"); // 3h
CONF_mInt64(dropped_partition_retention_seconds, "10800"); // 3h
Expand Down Expand Up @@ -110,7 +110,7 @@ CONF_String(test_hdfs_fs_name, "");
// CONF_Bool(b, "true");

// txn config
CONF_Int32(label_keep_max_second, "259200"); //3 * 24 * 3600 seconds
CONF_mInt32(label_keep_max_second, "259200"); //3 * 24 * 3600 seconds
CONF_Int32(expired_txn_scan_key_nums, "1000");

// Maximum number of version of a tablet. If the version num of a tablet exceed limit,
Expand All @@ -133,7 +133,7 @@ CONF_String(specific_max_qps_limit, "get_cluster:5000000;begin_txn:5000000");
CONF_Bool(enable_rate_limit, "true");
CONF_Int64(bvar_qps_update_second, "5");

CONF_Int32(copy_job_max_retention_second, "259200"); //3 * 24 * 3600 seconds
CONF_mInt32(copy_job_max_retention_second, "259200"); //3 * 24 * 3600 seconds
CONF_String(arn_id, "");
CONF_String(arn_ak, "");
CONF_String(arn_sk, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,8 @@ public class Config extends ConfigBase {

// update interval of tablet stat
// All frontends will get tablet stat from all backends at each interval
@ConfField public static int tablet_stat_update_interval_second = 60; // 1 min
@ConfField(mutable = true)
public static int tablet_stat_update_interval_second = 60; // 1 min

/**
* Max bytes a broker scanner can process in one broker load job.
Expand Down
7 changes: 7 additions & 0 deletions regression-test/plugins/aliyun_oss_sdk.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,20 @@ Suite.metaClass.calculateFolderLength = { OSS client, String bucketName, String
ObjectListing objectListing = null;
do {
// The default value for MaxKey is 100, and the maximum value is 1000
logger.info("debug:" + folder)
ListObjectsRequest request = new ListObjectsRequest(bucketName).withPrefix(folder).withMaxKeys(1000);
if (objectListing != null) {
request.setMarker(objectListing.getNextMarker());
}
objectListing = client.listObjects(request);
List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
for (OSSObjectSummary s : sums) {
logger.info("Object Key: ${s.getKey()}")
logger.info("Size: ${s.getSize()} bytes")
logger.info("Last Modified: ${s.getLastModified()}")
logger.info("Storage Class: ${s.getStorageClass()}")
logger.info("Owner: ${s.getOwner()?.getId()}")
logger.info("-------------------")
size += s.getSize();
}
} while (objectListing.isTruncated());
Expand Down
94 changes: 83 additions & 11 deletions regression-test/plugins/cloud_show_data_plugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
}

Suite.metaClass.get_tablets_from_table = { String table ->
def res = sql_return_maparray """show tablets from ${table}"""
def res = sql_return_maparray """show tablets from ${table}"""
logger.info("get tablets from ${table}:" + res)
return res
}

Expand Down Expand Up @@ -120,10 +121,10 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
if (tabletStatusAfterCompaction.rowsets.size() < tabletStatusBeforeCompaction.rowsets.size()){
compactionStatus = 'FINISHED'
}
Thread.sleep(60 * 1000)
} while (timeoutTimestamp > System.currentTimeMillis() && (status != 'FINISHED'))
Thread.sleep(10 * 1000)
} while (timeoutTimestamp > System.currentTimeMillis() && (compactionStatus != 'FINISHED'))

if (status != "FINISHED") {
if (compactionStatus != "FINISHED") {
logger.info("compaction not Finish or failed")
return false
}
Expand All @@ -132,8 +133,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods

Suite.metaClass.trigger_compaction = { List<List<Object>> tablets ->
for(def tablet: tablets) {
trigger_tablet_compaction(tablet, "cumulative")
trigger_tablet_compaction(tablet, "base")
trigger_tablet_compaction(tablet, "full")
}
}
Expand All @@ -157,7 +156,7 @@ import org.codehaus.groovy.runtime.IOGroovyMethods

def client = initOssClient(ak, sk, endpoint)
for(String tabletId: tabletIds) {
storageSize += calculateFolderLength(client, bucketName, storagePrefix + "/data/" + tabletId)
storageSize += calculateFolderLength(client, bucketName, storagePrefix + "data/" + tabletId)
}
shutDownOssClient(client)
}
Expand All @@ -168,8 +167,8 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
def fsUser = context.config.otherConfigs.get("cbsFsUser")
def storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
}

return storageSize
def round_size = new BigDecimal(storageSize/1024/1024).setScale(0, BigDecimal.ROUND_FLOOR);
return round_size
}

Suite.metaClass.translate_different_unit_to_MB = { String size, String unitField ->
Expand All @@ -196,7 +195,8 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
def unitField = fields[1]
mysqlShowDataSize = translate_different_unit_to_MB(sizeField, unitField)
}
return mysqlShowDataSize
def round_size = new BigDecimal(mysqlShowDataSize).setScale(0, BigDecimal.ROUND_FLOOR);
return round_size
}

Suite.metaClass.caculate_table_data_size_through_api = { List<List<Object>> tablets ->
Expand All @@ -214,7 +214,79 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
}
}
}
def round_size = new BigDecimal(apiCaculateSize).setScale(0, BigDecimal.ROUND_FLOOR);
return round_size
}

Suite.metaClass.update_ms_config = { String ms_endpoint, String key, String value /*param */ ->
return curl("POST", String.format("http://%s/MetaService/http/v1/update_config?%s=%s", ms_endpoint, key, value))
}

Suite.metaClass.set_config_before_show_data_test = { ->

sql """admin set frontend config ("tablet_stat_update_interval_second" = "1")"""
sql """admin set frontend config ("catalog_trash_expire_second" = "1")"""

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def get_be_param = { paramName ->
// assuming paramName on all BEs have save value
def (code, out, err) = show_be_config(backendIdToBackendIP.get(backendId), backendIdToBackendHttpPort.get(backendId))
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == paramName) {
return ((List<String>) ele)[2]
}
}
}

def ms_endpoint = get_be_param("meta_service_endpoint");

update_ms_config.call(ms_endpoint, "recycle_interval_seconds", "5")
update_ms_config.call(ms_endpoint, "retention_seconds", "0")
update_ms_config.call(ms_endpoint, "compacted_rowset_retention_seconds", "0")
update_ms_config.call(ms_endpoint, "recycle_job_lease_expired_ms", "0")
update_ms_config.call(ms_endpoint, "dropped_partition_retention_seconds", "0")
update_ms_config.call(ms_endpoint, "label_keep_max_second", "0")
update_ms_config.call(ms_endpoint, "copy_job_max_retention_second", "0")
}

Suite.metaClass.set_config_after_show_data_test = { ->

sql """admin set frontend config ("tablet_stat_update_interval_second" = "10")"""
sql """admin set frontend config ("catalog_trash_expire_second" = "600")"""

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def get_be_param = { paramName ->
// assuming paramName on all BEs have save value
def (code, out, err) = show_be_config(backendIdToBackendIP.get(backendId), backendIdToBackendHttpPort.get(backendId))
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == paramName) {
return ((List<String>) ele)[2]
}
}
}

def ms_endpoint = get_be_param("meta_service_endpoint");

return apiCaculateSize
update_ms_config.call(ms_endpoint, "recycle_interval_seconds", "600")
update_ms_config.call(ms_endpoint, "retention_seconds", "259200")
update_ms_config.call(ms_endpoint, "compacted_rowset_retention_seconds", "1800")
update_ms_config.call(ms_endpoint, "recycle_job_lease_expired_ms", "60000")
update_ms_config.call(ms_endpoint, "dropped_partition_retention_seconds", "10800")
update_ms_config.call(ms_endpoint, "label_keep_max_second", "300")
update_ms_config.call(ms_endpoint, "copy_job_max_retention_second", "259200")
}
//http://qa-build.oss-cn-beijing.aliyuncs.com/regression/show_data/fullData.1.part1.gz
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods

// loading one data 10 times, expect data size not rising
suite("test_cloud_follower_show_data","p2") {
suite("test_cloud_follower_show_data","p2, nonConcurrent") {
//cloud-mode
if (!isCloudMode()) {
logger.info("not cloud mode, not run")
Expand Down Expand Up @@ -72,20 +72,21 @@ suite("test_cloud_follower_show_data","p2") {
trigger_compaction(tablets)

// 然后 sleep 1min, 等fe汇报完
sleep(60 * 1000)
sleep(10 * 1000)
sql "select count(*) from ${tableName}"
sleep(10 * 1000)

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))
sleep(60 * 1000)
logger.info("after ${i} times stream load, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}")
}

// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["apiSize"][0])
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["cbsSize"][0])
// expect load 1 times == load 10 times
logger.info("after 1 time stream load, size is ${sizeRecords["mysqlSize"][0]}, after 10 times stream load, size is ${sizeRecords["mysqlSize"][1]}")
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["mysqlSize"][1])
assertEquals(sizeRecords["apiSize"][0], sizeRecords["apiSize"][1])
assertEquals(sizeRecords["cbsSize"][0], sizeRecords["cbsSize"][1])
Expand Down Expand Up @@ -121,5 +122,9 @@ suite("test_cloud_follower_show_data","p2") {
check(tableName)
}

set_config_before_show_data_test()
sleep(10 * 1000)
main()
set_config_after_show_data_test()
sleep(10 * 1000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods

// loading one data 10 times, expect data size not rising
suite("test_cloud_mtmv_show_data","p2") {
suite("test_cloud_mtmv_show_data","p2, nonConcurrent") {
//cloud-mode
if (!isCloudMode()) {
logger.info("not cloud mode, not run")
Expand Down Expand Up @@ -98,13 +98,13 @@ suite("test_cloud_mtmv_show_data","p2") {
trigger_compaction(tablets)

// 然后 sleep 1min, 等fe汇报完
sleep(60 * 1000)
sleep(10 * 1000)
sql "select count(*) from ${tableName}"
sleep(10 * 1000)

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))
sleep(60 * 1000)
logger.info("after ${i} times stream load, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}")
}

Expand All @@ -124,75 +124,37 @@ suite("test_cloud_mtmv_show_data","p2") {
trigger_compaction(tablets)

// 然后 sleep 1min, 等fe汇报完
sleep(60 * 1000)

sql "select count(*) from ${tableName}"

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))


// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][2], sizeRecords["apiSize"][2])
assertEquals(sizeRecords["mysqlSize"][2], sizeRecords["cbsSize"][2])

// 加一下触发compaction的机制
trigger_compaction(tablets)

// 然后 sleep 1min, 等fe汇报完
sleep(60 * 1000)

sleep(10 * 1000)
sql "select count(*) from ${tableName}"
sleep(10 * 1000)

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))


// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][3], sizeRecords["apiSize"][3])
assertEquals(sizeRecords["mysqlSize"][3], sizeRecords["cbsSize"][3])
logger.info("after create mv, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}")
}

if (op == 2){
create_mtmv(tableName)
tableName = ${tableName} + "_mtmv"
tableName = "${tableName}" + "_mtmv"
tablets = get_tablets_from_table(tableName)

// 加一下触发compaction的机制
trigger_compaction(tablets)

// 然后 sleep 1min, 等fe汇报完
sleep(60 * 1000)

sleep(10 * 1000)
sql "select count(*) from ${tableName}"
sleep(10 * 1000)

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))

logger.info("after create mtmv, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}")

// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][2], sizeRecords["apiSize"][2])
assertEquals(sizeRecords["mysqlSize"][2], sizeRecords["cbsSize"][2])

// 加一下触发compaction的机制
trigger_compaction(tablets)

// 然后 sleep 1min, 等fe汇报完
sleep(60 * 1000)

sql "select count(*) from ${tableName}"

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))


// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][3], sizeRecords["apiSize"][3])
assertEquals(sizeRecords["mysqlSize"][3], sizeRecords["cbsSize"][3])
}
}

Expand All @@ -205,5 +167,9 @@ suite("test_cloud_mtmv_show_data","p2") {
check(tableName, 2)
}

set_config_before_show_data_test()
sleep(10 * 1000)
main()
set_config_after_show_data_test()
sleep(10 * 1000)
}
Loading

0 comments on commit bad5b60

Please sign in to comment.