Python金融量化

Python股票數據分析

最近在學習基於python的股票數據分析,其中主要用到了tushare和seaborn。tushare是一款財經類數據接口包,國內的股票數據仍是比較全的html

官網地址:http://tushare.waditu.com/index.html#id5。seaborn則是一款繪圖庫,經過seaborn能夠輕鬆地畫出簡潔漂亮的圖表,並且庫自己具備必定的統計功能。python

  導入的模塊:算法

import matplotlib.pyplot as plt數據庫

  import seaborn as sns編程

  import seaborn.linearmodels as snsl數組

from datetime import datetimeapp

  import tushare as tsless

代碼部分:dom

  股票收盤價走勢曲線機器學習

  sns.set_style("whitegrid")

  end = datetime.today() #開始時間結束時間,選取最近一年的數據

  start = datetime(end.year-1,end.month,end.day)

  end = str(end)[0:10]

  start = str(start)[0:10]

stock = ts.get_hist_data('300104',start,end)#選取一支股票

  stock['close'].plot(legend=True ,figsize=(10,4))

  plt.show()

股票日線

同理,能夠作出5日均線、10日均線以及20日均線

  stock[['close','ma5','ma10','ma20']].plot(legend=True ,figsize=(10,4))

日線、5日均線、10日均線、20日均線

股票每日漲跌幅度

  stock['Daily Return'] = stock['close'].pct_change()

  stock['Daily Return'].plot(legend=True,figsize=(10,4))

每日漲跌幅

核密度估計

  sns.kdeplot(stock['Daily Return'].dropna())

核密度估計

核密度估計+統計柱狀圖

  sns.distplot(stock['Daily Return'].dropna(),bins=100)

核密度+柱狀圖

兩支股票的皮爾森相關係數

  sns.jointplot(stock['Daily Return'],stock['Daily Return'],alpha=0.2)

皮爾森相關係數

多隻股票相關性計算

  stock_lis=['300113','300343','300295','300315`] #隨便選取了四支互聯網相關的股票

  df=pd.DataFrame()

  for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)['close'] df = df.join(pd.DataFrame({stock:closing_df}),how='outer')

  tech_rets = df.pct_change()

  snsl.corrplot(tech_rets.dropna())

  相關性

簡單地計算股票的收益與風險,衡量股票收益與風險的數值分別爲股票漲跌的平均值以及標準差,平均值爲正則說明收益是正的,標準差越大則說明股票波動大,風險也大。

  rets = tech_rets.dropna()

  plt.scatter(rets.mean(),rets.std())

  plt.xlabel('Excepted Return')

  plt.ylabel('Risk')

  for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加標註 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = 'offset points', arrowprops = dict(arrowstyle = '-',connectionstyle = 'arc3,rad=-0.3'))

聲明:本文由入駐搜狐公衆平臺的做者撰寫,除搜狐官方帳號外,觀點僅表明做者本人,不表明搜狐立場。

 

 

用Python分析公開數據選出高送轉預期股票

根據以往的經驗,每一年年末都會有一波高送轉預期行情。今天,米哥就帶你們實踐一下如何利用tushare實現高送轉預期選股。

本文主要是講述選股的思路方法,選股條件和參數你們能夠根據米哥提供的代碼自行修改。

1. 選股原理

通常來講,具有高送轉預期的個股,都具備總市值低、每股公積金高、每股收益大,流通股本少的特色。固然,也還有其它的因素,好比當前股價、經營收益變更狀況、以及以往分成送股習慣等等。

這裏咱們暫時只考慮每股公積金、每股收益、流通股本和總市值四個因素,將公積金大於等於5元,每股收益大於等於5毛,流通股本在3億如下,總市值在100億之內做爲高送轉預期目標(這些參數你們可根據本身的經驗隨意調整)。

2. 數據準備

首先要導入tushare:

import tushare as ts

調取股票基本面數據和行情數據

# 基本面數據 basic = ts.get_stock_basics() # 行情和市值數據 hq = ts.get_today_all()

3. 數據清洗整理

對獲取到的數據進行清洗和整理,只保留須要的字段。(其它字段及含義,請參考 http:// tushare.org 文檔)

#當前股價,若是停牌則設置當前價格爲上一個交易日股價 hq['trade'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1) #分別選取流通股本,總股本,每股公積金,每股收益 basedata = basic[['outstanding', 'totals', 'reservedPerShare', 'esp']] #選取股票代碼,名稱,當前價格,總市值,流通市值 hqdata = hq[['code', 'name', 'trade', 'mktcap', 'nmc']] #設置行情數據code爲index列 hqdata = hqdata.set_index('code') #合併兩個數據表 data = basedata.merge(hqdata, left_index=True, right_index=True)

4. 選股條件

根據上文提到的選股參數和條件,咱們對數據進一步處理。

將總市值和流通市值換成億元單位
data['mktcap'] = data['mktcap'] / 10000 data['nmc'] = data['nmc'] / 10000

設置參數和過濾值(這次各自調整)

#每股公積金>=5 res = data.reservedPerShare >= 5 #流通股本<=3億 out = data.outstanding <= 30000 #每股收益>=5毛 eps = data.esp >= 0.5 #總市值<100億 mktcap = data.mktcap <= 100

取並集結果:

allcrit = res & out & eps & mktcap
selected = data[allcrit]

具備高送轉預期股票的結果呈現:

以上字段的含義分別爲:股票名稱、收盤價格、每股公積金、流通股本、每股收益(應該爲eps,以前發佈筆誤)、總市值和流通市值。

https://zhuanlan.zhihu.com/p/23829205

 

 

 

Python 金叉斷定

def jincha(context, bar_dict, his):

    #站上5日線

    def zs5(context, bar_dict, his):

        ma_n = pd.rolling_mean(his, 5)

        temp = his - ma_n

​       #temp_s包含了前一天站上五日線得股票代碼

        temp_s = list(temp[temp>0].iloc[-1,:].dropna().index)

        return temp_s

    #站上10日線

    def zs10(context, bar_dict, his):

        ma_n = pd.rolling_mean(his, 10)

        temp = his - ma_n

        temp_s = list(temp[temp>0].iloc[-1,:].dropna().index)

        return temp_s

    

    #金叉突破

    def jc(context, bar_dict, his):

        mas = pd.rolling_mean(his,5)

        mal = pd.rolling_mean(his, 10)

        temp = mas - mal

        #temp_jc昨天大於0股票代碼

        #temp_r前天大於0股票代碼​

        temp_jc = list(temp[temp>0].iloc[-1,:].dropna().index)

        temp_r = list(temp[temp>0].iloc[-2,:].dropna().index)

        temp = []

        for stock in temp_jc:

            if stock not in temp_r:

                temp.append(stock)

        return temp

    

    #求三種條件下的股票代碼交集

    con1 = zs5(context, bar_dict, his)

    con2 = zs10(context, bar_dict, his)

    con3 = jc(context, bar_dict, his)

    tar_list=[con1,con2,con3]​

    tarstock = tar_list[0]

    for i in tar_list:

        tarstock = list(set(tarstock).intersection(set(i)))

    return tarstock
View Code

 

Python 過濾次新股、停牌、漲跌停

#過濾次新股、是否漲跌停、是否停牌等條件

def filcon(context,bar_dict,tar_list):

    def zdt_trade(stock, context, bar_dict):

        yesterday = history(2,'1d', 'close')[stock].values[-1]

        zt = round(1.10 * yesterday,2)

        dt = round(0.99 * yesterday,2)

        #last最後交易價

        return dt < bar_dict[stock].last < zt

    filstock = []

    for stock in tar_list:

        con1 = ipo_days(stock,context.now) > 60

        con2 = bar_dict[stock].is_trading

        con3 = zdt_trade(stock,context,bar_dict)

        if con1 & con2 & con3:

                filstock.append(stock)

    return filstock
View Code

 

Python 按平均持倉市值調倉

# 按平均持倉市值調倉
def for_balance(context, bar_dict):
    #mvalues = context.portfolio.market_value
    #avalues = context.portfolio.portfolio_value
    #per = mvalues / avalues
    hlist = []
    for stock in context.portfolio.positions:
        #獲取股票及對應持倉市值 
        hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity])
    
    if hlist:
        #按持倉市值由大到小排序
        hlist = sorted(hlist,key=lambda x:x[1], reverse=True)
        temp = 0
        for li in hlist:
            #計算持倉總市值
            temp += li[1]
        for li in hlist:
            #平均各股持倉市值
            if bar_dict[li[0]].is_trading:
                order_target_value(li[0], temp/len(hlist))
    return
