量化編程技術—簡易回測系統

# -*- coding: utf-8 -*-
# @Date:   2017-08-26
# @Original:

from collections import namedtuple
from collections import OrderedDict
from functools import reduce
import itertools

class StockTradeDays(object):
    def __init__(self, price_array, start_date, date_array=None):
        # 私有價格序列
        self.__price_array = price_array
        # 私有日期序列
        self.__date_array = self._init_days(start_date, date_array)
        # 私有漲跌幅序列
        self.__change_array = self.__init_change()
        # 進行OrderedDict的組裝
        self.stock_dict = self._init_stock_dict()

    def __init_change(self):
        """
        從price_array生成change_array
        :return:
        """
        price_float_array = [float(price_str) for price_str in
                             self.__price_array]
        # 經過將時間平移造成兩個錯開的收盤價序列,經過zip打包成爲一個新的序列
        # 每一個元素爲相鄰的兩個收盤價格
        pp_array = [(price1, price2) for price1, price2 in
                    zip(price_float_array[:-1], price_float_array[1:])]
        change_array = list(map(lambda pp: reduce(lambda a, b: round((b - a) / a, 3), pp), pp_array))
        # list insert插入數據,將第一天的漲跌幅設置爲0
        change_array.insert(0, 0)
        return change_array

    def _init_days(self, start_date, date_array):
        """
        protect方法,
        :param start_date: 初始日期
        :param date_array: 給定日期序列
        :return:
        """
        if date_array is None:
            # 由start_date和self.__price_array來肯定日期序列
            date_array = [str(start_date + ind) for ind, _ in
                          enumerate(self.__price_array)]
        else:
            # 稍後的內容會使用外部直接設置的方式
            # 若是外面設置了date_array,就直接轉換str類型組成新date_array
            date_array = [str(date) for date in date_array]
        return date_array

    def _init_stock_dict(self):
        """
        使用namedtuple,OrderedDict將結果合併
        :return:
        """
        stock_namedtuple = namedtuple('stock',
                                      ('date', 'price', 'change'))

        # 使用以被賦值的__date_array等進行OrderedDict的組裝
        stock_dict = OrderedDict(
            (date, stock_namedtuple(date, price, change))
            for date, price, change in
            zip(self.__date_array, self.__price_array,
                self.__change_array))
        return stock_dict

    def filter_stock(self, want_up=True, want_calc_sum=False):
        """
        篩選結果子集
        :param want_up: 是否篩選上漲
        :param want_calc_sum: 是否計算漲跌和
        :return:
        """
        # Python中的三目表達式的寫法
        filter_func = (lambda p_day: p_day.change > 0) if want_up else (
            lambda p_day: p_day.change < 0)
        # 使用filter_func作篩選函數
        want_days = list(filter(filter_func, self.stock_dict.values()))

        if not want_calc_sum:
            return want_days

        # 須要計算漲跌幅和
        change_sum = 0.0
        for day in want_days:
            change_sum += day.change
        return change_sum

    """
        下面的__str__,__iter__, __getitem__, __len__稍後會詳細講解做
    """

    def __str__(self):
        return str(self.stock_dict)

    __repr__ = __str__

    def __iter__(self):
        """
        經過代理stock_dict的跌倒,yield元素
        :return:
        """
        for key in self.stock_dict:
            yield self.stock_dict[key]

    def __getitem__(self, ind):
        date_key = self.__date_array[ind]
        return self.stock_dict[date_key]

    def __len__(self):
        return len(self.stock_dict)


