forked from dolphindb/Tutorials_CN
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path01.createStreamTB.txt
51 lines (48 loc) · 3.19 KB
/
01.createStreamTB.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
createStreamTB.txt
Script to create stream tables for publishing
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09
Last modification time: 2022.05.31
*/
// clean up environment
def cleanEnvironment(parallel){
for(i in 1..parallel){
try { unsubscribeTable(tableName="tradeOriginalStream", actionName="processBuyOrder" + string(i)) } catch(ex) { print(ex) }
try { dropStreamEngine("processBuyOrder" + string(i)) } catch(ex) { print(ex) }
try { dropStreamEngine("processSellOrder" + string(i)) } catch(ex) { print(ex) }
try { dropStreamEngine("processCapitalFlow" + string(i)) } catch(ex) { print(ex) }
}
try { unsubscribeTable(tableName="capitalFlowStream", actionName="processCapitalFlow60min") } catch(ex) { print(ex) }
try { dropStreamEngine("processCapitalFlow60min") } catch(ex) { print(ex) }
try{ dropStreamTable(`tradeOriginalStream) } catch(ex){ print(ex) }
try{ dropStreamTable(`capitalFlowStream) } catch(ex){ print(ex) }
try{ dropStreamTable(`capitalFlowStream60min) } catch(ex){ print(ex) }
undef all
}
//calculation parallel, developers need to modify according to the development environment
parallel = 3
cleanEnvironment(parallel)
def createStreamTableFunc(){
//create stream table: tradeOriginalStream
colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum
colType = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT]
tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
undef("tradeOriginalStreamTemp")
//create stream table: capitalFlow
colName = `SecurityID`TradeTime`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
colType = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
capitalFlowStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
undef("capitalFlowStreamTemp")
//create stream table: capitalFlowStream60min
colName = `TradeTime`SecurityID`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
capitalFlowStream60minTemp = streamTable(1000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=capitalFlowStream60minTemp, tableName="capitalFlowStream60min", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
undef("capitalFlowStreamTemp")
}
createStreamTableFunc()
go
setStreamTableFilterColumn(tradeOriginalStream, `SecurityID)