View Code

 

Python PCA主成分分析算法

Python主成分分析算法的做用是提取樣本的主要特徵向量,從而實現數據降維的目的。

# -*- coding: utf-8 -*-
"""
Created on Sun Feb 28 10:04:26 2016
PCA source code
@author: liudiwei
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

#計算均值,要求輸入數據爲numpy的矩陣格式,行表示樣本數,列表示特徵    
def meanX(dataX):
    return np.mean(dataX,axis=0)#axis=0表示按照列來求均值,若是輸入list,則axis=1


#計算方差,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特徵    
def variance(X):
    m, n = np.shape(X)
    mu = meanX(X)
    muAll = np.tile(mu, (m, 1))    
    X1 = X - muAll
    variance = 1./m * np.diag(X1.T * X1)
    return variance

#標準化,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特徵    
def normalize(X):
    m, n = np.shape(X)
    mu = meanX(X)
    muAll = np.tile(mu, (m, 1))    
    X1 = X - muAll
    X2 = np.tile(np.diag(X.T * X), (m, 1))
    XNorm = X1/X2
    return XNorm

"""
參數:
    - XMat:傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特徵    
    - k:表示取前k個特徵值對應的特徵向量
返回值:
    - finalData:參數一指的是返回的低維矩陣,對應於輸入參數二
    - reconData:參數二對應的是移動座標軸後的矩陣
"""  
def pca(XMat, k):
    average = meanX(XMat) 
    m, n = np.shape(XMat)
    data_adjust = []
    avgs = np.tile(average, (m, 1))
    data_adjust = XMat - avgs
    covX = np.cov(data_adjust.T)   #計算協方差矩陣
    featValue, featVec=  np.linalg.eig(covX)  #求解協方差矩陣的特徵值和特徵向量
    index = np.argsort(-featValue) #按照featValue進行從大到小排序
    finalData = []
    if k > n:
        print "k must lower than feature number"
        return
    else:
        #注意特徵向量時列向量,而numpy的二維矩陣(數組)a[m][n]中,a[1]表示第1行值
        selectVec = np.matrix(featVec.T[index[:k]]) #因此這裏須要進行轉置
        finalData = data_adjust * selectVec.T 
        reconData = (finalData * selectVec) + average  
    return finalData, reconData

def loaddata(datafile):
    return np.array(pd.read_csv(datafile,sep="\t",header=-1)).astype(np.float)


def plotBestFit(data1, data2):    
    dataArr1 = np.array(data1)
    dataArr2 = np.array(data2)
    
    m = np.shape(dataArr1)[0]
    axis_x1 = []
    axis_y1 = []
    axis_x2 = []
    axis_y2 = []
    for i in range(m):
        axis_x1.append(dataArr1[i,0])
        axis_y1.append(dataArr1[i,1])
        axis_x2.append(dataArr2[i,0]) 
        axis_y2.append(dataArr2[i,1])                
    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.scatter(axis_x1, axis_y1, s=50, c='red', marker='s')
    ax.scatter(axis_x2, axis_y2, s=50, c='blue')
    plt.xlabel('x1'); plt.ylabel('x2');
    plt.savefig("outfile.png")
    plt.show() 

#簡單測試
#數據來源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html
def test():
    X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1],
         [2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]]
    XMat = np.matrix(X).T  
    k = 2
    return pca(XMat, k)

#根據數據集data.txt
def main():    
    datafile = "data.txt"
    XMat = loaddata(datafile)
    k = 2
    return pca(XMat, k)
    
if __name__ == "__main__":
    finalData, reconMat = main()
    plotBestFit(finalData, reconMat)
View Code

通過主成分降維的數據如紅色圖案所示,藍色的是恢復的原始數據。能夠看到通過降維的數據樣本差別更加明顯。

 

Python KNN最近鄰分類算法

KNN最近鄰算法:利用向量之間的距離來分類。
步驟:
第一步:計算新樣本與已知分類樣本之間的距離。
第二步:將所求距離按從小到大排列。
第三步:選取距離最近的k個樣本。
第四步:將新樣本歸爲以上k個樣本大多數中的一類。
如下爲KNN最近鄰分類算法的python代碼:
第一部分:KNN分類代碼
# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 13:21:22 2016
K-NearestNeighbor
"""
import numpy as np
import operator