price_array = '413.05,416.51,420.47,410.01,411.87,415.91,415.5,417.28,418.75,407.86,408.68,411.25,411.88,417.7,418.12,415.3,416,416.71,427.36,424.06,416,413.12,416.02,417.9,420.3,420.6,420.46,423.75,422.57,422.28,418.5,418.47,421.32,423.74,426.59,424.75,426.01,431.48,432.04,428.51,430.03,437.76,443.85,452.26,447.8,453.69,463.02,461.77,468.14,444.85,450.46,455.32,446.6,451.11,443.73,450.39,447.38,448.4,461.18,460.2,459.87,461.56,450.7,452.28,455.01,455.76,455.8,457.89,453.01,453.24,453.52,434.55,441.57,440.81,437.48,443.51,445.03,449.09,453.95,472.01,526.02,531,532.89,530.69,536.79,538.8,570.87,572.87,574.02,585.34,576.88,583.05,575.52,580.09,614.51,672,705.87,684.5,696.99,769.5,747.95,757.6,767.3,743.9,668.6,605.85,625.8,665.5,664.87,627.42,662.33,646.61,640,674.74,674.75,705.99,659.29,681.34,667.8,677.04,640.51,664.8,648.11,649.72,647.95,667.19,653.7,659.78,665.5,665.33,683.2,674.3,675,665.85,665.01,648.04,654.03,661.82,654.17,648.47,655.51,655.93,658.34,654.99,622.83,608.29,604.1,590.28,591.27,585.5,583.73,569.41,564.64,574.17,571.83,572.21,573.51,582.1,581.42,588.01,583.54,580.32,577.2,578.02,568.55,574.17,574.78,579.49,576.15,572.73,579.85,609.89,614.52,611.5,615.23,619.75,631.73,626.25,628,612.08,611.62,614.23,613.88,611.81,610.01,607.69,613.03,609.79,600.14,597.43,597.08,603.29,602.55,600.36,609.14,605.53,603.76,604.6,611.1,614.09,609.09,612.67,610.98,614.09,613.51,620.13,620.5,616.56,618.87,641.87,637.63,637.01,643,640.2,644.18,639.79,638.68,631.77,632.46,636.73,664.99,659.03,654.65,659.52,678.7,688.67,693.47,714.51,702.55,701.02,734.6,750.85,692.51,707.62,708.89,713.95,705.55,711.99,724.54,714.87,716.22,703.57,703.64,707.43,712.17,744.98,740.67,753.97,752.9,729.67,738.99,749.85,742,737.61,740.36,731.19,724.9,731.52,731.05,739,755.36,774.88,765.46,768.5,750.62,757.36,765.01,765.01,770.5,772.9,770.21,777.99,775,774.49,777.43,784.17,790.99,790.21,790.59,797.99,829.34,859.2,918.99,895.24,898,906.4,936.43,981.7,974.74,959.26,966.58,998.99,1019.3,1037.5,1139.6,1003.2,898.5,908,915.9,903,905.76,779.54,804.58,828.12,815.3,820.74,830.1,903.99,887.46,900.29,895.74,924.02,923.72,908.52,886.1,893.35,915.12,916.7,919.43,912.55,917.35,966.19,983.73,1007,1015.7,1031.1,1006.6,1022.6,1052.1,1048.8,984.97,992,1000.1,996.01,996.5,1013.3,1013.9,1038.5,1056.2,1059.7,1056.2,1091.2,1129.6,1125.5,1189.8,1185.4,1153,1178.3,1195.5,1189.1,1233.2,1258,1289.2,1267.8,1278.4,1279.2,1232.4,1150,1190.4,1115.4,1172.4,1224.4,1238.5,1245,1256.2,1168.6,1070.4,971.51,1016.5,1040.5,1115.9,1039.1,1032.7,942.13,972.17,968.9,1042.7,1044.7,1041.8,1041.2,1081.5,1093.5,1107.5,1150.1,1145.8,1140.4,1191.5,1196.6,1188.1,1215.9,1220.3,1235.6,1227.4,1186.9,1206.8,1193.3,1212,1240,1265.4,1260.5,1308.5,1327,1346.4,1355.2,1345,1371.1,1400,1440.3,1415.6,1423.6,1435,1533,1558.5,1619,1607.1,1545.1,1597,1619.9,1703.5,1760,1796.9,1853.9,1735,1819.5,1827.3,1772,1786.2,1870,1941.5,1966.5,2059.3,2026.6,2087.3,2249.6,2395.5,2268.1,2125.9,1980.2,2056.9,2207.4,2146.7,2191.8,2312,2405.9,2461,2488.2,2636.9,2844.6,2644,2781.5,2809,2806,2941.8,2569.6,2677.1,2394.3,2377.5,2437.5,2610.1,2491.4,2582,2714.5,2624.4,2672.8,2674.9,2502.6,2483.3,2393.6,2521.2,2518.2,2472.4,2420.6,2346.2,2445.1,2524,2579.9,2598.6,2593.2,2479.3,2542,2477.9,2318.3,2283.8,2375.6,2330.1,2206.5,1978.6,1925,2220,2302.8,2253.4,2865.1,2659,2844.7,2750.1,2769.7,2560.9,2527.7381,2664.6,2784.8,2713.1,2748.2,2854.3,2731.3,2702,2790.3,2860,3252.3,3232.1,3396,3415,3340.4,3405,3643.4,3866.2,4061.6,4320.8,4151.8,4386.4,4263,4090.1,4145,4063.1,3998.2,4081.9,4130.2,4322.1,4351.5,4340.4,4332.8,4385.1,4587.1,4568,4718.3,4907.7,4532.3,4598.5,4205,4375,4595.8,4613.7,4304,4315.9,4233.9,4198.7,4149.4,3849.7,3235.3,3697.1,3681.5,3666.6,4084.4,3892.2,3872.4,3596.7,3602.3,3779.6,3654.7,3930.1,3881.5,4209.7,4190,4168,4367.1,4404.3,4400.2,4310.6,4215.9,4312,4370,4435.6,4611.9,4782.3,4777.7,4824.9,5440,5636.8,5833.5,5713.9,5764.8,5597.1,5567,5694.2,5983.8,6005.1,5981.3,5907.3,5510,5724.1,5890,5759.7,5720.3,6150,6130,6455.1'.split(',')

