日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

#记录 配对交易学习

發布時間:2024/3/12 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 #记录 配对交易学习 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
import pandas as pd import numpy as np from pandas_datareader import data as web import fix_yahoo_finance as yf import tushare as ts

#提取中國銀行和浦發銀行的調整后收盤數據

PAf = web.get_data_yahoo('601988.SS', start = '2014-01-01', end = '2015-01-01') PBf = web.get_data_yahoo('600000.SS', start ='2014-01-01', end = '2015-01-01')

#PAF= ts.get_k_data(code=‘601988’,start=‘2014-01-01’,end=‘2015-01-01’)

PAf = PAf['Adj Close'] PBf = PBf['Adj Close'] #將兩只股票數據合在一起形成Dataframe pairf = pd.concat([PAf,PBf],axis=1)

#求形成期長度

len(pairf)

#最小距離法
#構造標準化價格之差平方累積SSD函數

def SSD(priceX,priceY):if priceX is None or priceY is None:print('缺少價格序列')returnX= (priceX - priceX.shift(1))/priceX.shift(1)[1:]returnY= (priceY - priceY.shift(1))/priceY.shift(1)[1:]standardX = (returnX +1).cumprod()standardY = (returnY +1).cumprod()SSD= np.sum((standardX-standardY)**2)return(SSD)

#協整模型

from arch.unitroot import ADF

#檢驗中國銀行對數價格的一階單整性

PAflog = np.log(PAf) PBflog = np.log(PBf)

#對中國銀行對數價格進行單位根檢驗

adfA = ADF(PAflog) print(adfA.summary().as_text())adfB = ADF(PBflog) print(adfB.summary().as_text())

#將中國銀行對數價格差分

retA = PAflog.diff()[1:] retB = PBflog.diff()[1:]adfretA = ADF(retA) print(adfretA.summary().as_text())adfretB = ADF(retB) print(adfretB.summary().as_text())

#畫出中國銀行和浦發銀行股票對數價格時序圖

import matplotlib.pyplot as plt plt.rcParams['font.sans-serif']=['SimHei']PAflog.plot(label='601988ZGYH',style='--') PBflog.plot(label='600000ZGLT',style='-') plt.legend(loc='upper left') plt.title('中國銀行與浦發銀行的對數價格時序圖') plot.show()

#繪制股票對數價格差分的時序圖

retA.plot(label='601988ZGYH') retB.plot(label='600000ZGLT') plt.legend(loc='lower left') plt.title('中國銀行與浦發銀行的對數價格差分(收益率)') plt.show()

#回歸分析
#因變量是中國銀行(A)股票的對數價格
#自變量是浦發銀行(B)股票的對數價格

import statsmodels.api as smmodel=sm.OLS(PBflog, sm.add_constant(PAflog)) results = model.fit() print(results.summary())

#將浦發銀行股票的對數價格與中國銀行股票的對數價格做線型回歸后,對回歸殘差進行平穩性檢驗。

#提取回歸截距項和系數

alpha = results.params[0] beta = results.params[1] #求殘差 spread = PBflog-beta*PAflog-alpha spread.plot() plt.title('價差序列') plt.show()

#價差序列單位根檢驗

adfspread = ADF(spread,trend='nc') print(adfspread.summary().as_text())

‘最小距離法交易策略’
#中國銀行和浦發銀行標準化價格

standardA = (1+retA).cumprod() standardB = (1+retB).cumprod()

#求中國銀行和浦發銀行標準化價格序列的價差

SSD_pair = standardB-standardA SSD_pair.head()meanSSD_pair = np.mean(SSD_pair) sdSSD_pair = np.std(SSD_pair) thresholdUp = meanSSD_pair+1.2*sdSSD_pair thresholdDown = meanSSD_pair-1.2*sdSSD_pairSSD_pair.plot() plt.title('中國銀行與浦發銀行標準化價差序列(形成期)') plt.axhline(y=meanSSD_pair,color='black') plt.axhline(y=thresholdUp,color='green') plt.axhline(y=thresholdDown,color='green') plt.show()

#構建pairtrading 類