class KNNClassifier():
    """This is a Nearest Neighbor classifier. """

    #定義k的值
    def __init__(self, k=3):
        self._k = k

    #計算新樣本與已知分類樣本的距離並從小到大排列    
    def _calEDistance(self, inSample, dataset):
        m = dataset.shape[0]
        diffMat = np.tile(inSample, (m,1)) - dataset
        sqDiffMat = diffMat**2 #每一個元素平方
        sqDistances = sqDiffMat.sum(axis = 1)  #求和
        distances = sqDistances ** 0.5 #開根號
        return distances.argsort()  #按距離的從小到達排列的下標值

    
    def _classify0(self, inX, dataSet, labels):
        k = self._k
        dataSetSize = dataSet.shape[0]                  
        diffMat = np.tile(inX, (dataSetSize,1)) - dataSet      
        sqDiffMat = diffMat**2
        sqDistances = sqDiffMat.sum(axis=1)                  
        distances = sqDistances**0.5
        sortedDistIndicies = distances.argsort()            
        classCount={}                                      
        for i in range(k):
            voteIlabel = labels[sortedDistIndicies[i]]
            classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
        sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True)
        return sortedClassCount[0][0]
        
    #對一個樣本進行分類
    def _classify(self, sample, train_X, train_y):
        #數據類型檢測
        if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                sample = np.array(sample)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        sortedDistances = self._calEDistance(sample, train_X)
        classCount = {}
        for i in range(self._k):
            oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別
            classCount[oneVote] = classCount.get(oneVote, 0) + 1
        sortedClassCount = sorted(classCount.iteritems(),\
                                    key=operator.itemgetter(1), reverse=True)
        #print "the sample :", sample, "is classified as",sortedClassCount[0][0]    
        return sortedClassCount[0][0]
    
    
    def classify(self, test_X, train_X, train_y):
        results = [] 
        #數據類型檢測
        if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                test_X = np.array(test_X)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        d = len(np.shape(test_X))
        if d == 1:
            sample = test_X
            result = self._classify(sample, train_X, train_y)
            results.append(result)
        else:
            for i in range(len(test_X)):
                sample = test_X[i]
                result = self._classify(sample, train_X, train_y)
                results.append(result)
        return results
        
        
if __name__=="__main__":
    train_X = [[1, 2, 0, 1, 0],
               [0, 1, 1, 0, 1],
               [1, 0, 0, 0, 1],
               [2, 1, 1, 0, 1],
               [1, 1, 0, 1, 1]]
    train_y = [1, 1, 0, 0, 0]
    clf = KNNClassifier(k = 3)
    sample = [[1,2,0,1,0],[1,2,0,1,1]]
    result = clf.classify(sample, train_X, train_y)
View Code

 

第二部分:KNN測試代碼

# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 13:21:22 2016
K-NearestNeighbor
"""
import numpy as np
import operator

class KNNClassifier():
    """This is a Nearest Neighbor classifier. """

    #定義k的值
    def __init__(self, k=3):
        self._k = k

    #計算新樣本與已知分類樣本的距離並從小到大排列    
    def _calEDistance(self, inSample, dataset):
        m = dataset.shape[0]
        diffMat = np.tile(inSample, (m,1)) - dataset
        sqDiffMat = diffMat**2 #每一個元素平方
        sqDistances = sqDiffMat.sum(axis = 1)  #求和
        distances = sqDistances ** 0.5 #開根號
        return distances.argsort()  #按距離的從小到達排列的下標值

    
    def _classify0(self, inX, dataSet, labels):
        k = self._k
        dataSetSize = dataSet.shape[0]                  
        diffMat = np.tile(inX, (dataSetSize,1)) - dataSet      
        sqDiffMat = diffMat**2
        sqDistances = sqDiffMat.sum(axis=1)                  
        distances = sqDistances**0.5
        sortedDistIndicies = distances.argsort()            
        classCount={}                                      
        for i in range(k):
            voteIlabel = labels[sortedDistIndicies[i]]
            classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
        sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True)
        return sortedClassCount[0][0]
        
    #對一個樣本進行分類
    def _classify(self, sample, train_X, train_y):
        #數據類型檢測
        if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                sample = np.array(sample)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        sortedDistances = self._calEDistance(sample, train_X)
        classCount = {}
        for i in range(self._k):
            oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別
            classCount[oneVote] = classCount.get(oneVote, 0) + 1
        sortedClassCount = sorted(classCount.iteritems(),\
                                    key=operator.itemgetter(1), reverse=True)
        #print "the sample :", sample, "is classified as",sortedClassCount[0][0]    
        return sortedClassCount[0][0]
    
    
    def classify(self, test_X, train_X, train_y):
        results = [] 
        #數據類型檢測
        if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                test_X = np.array(test_X)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        d = len(np.shape(test_X))
        if d == 1:
            sample = test_X
            result = self._classify(sample, train_X, train_y)
            results.append(result)
        else:
            for i in range(len(test_X)):
                sample = test_X[i]
                result = self._classify(sample, train_X, train_y)
                results.append(result)
        return results
        
        
if __name__=="__main__":
    train_X = [[1, 2, 0, 1, 0],
               [0, 1, 1, 0, 1],
               [1, 0, 0, 0, 1],
               [2, 1, 1, 0, 1],
               [1, 1, 0, 1, 1]]
    train_y = [1, 1, 0, 0, 0]
    clf = KNNClassifier(k = 3)
    sample = [[1,2,0,1,0],[1,2,0,1,1]]
    result = clf.classify(sample, train_X, train_y)
View Code

 

Python 決策樹算法(ID3 &C4.5)

決策樹(Decision Tree)算法:按照樣本的屬性逐步進行分類,爲了可以使分類更快、更有效。每個新分類屬性的選擇依據能夠是信息增益IG和信息增益率IGR,前者爲最基本的ID3算法,後者爲改進後的C4.5算法。

以ID3爲例,其訓練過程的編程思路以下:

(1)輸入x、y(x爲樣本,y爲label),行爲樣本,列爲樣本特徵。

(2)計算信息增益IG,獲取使IG最大的特徵。

(3)得到刪除最佳分類特徵後的樣本陣列。

(4)按照最佳分類特徵的屬性值將更新後的樣本進行歸類。

屬性值1(x1,y1)    屬性值2(x2,y2)    屬性值(x3,y3)

(5)分別對以上類別重複以上操做直至到達葉節點(遞歸調用)。

葉節點的特徵:

(1)全部的標籤值y都同樣。

(2)沒有特徵能夠繼續劃分。

測試過程的編程思路以下:

(1)讀取訓練好的決策樹。

(2)從根節點開始遞歸遍歷整個決策樹直到到達葉節點爲止。

如下爲具體代碼,訓練後的決策樹結構爲遞歸套用的字典,其是由特徵值組成的索引加上label組成的。

 

# -*- coding: utf-8 -*-
"""
Created on Mon Nov 07 09:06:37 2016

