最近在學習基於python的股票數據分析,其中主要用到了tushare和seaborn。tushare是一款財經類數據接口包,國內的股票數據仍是比較全的html
官網地址:http://tushare.waditu.com/index.html#id5。seaborn則是一款繪圖庫,經過seaborn能夠輕鬆地畫出簡潔漂亮的圖表,並且庫自己具備必定的統計功能。python
導入的模塊:算法
import matplotlib.pyplot as plt數據庫
import seaborn as sns編程
import seaborn.linearmodels as snsl數組
from datetime import datetimeapp
import tushare as tsless
代碼部分:dom
股票收盤價走勢曲線機器學習
sns.set_style("whitegrid")
end = datetime.today() #開始時間結束時間,選取最近一年的數據
start = datetime(end.year-1,end.month,end.day)
end = str(end)[0:10]
start = str(start)[0:10]
stock = ts.get_hist_data('300104',start,end)#選取一支股票
stock['close'].plot(legend=True ,figsize=(10,4))
plt.show()
股票日線
同理,能夠作出5日均線、10日均線以及20日均線
stock[['close','ma5','ma10','ma20']].plot(legend=True ,figsize=(10,4))
日線、5日均線、10日均線、20日均線
股票每日漲跌幅度
stock['Daily Return'] = stock['close'].pct_change()
stock['Daily Return'].plot(legend=True,figsize=(10,4))
每日漲跌幅
核密度估計
sns.kdeplot(stock['Daily Return'].dropna())
核密度估計
核密度估計+統計柱狀圖
sns.distplot(stock['Daily Return'].dropna(),bins=100)
核密度+柱狀圖
兩支股票的皮爾森相關係數
sns.jointplot(stock['Daily Return'],stock['Daily Return'],alpha=0.2)
皮爾森相關係數
多隻股票相關性計算
stock_lis=['300113','300343','300295','300315`] #隨便選取了四支互聯網相關的股票
df=pd.DataFrame()
for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)['close'] df = df.join(pd.DataFrame({stock:closing_df}),how='outer')
tech_rets = df.pct_change()
snsl.corrplot(tech_rets.dropna())
相關性
簡單地計算股票的收益與風險,衡量股票收益與風險的數值分別爲股票漲跌的平均值以及標準差,平均值爲正則說明收益是正的,標準差越大則說明股票波動大,風險也大。
rets = tech_rets.dropna()
plt.scatter(rets.mean(),rets.std())
plt.xlabel('Excepted Return')
plt.ylabel('Risk')
for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加標註 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = 'offset points', arrowprops = dict(arrowstyle = '-',connectionstyle = 'arc3,rad=-0.3'))
根據以往的經驗,每一年年末都會有一波高送轉預期行情。今天,米哥就帶你們實踐一下如何利用tushare實現高送轉預期選股。
本文主要是講述選股的思路方法,選股條件和參數你們能夠根據米哥提供的代碼自行修改。
1. 選股原理
通常來講,具有高送轉預期的個股,都具備總市值低、每股公積金高、每股收益大,流通股本少的特色。固然,也還有其它的因素,好比當前股價、經營收益變更狀況、以及以往分成送股習慣等等。
這裏咱們暫時只考慮每股公積金、每股收益、流通股本和總市值四個因素,將公積金大於等於5元,每股收益大於等於5毛,流通股本在3億如下,總市值在100億之內做爲高送轉預期目標(這些參數你們可根據本身的經驗隨意調整)。
2. 數據準備
首先要導入tushare:
import tushare as ts
調取股票基本面數據和行情數據
# 基本面數據 basic = ts.get_stock_basics() # 行情和市值數據 hq = ts.get_today_all()
3. 數據清洗整理
對獲取到的數據進行清洗和整理,只保留須要的字段。(其它字段及含義,請參考 http:// tushare.org 文檔)
#當前股價,若是停牌則設置當前價格爲上一個交易日股價 hq['trade'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1) #分別選取流通股本,總股本,每股公積金,每股收益 basedata = basic[['outstanding', 'totals', 'reservedPerShare', 'esp']] #選取股票代碼,名稱,當前價格,總市值,流通市值 hqdata = hq[['code', 'name', 'trade', 'mktcap', 'nmc']] #設置行情數據code爲index列 hqdata = hqdata.set_index('code') #合併兩個數據表 data = basedata.merge(hqdata, left_index=True, right_index=True)
4. 選股條件
根據上文提到的選股參數和條件,咱們對數據進一步處理。
將總市值和流通市值換成億元單位
data['mktcap'] = data['mktcap'] / 10000 data['nmc'] = data['nmc'] / 10000
設置參數和過濾值(這次各自調整)
#每股公積金>=5 res = data.reservedPerShare >= 5 #流通股本<=3億 out = data.outstanding <= 30000 #每股收益>=5毛 eps = data.esp >= 0.5 #總市值<100億 mktcap = data.mktcap <= 100
取並集結果:
allcrit = res & out & eps & mktcap selected = data[allcrit]
具備高送轉預期股票的結果呈現:
以上字段的含義分別爲:股票名稱、收盤價格、每股公積金、流通股本、每股收益(應該爲eps,以前發佈筆誤)、總市值和流通市值。
def jincha(context, bar_dict, his): #站上5日線 def zs5(context, bar_dict, his): ma_n = pd.rolling_mean(his, 5) temp = his - ma_n #temp_s包含了前一天站上五日線得股票代碼 temp_s = list(temp[temp>0].iloc[-1,:].dropna().index) return temp_s #站上10日線 def zs10(context, bar_dict, his): ma_n = pd.rolling_mean(his, 10) temp = his - ma_n temp_s = list(temp[temp>0].iloc[-1,:].dropna().index) return temp_s #金叉突破 def jc(context, bar_dict, his): mas = pd.rolling_mean(his,5) mal = pd.rolling_mean(his, 10) temp = mas - mal #temp_jc昨天大於0股票代碼 #temp_r前天大於0股票代碼 temp_jc = list(temp[temp>0].iloc[-1,:].dropna().index) temp_r = list(temp[temp>0].iloc[-2,:].dropna().index) temp = [] for stock in temp_jc: if stock not in temp_r: temp.append(stock) return temp #求三種條件下的股票代碼交集 con1 = zs5(context, bar_dict, his) con2 = zs10(context, bar_dict, his) con3 = jc(context, bar_dict, his) tar_list=[con1,con2,con3] tarstock = tar_list[0] for i in tar_list: tarstock = list(set(tarstock).intersection(set(i))) return tarstock
#過濾次新股、是否漲跌停、是否停牌等條件 def filcon(context,bar_dict,tar_list): def zdt_trade(stock, context, bar_dict): yesterday = history(2,'1d', 'close')[stock].values[-1] zt = round(1.10 * yesterday,2) dt = round(0.99 * yesterday,2) #last最後交易價 return dt < bar_dict[stock].last < zt filstock = [] for stock in tar_list: con1 = ipo_days(stock,context.now) > 60 con2 = bar_dict[stock].is_trading con3 = zdt_trade(stock,context,bar_dict) if con1 & con2 & con3: filstock.append(stock) return filstock
# 按平均持倉市值調倉 def for_balance(context, bar_dict): #mvalues = context.portfolio.market_value #avalues = context.portfolio.portfolio_value #per = mvalues / avalues hlist = [] for stock in context.portfolio.positions: #獲取股票及對應持倉市值 hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity]) if hlist: #按持倉市值由大到小排序 hlist = sorted(hlist,key=lambda x:x[1], reverse=True) temp = 0 for li in hlist: #計算持倉總市值 temp += li[1] for li in hlist: #平均各股持倉市值 if bar_dict[li[0]].is_trading: order_target_value(li[0], temp/len(hlist)) return
Python主成分分析算法的做用是提取樣本的主要特徵向量,從而實現數據降維的目的。
# -*- coding: utf-8 -*- """ Created on Sun Feb 28 10:04:26 2016 PCA source code @author: liudiwei """ import numpy as np import pandas as pd import matplotlib.pyplot as plt #計算均值,要求輸入數據爲numpy的矩陣格式,行表示樣本數,列表示特徵 def meanX(dataX): return np.mean(dataX,axis=0)#axis=0表示按照列來求均值,若是輸入list,則axis=1 #計算方差,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特徵 def variance(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll variance = 1./m * np.diag(X1.T * X1) return variance #標準化,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特徵 def normalize(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll X2 = np.tile(np.diag(X.T * X), (m, 1)) XNorm = X1/X2 return XNorm """ 參數: - XMat:傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特徵 - k:表示取前k個特徵值對應的特徵向量 返回值: - finalData:參數一指的是返回的低維矩陣,對應於輸入參數二 - reconData:參數二對應的是移動座標軸後的矩陣 """ def pca(XMat, k): average = meanX(XMat) m, n = np.shape(XMat) data_adjust = [] avgs = np.tile(average, (m, 1)) data_adjust = XMat - avgs covX = np.cov(data_adjust.T) #計算協方差矩陣 featValue, featVec= np.linalg.eig(covX) #求解協方差矩陣的特徵值和特徵向量 index = np.argsort(-featValue) #按照featValue進行從大到小排序 finalData = [] if k > n: print "k must lower than feature number" return else: #注意特徵向量時列向量,而numpy的二維矩陣(數組)a[m][n]中,a[1]表示第1行值 selectVec = np.matrix(featVec.T[index[:k]]) #因此這裏須要進行轉置 finalData = data_adjust * selectVec.T reconData = (finalData * selectVec) + average return finalData, reconData def loaddata(datafile): return np.array(pd.read_csv(datafile,sep="\t",header=-1)).astype(np.float) def plotBestFit(data1, data2): dataArr1 = np.array(data1) dataArr2 = np.array(data2) m = np.shape(dataArr1)[0] axis_x1 = [] axis_y1 = [] axis_x2 = [] axis_y2 = [] for i in range(m): axis_x1.append(dataArr1[i,0]) axis_y1.append(dataArr1[i,1]) axis_x2.append(dataArr2[i,0]) axis_y2.append(dataArr2[i,1]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(axis_x1, axis_y1, s=50, c='red', marker='s') ax.scatter(axis_x2, axis_y2, s=50, c='blue') plt.xlabel('x1'); plt.ylabel('x2'); plt.savefig("outfile.png") plt.show() #簡單測試 #數據來源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html def test(): X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1], [2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]] XMat = np.matrix(X).T k = 2 return pca(XMat, k) #根據數據集data.txt def main(): datafile = "data.txt" XMat = loaddata(datafile) k = 2 return pca(XMat, k) if __name__ == "__main__": finalData, reconMat = main() plotBestFit(finalData, reconMat)
通過主成分降維的數據如紅色圖案所示,藍色的是恢復的原始數據。能夠看到通過降維的數據樣本差別更加明顯。
# -*- coding: utf-8 -*- """ Created on Mon Feb 22 13:21:22 2016 K-NearestNeighbor """ import numpy as np import operator class KNNClassifier(): """This is a Nearest Neighbor classifier. """ #定義k的值 def __init__(self, k=3): self._k = k #計算新樣本與已知分類樣本的距離並從小到大排列 def _calEDistance(self, inSample, dataset): m = dataset.shape[0] diffMat = np.tile(inSample, (m,1)) - dataset sqDiffMat = diffMat**2 #每一個元素平方 sqDistances = sqDiffMat.sum(axis = 1) #求和 distances = sqDistances ** 0.5 #開根號 return distances.argsort() #按距離的從小到達排列的下標值 def _classify0(self, inX, dataSet, labels): k = self._k dataSetSize = dataSet.shape[0] diffMat = np.tile(inX, (dataSetSize,1)) - dataSet sqDiffMat = diffMat**2 sqDistances = sqDiffMat.sum(axis=1) distances = sqDistances**0.5 sortedDistIndicies = distances.argsort() classCount={} for i in range(k): voteIlabel = labels[sortedDistIndicies[i]] classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0] #對一個樣本進行分類 def _classify(self, sample, train_X, train_y): #數據類型檢測 if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: sample = np.array(sample) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") sortedDistances = self._calEDistance(sample, train_X) classCount = {} for i in range(self._k): oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別 classCount[oneVote] = classCount.get(oneVote, 0) + 1 sortedClassCount = sorted(classCount.iteritems(),\ key=operator.itemgetter(1), reverse=True) #print "the sample :", sample, "is classified as",sortedClassCount[0][0] return sortedClassCount[0][0] def classify(self, test_X, train_X, train_y): results = [] #數據類型檢測 if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: test_X = np.array(test_X) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") d = len(np.shape(test_X)) if d == 1: sample = test_X result = self._classify(sample, train_X, train_y) results.append(result) else: for i in range(len(test_X)): sample = test_X[i] result = self._classify(sample, train_X, train_y) results.append(result) return results if __name__=="__main__": train_X = [[1, 2, 0, 1, 0], [0, 1, 1, 0, 1], [1, 0, 0, 0, 1], [2, 1, 1, 0, 1], [1, 1, 0, 1, 1]] train_y = [1, 1, 0, 0, 0] clf = KNNClassifier(k = 3) sample = [[1,2,0,1,0],[1,2,0,1,1]] result = clf.classify(sample, train_X, train_y)
第二部分:KNN測試代碼
# -*- coding: utf-8 -*- """ Created on Mon Feb 22 13:21:22 2016 K-NearestNeighbor """ import numpy as np import operator class KNNClassifier(): """This is a Nearest Neighbor classifier. """ #定義k的值 def __init__(self, k=3): self._k = k #計算新樣本與已知分類樣本的距離並從小到大排列 def _calEDistance(self, inSample, dataset): m = dataset.shape[0] diffMat = np.tile(inSample, (m,1)) - dataset sqDiffMat = diffMat**2 #每一個元素平方 sqDistances = sqDiffMat.sum(axis = 1) #求和 distances = sqDistances ** 0.5 #開根號 return distances.argsort() #按距離的從小到達排列的下標值 def _classify0(self, inX, dataSet, labels): k = self._k dataSetSize = dataSet.shape[0] diffMat = np.tile(inX, (dataSetSize,1)) - dataSet sqDiffMat = diffMat**2 sqDistances = sqDiffMat.sum(axis=1) distances = sqDistances**0.5 sortedDistIndicies = distances.argsort() classCount={} for i in range(k): voteIlabel = labels[sortedDistIndicies[i]] classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0] #對一個樣本進行分類 def _classify(self, sample, train_X, train_y): #數據類型檢測 if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: sample = np.array(sample) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") sortedDistances = self._calEDistance(sample, train_X) classCount = {} for i in range(self._k): oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別 classCount[oneVote] = classCount.get(oneVote, 0) + 1 sortedClassCount = sorted(classCount.iteritems(),\ key=operator.itemgetter(1), reverse=True) #print "the sample :", sample, "is classified as",sortedClassCount[0][0] return sortedClassCount[0][0] def classify(self, test_X, train_X, train_y): results = [] #數據類型檢測 if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: test_X = np.array(test_X) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") d = len(np.shape(test_X)) if d == 1: sample = test_X result = self._classify(sample, train_X, train_y) results.append(result) else: for i in range(len(test_X)): sample = test_X[i] result = self._classify(sample, train_X, train_y) results.append(result) return results if __name__=="__main__": train_X = [[1, 2, 0, 1, 0], [0, 1, 1, 0, 1], [1, 0, 0, 0, 1], [2, 1, 1, 0, 1], [1, 1, 0, 1, 1]] train_y = [1, 1, 0, 0, 0] clf = KNNClassifier(k = 3) sample = [[1,2,0,1,0],[1,2,0,1,1]] result = clf.classify(sample, train_X, train_y)
決策樹(Decision Tree)算法:按照樣本的屬性逐步進行分類,爲了可以使分類更快、更有效。每個新分類屬性的選擇依據能夠是信息增益IG和信息增益率IGR,前者爲最基本的ID3算法,後者爲改進後的C4.5算法。
以ID3爲例,其訓練過程的編程思路以下:
(1)輸入x、y(x爲樣本,y爲label),行爲樣本,列爲樣本特徵。
(2)計算信息增益IG,獲取使IG最大的特徵。
(3)得到刪除最佳分類特徵後的樣本陣列。
(4)按照最佳分類特徵的屬性值將更新後的樣本進行歸類。
屬性值1(x1,y1) 屬性值2(x2,y2) 屬性值(x3,y3)
(5)分別對以上類別重複以上操做直至到達葉節點(遞歸調用)。
葉節點的特徵:
(1)全部的標籤值y都同樣。
(2)沒有特徵能夠繼續劃分。
測試過程的編程思路以下:
(1)讀取訓練好的決策樹。
(2)從根節點開始遞歸遍歷整個決策樹直到到達葉節點爲止。
如下爲具體代碼,訓練後的決策樹結構爲遞歸套用的字典,其是由特徵值組成的索引加上label組成的。
# -*- coding: utf-8 -*- """ Created on Mon Nov 07 09:06:37 2016 @author: yehx """ # -*- coding: utf-8 -*- """ Created on Sun Feb 21 12:17:10 2016 Decision Tree Source Code @author: liudiwei """ import os import numpy as np class DecitionTree(): """This is a decision tree classifier. """ def __init__(self, criteria='ID3'): self._tree = None if criteria == 'ID3' or criteria == 'C4.5': self._criteria = criteria else: raise Exception("criterion should be ID3 or C4.5") def _calEntropy(slef, y): ''' 功能:_calEntropy用於計算香農熵 e=-sum(pi*log pi) 參數:其中y爲數組array 輸出:信息熵entropy ''' n = y.shape[0] labelCounts = {} for label in y: if label not in labelCounts.keys(): labelCounts[label] = 1 else: labelCounts[label] += 1 entropy = 0.0 for key in labelCounts: prob = float(labelCounts[key])/n entropy -= prob * np.log2(prob) return entropy def _splitData(self, X, y, axis, cutoff): """ 參數:X爲特徵,y爲label,axis爲某個特徵的下標,cutoff是下標爲axis特徵取值值 輸出:返回數據集中特徵下標爲axis,特徵值等於cutoff的子數據集 先將特徵列從樣本矩陣裏除去,而後將屬性值爲cutoff的數據歸爲一類 """ ret = [] featVec = X[:,axis] n = X.shape[1] #特徵個數 #除去第axis列特徵後的樣本矩陣 X = X[:,[i for i in range(n) if i!=axis]] for i in range(len(featVec)): if featVec[i] == cutoff: ret.append(i) return X[ret, :], y[ret] def _chooseBestSplit(self, X, y): """ID3 & C4.5 參數:X爲特徵,y爲label 功能:根據信息增益或者信息增益率來獲取最好的劃分特徵 輸出:返回最好劃分特徵的下標 """ numFeat = X.shape[1] baseEntropy = self._calEntropy(y) bestSplit = 0.0 best_idx = -1 for i in range(numFeat): featlist = X[:,i] #獲得第i個特徵對應的特徵列 uniqueVals = set(featlist) curEntropy = 0.0 splitInfo = 0.0 for value in uniqueVals: sub_x, sub_y = self._splitData(X, y, i, value) prob = len(sub_y)/float(len(y)) #計算某個特徵的某個值的機率 curEntropy += prob * self._calEntropy(sub_y) #迭代計算條件熵 splitInfo -= prob * np.log2(prob) #分裂信息,用於計算信息增益率 IG = baseEntropy - curEntropy if self._criteria=="ID3": if IG > bestSplit: bestSplit = IG best_idx = i if self._criteria=="C4.5": if splitInfo == 0.0: pass IGR = IG/splitInfo if IGR > bestSplit: bestSplit = IGR best_idx = i return best_idx def _majorityCnt(self, labellist): """ 參數:labellist是類標籤,序列類型爲list 輸出:返回labellist中出現次數最多的label """ labelCount={} for vote in labellist: if vote not in labelCount.keys(): labelCount[vote] = 0 labelCount[vote] += 1 sortedClassCount = sorted(labelCount.iteritems(), key=lambda x:x[1], \ reverse=True) return sortedClassCount[0][0] def _createTree(self, X, y, featureIndex): """ 參數:X爲特徵,y爲label,featureIndex類型是元組,記錄X特徵在原始數據中的下標 輸出:根據當前的featureIndex建立一顆完整的樹 """ labelList = list(y) #若是全部的標籤都同樣(葉節點),直接返回標籤 if labelList.count(labelList[0]) == len(labelList): return labelList[0] #若是沒有特徵能夠繼續劃分,那麼將全部的label歸爲大多數的一類,並返回標籤 if len(featureIndex) == 0: return self._majorityCnt(labelList) #返回最佳分類特徵的下標 bestFeatIndex = self._chooseBestSplit(X,y) #返回最佳分類特徵的索引 bestFeatAxis = featureIndex[bestFeatIndex] featureIndex = list(featureIndex) #得到刪除最佳分類特徵索引後的列表 featureIndex.remove(bestFeatAxis) featureIndex = tuple(featureIndex) myTree = {bestFeatAxis:{}} featValues = X[:, bestFeatIndex] uniqueVals = set(featValues) for value in uniqueVals: #對每一個value遞歸地建立樹 sub_X, sub_y = self._splitData(X,y, bestFeatIndex, value) myTree[bestFeatAxis][value] = self._createTree(sub_X, sub_y, \ featureIndex) return myTree def fit(self, X, y): """ 參數:X是特徵,y是類標籤 注意事項:對數據X和y進行類型檢測,保證其爲array 輸出:self自己 """ if isinstance(X, np.ndarray) and isinstance(y, np.ndarray): pass else: try: X = np.array(X) y = np.array(y) except: raise TypeError("numpy.ndarray required for X,y") featureIndex = tuple(['x'+str(i) for i in range(X.shape[1])]) self._tree = self._createTree(X,y,featureIndex) return self #allow using: clf.fit().predict() def _classify(self, tree, sample): """ 用訓練好的模型對輸入數據進行分類 注意:決策樹的構建是一個遞歸的過程,用決策樹分類也是一個遞歸的過程 _classify()一次只能對一個樣本(sample)分類 """ featIndex = tree.keys()[0] #獲得數的根節點值 secondDict = tree[featIndex] #獲得以featIndex爲劃分特徵的結果 axis=featIndex[1:] #獲得根節點特徵在原始數據中的下標 key = sample[int(axis)] #獲取待分類樣本中下標爲axis的值 valueOfKey = secondDict[key] #獲取secondDict中keys爲key的value值 if type(valueOfKey).__name__=='dict': #若是value爲dict,則繼續遞歸分類 return self._classify(valueOfKey, sample) else: return valueOfKey def predict(self, X): if self._tree==None: raise NotImplementedError("Estimator not fitted, call `fit` first") #對X的類型進行檢測,判斷其是不是數組 if isinstance(X, np.ndarray): pass else: try: X = np.array(X) except: raise TypeError("numpy.ndarray required for X") if len(X.shape) == 1: return self._classify(self._tree, X) else: result = [] for i in range(X.shape[0]): value = self._classify(self._tree, X[i]) print str(i+1)+"-th sample is classfied as:", value result.append(value) return np.array(result) def show(self, outpdf): if self._tree==None: pass #plot the tree using matplotlib import treePlotter treePlotter.createPlot(self._tree, outpdf) if __name__=="__main__": trainfile=r"data\train.txt" testfile=r"data\test.txt" import sys sys.path.append(r"F:\CSU\Github\MachineLearning\lib") import dataload as dload train_x, train_y = dload.loadData(trainfile) test_x, test_y = dload.loadData(testfile) clf = DecitionTree(criteria="C4.5") clf.fit(train_x, train_y) result = clf.predict(test_x) outpdf = r"tree.pdf" clf.show(outpdf)
Python K均值聚類是一種無監督的機器學習算法,可以實現自動歸類的功能。
算法步驟以下:
(1)隨機產生K個分類中心,通常稱爲質心。
(2)將全部樣本劃分到距離最近的質心表明的分類中。(距離能夠是歐氏距離、曼哈頓距離、夾角餘弦等)
(3)計算分類後的質心,能夠用同一類中全部樣本的平均屬性來表明新的質心。
(4)重複(2)(3)兩步,直到知足如下其中一個條件:
1)分類結果沒有發生改變。
2)最小偏差(如平方偏差)達到所要求的範圍。
3)迭代總數達到設置的最大值。
常見的K均值聚類算法還有2分K均值聚類算法,其步驟以下:
(1)將全部樣本做爲一類。
(2)按照傳統K均值聚類的方法將樣本分爲兩類。
(3)對以上兩類分別再分爲兩類,且分別計算兩種狀況下偏差,僅保留偏差更小的分類;即第(2)步產生的兩類其中一類保留,另外一類進行再次分類。
(4)重複對已有類別分別進行二分類,同理保留偏差最小的分類,直到達到所須要的分類數目。
具體Python代碼以下:
# -*- coding: utf-8 -*- """ Created on Tue Nov 08 14:01:44 2016 K - means cluster """ import numpy as np class KMeansClassifier(): "this is a k-means classifier" def __init__(self, k=3, initCent='random', max_iter=500 ): self._k = k self._initCent = initCent self._max_iter = max_iter self._clusterAssment = None self._labels = None self._sse = None def _calEDist(self, arrA, arrB): """ 功能:歐拉距離距離計算 輸入:兩個一維數組 """ return np.math.sqrt(sum(np.power(arrA-arrB, 2))) def _calMDist(self, arrA, arrB): """ 功能:曼哈頓距離距離計算 輸入:兩個一維數組 """ return sum(np.abs(arrA-arrB)) def _randCent(self, data_X, k): """ 功能:隨機選取k個質心 輸出:centroids #返回一個m*n的質心矩陣 """ n = data_X.shape[1] #獲取特徵的維數 centroids = np.empty((k,n)) #使用numpy生成一個k*n的矩陣,用於存儲質心 for j in range(n): minJ = min(data_X[:, j]) rangeJ = float(max(data_X[:, j] - minJ)) #使用flatten拉平嵌套列表(nested list) centroids[:, j] = (minJ + rangeJ * np.random.rand(k, 1)).flatten() return centroids def fit(self, data_X): """ 輸入:一個m*n維的矩陣 """ if not isinstance(data_X, np.ndarray) or \ isinstance(data_X, np.matrixlib.defmatrix.matrix): try: data_X = np.asarray(data_X) except: raise TypeError("numpy.ndarray resuired for data_X") m = data_X.shape[0] #獲取樣本的個數 #一個m*2的二維矩陣,矩陣第一列存儲樣本點所屬的族的索引值, #第二列存儲該點與所屬族的質心的平方偏差 self._clusterAssment = np.zeros((m,2)) if self._initCent == 'random': self._centroids = self._randCent(data_X, self._k) clusterChanged = True for _ in range(self._max_iter): #使用"_"主要是由於後面沒有用到這個值 clusterChanged = False for i in range(m): #將每一個樣本點分配到離它最近的質心所屬的族 minDist = np.inf #首先將minDist置爲一個無窮大的數 minIndex = -1 #將最近質心的下標置爲-1 for j in range(self._k): #次迭代用於尋找最近的質心 arrA = self._centroids[j,:] arrB = data_X[i,:] distJI = self._calEDist(arrA, arrB) #計算偏差值 if distJI <</span> minDist: minDist = distJI minIndex = j if self._clusterAssment[i,0] !=minIndex: clusterChanged = True self._clusterAssment[i,:] = minIndex, minDist**2 if not clusterChanged:#若全部樣本點所屬的族都不改變,則已收斂,結束迭代 break for i in range(self._k):#更新質心,將每一個族中的點的均值做爲質心 index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值 value = np.nonzero(index_all==i) #取出全部屬於第i個簇的索引值 ptsInClust = data_X[value[0]] #取出屬於第i個簇的全部樣本點 self._centroids[i,:] = np.mean(ptsInClust, axis=0) #計算均值 self._labels = self._clusterAssment[:,0] self._sse = sum(self._clusterAssment[:,1]) def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族 #類型檢查 if not isinstance(X,np.ndarray): try: X = np.asarray(X) except: raise TypeError("numpy.ndarray required for X") m = X.shape[0]#m表明樣本數量 preds = np.empty((m,)) for i in range(m):#將每一個樣本點分配到離它最近的質心所屬的族 minDist = np.inf for j in range(self._k): distJI = self._calEDist(self._centroids[j,:], X[i,:]) if distJI <</span> minDist: minDist = distJI preds[i] = j return preds class biKMeansClassifier(): "this is a binary k-means classifier" def __init__(self, k=3): self._k = k self._centroids = None self._clusterAssment = None self._labels = None self._sse = None def _calEDist(self, arrA, arrB): """ 功能:歐拉距離距離計算 輸入:兩個一維數組 """ return np.math.sqrt(sum(np.power(arrA-arrB, 2))) def fit(self, X): m = X.shape[0] self._clusterAssment = np.zeros((m,2)) centroid0 = np.mean(X, axis=0).tolist() centList =[centroid0] for j in range(m):#計算每一個樣本點與質心之間初始的平方偏差 self._clusterAssment[j,1] = self._calEDist(np.asarray(centroid0), \ X[j,:])**2 while (len(centList) <</span> self._k): lowestSSE = np.inf #嘗試劃分每一族,選取使得偏差最小的那個族進行劃分 for i in range(len(centList)): index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值 value = np.nonzero(index_all==i) #取出全部屬於第i個簇的索引值 ptsInCurrCluster = X[value[0],:] #取出屬於第i個簇的全部樣本點 clf = KMeansClassifier(k=2) clf.fit(ptsInCurrCluster) #劃分該族後,所獲得的質心、分配結果及偏差矩陣 centroidMat, splitClustAss = clf._centroids, clf._clusterAssment sseSplit = sum(splitClustAss[:,1]) index_all = self._clusterAssment[:,0] value = np.nonzero(index_all==i) sseNotSplit = sum(self._clusterAssment[value[0],1]) if (sseSplit + sseNotSplit) <</span> lowestSSE: bestCentToSplit = i bestNewCents = centroidMat bestClustAss = splitClustAss.copy() lowestSSE = sseSplit + sseNotSplit #該族被劃分紅兩個子族後,其中一個子族的索引變爲原族的索引 #另外一個子族的索引變爲len(centList),而後存入centList bestClustAss[np.nonzero(bestClustAss[:,0]==1)[0],0]=len(centList) bestClustAss[np.nonzero(bestClustAss[:,0]==0)[0],0]=bestCentToSplit centList[bestCentToSplit] = bestNewCents[0,:].tolist() centList.append(bestNewCents[1,:].tolist()) self._clusterAssment[np.nonzero(self._clusterAssment[:,0] == \ bestCentToSplit)[0],:]= bestClustAss self._labels = self._clusterAssment[:,0] self._sse = sum(self._clusterAssment[:,1]) self._centroids = np.asarray(centList) def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族 #類型檢查 if not isinstance(X,np.ndarray): try: X = np.asarray(X) except: raise TypeError("numpy.ndarray required for X") m = X.shape[0]#m表明樣本數量 preds = np.empty((m,)) for i in range(m):#將每一個樣本點分配到離它最近的質心所屬的族 minDist = np.inf for j in range(self._k): distJI = self._calEDist(self._centroids[j,:],X[i,:]) if distJI <</span> minDist: minDist = distJI preds[i] = j return preds
股票漲跌幅數據是量化投資學習的基本數據資料之一,下面以Python代碼編程爲工具,得到所須要的歷史數據。主要步驟有:
(1) #按照市值從小到大的順序活得N支股票的代碼;
(2) #分別對這一百隻股票進行100支股票操做;
(3) #獲取從2016.05.01到2016.11.17的漲跌幅數據;
(4) #選取記錄大於40個的數據,去除次新股;
(5) #將文件名名爲「股票代碼.csv」。
具體代碼以下:
# -*- coding: utf-8 -*- """ Created on Thu Nov 17 23:04:33 2016 獲取股票的歷史漲跌幅,並分別存爲csv格式 @author: yehx """ import numpy as np import pandas as pd #按照市值從小到大的順序活得100支股票的代碼 df = get_fundamentals( query(fundamentals.eod_derivative_indicator.market_cap) .order_by(fundamentals.eod_derivative_indicator.market_cap.asc()) .limit(100),'2016-11-17', '1y' ) #分別對這一百隻股票進行100支股票操做 #獲取從2016.05.01到2016.11.17的漲跌幅數據 #選取記錄大於40個的數據,去除次新股 #將文件名名爲「股票代碼.csv」 for stock in range(100): priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20160501', '20161117') if priceChangeRate is None: openDays = 0 else: openDays = len(priceChangeRate) if openDays > 40: tempPrice = priceChangeRate[39:(openDays - 1)] for rate in range(len(tempPrice)): tempPrice[rate] = "%.3f" %tempPrice[rate] fileName = '' fileName = fileName.join(df['market_cap'].columns[i].split('.')) + '.csv' fileName tempPrice.to_csv(fileName)
Logistic迴歸能夠認爲是線性迴歸的延伸,其做用是對二分類樣本進行訓練,從而對達到預測新樣本分類的目的。
假設有一組已知分類的MxN維樣本X,M爲樣本數,N爲特徵維度,其相應的已知分類標籤爲Mx1維矩陣Y。那麼Logistic迴歸的實現思路以下:
(1)用一組權重值W(Nx1)對X的特徵進行線性變換,獲得變換後的樣本X’(Mx1),其目標是使屬於不一樣分類的樣本X’存在一個明顯的一維邊界。
(2)而後再對樣本X’進一步作函數變換,從而使處於一維邊界兩測的值變換到相應的範圍以內。
(3)訓練過程就是經過改變W儘量使獲得的值位於一維邊界兩側,而且與已知分類相符。
(4)對於Logistic迴歸,就是將原樣本的邊界變換到x=0這個邊界。
下面是Logistic迴歸的典型代碼:
# -*- coding: utf-8 -*- """ Created on Wed Nov 09 15:21:48 2016 Logistic迴歸分類 """ import numpy as np class LogisticRegressionClassifier(): def __init__(self): self._alpha = None #定義一個sigmoid函數 def _sigmoid(self, fx): return 1.0/(1 + np.exp(-fx)) #alpha爲步長(學習率);maxCycles最大迭代次數 def _gradDescent(self, featData, labelData, alpha, maxCycles): dataMat = np.mat(featData) #size: m*n labelMat = np.mat(labelData).transpose() #size: m*1 m, n = np.shape(dataMat) weigh = np.ones((n, 1)) for i in range(maxCycles): hx = self._sigmoid(dataMat * weigh) error = labelMat - hx #size:m*1 weigh = weigh + alpha * dataMat.transpose() * error#根據偏差修改迴歸係數 return weigh #使用梯度降低方法訓練模型,若是使用其它的尋參方法,此處能夠作相應修改 def fit(self, train_x, train_y, alpha=0.01, maxCycles=100): return self._gradDescent(train_x, train_y, alpha, maxCycles) #使用學習獲得的參數進行分類 def predict(self, test_X, test_y, weigh): dataMat = np.mat(test_X) labelMat = np.mat(test_y).transpose() #使用transpose()轉置 hx = self._sigmoid(dataMat*weigh) #size:m*1 m = len(hx) error = 0.0 for i in range(m): if int(hx[i]) > 0.5: print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 1' if int(labelMat[i]) != 1: error += 1.0 print "classify error." else: print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 0' if int(labelMat[i]) != 0: error += 1.0 print "classify error." error_rate = error/m print "error rate is:", "%.4f" %error_rate return error_rate
Naïve Bayes 分類的核心是計算條件機率P(y|x),其中y爲類別,x爲特徵向量。其意義是在x樣本出現時,它被劃分爲y類的可能性(機率)。經過計算不一樣分類下的機率,進而把樣本劃分到機率最大的一類。
根據條件機率的計算公式能夠獲得:
P(y|x) = P(y)*P(x|y)/P(x)。
因爲在計算不一樣分類機率是等式右邊的分母是相同的,因此只需比較分子的大小。而且,若是各個樣本特徵是獨立分佈的,那麼p(x
|y)等於p(xi|y)相乘。
下面以文本分類來介紹Naïve Bayes分類的應用。其思路以下:
(1)創建詞庫,即無重複的單詞表。
(2)分別計算詞庫中類別標籤出現的機率P(y)。
(3)分別計算各個類別標籤下不一樣單詞出現的機率P(xi|y)。
(4)在不一樣類別下,將待分類樣本各個特徵出現機率((xi|y)相乘,而後在乘以對應的P(y)。
(5)比較不一樣類別下(4)中結果,將待分類樣本分到取值最大的類別。
下面是Naïve Bayes 文本分類的Python代碼,其中爲了方便計算,程序中藉助log對數函數將乘法轉化爲了加法。
# -*- coding: utf-8 -*- """ Created on Mon Nov 14 11:15:47 2016 Naive Bayes Clssification """ # -*- coding: utf-8 -*- import numpy as np class NaiveBayes: def __init__(self): self._creteria = "NB" def _createVocabList(self, dataList): """ 建立一個詞庫向量 """ vocabSet = set([]) for line in dataList: print set(line) vocabSet = vocabSet | set(line) return list(vocabSet) #文檔詞集模型 def _setOfWords2Vec(self, vocabList, inputSet): """ 功能:根據給定的一行詞,將每一個詞映射到此庫向量中,出現則標記爲1,不出現則爲0 """ outputVec = [0] * len(vocabList) for word in inputSet: if word in vocabList: outputVec[vocabList.index(word)] = 1 else: print "the word:%s is not in my vocabulary!" % word return outputVec # 修改 _setOfWordsVec 文檔詞袋模型 def _bagOfWords2VecMN(self, vocabList, inputSet): """ 功能:對每行詞使用第二種統計策略,統計單個詞的個數,而後映射到此庫中 輸出:一個n維向量,n爲詞庫的長度,每一個取值爲單詞出現的次數 """ returnVec = [0]*len(vocabList) for word in inputSet: if word in vocabList: returnVec[vocabList.index(word)] += 1 # 更新此處代碼 return returnVec def _trainNB(self, trainMatrix, trainLabel): """ 輸入:訓練矩陣和類別標籤,格式爲numpy矩陣格式 功能:計算條件機率和類標籤機率 """ numTrainDocs = len(trainMatrix) #統計樣本個數 numWords = len(trainMatrix[0]) #統計特徵個數,理論上是詞庫的長度 pNeg = sum(trainLabel)/float(numTrainDocs) #計算負樣本出現的機率 p0Num = np.ones(numWords) #初始樣本個數爲1,防止條件機率爲0,影響結果 p1Num = np.ones(numWords) #做用同上 p0InAll = 2.0 #詞庫中只有兩類,因此此處初始化爲2(use laplace) p1InAll = 2.0 # 再單個文檔和整個詞庫中更新正負樣本數據 for i in range(numTrainDocs): if trainLabel[i] == 1: p1Num += trainMatrix[i] p1InAll += sum(trainMatrix[i]) else: p0Num += trainMatrix[i] p0InAll += sum(trainMatrix[i]) print p1InAll #計算給定類別的條件下,詞彙表中單詞出現的機率 #而後取log對數,解決條件機率乘積下溢 p0Vect = np.log(p0Num/p0InAll) #計算類標籤爲0時的其它屬性發生的條件機率 p1Vect = np.log(p1Num/p1InAll) #log函數默認以e爲底 #p(ci|w=0) return p0Vect, p1Vect, pNeg def _classifyNB(self, vecSample, p0Vec, p1Vec, pNeg): """ 使用樸素貝葉斯進行分類,返回結果爲0/1 """ prob_y0 = sum(vecSample * p0Vec) + np.log(1-pNeg) prob_y1 = sum(vecSample * p1Vec) + np.log(pNeg) #log是以e爲底 if prob_y0 <</span> prob_y1: return 1 else: return 0 # 測試NB算法 def testingNB(self, testSample): listOPosts, listClasses = loadDataSet() myVocabList = self._createVocabList(listOPosts) # print myVocabList trainMat=[] for postinDoc in listOPosts: trainMat.append(self._bagOfWords2VecMN(myVocabList, postinDoc)) p0V,p1V,pAb = self._trainNB(np.array(trainMat), np.array(listClasses)) print trainMat thisSample = np.array(self._bagOfWords2VecMN(myVocabList, testSample)) result = self._classifyNB(thisSample, p0V, p1V, pAb) print testSample,'classified as: ', result return result ############################################################################### def loadDataSet(): wordsList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'], ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'], ['my', 'dalmation', 'is', 'so', 'cute', ' and', 'I', 'love', 'him'], ['stop', 'posting', 'stupid', 'worthless', 'garbage'], ['mr', 'licks','ate','my', 'steak', 'how', 'to', 'stop', 'him'], ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']] classLable = [0,1,0,1,0,1] # 0:good; 1:bad return wordsList, classLable if __name__=="__main__": clf = NaiveBayes() testEntry = [['love', 'my', 'girl', 'friend'], ['stupid', 'garbage'], ['Haha', 'I', 'really', "Love", "You"], ['This', 'is', "my", "dog"]] clf.testingNB(testEntry[0]) # for item in testEntry: # clf.testingNB(item)
在進行量化投資交易編程時,咱們須要股票歷史數據做爲分析依據,下面介紹如何經過Python獲取股票歷史數據而且將結果存爲DataFrame格式。處理後的股票歷史數據下載連接爲:http://download.csdn.net/detail/suiyingy/9688505。
具體步驟以下:
具體代碼以下:
# -*- coding: utf-8 -*- """ Created on Thu Nov 17 23:04:33 2016 獲取股票的歷史漲跌幅,先合併爲DataFrame後存爲csv格式 @author: yehx """ import numpy as np import pandas as pd #按照市值從小到大的順序得到50支股票的代碼 df = get_fundamentals( query(fundamentals.eod_derivative_indicator.market_cap) .order_by(fundamentals.eod_derivative_indicator.market_cap.asc()) .limit(50),'2016-11-17', '1y' ) b1= {} priceChangeRate_300 = get_price_change_rate('000300.XSHG', '20060101', '20161118') df300 = pd.DataFrame(priceChangeRate_300) lenReference = len(priceChangeRate_300) dfout = df300 dflen = pd.DataFrame() dflen['000300.XSHG'] = [lenReference] #分別對這一百隻股票進行50支股票操做 #獲取從2006.01.01到2016.11.17的漲跌幅數據 #將數據存到DataFrame中 #DataFrame存爲csv文件 for stock in range(50): priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20150101', '20161118') if priceChangeRate is None: openDays = 0 else: openDays = len(priceChangeRate) dftempPrice = pd.DataFrame(priceChangeRate) tempArr = [] for i in range(lenReference): if df300.index[i] in list(dftempPrice.index): #保存爲4位有效數字 tempArr.append( "%.4f" %((dftempPrice.loc[str(df300.index[i])][0]))) pass else: tempArr.append(float(0.0)) fileName = '' fileName = fileName.join(df['market_cap'].columns[stock].split('.')) dfout[fileName] = tempArr dflen[fileName] = [len(priceChangeRate)] dfout = dfout.append(dflen) dfout.to_csv('00050.csv')
從網上下載的股票歷史數據每每不能直接使用,須要轉換爲本身所須要的格式。下面以Python代碼編程爲工具,將csv文件中存儲的股票歷史數據提取出來並處理。處理的數據結果爲是30天漲跌幅子數據庫,下載地址爲:http://download.csdn.net/detail/suiyingy/9688605。
主要步驟有(Python csv數據讀寫):
具體代碼以下:
# -*- coding: utf-8 -*- """ Created on Thu Nov 17 23:04:33 2016 csv格式股票歷史漲跌幅數據處理 @author: yehx """ import numpy as np import pandas as pd import random import csv import sys reload(sys) sys.setdefaultencoding('utf-8') ''' - 加載csv格式數據 ''' def loadCSVfile1(datafile): filelist = [] with open(datafile) as file: lines = csv.reader(file) for oneline in lines: filelist.append(oneline) filelist = np.array(filelist) return filelist #數據處理 #隨機選取30個歷史漲跌幅數據 #構建本身的數據庫 def dataProcess(dataArr, subLen): totLen, totWid = np.shape(data) print totLen, totWid lenArr = dataArr[totLen-1,2:totWid] columnCnt = 1 dataOut = [] for lenData in lenArr: columnCnt = columnCnt + 1 N60 = int(lenData) / (2 * subLen) print N60 if N60 > 0: randIndex = random.sample(range(totLen-int(lenData)-1,totLen-subLen), N60) for i in randIndex: dataOut.append(dataArr[i:(i+subLen),columnCnt]) dataOut = np.array(dataOut) return dataOut if __name__=="__main__": datafile = "00100 (3).csv" data = loadCSVfile1(datafile) df = pd.DataFrame(data) m, n = np.shape(data) dataOut = dataProcess(data, 30) m, n = np.shape(dataOut) #保存處理結果 csvfile = file('csvtest.csv', 'wb') writer = csv.writer(csvfile) writer.writerows(dataOut) csvfile.close()
http://blog.sina.com.cn/s/articlelist_6017673753_0_1.html