import re import pandas as pd import numpy as np from arch.unitroot import ADF import statsmodels.api as sm class PairTrading:def SSD(self,priceX,priceY):if priceX is None or priceY is None:print('缺少價格序列')returnX = (priceX - priceX.shift(1)) / priceX.shift(1)[1:]returnY = (priceY - priceY.shift(1)) / priceY.shift(1)[1:]standardX = (returnX + 1).cumprod()standardY = (returnY + 1).cumprod()SSD = np.sum((standardX - standardY) ** 2)return (SSD)def SSDSpread(self,priceX,priceY):if priceX is None or priceY is None:print('缺少價格序列')retx = (priceX - priceX.shift(1)) / priceX.shift(1)[1:]rety = (priceY - priceY.shift(1)) / priceY.shift(1)[1:]standardX = (1+retx).cumprod()standardY = (1+rety).cumprod()spread = standardY - standardXreturn(spread)def cointegration(self,priceX,priceY):if priceX is None or priceY is None:print('缺少價格序列')priceX = np.log(priceX)priceY = np.log(priceY)results = sm.OLS(priceY,sm.add_constant(priceX)).fit()resid = results.residadfSpread = ADF(resid)if adfSpread.pvalue >= 0.05:print('''交易價格不具有協整關系,P-value of ADF test: %fCoefficients of regression: Intercept: %fBeta: %f'''% (adfSpread.pvalue, results.params[0],results.params[1]))return(None)else:print('''交易價格具有協整關系,P-value of ADF test: %fCoefficients of regression: Intercept: %fBeta: %f''' % (adfSpread.pvalue, results.params[0], results.params[1]))return(results.params[0], results.params[1])def CointegrationSpread(self,priceX,priceY,formPeriod, tradePeriod):if priceX is None or priceY is None:print('缺少價格序列')if not (re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',formPeriod)or re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',tradePeriod)):print('形成期交易期格式錯誤.')formX = priceX[formPeriod.split(':')[0]:formPeriod.split(':')[1]]formY = priceY[formPeriod.split(':')[0]:formPeriod.split(':')[1]]coefficients = self.cointegration(formX,formY)if coefficients is None:print('未形成協整關系,無法配對.')else:spread=(np.log(priceY[tradePeriod.split(':')[0]:tradePeriod.split(':')[1]])-coefficients[0]-coefficients[1]*np.log(priceX[tradePeriod.split(':')[0]:tradePeriod.split(':')[1]]))return(spread)def calBound(self,priceX,priceY,method,formPeriod,width=1.5):if not (re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',formPeriod)or re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',tradePeriod)):print('形成期格式錯誤.')if method=='SSD':spread=self.SSDSpread(priceX[formPeriod.split(':')[0]:formPeriod.split(':')[1]],priceY[formPeriod.split(':')[0]:formPeriod.split(':')[1]])mu = np.mean(spread)sd = np.std(spread)UpperBound = mu+width*sdLowerBound = mu-width*sdreturn(UpperBound,LowerBound)elif method=='Cointegration':spread=self.CointegrationSpread(priceX,priceY,formPeriod,formPeriod)mu = np.mean(spread)sd = np.std(spread)UpperBound = mu + width * sdLowerBound = mu - width * sdreturn (UpperBound, LowerBound)else:print('不存在該方法,請選擇“SSD"或是”Cointegration".')

testing

import pandas as pd import numpy as np from pandas_datareader import data as web import fix_yahoo_finance as yf import tushare as tsformPeriod = '2014-01-01:2015-01-01' tradePeriod = '2015-01-01:2015-06-30' priceA = web.get_data_yahoo('601988.SS', start = '2014-01-01', end = '2015-06-30') priceB = web.get_data_yahoo('600000.SS', start ='2014-01-01', end = '2015-06-30') priceA = priceA['Adj Close'] priceB = priceB['Adj Close'] priceAf = priceA[formPeriod.split(':')[0]:formPeriod.split(":")[1]] priceBf = priceB[formPeriod.split(':')[0]:formPeriod.split(":")[1]] priceAt =priceA[tradePeriod.split(':')[0]:tradePeriod.split(":")[1]] priceBt = priceB[tradePeriod.split(':')[0]:tradePeriod.split(":")[1]] pt=PairTrading()SSD=pt.SSD(priceAf,priceBf)SSDspread = pt.SSDSpread(priceAf,priceBf) SSDspread.describe() SSDspread.head()coefficients=pt.cointegration(priceAf,priceBf)CoSpreadF=pt.CointegrationSpread(priceA,priceB,formPeriod,formPeriod) CoSpreadTr=pt.CointegrationSpread(priceA,priceB,formPeriod,tradePeriod)bound = pt.calBound(priceA,priceB,'Cointegration',formPeriod,width=1.2)CoSpreadTr.plot() plt.axhline(bound[0],color='black') plt.axhline(bound[1],color='black') plt.show()logPBf=np.log(priceBf) logPAf=np.log(priceAf)spreadf=logPBf-beta*logPAf-alpha adfSpread=ADF(spreadf) print(adfSpread.summary().as_text())CoSpreadT=logPBf-beta*logPAf-alpha mu=np.mean(spreadf) sd=np.std(spreadf)