@author: yehx
"""
# -*- coding: utf-8 -*-
"""
Created on Sun Feb 21 12:17:10 2016
Decision Tree Source Code
@author: liudiwei
"""
import os
import numpy as np

class DecitionTree():
    """This is a decision tree classifier. """
    
    def __init__(self, criteria='ID3'):
        self._tree = None
        if criteria == 'ID3' or criteria == 'C4.5':
            self._criteria = criteria
        else:
            raise Exception("criterion should be ID3 or C4.5")
    
    def _calEntropy(slef, y):
        '''
        功能:_calEntropy用於計算香農熵 e=-sum(pi*log pi)
        參數:其中y爲數組array
        輸出:信息熵entropy
        '''
        n = y.shape[0]  
        labelCounts = {}
        for label in y:
            if label not in labelCounts.keys():
                labelCounts[label] = 1
            else:
                labelCounts[label] += 1
        entropy = 0.0
        for key in labelCounts:
            prob = float(labelCounts[key])/n
            entropy -= prob * np.log2(prob)
        return entropy
    
    def _splitData(self, X, y, axis, cutoff):
        """
        參數:X爲特徵,y爲label,axis爲某個特徵的下標,cutoff是下標爲axis特徵取值值
        輸出:返回數據集中特徵下標爲axis,特徵值等於cutoff的子數據集
        先將特徵列從樣本矩陣裏除去,而後將屬性值爲cutoff的數據歸爲一類
        """
        ret = []
        featVec = X[:,axis]
        n = X.shape[1]      #特徵個數
        #除去第axis列特徵後的樣本矩陣
        X = X[:,[i for i in range(n) if i!=axis]]
        for i in range(len(featVec)):
            if featVec[i] == cutoff:
                ret.append(i)
        return X[ret, :], y[ret]   
            
    def _chooseBestSplit(self, X, y):
        """ID3 & C4.5
        參數:X爲特徵,y爲label
        功能:根據信息增益或者信息增益率來獲取最好的劃分特徵
        輸出:返回最好劃分特徵的下標
        """
        numFeat = X.shape[1]
        baseEntropy = self._calEntropy(y)
        bestSplit = 0.0
        best_idx  = -1
        for i in range(numFeat):
            featlist = X[:,i]   #獲得第i個特徵對應的特徵列
            uniqueVals = set(featlist)
            curEntropy = 0.0
            splitInfo = 0.0
            for value in uniqueVals:
                sub_x, sub_y = self._splitData(X, y, i, value)
                prob = len(sub_y)/float(len(y))      #計算某個特徵的某個值的機率
                curEntropy += prob * self._calEntropy(sub_y)    #迭代計算條件熵
                splitInfo -=  prob * np.log2(prob) #分裂信息,用於計算信息增益率
            IG = baseEntropy - curEntropy
            if self._criteria=="ID3":
                if IG > bestSplit:
                    bestSplit = IG
                    best_idx = i
            if self._criteria=="C4.5":
                if splitInfo == 0.0:
                    pass
                IGR = IG/splitInfo
                if IGR > bestSplit:
                    bestSplit = IGR
                    best_idx = i
        return best_idx
        
    def _majorityCnt(self, labellist):
        """
        參數:labellist是類標籤,序列類型爲list
        輸出:返回labellist中出現次數最多的label
        """
        labelCount={}
        for vote in labellist:
            if vote not in labelCount.keys(): 
                labelCount[vote] = 0
            labelCount[vote] += 1
        sortedClassCount = sorted(labelCount.iteritems(), key=lambda x:x[1], \
                                     reverse=True)
        return sortedClassCount[0][0]

    def _createTree(self, X, y, featureIndex):
        """
        參數:X爲特徵,y爲label,featureIndex類型是元組,記錄X特徵在原始數據中的下標
        輸出:根據當前的featureIndex建立一顆完整的樹
        """
        labelList = list(y)
        #若是全部的標籤都同樣(葉節點),直接返回標籤
        if labelList.count(labelList[0]) == len(labelList): 
            return labelList[0]
        #若是沒有特徵能夠繼續劃分,那麼將全部的label歸爲大多數的一類,並返回標籤
        if len(featureIndex) == 0:
            return self._majorityCnt(labelList)
        #返回最佳分類特徵的下標
        bestFeatIndex = self._chooseBestSplit(X,y)
        #返回最佳分類特徵的索引
        bestFeatAxis = featureIndex[bestFeatIndex]
        featureIndex = list(featureIndex)
        #得到刪除最佳分類特徵索引後的列表
        featureIndex.remove(bestFeatAxis)
        featureIndex = tuple(featureIndex)
        myTree = {bestFeatAxis:{}}
        featValues = X[:, bestFeatIndex]
        uniqueVals = set(featValues)
        for value in uniqueVals:
            #對每一個value遞歸地建立樹
            sub_X, sub_y = self._splitData(X,y, bestFeatIndex, value)
            myTree[bestFeatAxis][value] = self._createTree(sub_X, sub_y, \
                                            featureIndex)
        return myTree  
        
    def fit(self, X, y):
        """
        參數:X是特徵,y是類標籤
        注意事項:對數據X和y進行類型檢測,保證其爲array
        輸出:self自己
        """
        if isinstance(X, np.ndarray) and isinstance(y, np.ndarray):
            pass
        else:
            try:
                X = np.array(X)
                y = np.array(y)
            except:
                raise TypeError("numpy.ndarray required for X,y")
        featureIndex  = tuple(['x'+str(i) for i in range(X.shape[1])])
        self._tree = self._createTree(X,y,featureIndex)
        return self  #allow using: clf.fit().predict()
        
    def _classify(self, tree, sample):
        """
        用訓練好的模型對輸入數據進行分類 
        注意:決策樹的構建是一個遞歸的過程,用決策樹分類也是一個遞歸的過程
        _classify()一次只能對一個樣本(sample)分類
        """
        featIndex = tree.keys()[0] #獲得數的根節點值
        secondDict = tree[featIndex] #獲得以featIndex爲劃分特徵的結果
        axis=featIndex[1:] #獲得根節點特徵在原始數據中的下標
        key = sample[int(axis)] #獲取待分類樣本中下標爲axis的值
        valueOfKey = secondDict[key] #獲取secondDict中keys爲key的value值
        if type(valueOfKey).__name__=='dict': #若是value爲dict,則繼續遞歸分類
            return self._classify(valueOfKey, sample)
        else: 
            return valueOfKey
        
    def predict(self, X):
        if self._tree==None:
            raise NotImplementedError("Estimator not fitted, call `fit` first")
        #對X的類型進行檢測,判斷其是不是數組
        if isinstance(X, np.ndarray): 
            pass
        else: 
            try:
                X = np.array(X)
            except:
                raise TypeError("numpy.ndarray required for X")
            
        if len(X.shape) == 1:
            return self._classify(self._tree, X)
        else:
            result = []
            for i in range(X.shape[0]):
                value = self._classify(self._tree, X[i])
                print str(i+1)+"-th sample is classfied as:", value 
                result.append(value)
            return np.array(result)

    def show(self, outpdf):
        if self._tree==None:
            pass
        #plot the tree using matplotlib
        import treePlotter
        treePlotter.createPlot(self._tree, outpdf)
    
if __name__=="__main__":
    trainfile=r"data\train.txt"
    testfile=r"data\test.txt"
    import sys
    sys.path.append(r"F:\CSU\Github\MachineLearning\lib")  
    import dataload as dload
    train_x, train_y = dload.loadData(trainfile)
    test_x, test_y = dload.loadData(testfile)
    
    clf = DecitionTree(criteria="C4.5")
    clf.fit(train_x, train_y)
    result = clf.predict(test_x)    
    outpdf = r"tree.pdf"
    clf.show(outpdf)
View Code

 

 

 

Python K均值聚類

 

Python K均值聚類是一種無監督的機器學習算法,可以實現自動歸類的功能。

算法步驟以下:

(1)隨機產生K個分類中心,通常稱爲質心。

(2)將全部樣本劃分到距離最近的質心表明的分類中。(距離能夠是歐氏距離、曼哈頓距離、夾角餘弦等)

(3)計算分類後的質心,能夠用同一類中全部樣本的平均屬性來表明新的質心。

(4)重複(2)(3)兩步,直到知足如下其中一個條件:

    1)分類結果沒有發生改變。

    2)最小偏差(如平方偏差)達到所要求的範圍。

    3)迭代總數達到設置的最大值。

常見的K均值聚類算法還有2分K均值聚類算法,其步驟以下:

(1)將全部樣本做爲一類。

(2)按照傳統K均值聚類的方法將樣本分爲兩類。

(3)對以上兩類分別再分爲兩類,且分別計算兩種狀況下偏差,僅保留偏差更小的分類;即第(2)步產生的兩類其中一類保留,另外一類進行再次分類。

(4)重複對已有類別分別進行二分類,同理保留偏差最小的分類,直到達到所須要的分類數目。

具體Python代碼以下:

 

# -*- coding: utf-8 -*-
"""
Created on Tue Nov 08 14:01:44 2016
K - means cluster
"""

import numpy as np

class KMeansClassifier():
    "this is a k-means classifier"
    
    def __init__(self, k=3, initCent='random', max_iter=500 ):
        
        self._k = k
        self._initCent = initCent
        self._max_iter = max_iter
        self._clusterAssment = None
        self._labels = None
        self._sse = None
        
    def _calEDist(self, arrA, arrB):
        """
        功能:歐拉距離距離計算
        輸入:兩個一維數組
        """
        return np.math.sqrt(sum(np.power(arrA-arrB, 2)))
    
    def _calMDist(self, arrA, arrB):
        """
        功能:曼哈頓距離距離計算
        輸入:兩個一維數組
        """
        return sum(np.abs(arrA-arrB))


    def _randCent(self, data_X, k):
        """
        功能:隨機選取k個質心
        輸出:centroids #返回一個m*n的質心矩陣
        """
        n = data_X.shape[1] #獲取特徵的維數
        centroids = np.empty((k,n))  #使用numpy生成一個k*n的矩陣,用於存儲質心
        for j in range(n):
            minJ = min(data_X[:, j])
            rangeJ  = float(max(data_X[:, j] - minJ))
            #使用flatten拉平嵌套列表(nested list)
            centroids[:, j] = (minJ + rangeJ * np.random.rand(k, 1)).flatten()
        return centroids 
    
    def fit(self, data_X):
        """
        輸入:一個m*n維的矩陣
        """
        if not isinstance(data_X, np.ndarray) or \
               isinstance(data_X, np.matrixlib.defmatrix.matrix):
            try:
                data_X = np.asarray(data_X)
            except:
                raise TypeError("numpy.ndarray resuired for data_X")
                
        m = data_X.shape[0]  #獲取樣本的個數
        #一個m*2的二維矩陣,矩陣第一列存儲樣本點所屬的族的索引值,
        #第二列存儲該點與所屬族的質心的平方偏差
        self._clusterAssment = np.zeros((m,2)) 
        
        if self._initCent == 'random':
            self._centroids = self._randCent(data_X, self._k)
            
        clusterChanged = True
        for _ in range(self._max_iter): #使用"_"主要是由於後面沒有用到這個值
            clusterChanged = False
            for i in range(m):   #將每一個樣本點分配到離它最近的質心所屬的族
                minDist = np.inf #首先將minDist置爲一個無窮大的數
                minIndex = -1    #將最近質心的下標置爲-1
                for j in range(self._k): #次迭代用於尋找最近的質心
                    arrA = self._centroids[j,:]
                    arrB = data_X[i,:]
                    distJI = self._calEDist(arrA, arrB) #計算偏差值
                    if distJI <</span> minDist:
                        minDist = distJI
                        minIndex = j
                if self._clusterAssment[i,0] !=minIndex:
                    clusterChanged = True
                    self._clusterAssment[i,:] = minIndex, minDist**2
            if not clusterChanged:#若全部樣本點所屬的族都不改變,則已收斂,結束迭代
                break
            for i in range(self._k):#更新質心,將每一個族中的點的均值做爲質心
                index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值
                value = np.nonzero(index_all==i) #取出全部屬於第i個簇的索引值
                ptsInClust = data_X[value[0]]    #取出屬於第i個簇的全部樣本點
                self._centroids[i,:] = np.mean(ptsInClust, axis=0) #計算均值
        
        self._labels = self._clusterAssment[:,0]
        self._sse = sum(self._clusterAssment[:,1])
    
    def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族
        #類型檢查
        if not isinstance(X,np.ndarray):
            try:
                X = np.asarray(X)
            except:
                raise TypeError("numpy.ndarray required for X")
        
        m = X.shape[0]#m表明樣本數量
        preds = np.empty((m,))
        for i in range(m):#將每一個樣本點分配到離它最近的質心所屬的族
            minDist = np.inf
            for j in range(self._k):
                distJI = self._calEDist(self._centroids[j,:], X[i,:])
                if distJI <</span> minDist:
                    minDist = distJI
                    preds[i] = j
        return preds

        
class biKMeansClassifier():
    "this is a binary k-means classifier"
    
    def __init__(self, k=3):
        
        self._k = k
        self._centroids = None
        self._clusterAssment = None
        self._labels = None
        self._sse = None
        
    
    def _calEDist(self, arrA, arrB):
        """
        功能:歐拉距離距離計算
        輸入:兩個一維數組
        """
        return np.math.sqrt(sum(np.power(arrA-arrB, 2)))
        
    def fit(self, X):
        m = X.shape[0]
        self._clusterAssment = np.zeros((m,2))
        centroid0 = np.mean(X, axis=0).tolist()
        centList =[centroid0]
        for j in range(m):#計算每一個樣本點與質心之間初始的平方偏差
            self._clusterAssment[j,1] = self._calEDist(np.asarray(centroid0), \
                                        X[j,:])**2
        
        while (len(centList) <</span> self._k):
            lowestSSE = np.inf
            #嘗試劃分每一族,選取使得偏差最小的那個族進行劃分
            for i in range(len(centList)):
                index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值
                value = np.nonzero(index_all==i) #取出全部屬於第i個簇的索引值
                ptsInCurrCluster = X[value[0],:] #取出屬於第i個簇的全部樣本點
                clf = KMeansClassifier(k=2)
                clf.fit(ptsInCurrCluster)
                #劃分該族後,所獲得的質心、分配結果及偏差矩陣
                centroidMat, splitClustAss = clf._centroids, clf._clusterAssment
                sseSplit = sum(splitClustAss[:,1])
                index_all = self._clusterAssment[:,0] 
                value = np.nonzero(index_all==i)
                sseNotSplit = sum(self._clusterAssment[value[0],1])
                if (sseSplit + sseNotSplit) <</span> lowestSSE:
                    bestCentToSplit = i
                    bestNewCents = centroidMat
                    bestClustAss = splitClustAss.copy()
                    lowestSSE = sseSplit + sseNotSplit
            #該族被劃分紅兩個子族後,其中一個子族的索引變爲原族的索引
            #另外一個子族的索引變爲len(centList),而後存入centList
            bestClustAss[np.nonzero(bestClustAss[:,0]==1)[0],0]=len(centList)
            bestClustAss[np.nonzero(bestClustAss[:,0]==0)[0],0]=bestCentToSplit
            centList[bestCentToSplit] = bestNewCents[0,:].tolist()
            centList.append(bestNewCents[1,:].tolist())
            self._clusterAssment[np.nonzero(self._clusterAssment[:,0] == \
                                        bestCentToSplit)[0],:]= bestClustAss 
                   
        self._labels = self._clusterAssment[:,0] 
        self._sse = sum(self._clusterAssment[:,1])
        self._centroids = np.asarray(centList)
                                
    def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族
        #類型檢查
        if not isinstance(X,np.ndarray):
            try:
                X = np.asarray(X)
            except:
                raise TypeError("numpy.ndarray required for X")
        
        m = X.shape[0]#m表明樣本數量
        preds = np.empty((m,))
        for i in range(m):#將每一個樣本點分配到離它最近的質心所屬的族
            minDist = np.inf
            for j in range(self._k):
                distJI = self._calEDist(self._centroids[j,:],X[i,:])
                if distJI <</span> minDist:
                    minDist = distJI
                    preds[i] = j
        return preds
View Code

 

 

 

Python股票歷史漲跌幅數據獲取

股票漲跌幅數據是量化投資學習的基本數據資料之一,下面以Python代碼編程爲工具,得到所須要的歷史數據。主要步驟有:

(1) #按照市值從小到大的順序活得N支股票的代碼;

(2) #分別對這一百隻股票進行100支股票操做;

(3) #獲取從2016.05.01到2016.11.17的漲跌幅數據;

(4) #選取記錄大於40個的數據,去除次新股;

(5) #將文件名名爲「股票代碼.csv」。

具體代碼以下:

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 17 23:04:33 2016
獲取股票的歷史漲跌幅,並分別存爲csv格式
@author: yehx
"""