date_base = 20170118
# 從StockTradeDays類初始化一個實例對象trade_days,內部會調用__init__
trade_days = StockTradeDays(price_array, date_base)
# # 打印對象信息
# print(trade_days)

# print('trade_days對象長度爲: {}'.format(len(trade_days)))

# from collections import Iterable
# # 若是是trade_days是可迭代對象,依次打印出
# if isinstance(trade_days, Iterable) :
    # for day in trade_days:
        # print(day)

# print(trade_days.filter_stock())


import six
from abc import ABCMeta, abstractmethod

"""
    交易策略抽象基類
"""
class TradeStrategyBase(six.with_metaclass(ABCMeta, object)):


    @abstractmethod
    def buy_strategy(self, *args, **kwargs):
        # 買入策略基類
        pass

    @abstractmethod
    def sell_strategy(self, *args, **kwargs):
        # 賣出策略基類
        pass


"""
    交易策略1: 追漲策略,當股價上漲一個閥值默認爲7%時
    買入股票並持有s_keep_stock_threshold(20)天
"""
class TradeStrategy1(TradeStrategyBase):

    s_keep_stock_threshold = 20

    def __init__(self):
        self.keep_stock_day = 0
        # 7%上漲幅度做爲買入策略閥值
        self.__buy_change_threshold = 0.07

    def buy_strategy(self, trade_ind, trade_day, trade_days):
        if self.keep_stock_day == 0 and trade_day.change > self.__buy_change_threshold:
            # 當沒有持有股票的時候self.keep_stock_day == 0 而且
            # 符合買入條件上漲一個閥值,買入
            self.keep_stock_day += 1
        elif self.keep_stock_day > 0:
            # self.keep_stock_day > 0表明持有股票,持有股票天數遞增
            self.keep_stock_day += 1

    def sell_strategy(self, trade_ind, trade_day, trade_days):
        if self.keep_stock_day >= TradeStrategy1.s_keep_stock_threshold:
            # 當持有股票天數超過閥值s_keep_stock_threshold,賣出股票
            self.keep_stock_day = 0

    """
        property屬性稍後會講到
    """
    @property
    def buy_change_threshold(self):
        return self.__buy_change_threshold

    @buy_change_threshold.setter
    def buy_change_threshold(self, buy_change_threshold):
        if not isinstance(buy_change_threshold, float):
            """
                上漲閥值須要爲float類型
            """
            raise TypeError('buy_change_threshold must be float!')
        # 上漲閥值只取小數點後兩位
        self.__buy_change_threshold = round(buy_change_threshold, 2)