#設定交易期

tradeStart = '2015-01-01' tradeEnd='2015-06-30'

#繪制價差區間圖

CoSpreadTr.plot() plt.title('交易期價差序列(協整配對)') plt.axhline(y=mu,color='black') plt.axhline(y=mu+0.2*sd,color='blue',ls='-',lw=2) plt.axhline(y=mu-0.2*sd,color='blue',ls='-',lw=2) plt.axhline(y=mu+1.5*sd,color='green',ls='--',lw=2.5) plt.axhline(y=mu-1.5*sd,color='green',ls='--',lw=2.5) plt.axhline(y=mu+2.5*sd,color='red',ls='-.',lw=3) plt.show()

#根據開倉平倉點制定交易策略,并模擬交易賬戶。

level =(float('-inf'),mu-2.5*sd,mu-1.5*sd,mu-0.2*sd,mu+0.2*sd,mu+1.5*sd,mu+2.5*sd,float('inf')) prcLevel = pd.cut(CoSpreadTr,level,labels=False)-3 prcLevel.head()

#構造交易新號函數

def TradeSig(prcLevel):n= len(prcLevel)signal = np.zeros(n)for i in range(1,n):if prcLevel[i-1]==1 and prcLevel[i]==2:signal[i]=-2elif prcLevel[i-1]==1 and prcLevel[i]==0:signal[i]=2elif prcLevel[i-1]==2 and prcLevel[i]==3:signal[i]=3elif prcLevel[i-1]==-1 and prcLevel[i]==-2:signal[i]=1elif prcLevel[i-1]==-1 and prcLevel[i]==0:signal[i]=-1elif prcLevel[i-1]==-2 and prcLevel[i]==-3:signal[i]=-3return(signal)signal = TradeSig(prcLevel) position=[signal[0]] ns=len(signal)for i in range (1,ns):position.append(position[-1])if signal[i]==1:position[i]=1elif signal[i]==-2:position[i]==-1elif signal[i]==1 and postion[i-1]==1:position[i]=0elif signal[i]==2 and position[i-1]==-1:position[i]=0elif signal[i]==3:position[i]=0elif signal[i]==-3:position[i]=0position = pd.Series(position,index=CoSpreadTr.index) position.tail()def TraddeSim(priceX, priceY, position):n=len(position)size=1000shareY = size*positionshareX = [(-beta)*shareY[0]*priceY[0]/priceX[0]]cash=[2000]for i in range (1,n):shareX.append(shareX[i-1])cash.append(cash[i-1])if position[i-1]==0 and position[i]==1:shareX[i]=[(-beta)*shareY[i]*priceY[i]/priceX[i]]cash[i]=cash[i-1]-(shareY[i]*priceY[i]+ np.array(shareX[i])*priceX[i])elif position[i-1]==0 and position[i]==-1:shareX[i] = [(-beta) * shareY[i] * priceY[i] / priceX[i]]cash[i]=cash[i-1]-(shareY[i]*priceY[i]+np.array(shareX[i])*priceX[i])elif position[i-1]==1 and position[i]==0:shareX[i]=0cash[i]=cash[i-1]+(shareY[i-1]*priceY[i]+np.array(shareX[i-1])*priceX[i])elif position[i-1]==-1 and position[i]==0:shareX[i]=0cash[i]=cash[i-1]+(shareY[i-1]*priceY[i]+np.array(shareX[i-1])*priceX[i])cash = pd.Series(cash,index=position.index)shareY = pd.Series(shareY,index=position.index)shareX = pd.Series(shareX,index=position.index)asset = cash + shareY*priceY +pd.Series(shareX)*priceXaccount =pd.DataFrame({'Position':position,'ShareY':shareY,'ShareX':shareX,'Cash':cash,'Asset':asset})account.index=position.indexreturn(account)account = TradeSim(priceAt,priceBt,position) account.iloc[:,0].plot() account.iloc[:,1].plot() account.iloc[:,2].plot() plt.show()

總結

以上是生活随笔為你收集整理的#记录 配对交易学习的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。