import numpy as np
import pandas as pd

#按照市值從小到大的順序活得100支股票的代碼
df = get_fundamentals(
        query(fundamentals.eod_derivative_indicator.market_cap)
        .order_by(fundamentals.eod_derivative_indicator.market_cap.asc())
        .limit(100),'2016-11-17', '1y'
    )

#分別對這一百隻股票進行100支股票操做
#獲取從2016.05.01到2016.11.17的漲跌幅數據
#選取記錄大於40個的數據,去除次新股
#將文件名名爲「股票代碼.csv」
for stock in range(100):
    priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20160501', '20161117')
    if priceChangeRate is None:
        openDays = 0
    else:
        openDays = len(priceChangeRate)
    if openDays > 40:
        tempPrice = priceChangeRate[39:(openDays - 1)]
        for rate in range(len(tempPrice)):
            tempPrice[rate] = "%.3f" %tempPrice[rate]
    fileName = ''
    fileName = fileName.join(df['market_cap'].columns[i].split('.')) + '.csv'
    fileName
    tempPrice.to_csv(fileName)
View Code

 

 

Python Logistic 迴歸分類

 Logistic迴歸能夠認爲是線性迴歸的延伸,其做用是對二分類樣本進行訓練,從而對達到預測新樣本分類的目的。
