python異步加協程獲取比特幣市場信息

目標html

  選取幾個比特幣交易量大的幾個交易平臺,查看對應的API,獲取該市場下貨幣對的ticker和depth信息。咱們從網站上選取4個交易平臺:bitfinex、okex、binance、gdax。對應的交易對是BTC/USD,BTC/USDT,BTC/USDT,BTC/USD。python

 

、ccxtgit

  開始想着直接請求市場的API,而後再解析獲取下來的數據,但到github上發現一個比較好得python庫,裏面封裝好了獲取比特幣市場的相關函數,這樣一來就省掉分析API的時間了。所以我只要傳入市場以及對應的貨幣對,利用庫裏面的函數 fetch_ticker 和 fetch_order_book 就能夠獲取到市場的ticker和depth信息(具體的使用方法能夠查看ccxt手冊)。接下來以市場okex爲例,利用ccxt庫獲取okex的ticker和depth信息。github

# 引入庫
import ccxt

# 實例化市場
exchange = ccxt.okex()
# 交易對
symbol = 'BTC/USDT'

# 獲取ticker信息
ticker = exchange.fetch_ticker(symbol)
# 獲取depth信息
depth = exchange.fetch_order_book(symbol)

print('ticker:%s, depth:%s' % (ticker, depth))

   運行後會獲得結果以下圖,今後能夠看出已經獲取到了ticker和depth信息。mongodb

 

 

 2、獲取四個市場的信息(for循環)數據庫

   接下來咱們獲取四個市場的信息,深度裏面有asks和bids,數據量稍微有點兒多,這裏depth信息我只去前面五個,對於ticker我也只提取裏面的info信息(具體表明什麼含義就要參考一下對應市場的API啦)。將其簡單的封裝後,最開始我想的是for循環。想到啥就開始吧:編程

# 引入庫
import ccxt
import time

now = lambda: time.time()
start = now()

def getData(exchange, symbol):
    data = {}  # 用於存儲ticker和depth信息
    # 獲取ticker信息
    tickerInfo = exchange.fetch_ticker(symbol)
    # 獲取depth信息
    depth = {}
    # 獲取深度信息
    exchange_depth = exchange.fetch_order_book(symbol)
    # 獲取asks,bids 最低5個,最高5個信息
    asks = exchange_depth.get('asks')[:5]
    bids = exchange_depth.get('bids')[:5]
    depth['asks'] = asks
    depth['bids'] = bids

    data['ticker'] = tickerInfo
    data['depth'] = depth

    return data

def main():
    # 實例化市場
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
    # 交易對
    symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']

    for i in range(len(exchanges)):
        exchange = exchanges[i]
        symbol = symbols[i]
        data = getData(exchange, symbol)
        print('exchange: %s data is %s' % (exchange.id, data))

if __name__ == '__main__':
    main()
    print('Run Time: %s' % (now() - start))

   運行後會發現雖然每一個市場的信息都獲取到了,執行完差很少花掉5.7秒,由於這是同步的,也就是按順序執行的,要是要想每隔必定時間同時獲取四個市場的信息,很顯然這種結果不符合咱們的要求。併發

 

 

3、異步加協程(coroutine)app

  前面講的循環雖然能夠輸出結果,但耗時長並且達不到想要的效果,接下來採用異步加協程(參考知乎上的一篇文章),要用到異步首先得引入asyncio庫,這個庫是3.4之後纔有的,它提供了一種機制,使得你能夠用協程(coroutines)、IO複用(multiplexing I/O)在單線程環境中編寫併發模型。這裏python文檔有個小例子。異步

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  

 

  當事件循環開始運行時,它會在Task中尋找coroutine來執行調度,由於事件循環註冊了print_sum(),所以print_sum()被調用,執行result = await compute(x, y)這條語句(等同於result = yield from compute(x, y)),由於compute()自身就是一個coroutine,所以print_sum()這個協程就會暫時被掛起,compute()被加入到事件循環中,程序流執行compute()中的print語句,打印」Compute %s + %s …」,而後執行了await asyncio.sleep(1.0),由於asyncio.sleep()也是一個coroutine,接着compute()就會被掛起,等待計時器讀秒,在這1秒的過程當中,事件循環會在隊列中查詢能夠被調度的coroutine,而由於此前print_sum()compute()都被掛起了,所以事件循環會停下來等待協程的調度,當計時器讀秒結束後,程序流便會返回到compute()中執行return語句,結果會返回到print_sum()中的result中,最後打印result,事件隊列中沒有能夠調度的任務了,此時loop.close()把事件隊列關閉,程序結束。

  接下來咱們採用異步和協程(ps:ccxt庫也有對應的異步),運行後發現時間只用了1.9秒,比以前快了好多倍。  

Run Time: 1.9661316871643066 

相關代碼:

# 引入庫
import ccxt.async as ccxt
import asyncio
import time

now = lambda: time.time()
start = now()

