PHP实现量化交易,量化交易干货丨如何使用DolphinDB计算K线
DolphinDB提供了功能強(qiáng)大的內(nèi)存計(jì)算引擎,內(nèi)置時(shí)間序列函數(shù),分布式計(jì)算以及流數(shù)據(jù)處理引擎,在眾多場(chǎng)景下均可高效的計(jì)算K線。本教程將介紹DolphinDB如何通過批量處理和流式處理計(jì)算K線。歷史數(shù)據(jù)批量計(jì)算K線
其中可以指定K線窗口的起始時(shí)間;一天中可以存在多個(gè)交易時(shí)段,包括隔夜時(shí)段;K線窗口可重疊;使用交易量作為劃分K線窗口的維度。需要讀取的數(shù)據(jù)量特別大并且需要將結(jié)果寫入數(shù)據(jù)庫時(shí),可使用DolphinDB內(nèi)置的Map-Reduce函數(shù)并行計(jì)算。流式計(jì)算K線
使用API實(shí)時(shí)接收市場(chǎng)數(shù)據(jù),并使用DolphinDB內(nèi)置的流數(shù)據(jù)時(shí)序計(jì)算引擎(TimeSeriesAggregator)進(jìn)行實(shí)時(shí)計(jì)算得到K線數(shù)據(jù)。
1. 歷史數(shù)據(jù)K線計(jì)算
使用歷史數(shù)據(jù)計(jì)算K線,可使用DolphinDB的內(nèi)置函數(shù)bar,dailyAlignedBar,或wj。
1.1 不指定K線窗口的起始時(shí)刻,根據(jù)數(shù)據(jù)自動(dòng)生成K線結(jié)果
bar(X,Y)返回X減去X除以Y的余數(shù),一般用于將數(shù)據(jù)分組。date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m;
bar(date, 5);
返回以下結(jié)果:[09:30m,09:30m,09:45m,09:45m,09:55m,09:55m]
例子1:使用以下數(shù)據(jù)模擬美國股票市場(chǎng):n = 1000000
date = take(2019.11.07 2019.11.08, n)
time = (09:30:00.000 + rand(int(6.5*60*60*1000), n)).sort!()
timestamp = concatDateTime(date, time)
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`AAPL`FB`AMZN`MSFT, n)
trade = table(symbol, date, time, timestamp, price, volume).sortBy!(`symbol`timestamp)
undef(`date`time`timestamp`price`volume`symbol)
計(jì)算5分鐘K線:barMinutes = 5
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart
請(qǐng)注意,以上數(shù)據(jù)中,time列的精度為毫秒。若time列精度不是毫秒,則應(yīng)當(dāng)將 barMinutes601000 中的數(shù)字做相應(yīng)調(diào)整。
1.2 需要指定K線窗口的起始時(shí)刻
需要指定K線窗口的起始時(shí)刻,可使用dailyAlignedBar函數(shù)。該函數(shù)可處理每日多個(gè)交易時(shí)段,亦可處理隔夜時(shí)段。
請(qǐng)注意,使用dailyAlignedBar函數(shù)時(shí),時(shí)間列必須含有日期信息,包括 DATETIME, TIMESTAMP 或 NANOTIMESTAMP 這三種類型的數(shù)據(jù)。指定每個(gè)交易時(shí)段窗口起始時(shí)刻的參數(shù) timeOffset 必須使用相應(yīng)的去除日期信息之后的 SECOND,TIME 或 NANOTIME 類型的數(shù)據(jù)。
例子2(每日一個(gè)交易時(shí)段):計(jì)算美國股票市場(chǎng)7分鐘K線。數(shù)據(jù)沿用例子1中的trade表。barMinutes = 7
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, 09:30:00.000, barMinutes*60*1000) as barStart
例子3(每日兩個(gè)交易時(shí)段):中國股票市場(chǎng)每日有兩個(gè)交易時(shí)段,上午時(shí)段為9:30至11:30,下午時(shí)段為13:00至15:00。
使用以下數(shù)據(jù)模擬:n = 1000000
date = take(2019.11.07 2019.11.08, n)
time = (09:30:00.000 + rand(2*60*60*1000, n/2)).sort!() join (13:00:00.000 + rand(2*60*60*1000, n/2)).sort!()
timestamp = concatDateTime(date, time)
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`600519`000001`600000`601766, n)
trade = table(symbol, timestamp, price, volume).sortBy!(`symbol`timestamp)
undef(`date`time`timestamp`price`volume`symbol)
計(jì)算7分鐘K線:barMinutes = 7
sessionsStart=09:30:00.000 13:00:00.000
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart
例子4(每日兩個(gè)交易時(shí)段,包含隔夜時(shí)段):某些期貨每日有多個(gè)交易時(shí)段,且包括隔夜時(shí)段。本例中,第一個(gè)交易時(shí)段為8:45到下午13:45,另一個(gè)時(shí)段為隔夜時(shí)段,從下午15:00到第二天05:00。
使用以下數(shù)據(jù)模擬:daySession = 08:45:00.000 : 13:45:00.000
nightSession = 15:00:00.000 : 05:00:00.000
n = 1000000
timestamp = rand(concatDateTime(2019.11.06, daySession[0]) .. concatDateTime(2019.11.08, nightSession[1]), n).sort!()
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`A120001`A120002`A120003`A120004, n)
trade = select * from table(symbol, timestamp, price, volume) where timestamp.time() between daySession or timestamp.time()>=nightSession[0] or timestamp.time()
undef(`timestamp`price`volume`symbol)
計(jì)算7分鐘K線:barMinutes = 7
sessionsStart = [daySession[0], nightSession[0]]
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart
1.3 重疊K線窗口:使用wj函數(shù)
以上例子中,K線窗口均不重疊。若要計(jì)算重疊K線窗口,可以使用wj函數(shù)。使用wj函數(shù),可對(duì)左表中的時(shí)間列,指定相對(duì)時(shí)間范圍,在右表中進(jìn)行計(jì)算。
例子5 (每日兩個(gè)交易時(shí)段,重疊的K線窗口):模擬中國股票市場(chǎng)數(shù)據(jù),每5分鐘計(jì)算30分鐘K線。n = 1000000
sampleDate = 2019.11.07
symbols = `600519`000001`600000`601766
trade = table(take(sampleDate, n) as date,
(09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time,
rand(symbols, n) as symbol,
100+cumsum(rand(0.02, n)-0.01) as price,
rand(1000, n) as volume)
首先根據(jù)時(shí)間來生成窗口,并且用cross join來生成股票和交易窗口的組合。barWindows = table(symbols as symbol).cj(table((09:30:00.000 + 0..23 * 300000).join(13:00:00.000 + 0..23 * 300000) as time))
然后使用wj函數(shù)計(jì)算重疊窗口的K線數(shù)據(jù):OHLC = wj(barWindows, trade, 0:(30*60*1000),
, `symbol`time)
1.4 使用交易量劃分K線窗口
上面的例子我們均使用時(shí)間作為劃分K線窗口的維度。在實(shí)踐中,也可以使用其他維度作為劃分K線窗口的依據(jù)。譬如用累計(jì)的交易量來計(jì)算K線。
例子6 (每日兩個(gè)交易時(shí)段,使用累計(jì)的交易量計(jì)算K線):模擬中國股票市場(chǎng)數(shù)據(jù),交易量每增加10000計(jì)算K線。n = 1000000
sampleDate = 2019.11.07
symbols = `600519`000001`600000`601766
trade = table(take(sampleDate, n) as date,
(09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time,
rand(symbols, n) as symbol,
100+cumsum(rand(0.02, n)-0.01) as price,
rand(1000, n) as volume)
volThreshold = 10000
select first(time) as barStart, first(price) as open, max(price) as high, min(price) as low, last(price) as close
from (select symbol, price, cumsum(volume) as cumvol from trade context by symbol)
group by symbol, bar(cumvol, volThreshold) as volBar
代碼采用了嵌套查詢的方法。子查詢?yōu)槊總€(gè)股票生成累計(jì)的交易量cumvol,然后在主查詢中根據(jù)累計(jì)的交易量用bar函數(shù)生成窗口。
1.5 使用MapReduce函數(shù)加速
若需從數(shù)據(jù)庫中提取較大量級(jí)的歷史數(shù)據(jù),計(jì)算K線,然后存入數(shù)據(jù)庫,可使用DolphinDB內(nèi)置的Map-Reduce函數(shù)mr進(jìn)行數(shù)據(jù)的并行讀取與計(jì)算。這種方法可以顯著提高速度。
本例使用美國股票市場(chǎng)精確到納秒的交易數(shù)據(jù)。原始數(shù)據(jù)存于"dfs://TAQ"數(shù)據(jù)庫的"trades"表中。"dfs://TAQ"數(shù)據(jù)庫采用復(fù)合分區(qū):基于交易日期Date的值分區(qū)與基于股票代碼Symbol的范圍分區(qū)。
(1) 將存于磁盤的原始數(shù)據(jù)表的元數(shù)據(jù)載入內(nèi)存:login(`admin, `123456)
db = database("dfs://TAQ")
trades = db.loadTable("trades")
(2) 在磁盤上創(chuàng)建一個(gè)空的數(shù)據(jù)表,以存放計(jì)算結(jié)果。以下代碼建立一個(gè)模板表(model),并根據(jù)此模板表的schema在數(shù)據(jù)庫"dfs://TAQ"中創(chuàng)建一個(gè)空的 OHLC 表以存放K線計(jì)算結(jié)果:model=select top 1 Symbol, Date, Time.second() as bar, PRICE as open, PRICE as high, PRICE as low, PRICE as close, SIZE as volume from trades where Date=2007.08.01, Symbol=`EBAY
if(existsTable("dfs://TAQ", "OHLC"))
db.dropTable("OHLC")
db.createPartitionedTable(model, `OHLC, `Date`Symbol)
(3) 使用mr函數(shù)計(jì)算K線數(shù)據(jù),并將結(jié)果寫入 OHLC 表中:def calcOHLC(inputTable){
tmp=select first(PRICE) as open, max(PRICE) as high, min(PRICE) as low, last(PRICE) as close, sum(SIZE) as volume from inputTable where Time.second() between 09:30:00 : 15:59:59 group by Symbol, Date, 09:30:00+bar(Time.second()-09:30:00, 5*60) as bar
loadTable("dfs://TAQ", `OHLC).append!(tmp)
return tmp.size()
}
ds = sqlDS()
mr(ds, calcOHLC, +)
在以上代碼中,ds是函數(shù)sqlDS生成的一系列數(shù)據(jù)源,每個(gè)數(shù)據(jù)源代表從一個(gè)數(shù)據(jù)分區(qū)中提取的數(shù)據(jù);自定義函數(shù)calcOHLC為Map-Reduce算法中的map函數(shù),對(duì)每個(gè)數(shù)據(jù)源計(jì)算K線數(shù)據(jù),并將結(jié)果寫入數(shù)據(jù)庫,返回寫入數(shù)據(jù)庫的K線數(shù)據(jù)的行數(shù);"+"是Map-Reduce算法中的reduce函數(shù),將所有map函數(shù)的結(jié)果,亦即寫入數(shù)據(jù)庫的K線數(shù)據(jù)的行數(shù)相加,返回寫入數(shù)據(jù)庫的K線數(shù)據(jù)總數(shù)。
2. 實(shí)時(shí)K線計(jì)算
DolphinDB database 中計(jì)算實(shí)時(shí)K線的流程如下圖所示:
DolphinDB中計(jì)算實(shí)時(shí)K線流程圖
實(shí)時(shí)數(shù)據(jù)供應(yīng)商一般會(huì)提供基于Python、Java或其他常用語言的API的數(shù)據(jù)訂閱服務(wù)。本例中使用Python來模擬接收市場(chǎng)數(shù)據(jù),通過DolphinDB Python API寫入流數(shù)據(jù)表中。DolphinDB的流數(shù)據(jù)時(shí)序聚合引擎(TimeSeriesAggregator)可以對(duì)實(shí)時(shí)數(shù)據(jù)按照指定的頻率與移動(dòng)窗口計(jì)算K線。
本例使用的模擬實(shí)時(shí)數(shù)據(jù)源為文本文件trades.csv。該文件包含以下4列(一同給出一行樣本數(shù)據(jù)):
以下三小節(jié)介紹實(shí)時(shí)K線計(jì)算的三個(gè)步驟:
2.1 使用 Python 接收實(shí)時(shí)數(shù)據(jù),并寫入DolphinDB流數(shù)據(jù)表DolphinDB 中建立流數(shù)據(jù)表share streamTable(100:0, `Symbol`Datetime`Price`Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as TradePython程序從數(shù)據(jù)源 trades.csv 文件中讀取數(shù)據(jù)寫入DolphinDB。
實(shí)時(shí)數(shù)據(jù)中Datetime的數(shù)據(jù)精度是秒,由于pandas DataFrame中僅能使用DateTime[64]即nanatimestamp類型,所以下列代碼在寫入前有一個(gè)數(shù)據(jù)類型轉(zhuǎn)換的過程。這個(gè)過程也適用于大多數(shù)數(shù)據(jù)需要清洗和轉(zhuǎn)換的場(chǎng)景。import dolphindb as ddb
import pandas as pd
import numpy as np
csv_file = "trades.csv"
csv_data = pd.read_csv(csv_file, dtype={'Symbol':str} )
csv_df = pd.DataFrame(csv_data)
s = ddb.session();
s.connect("127.0.0.1",8848,"admin","123456")
#上傳DataFrame到DolphinDB,并對(duì)Datetime字段做類型轉(zhuǎn)換
s.upload({"tmpData":csv_df})
s.run("data = select Symbol, datetime(Datetime) as Datetime, Price, Volume from tmpData")
s.run("tableInsert(Trade,data)")
2.2 實(shí)時(shí)計(jì)算K線
本例中使用時(shí)序聚合引擎createTimeSeriesAggregator函數(shù)實(shí)時(shí)計(jì)算K線數(shù)據(jù),并將計(jì)算結(jié)果輸出到流數(shù)據(jù)表OHLC中。
實(shí)時(shí)計(jì)算K線數(shù)據(jù),根據(jù)應(yīng)用場(chǎng)景不同,可以分為以下2種情況:僅在每次時(shí)間窗口結(jié)束時(shí)觸發(fā)計(jì)算
時(shí)間窗口完全不重合,例如每隔5分鐘計(jì)算過去5分鐘的K線數(shù)據(jù)
時(shí)間窗口部分重合,例如每隔1分鐘計(jì)算過去5分鐘的K線數(shù)據(jù)
在每個(gè)時(shí)間窗口結(jié)束時(shí)觸發(fā)計(jì)算,同時(shí)在每個(gè)時(shí)間窗口內(nèi)數(shù)據(jù)也會(huì)按照一定頻率更新
例如每隔1分鐘計(jì)算過去1分鐘的K線數(shù)據(jù),但最近1分鐘的K線不希望等到窗口結(jié)束后再計(jì)算。希望每隔1秒鐘更新一次
下面針對(duì)上述的幾種情況分別介紹如何使用createTimeSeriesAggregator函數(shù)實(shí)時(shí)計(jì)算K線數(shù)據(jù)。請(qǐng)根據(jù)實(shí)際需要選擇相應(yīng)場(chǎng)景創(chuàng)建時(shí)間序列聚合引擎。
2.2.1 僅在每次時(shí)間窗口結(jié)束時(shí)觸發(fā)計(jì)算
僅在每次時(shí)間窗口結(jié)束時(shí)觸發(fā)計(jì)算的情況下,又可以分為時(shí)間窗口完全不重合和部分重合兩種場(chǎng)景。這兩種情況可通過設(shè)定createTimeSeriesAggregator函數(shù)的windowSize參數(shù)和step參數(shù)以實(shí)現(xiàn)。下面具體說明。
首先定義輸出表:share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME, SYMBOL, DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC
然后根據(jù)使用場(chǎng)景不同,選擇以下任意一種場(chǎng)景創(chuàng)建時(shí)間序列聚合引擎。
場(chǎng)景一:每隔5分鐘計(jì)算一次過去5分鐘的K線數(shù)據(jù),使用以下腳本定義時(shí)序聚合引擎,其中,windowSize參數(shù)取值與step參數(shù)取值相等tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=300, metrics=, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)
場(chǎng)景二:每隔1分鐘計(jì)算過去5分鐘的K線數(shù)據(jù),可以使用以下腳本定義時(shí)序聚合引擎。其中,windowSize參數(shù)取值是step參數(shù)取值的倍數(shù)tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=60, metrics=, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)
最后,定義流數(shù)據(jù)訂閱。若此時(shí)流數(shù)據(jù)表Trade中已經(jīng)有實(shí)時(shí)數(shù)據(jù)寫入,那么實(shí)時(shí)數(shù)據(jù)會(huì)馬上被訂閱并注入聚合引擎:subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true)
場(chǎng)景一的輸出表前5行數(shù)據(jù):
2.2.2 在每個(gè)時(shí)間窗口結(jié)束觸發(fā)計(jì)算,同時(shí)按照一定頻率更新計(jì)算結(jié)果
以窗口時(shí)間1分鐘計(jì)算vwap價(jià)格為例,10:00更新了聚合結(jié)果以后,那么下一次更新至少要等到10:01。按照計(jì)算規(guī)則,這一分鐘內(nèi)即使發(fā)生了很多交易,也不會(huì)觸發(fā)任何計(jì)算。這在很多金融交易場(chǎng)景中是無法接受的,希望以更高的頻率更新信息,為此引入了時(shí)序聚合引擎的updateTime參數(shù)。
updateTime參數(shù)表示計(jì)算的時(shí)間間隔,如果沒有指定updateTime,只有在每個(gè)時(shí)間窗口結(jié)束時(shí),時(shí)間序列聚合引擎才會(huì)觸發(fā)一次計(jì)算。但如果指定了updateTime,在以下3種情況下都會(huì)觸發(fā)計(jì)算:在每個(gè)時(shí)間窗口結(jié)束時(shí),時(shí)間序列聚合引擎會(huì)觸發(fā)一次計(jì)算
每過updateTime個(gè)時(shí)間單位,時(shí)間序列聚合引擎都會(huì)觸發(fā)一次計(jì)算
如果數(shù)據(jù)進(jìn)入后超過2updateTime個(gè)時(shí)間單位(如果2updateTime不足2秒,則設(shè)置為2秒),當(dāng)前窗口中仍有未計(jì)算的數(shù)據(jù),時(shí)間序列聚合引擎會(huì)觸發(fā)一次計(jì)算
這樣就能保證時(shí)序聚合引擎能在每個(gè)時(shí)間窗口結(jié)束觸發(fā)計(jì)算,同時(shí)在每個(gè)時(shí)間窗口內(nèi)部也會(huì)按照一定頻率觸發(fā)計(jì)算。
需要說明的是,時(shí)序聚合引擎要求在使用updateTime參數(shù)時(shí),必須使用keyedTable作為輸出表。具體原因如下:若將普通的table或streamTable作為輸出表
table與streamTable不會(huì)對(duì)重復(fù)的數(shù)據(jù)進(jìn)行寫入限制,因此在數(shù)據(jù)滿足觸發(fā)updateTime的條件而還未滿足觸發(fā)step的條件時(shí),時(shí)序聚合引擎會(huì)不斷向輸出表添加同一個(gè)time的計(jì)算結(jié)果,最終得到的輸出表就會(huì)有大量時(shí)間相同的記錄,這個(gè)結(jié)果就沒有意義。
若將keyedStreamTable作為輸出表
keyedStreamTable不允許更新歷史記錄,也不允許往表中添加key值相同的記錄。往表中添加新記錄時(shí),系統(tǒng)會(huì)自動(dòng)檢查新記錄的主鍵值,如果新紀(jì)錄的主鍵值與已有記錄的主鍵值重復(fù)時(shí),新紀(jì)錄不會(huì)被寫入。在本場(chǎng)景下表現(xiàn)的結(jié)果是,在數(shù)據(jù)還沒有滿足觸發(fā)step的條件,但滿足觸發(fā)updateTime的條件時(shí),時(shí)序聚合引擎將最近窗口的計(jì)算結(jié)果寫入到輸出表,卻因?yàn)闀r(shí)間相同而被禁止寫入,updateTIme參數(shù)同樣失去了意義。
使用keyedTable作為輸出表
keyedTable允許更新,往表中添加新記錄時(shí),系統(tǒng)會(huì)自動(dòng)檢查新記錄的主鍵值,如果新紀(jì)錄的主鍵值與已有記錄的主鍵值重復(fù)時(shí),會(huì)更新表中對(duì)應(yīng)的記錄。在本場(chǎng)景下表現(xiàn)的結(jié)果是,同一個(gè)時(shí)間計(jì)算結(jié)果可能會(huì)發(fā)生更新。在數(shù)據(jù)還沒有滿足觸發(fā)step的條件,但滿足觸發(fā)updateTime的條件時(shí),計(jì)算結(jié)果會(huì)被修改為根據(jù)最近窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算的結(jié)果,而不是向輸出表中添加一條新的記錄。直到數(shù)據(jù)滿足觸發(fā)step的條件時(shí),才會(huì)向輸出表中添加新的記錄。而這個(gè)結(jié)果才是我們預(yù)期想要達(dá)到的效果,因此時(shí)序聚合引擎要求在使用updateTime參數(shù)時(shí),必須使用keyedTable作為輸出表。
例如,要計(jì)算窗口為1分鐘的K線,但最近1分鐘的K線不希望等到窗口結(jié)束后再計(jì)算。希望每隔1秒鐘都更新一次近1分鐘的K線數(shù)據(jù)。我們可以通過如下步驟實(shí)現(xiàn)這個(gè)場(chǎng)景。
首先,我們需要?jiǎng)?chuàng)建一個(gè)keyedTable作為輸出表,并將時(shí)間列和股票代碼列作為主鍵。當(dāng)有新的數(shù)據(jù)注入輸出表時(shí),如果新紀(jì)錄的時(shí)間在表中已存在,會(huì)更新表中對(duì)應(yīng)時(shí)間的記錄。這樣就能保證每次查詢時(shí)每個(gè)時(shí)刻的數(shù)據(jù)是最新的。share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC請(qǐng)注意:在使用時(shí)序聚合引擎時(shí)將keyedTable作為輸出表,若時(shí)序聚合引擎指定了keyColumn參數(shù),那么kyedTable需要同時(shí)將時(shí)間相關(guān)列和keyColumn列作為主鍵。
每隔1分鐘計(jì)算一次過去1分鐘的K線數(shù)據(jù),并且每隔1秒鐘都更新一次近1分鐘的K線數(shù)據(jù),可以使用以下腳本定義時(shí)序聚合引擎。其中,windowSize參數(shù)取值與step參數(shù)取值相等,并指定updateTime參數(shù)取值為1秒鐘,即每隔1秒種更新最近1分鐘的數(shù)據(jù)。下例中的useWindowStartTime參數(shù)則用于指定輸出表中的時(shí)間為數(shù)據(jù)窗口的起始時(shí)間。tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=60, step=60, metrics=, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol,updateTime=1, useWindowStartTime=true)請(qǐng)注意,在使用時(shí)間序列聚合引擎時(shí),windowSize必須是step的整數(shù)倍,并且step必須是updateTime的整數(shù)倍。
最后,定義流數(shù)據(jù)訂閱。若此時(shí)流數(shù)據(jù)表Trade中已經(jīng)有實(shí)時(shí)數(shù)據(jù)寫入,那么實(shí)時(shí)數(shù)據(jù)會(huì)馬上被訂閱并注入聚合引擎:subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true)
輸出表的前5行數(shù)據(jù):
2.3 在Python中展示K線數(shù)據(jù)
在本例中,聚合引擎的輸出表也定義為流數(shù)據(jù)表,客戶端可以通過Python API訂閱輸出表,并將計(jì)算結(jié)果展現(xiàn)到Python終端。
以下代碼使用Python API訂閱實(shí)時(shí)聚合計(jì)算的輸出結(jié)果表OHLC,并將結(jié)果通過print函數(shù)打印出來。import dolphindb as ddb
import pandas as pd
import numpy as np
#設(shè)定本地端口20001用于訂閱流數(shù)據(jù)
s.enableStreaming(20001)
def handler(lst):
print(lst)
# 訂閱DolphinDB(本機(jī)8848端口)上的OHLC流數(shù)據(jù)表
s.subscribe("127.0.0.1", 8848, handler, "OHLC")
也可通過Grafana等可視化系統(tǒng)來連接DolphinDB database,對(duì)輸出表進(jìn)行查詢并將結(jié)果以圖表方式展現(xiàn)。
總結(jié)
以上是生活随笔為你收集整理的PHP实现量化交易,量化交易干货丨如何使用DolphinDB计算K线的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Tudor.CutViewer.Lath
- 下一篇: php之webservice限制ip,P