假設有一組已知分類的MxN維樣本X,M爲樣本數,N爲特徵維度,其相應的已知分類標籤爲Mx1維矩陣Y。那麼Logistic迴歸的實現思路以下:
(1)用一組權重值W(Nx1)對X的特徵進行線性變換,獲得變換後的樣本X’(Mx1),其目標是使屬於不一樣分類的樣本X’存在一個明顯的一維邊界。
(2)而後再對樣本X’進一步作函數變換,從而使處於一維邊界兩測的值變換到相應的範圍以內。
(3)訓練過程就是經過改變W儘量使獲得的值位於一維邊界兩側,而且與已知分類相符。
(4)對於Logistic迴歸,就是將原樣本的邊界變換到x=0這個邊界。
下面是Logistic迴歸的典型代碼:

# -*- coding: utf-8 -*-
"""
Created on Wed Nov 09 15:21:48 2016
Logistic迴歸分類
"""
import numpy as np


class LogisticRegressionClassifier():
    
    def __init__(self):
        self._alpha = None
    

    #定義一個sigmoid函數
    def _sigmoid(self, fx):
        return 1.0/(1 + np.exp(-fx))

    #alpha爲步長(學習率);maxCycles最大迭代次數
    def _gradDescent(self, featData, labelData, alpha, maxCycles):
        dataMat = np.mat(featData)                      #size: m*n
        labelMat = np.mat(labelData).transpose()        #size: m*1
        m, n = np.shape(dataMat)
        weigh = np.ones((n, 1)) 
        for i in range(maxCycles):
            hx = self._sigmoid(dataMat * weigh)
            error = labelMat - hx       #size:m*1
            weigh = weigh + alpha * dataMat.transpose() * error#根據偏差修改迴歸係數
        return weigh

    #使用梯度降低方法訓練模型,若是使用其它的尋參方法,此處能夠作相應修改
    def fit(self, train_x, train_y, alpha=0.01, maxCycles=100):
        return self._gradDescent(train_x, train_y, alpha, maxCycles)

    #使用學習獲得的參數進行分類
    def predict(self, test_X, test_y, weigh):
        dataMat = np.mat(test_X)
        labelMat = np.mat(test_y).transpose()  #使用transpose()轉置
        hx = self._sigmoid(dataMat*weigh)  #size:m*1
        m = len(hx)
        error = 0.0
        for i in range(m):
            if int(hx[i]) > 0.5:
                print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 1' 
                if int(labelMat[i]) != 1:
                    error += 1.0
                    print "classify error."
            else:
                print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 0' 
                if int(labelMat[i]) != 0:
                    error += 1.0
                    print "classify error."
        error_rate = error/m
        print "error rate is:", "%.4f" %error_rate
        return error_rate
