目標html
選取幾個比特幣交易量大的幾個交易平臺,查看對應的API,獲取該市場下貨幣對的ticker和depth信息。咱們從網站上選取4個交易平臺:bitfinex、okex、binance、gdax。對應的交易對是BTC/USD,BTC/USDT,BTC/USDT,BTC/USD。python
一、ccxt庫git
開始想着直接請求市場的API,而後再解析獲取下來的數據,但到github上發現一個比較好得python庫,裏面封裝好了獲取比特幣市場的相關函數,這樣一來就省掉分析API的時間了。所以我只要傳入市場以及對應的貨幣對,利用庫裏面的函數 fetch_ticker 和 fetch_order_book 就能夠獲取到市場的ticker和depth信息(具體的使用方法能夠查看ccxt手冊)。接下來以市場okex爲例,利用ccxt庫獲取okex的ticker和depth信息。github
# 引入庫 import ccxt # 實例化市場 exchange = ccxt.okex() # 交易對 symbol = 'BTC/USDT' # 獲取ticker信息 ticker = exchange.fetch_ticker(symbol) # 獲取depth信息 depth = exchange.fetch_order_book(symbol) print('ticker:%s, depth:%s' % (ticker, depth))
運行後會獲得結果以下圖,今後能夠看出已經獲取到了ticker和depth信息。mongodb
2、獲取四個市場的信息(for循環)數據庫
接下來咱們獲取四個市場的信息,深度裏面有asks和bids,數據量稍微有點兒多,這裏depth信息我只去前面五個,對於ticker我也只提取裏面的info信息(具體表明什麼含義就要參考一下對應市場的API啦)。將其簡單的封裝後,最開始我想的是for循環。想到啥就開始吧:編程
# 引入庫 import ccxt import time now = lambda: time.time() start = now() def getData(exchange, symbol): data = {} # 用於存儲ticker和depth信息 # 獲取ticker信息 tickerInfo = exchange.fetch_ticker(symbol) # 獲取depth信息 depth = {} # 獲取深度信息 exchange_depth = exchange.fetch_order_book(symbol) # 獲取asks,bids 最低5個,最高5個信息 asks = exchange_depth.get('asks')[:5] bids = exchange_depth.get('bids')[:5] depth['asks'] = asks depth['bids'] = bids data['ticker'] = tickerInfo data['depth'] = depth return data def main(): # 實例化市場 exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()] # 交易對 symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD'] for i in range(len(exchanges)): exchange = exchanges[i] symbol = symbols[i] data = getData(exchange, symbol) print('exchange: %s data is %s' % (exchange.id, data)) if __name__ == '__main__': main() print('Run Time: %s' % (now() - start))
運行後會發現雖然每一個市場的信息都獲取到了,執行完差很少花掉5.7秒,由於這是同步的,也就是按順序執行的,要是要想每隔必定時間同時獲取四個市場的信息,很顯然這種結果不符合咱們的要求。併發
3、異步加協程(coroutine)app
前面講的循環雖然能夠輸出結果,但耗時長並且達不到想要的效果,接下來採用異步加協程(參考知乎上的一篇文章),要用到異步首先得引入asyncio庫,這個庫是3.4之後纔有的,它提供了一種機制,使得你能夠用協程(coroutines)、IO複用(multiplexing I/O)在單線程環境中編寫併發模型。這裏python文檔有個小例子。異步
import asyncio async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
當事件循環開始運行時,它會在Task中尋找coroutine來執行調度,由於事件循環註冊了print_sum(),所以print_sum()被調用,執行result = await compute(x, y)這條語句(等同於result = yield from compute(x, y)),由於compute()自身就是一個coroutine,所以print_sum()這個協程就會暫時被掛起,compute()被加入到事件循環中,程序流執行compute()中的print語句,打印」Compute %s + %s …」,而後執行了await asyncio.sleep(1.0),由於asyncio.sleep()也是一個coroutine,接着compute()就會被掛起,等待計時器讀秒,在這1秒的過程當中,事件循環會在隊列中查詢能夠被調度的coroutine,而由於此前print_sum()與compute()都被掛起了,所以事件循環會停下來等待協程的調度,當計時器讀秒結束後,程序流便會返回到compute()中執行return語句,結果會返回到print_sum()中的result中,最後打印result,事件隊列中沒有能夠調度的任務了,此時loop.close()把事件隊列關閉,程序結束。
接下來咱們採用異步和協程(ps:ccxt庫也有對應的異步),運行後發現時間只用了1.9秒,比以前快了好多倍。
Run Time: 1.9661316871643066
相關代碼:
# 引入庫 import ccxt.async as ccxt import asyncio import time now = lambda: time.time() start = now() async def getData(exchange, symbol): data = {} # 用於存儲ticker和depth信息 # 獲取ticker信息 tickerInfo = await exchange.fetch_ticker(symbol) # 獲取depth信息 depth = {} # 獲取深度信息 exchange_depth = await exchange.fetch_order_book(symbol) # 獲取asks,bids 最低5個,最高5個信息 asks = exchange_depth.get('asks')[:5] bids = exchange_depth.get('bids')[:5] depth['asks'] = asks depth['bids'] = bids data['ticker'] = tickerInfo data['depth'] = depth return data def main(): # 實例化市場 exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()] # 交易對 symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD'] tasks = [] for i in range(len(exchanges)): task = getData(exchanges[i], symbols[i]) tasks.append(asyncio.ensure_future(task)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) if __name__ == '__main__': main() print('Run Time: %s' % (now() - start))
3、定時爬取並用mongodb保存數據
在前面的基礎上,添加一個定時任務,實現每隔一段時間爬取一次數據,並將數據保存到mongodb數據庫。只需再前面的代碼上稍微改改就能夠啦,代碼和運行結果以下:
import asyncio import ccxt.async as ccxt import time import pymongo # 獲取ticker和depth信息 async def get_exchange_tickerDepth(exchange, symbol): # 其中exchange爲實例化後的市場 # print('start get_ticker') while True: print('%s is run %s' % (exchange.id, time.ctime())) # 獲取ticher信息 tickerInfo = await exchange.fetch_ticker(symbol) ticker = tickerInfo.get('info') if type(ticker) == type({}): ticker['timestamp'] = tickerInfo.get('timestamp') ticker['high'] = tickerInfo.get('high') ticker['low'] = tickerInfo.get('low') ticker['last'] = tickerInfo.get('last') else: ticker = tickerInfo # print(ticker) # 獲取深度信息 depth = {} exchange_depth = await exchange.fetch_order_book(symbol) # 獲取asks,bids 最低5個,最高5個信息 asks = exchange_depth.get('asks')[:5] bids = exchange_depth.get('bids')[:5] depth['asks'] = asks depth['bids'] = bids # print('depth:{}'.format(depth)) data = { 'exchange': exchange.id, 'countries': exchange.countries, 'symbol': symbol, 'ticker': ticker, 'depth': depth } # 保存數據 save_exchangeDate(exchange.id, data) print('********* %s is finished, time %s *********' % (exchange.id, time.ctime())) # 等待時間 await asyncio.sleep(2) # 存庫 def save_exchangeDate(exchangeName, data): # 連接MongoDB connect = pymongo.MongoClient(host='localhost', port=27017) # 建立數據庫 exchangeData = connect['exchangeDataAsyncio'] # 建立表 exchangeInformation = exchangeData[exchangeName] # print(table_name) # 數據去重後保存 count = exchangeInformation.count() if not count > 0: exchangeInformation.insert_one(data) else: for item in exchangeInformation.find().skip(count - 1): lastdata = item if lastdata['ticker']['timestamp'] != data['ticker']['timestamp']: exchangeInformation.insert_one(data) def main(): exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()] symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD'] tasks = [] for i in range(len(exchanges)): task = get_exchange_tickerDepth(exchanges[i], symbols[i]) tasks.append(asyncio.ensure_future(task)) loop = asyncio.get_event_loop() try: # print(asyncio.Task.all_tasks(loop)) loop.run_forever() except Exception as e: print(e) loop.stop() loop.run_forever() finally: loop.close() if __name__ == '__main__': main()
5、小結
使用協程能夠實現高效的併發任務。Python在3.4中引入了協程的概念,但是這個仍是以生成器對象爲基礎,3.5則肯定了協程的語法。這裏只簡單的使用了asyncio。固然實現協程的不單單是asyncio,tornado和gevent都實現了相似的功能。這裏我有一個問題,就是運行一段時間後,裏面的市場可能有請求超時等狀況致使協程中止運行,我要怎樣才能獲取到錯誤而後重啓對應的協程。若是有大神知道的話請指點指點。
6、參考連接
1. Python黑魔法 --- 異步IO( asyncio) 協程 http://python.jobbole.com/87310/
2. Python併發編程之協程/異步IO https://www.ziwenxie.site/2016/12/19/python-asyncio/
3. 從0到1,Python異步編程的演進之路 https://zhuanlan.zhihu.com/p/25228075
4. Tasks and coroutines https://docs.python.org/3/library/asyncio-task.html