"""
    交易回測系統
"""
class TradeLoopBack(object):

    def __init__(self, trade_days, trade_strategy):
        """
        使用上一節封裝的StockTradeDays類和本節編寫的交易策略類
        TradeStrategyBase類初始化交易系統
        :param trade_days: StockTradeDays交易數據序列
        :param trade_strategy: TradeStrategyBase交易策略
        """
        self.trade_days = trade_days
        self.trade_strategy = trade_strategy
        # 交易盈虧結果序列
        self.profit_array = []

    def execute_trade(self):
        """
        執行交易回測
        :return:
        """
        for ind, day in enumerate(self.trade_days):
            """
                以時間驅動,完成交易回測
            """
            if self.trade_strategy.keep_stock_day > 0:
                # 若是有持有股票,加入交易盈虧結果序列
                self.profit_array.append(day.change)

            # hasattr: 用來查詢對象有沒有實現某個方法
            if hasattr(self.trade_strategy, 'buy_strategy'):
                # 買入策略執行
                self.trade_strategy.buy_strategy(ind, day, self.trade_days)

            if hasattr(self.trade_strategy, 'sell_strategy'):
                # 賣出策略執行
                self.trade_strategy.sell_strategy(ind, day, self.trade_days)

"""
    交易策略2: 均值回覆策略,當股價連續兩個交易日下跌,
    且下跌幅度超過閥值默認s_buy_change_threshold(-10%),
    買入股票並持有s_keep_stock_threshold(10)天
"""
class TradeStrategy2(TradeStrategyBase):

    # 買入後持有天數
    s_keep_stock_threshold = 10
    # 下跌買入閥值
    s_buy_change_threshold = -0.10

    def __init__(self):
        self.keep_stock_day = 0

    def buy_strategy(self, trade_ind, trade_day, trade_days):
        if self.keep_stock_day == 0 and trade_ind >= 1:
            """
                當沒有持有股票的時候self.keep_stock_day == 0 而且
                trade_ind >= 1, 不是交易開始的第一天,由於須要yesterday數據
            """
            # trade_day.change < 0 bool:今天是否股價下跌
            today_down = trade_day.change < 0
            # 昨天是否股價下跌
            yesterday_down = trade_days[trade_ind - 1].change < 0
            # 兩天總跌幅
            down_rate = trade_day.change + trade_days[trade_ind - 1].change
            if today_down and yesterday_down and down_rate < TradeStrategy2.s_buy_change_threshold:
                # 買入條件成立:連跌兩天,跌幅超過s_buy_change_threshold
                self.keep_stock_day += 1
        elif self.keep_stock_day > 0:
            # self.keep_stock_day > 0表明持有股票,持有股票天數遞增
            self.keep_stock_day += 1

    def sell_strategy(self, trade_ind, trade_day, trade_days):
        if self.keep_stock_day >= TradeStrategy2.s_keep_stock_threshold:
            # 當持有股票天數超過閥值s_keep_stock_threshold,賣出股票
            self.keep_stock_day = 0

    #類方法,第一個參數爲表示自身類的cls參數
    @classmethod
    def set_keep_stock_threshold(cls, keep_stock_threshold):
        cls.s_keep_stock_threshold = keep_stock_threshold

    #靜態方法
    @staticmethod
    def set_buy_change_threshold(buy_change_threshold):
        TradeStrategy2.s_buy_change_threshold = buy_change_threshold