View Code

 

 

Python 樸素貝葉斯(Naive Bayes)分類

Naïve Bayes 分類的核心是計算條件機率P(y|x),其中y爲類別,x爲特徵向量。其意義是在x樣本出現時,它被劃分爲y類的可能性(機率)。經過計算不一樣分類下的機率,進而把樣本劃分到機率最大的一類。

根據條件機率的計算公式能夠獲得:

P(y|x) = P(y)*P(x|y)/P(x)。

      因爲在計算不一樣分類機率是等式右邊的分母是相同的,因此只需比較分子的大小。而且,若是各個樣本特徵是獨立分佈的,那麼p(x

|y)等於p(xi|y)相乘。

      下面以文本分類來介紹Naïve Bayes分類的應用。其思路以下:

(1)創建詞庫,即無重複的單詞表。

(2)分別計算詞庫中類別標籤出現的機率P(y)。

(3)分別計算各個類別標籤下不一樣單詞出現的機率P(xi|y)。

(4)在不一樣類別下,將待分類樣本各個特徵出現機率((xi|y)相乘,而後在乘以對應的P(y)。

(5)比較不一樣類別下(4)中結果,將待分類樣本分到取值最大的類別。

下面是Naïve Bayes 文本分類的Python代碼,其中爲了方便計算,程序中藉助log對數函數將乘法轉化爲了加法。

# -*- coding: utf-8 -*-
"""
Created on Mon Nov 14 11:15:47 2016
Naive Bayes Clssification
"""
# -*- coding: utf-8 -*-
import numpy as np

class NaiveBayes:
    def __init__(self):
        self._creteria = "NB"
        
    def _createVocabList(self, dataList):
        """
        建立一個詞庫向量
        """
        vocabSet = set([])
        for line in dataList:
            print set(line)
            vocabSet = vocabSet | set(line)
        return list(vocabSet)
        
    #文檔詞集模型
    def _setOfWords2Vec(self, vocabList, inputSet):
        """
        功能:根據給定的一行詞,將每一個詞映射到此庫向量中,出現則標記爲1,不出現則爲0
        """
        outputVec = [0] * len(vocabList)
        for word in inputSet:
            if word in vocabList:
                outputVec[vocabList.index(word)] = 1
            else:
                print "the word:%s is not in my vocabulary!" % word
        return outputVec
    
        
    # 修改 _setOfWordsVec  文檔詞袋模型
    def _bagOfWords2VecMN(self, vocabList, inputSet):
        """
        功能:對每行詞使用第二種統計策略,統計單個詞的個數,而後映射到此庫中
        輸出:一個n維向量,n爲詞庫的長度,每一個取值爲單詞出現的次數
        """
        returnVec = [0]*len(vocabList)
        for word in inputSet:
            if word in vocabList:
                returnVec[vocabList.index(word)] += 1 # 更新此處代碼
        return returnVec
    
    
    def _trainNB(self, trainMatrix, trainLabel):
        """
        輸入:訓練矩陣和類別標籤,格式爲numpy矩陣格式
        功能:計算條件機率和類標籤機率
        """
        numTrainDocs = len(trainMatrix) #統計樣本個數
        numWords = len(trainMatrix[0])  #統計特徵個數,理論上是詞庫的長度
        pNeg = sum(trainLabel)/float(numTrainDocs) #計算負樣本出現的機率
        
        p0Num = np.ones(numWords) #初始樣本個數爲1,防止條件機率爲0,影響結果     
        p1Num = np.ones(numWords) #做用同上
       
        p0InAll = 2.0 #詞庫中只有兩類,因此此處初始化爲2(use laplace)
        p1InAll = 2.0 
        
        # 再單個文檔和整個詞庫中更新正負樣本數據
        for i in range(numTrainDocs):
            if trainLabel[i] == 1:
                p1Num += trainMatrix[i]
                p1InAll += sum(trainMatrix[i])
            else:
                p0Num += trainMatrix[i]
                p0InAll += sum(trainMatrix[i])
        
        print p1InAll
        #計算給定類別的條件下,詞彙表中單詞出現的機率
        #而後取log對數,解決條件機率乘積下溢
        p0Vect = np.log(p0Num/p0InAll) #計算類標籤爲0時的其它屬性發生的條件機率
        p1Vect = np.log(p1Num/p1InAll)  #log函數默認以e爲底  #p(ci|w=0)
        return p0Vect, p1Vect, pNeg
        
    def _classifyNB(self, vecSample, p0Vec, p1Vec, pNeg):
        """
        使用樸素貝葉斯進行分類,返回結果爲0/1
        """
        prob_y0 = sum(vecSample * p0Vec) + np.log(1-pNeg)
        prob_y1 = sum(vecSample * p1Vec) + np.log(pNeg) #log是以e爲底
        if prob_y0 <</span> prob_y1:
            return 1
        else:
            return 0

    
    # 測試NB算法
    def testingNB(self, testSample):
        listOPosts, listClasses = loadDataSet()
        myVocabList = self._createVocabList(listOPosts)
#        print myVocabList
        trainMat=[]
        for postinDoc in listOPosts:
            trainMat.append(self._bagOfWords2VecMN(myVocabList, postinDoc))
        p0V,p1V,pAb = self._trainNB(np.array(trainMat), np.array(listClasses))
        print trainMat
        thisSample = np.array(self._bagOfWords2VecMN(myVocabList, testSample))
        result = self._classifyNB(thisSample, p0V, p1V, pAb)
        print testSample,'classified as: ', result
        return result


###############################################################################
def loadDataSet():
        wordsList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
                   ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
                   ['my', 'dalmation', 'is', 'so', 'cute', ' and', 'I', 'love', 'him'],
                   ['stop', 'posting', 'stupid', 'worthless', 'garbage'],
                   ['mr', 'licks','ate','my', 'steak', 'how', 'to', 'stop', 'him'],
                   ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
        classLable = [0,1,0,1,0,1] # 0:good; 1:bad
        return wordsList, classLable
        
if __name__=="__main__":    
    clf = NaiveBayes()
    testEntry = [['love', 'my', 'girl', 'friend'],
                 ['stupid', 'garbage'],
                 ['Haha', 'I', 'really', "Love", "You"],
                 ['This', 'is', "my", "dog"]]
    clf.testingNB(testEntry[0])
#    for item in testEntry:
#        clf.testingNB(item)
View Code

 

Python股票歷史數據預處理(一)

 

在進行量化投資交易編程時,咱們須要股票歷史數據做爲分析依據,下面介紹如何經過Python獲取股票歷史數據而且將結果存爲DataFrame格式。處理後的股票歷史數據下載連接爲:http://download.csdn.net/detail/suiyingy/9688505。

具體步驟以下:

  • (1) 創建股票池,這裏按照股本大小來做爲選擇依據。
  • (2) 分別讀取股票池中全部股票的歷史漲跌幅。
  • (3) 將各支股票的歷史漲跌幅存到DataFrame結構變量中,每一列表明一支股票,對於在指定時間內尚未發行的股票的漲跌幅設置爲0。
  • (4) 將DataFrame最後一行的數值設置爲各支股票對應的交易天數。
  • (5) 將DataFrame數據存到csv文件中去。

具體代碼以下:

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 17 23:04:33 2016
獲取股票的歷史漲跌幅,先合併爲DataFrame後存爲csv格式
@author: yehx
"""

import numpy as np
import pandas as pd

#按照市值從小到大的順序得到50支股票的代碼
df = get_fundamentals(
        query(fundamentals.eod_derivative_indicator.market_cap)
        .order_by(fundamentals.eod_derivative_indicator.market_cap.asc())
        .limit(50),'2016-11-17', '1y'
    )
b1= {}
priceChangeRate_300 = get_price_change_rate('000300.XSHG', '20060101', '20161118')
df300 = pd.DataFrame(priceChangeRate_300)
lenReference =  len(priceChangeRate_300)
dfout = df300

dflen = pd.DataFrame()
dflen['000300.XSHG'] = [lenReference]
#分別對這一百隻股票進行50支股票操做
#獲取從2006.01.01到2016.11.17的漲跌幅數據
#將數據存到DataFrame中
#DataFrame存爲csv文件
for stock in range(50):
    priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20150101', '20161118')
    if priceChangeRate is None:
        openDays = 0
    else:
        openDays = len(priceChangeRate)
        dftempPrice = pd.DataFrame(priceChangeRate)
        tempArr = []
        for i in range(lenReference):          
            if df300.index[i] in list(dftempPrice.index):
                #保存爲4位有效數字
                tempArr.append( "%.4f" %((dftempPrice.loc[str(df300.index[i])][0])))
                pass
            else:
                tempArr.append(float(0.0))
        fileName = ''
        fileName = fileName.join(df['market_cap'].columns[stock].split('.')) 
        dfout[fileName] = tempArr
        dflen[fileName] = [len(priceChangeRate)]

dfout = dfout.append(dflen)
dfout.to_csv('00050.csv')
View Code

 

 

Python股票歷史數據預處理(二)

 

從網上下載的股票歷史數據每每不能直接使用,須要轉換爲本身所須要的格式。下面以Python代碼編程爲工具,將csv文件中存儲的股票歷史數據提取出來並處理。處理的數據結果爲是30天漲跌幅子數據庫,下載地址爲:http://download.csdn.net/detail/suiyingy/9688605。

主要步驟有(Python csv數據讀寫):

  • #csv文件讀取股票歷史漲跌幅數據;
  • #隨機選取30個歷史漲跌幅數據;
  • #構建本身的數據庫;
  • #將處理結果保存爲新的csv文件。

具體代碼以下:

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 17 23:04:33 2016
csv格式股票歷史漲跌幅數據處理
@author: yehx
"""

import numpy as np
import pandas as pd
import random
import csv
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

'''
    - 加載csv格式數據
'''
def loadCSVfile1(datafile):
    filelist = []
    with open(datafile) as file:
        lines = csv.reader(file)
        for oneline in lines:
            filelist.append(oneline)
    filelist = np.array(filelist)
    return filelist

#數據處理
#隨機選取30個歷史漲跌幅數據
#構建本身的數據庫
    
def dataProcess(dataArr, subLen):
    totLen, totWid = np.shape(data)
    print totLen, totWid
    lenArr = dataArr[totLen-1,2:totWid]
    columnCnt = 1
    dataOut = []
    for lenData in lenArr:
        columnCnt = columnCnt + 1
        N60 = int(lenData) / (2 * subLen)
        print N60
        if N60 > 0:
            randIndex = random.sample(range(totLen-int(lenData)-1,totLen-subLen), N60)
            for i in randIndex:
                dataOut.append(dataArr[i:(i+subLen),columnCnt])
    dataOut = np.array(dataOut)
    
    return dataOut




if __name__=="__main__":
    datafile = "00100 (3).csv"
    data = loadCSVfile1(datafile) 
    df = pd.DataFrame(data)
    m, n = np.shape(data)
    dataOut = dataProcess(data, 30)
    m, n = np.shape(dataOut)
    #保存處理結果
    csvfile = file('csvtest.csv', 'wb')
    writer = csv.writer(csvfile)
    writer.writerows(dataOut)
    csvfile.close()
View Code

http://blog.sina.com.cn/s/articlelist_6017673753_0_1.html

相關文章
相關標籤/搜索