async def getData(exchange, symbol):
    data = {}  # 用於存儲ticker和depth信息
    # 獲取ticker信息
    tickerInfo = await exchange.fetch_ticker(symbol)
    # 獲取depth信息
    depth = {}
    # 獲取深度信息
    exchange_depth = await exchange.fetch_order_book(symbol)
    # 獲取asks,bids 最低5個,最高5個信息
    asks = exchange_depth.get('asks')[:5]
    bids = exchange_depth.get('bids')[:5]
    depth['asks'] = asks
    depth['bids'] = bids

    data['ticker'] = tickerInfo
    data['depth'] = depth

    return data

def main():
    # 實例化市場
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
    # 交易對
    symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']

    tasks = []
    for i in range(len(exchanges)):
        task = getData(exchanges[i], symbols[i])
        tasks.append(asyncio.ensure_future(task))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

if __name__ == '__main__':
    main()
    print('Run Time: %s' % (now() - start))

  

 3、定時爬取並用mongodb保存數據

  在前面的基礎上,添加一個定時任務,實現每隔一段時間爬取一次數據,並將數據保存到mongodb數據庫。只需再前面的代碼上稍微改改就能夠啦,代碼和運行結果以下:

 

import asyncio
import ccxt.async as ccxt
import time
import pymongo

# 獲取ticker和depth信息
async def get_exchange_tickerDepth(exchange, symbol):  # 其中exchange爲實例化後的市場
    # print('start get_ticker')
    while True:
        print('%s is run %s' % (exchange.id, time.ctime()))

        # 獲取ticher信息
        tickerInfo = await exchange.fetch_ticker(symbol)
        ticker = tickerInfo.get('info')

        if type(ticker) == type({}):
            ticker['timestamp'] = tickerInfo.get('timestamp')
            ticker['high'] = tickerInfo.get('high')
            ticker['low'] = tickerInfo.get('low')
            ticker['last'] = tickerInfo.get('last')
        else:
            ticker = tickerInfo
        # print(ticker)

        # 獲取深度信息
        depth = {}
        exchange_depth = await exchange.fetch_order_book(symbol)
        # 獲取asks,bids 最低5個,最高5個信息
        asks = exchange_depth.get('asks')[:5]
        bids = exchange_depth.get('bids')[:5]
        depth['asks'] = asks
        depth['bids'] = bids
        # print('depth:{}'.format(depth))
        data = {
            'exchange': exchange.id,
            'countries': exchange.countries,
            'symbol': symbol,
            'ticker': ticker,
            'depth': depth
        }

        # 保存數據
        save_exchangeDate(exchange.id, data)
        print('********* %s is finished, time %s *********' % (exchange.id, time.ctime()))

        # 等待時間
        await asyncio.sleep(2)


# 存庫
def save_exchangeDate(exchangeName, data):
    # 連接MongoDB
    connect = pymongo.MongoClient(host='localhost', port=27017)
    # 建立數據庫
    exchangeData = connect['exchangeDataAsyncio']
    # 建立表
    exchangeInformation = exchangeData[exchangeName]
    # print(table_name)
    # 數據去重後保存
    count = exchangeInformation.count()
    if not count > 0:
        exchangeInformation.insert_one(data)
    else:
        for item in exchangeInformation.find().skip(count - 1):
            lastdata = item
        if lastdata['ticker']['timestamp'] != data['ticker']['timestamp']:
            exchangeInformation.insert_one(data)

def main():
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(),
                  ccxt.gdax()]
    symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
    tasks = []
    for i in range(len(exchanges)):
        task = get_exchange_tickerDepth(exchanges[i], symbols[i])
        tasks.append(asyncio.ensure_future(task))

    loop = asyncio.get_event_loop()

    try:
        # print(asyncio.Task.all_tasks(loop))
        loop.run_forever()

    except Exception as e:
        print(e)
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

if __name__ == '__main__':
    main()

 

5、小結

  使用協程能夠實現高效的併發任務。Python在3.4中引入了協程的概念,但是這個仍是以生成器對象爲基礎,3.5則肯定了協程的語法。這裏只簡單的使用了asyncio。固然實現協程的不單單是asyncio,tornado和gevent都實現了相似的功能。這裏我有一個問題,就是運行一段時間後,裏面的市場可能有請求超時等狀況致使協程中止運行,我要怎樣才能獲取到錯誤而後重啓對應的協程。若是有大神知道的話請指點指點。

 

6、參考連接

1. Python黑魔法 --- 異步IO( asyncio) 協程  http://python.jobbole.com/87310/

2. Python併發編程之協程/異步IO  https://www.ziwenxie.site/2016/12/19/python-asyncio/

3. 從0到1,Python異步編程的演進之路  https://zhuanlan.zhihu.com/p/25228075

4. Tasks and coroutines  https://docs.python.org/3/library/asyncio-task.html

相關文章
相關標籤/搜索