def calc(keep_stock_threshold, buy_change_threshold):
    """
    :param keep_stock_threshold: 持股天數
    :param buy_change_threshold: 下跌買入閥值
    :return: 盈虧狀況,輸入的持股天數, 輸入的下跌買入閥值
    """
    # 實例化TradeStrategy2
    trade_strategy2 = TradeStrategy2()
    # 經過類方法設置買入後持股天數
    TradeStrategy2.set_keep_stock_threshold(keep_stock_threshold)
    # 經過類方法設置下跌買入閥值
    TradeStrategy2.set_buy_change_threshold(buy_change_threshold)
    # 進行回測
    trade_loop_back = TradeLoopBack(trade_days, trade_strategy2)
    trade_loop_back.execute_trade()
    # 計算回測結果的最終盈虧值profit
    profit = 0.0 if len(trade_loop_back.profit_array) == 0 else \
        reduce(lambda a, b: a + b, trade_loop_back.profit_array)
    # 返回值profit和函數的兩個輸入參數
    return profit, keep_stock_threshold, buy_change_threshold


#1. 繼續和多態
trade_loop_back = TradeLoopBack(trade_days, TradeStrategy1())
trade_loop_back.execute_trade()
print('回測策略1 總盈虧爲:{}%'.format(reduce(lambda a, b: a + b, trade_loop_back.profit_array) * 100))


#2. 使用@property給私有變量賦值
trade_strategy1 = TradeStrategy1()
# 買入閥值從0.07上升到0.1
trade_strategy1.buy_change_threshold = 0.1
trade_loop_back = TradeLoopBack(trade_days, trade_strategy1)
trade_loop_back.execute_trade()
print('回測策略1 總盈虧爲:{}%'.format(reduce(lambda a, b: a + b, trade_loop_back.profit_array) * 100))

trade_strategy2 = TradeStrategy2()
trade_loop_back = TradeLoopBack(trade_days, trade_strategy2)
trade_loop_back.execute_trade()
print('回測策略2 總盈虧爲:{}%'.format(reduce(lambda a, b: a + b, trade_loop_back.profit_array) * 100))

# 實例化一個新的TradeStrategy2類對象
trade_strategy2 = TradeStrategy2()
# 修改成買入後持有股票20天,默認爲10天
TradeStrategy2.set_keep_stock_threshold(20)
# 修改股價下跌買入閥值爲-0.08(下跌8%),默認爲-0.10(下跌10%)
TradeStrategy2.set_buy_change_threshold(-0.08)
# 實例化新的回測對象trade_loop_back
trade_loop_back = TradeLoopBack(trade_days, trade_strategy2)
# 執行回測
trade_loop_back.execute_trade()
print('回測策略2 總盈虧爲:{}%'.format(reduce(lambda a, b: a + b, trade_loop_back.profit_array) * 100))

print(calc(20, -0.08))

# range集合:買入後持股天數從2天-30天,間隔兩天
keep_stock_list = list(range(2, 30, 2))
print('持股天數參數組:{}'.format(keep_stock_list))
# 下跌買入閥值從-0.05到-0.15,即從下跌5%到15%
buy_change_list = [buy_change / 100.0 for buy_change in range(-5, -16, -1)]
print('下跌閥值參數組:{}'.format(buy_change_list))


result = []
for keep_stock_threshold, buy_change_threshold in itertools.product(
        keep_stock_list, buy_change_list):
    # 使用calc計算參數對應的最終盈利,結果加入result序列
    result.append(calc(keep_stock_threshold, buy_change_threshold))
print('笛卡爾積參數集合總共結果爲:{}個'.format(len(result)))


print(sorted(result)[::-1][:20])
相關文章
相關標籤/搜索