最近,須要使用 Python 對 MongodB 作一些簡單的操做,不想使用各類繁重的框架。出於可重用性的考慮,想對 MongoDB Python 官方驅動 PyMongo 作下簡單封裝,百度一如既往的未能給我一個滿意的結果,因而有了下文。html
【正文】python
PyMongo,MongoDB Python官方驅動git
- docs: https://api.mongodb.com/python/current/index.html
- github: https://github.com/mongodb/mongo-python-driver
PyMongo 驅動幾乎支持 MongoDB 的所有特性,能夠鏈接單個的 MongoDB 數據庫、副本集和分片集羣。從提供的API角度來看,pymongo package是其核心,包含對數據庫的各類操做。本文將介紹一個簡單封裝類 DBManager。主要特性:對數據庫和集合的操做確保其存在性;支持PyMongo的原生操做,包括基本的CRUD操做、批量操做、MapReduce、多線程和多進程等;支持因果一致性會話和事務的流水線操做,並給出簡單示例。github
mongo_client 提供了鏈接 MongoDB 的MongoClient類:
class pymongo.mongo_client.MongoClient(host='localhost', port=27017, document_class=dict, tz_aware=False, connect=True, **kwargs)
每一個 MongoClient 實例 client (下文簡稱 client)都維護一個內建的鏈接池,默認 maxPoolsize 大小100。對於多線程的操做,鏈接池會給每個線程一個 socket 鏈接,直到達到最大的鏈接數,後續的線程會阻塞以等待有可用的鏈接被釋放。client 對 MongoDB 拓撲結構中的每一個server 還維護一個額外的鏈接來監聽 server 的狀態。mongodb
下面的 new_mongo_client
函數用於獲取一個數據庫鏈接的 client。其中,client.admin.command('ismaster')
用來檢查 server 的可用狀態,簡單省事不須要認證。數據庫
def new_mongo_client(uri, **kwargs): """Create new pymongo.mongo_client.MongoClient instance. DO NOT USE IT DIRECTLY.""" try: client = MongoClient(uri, maxPoolSize=1024, **kwargs) client.admin.command('ismaster') # The ismaster command is cheap and does not require auth. except ConnectionFailure: logging.error("new_mongo_client(): Server not available, Please check you uri: {}".format(uri)) return None else: return client
PyMongo 不是進程(fork-safe)安全的,但在一個進程中是線程安全(thread-safe)的。所以常見的場景是,對於一個MongoDB 環境,爲每個進程中建立一個 client ,後面全部的數據庫操做都使用這一個實例,包括多線程操做。永遠不要爲每一次操做都建立一個 MongoClient 實例,使用完後調用 MongoClient.close() 方法,這樣沒有必要並且會很是浪費性能。api
鑑於以上緣由,通常不宜直接使用new_mongo_client
函數獲取 client,而是進一步封裝爲get_mongo_client
方法。 其中全局常量 URI_CLIENT_DICT
保持着數據庫 URI 字符串與對應 clinet 的字典,一個 URI 對應一個 client 。代碼以下:數組
MONGO_URI_DEFAULT = 'mongodb://localhost:27017/admin' URI_CLIENT_DICT = {} # a dictionary hold all client with uri as key def get_mongo_client(uri=MONGO_URI_DEFAULT, fork=False, **kwargs): """Get pymongo.mongo_client.MongoClient instance. One mongodb uri, one client. @:param uri: mongodb uri @:param fork: for fork-safe in multiprocess case, if fork=True, return a new MongoClient instance, default False. @:param kwargs: refer to pymongo.mongo_client.MongoClient kwargs """ if fork: return new_mongo_client(uri, **kwargs) global URI_CLIENT_DICT matched_client = URI_CLIENT_DICT.get(uri) if matched_client is None: # no matched client new_client = new_mongo_client(uri, **kwargs) if new_client is not None: URI_CLIENT_DICT[uri] = new_client return new_client return matched_client
PyMongo 有個特性:對於不存在的數據庫、集合上的查詢不會報錯。以下,Ipython中演示在不存在xxDB 數據庫和 xxCollection 集合上的操做:安全
In [1]: from pymongo import MongoClient In [2]: client = MongoClient() # default uri is 'mongodb://localhost:27017/admin' In [3]: db = client.get_database('xxDB') # Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB') In [4]: coll = db.get_collection('XXCollection') # Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB'), u'XXCollection') In [5]: coll.find_one() # note: no tip, no error, no exception, return None In [6]: coll.insert_one({'hello' : 'what a fucking feature'}) Out[6]: <pymongo.results.InsertOneResult at 0x524ccc8> In [7]: coll.find_one() Out[7]: {u'_id': ObjectId('5c31c807bb048515b814d719'), u'hello': u'what a fucking feature'}
這對於手誤寫錯數據庫或集合名字後進行的後續操做,簡直就是災難。鑑於此因,有必要對獲取數據庫或集合時加上確認保護。
下面對於獲取數據庫,使用 MongoClient.list_database_names() 獲取全部的數據庫名字,若是數據庫名稱不在其中,則返回None。一樣的道理,對於集合使用 Database.list_collection_names()。注:因爲用戶權限問題形成的獲取數據庫或集合列表的操做報錯的狀況,默認不加確認保護。session
def get_existing_db(client, db_name): """Get existing pymongo.database.Database instance. @:param client: pymongo.mongo_client.MongoClient instance @:param db_name: database name wanted """ if client is None: logging.error('client {} is None'.format(client)) return None try: db_available_list = client.list_database_names() except PyMongoError as e: logging.error('client: {}, db_name: {}, client.list_database_names() error: {}'. format(client, db_name, repr(e))) else: if db_name not in db_available_list: logging.error('client {} has no db named {}'.format(client, db_name)) return None db = client.get_database(db_name) return db def get_existing_coll(db, coll_name): """Get existing pymongo.collection.Collection instance. @:param client: pymongo.mongo_client.MongoClient instance @:param coll_name: collection name wanted """ if db is None: logging.error('db {} is None'.format(db)) return None try: coll_available_list = db.list_collection_names() except PyMongoError as e: logging.error('db: {}, coll_name: {}, db.list_collection_names() error: {}'. format(db, coll_name, repr(e))) else: if coll_name not in coll_available_list: logging.error('db {} has no collection named {}'.format(db, coll_name)) return None coll = db.get_collection(coll_name) return coll
前文的冗長鋪墊主要是爲了引入這個 PyMongo 驅動封裝類 DBManger。
DBManger 類的實例保持的狀態有MongoClient實例 self.client
, 數據庫self.db
和 集合self.coll
,並經過屬性(property)對外開放。PyMongo 原生的方法對這裏的 client, db 和 coll 一樣適用。client 由類的構造器調用上文的get_mongo_client
方法獲取,db 和 coll 便可經過類的構造器獲取也可經過 self.db_name
和 self.coll_name
這些 setter 來切換。
DBManger 類的實例持有的方法 self.create_coll(self, db_name, coll_name)
, session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
。後兩種方法在下一節再具體解釋。
class DBManager: """A safe and simple pymongo packaging class ensuring existing database and collection. Operations: MongoClient level operations: https://api.mongodb.com/python/current/api/pymongo/mongo_client.html Database level operations: https://api.mongodb.com/python/current/api/pymongo/database.html Collection level operations: https://api.mongodb.com/python/current/api/pymongo/collection.html """ __default_uri = 'mongodb://localhost:27017/admin' __default_db_name = 'test' __default_coll_name = 'test' def __init__(self, uri=__default_uri, db_name=__default_db_name, coll_name=__default_coll_name, **kwargs): self.__uri = uri self.__db_name = db_name self.__coll_name = coll_name self.__client = get_mongo_client(uri, **kwargs) self.__db = get_existing_db(self.__client, db_name) self.__coll = get_existing_coll(self.__db, coll_name) def __str__(self): return u'uri: {}, db_name: {}, coll_name: {}, id_client: {}, client: {}, db: {}, coll: {}'.format( self.uri, self.db_name, self.coll_name, id(self.client), self.client, self.db, self.coll) @property def uri(self): return self.__uri @property def db_name(self): return self.__db_name @property def coll_name(self): return self.__coll_name @db_name.setter def db_name(self, db_name): self.__db_name = db_name self.__db = get_existing_db(self.__client, db_name) @coll_name.setter def coll_name(self, coll_name): self.__coll_name = coll_name self.__coll = get_existing_coll(self.__db, coll_name) @property def client(self): return self.__client @property def db(self): return self.__db @property def coll(self): # always use the current instance self.__db self.__coll = get_existing_coll(self.__db, self.__coll_name) return self.__coll def create_coll(self, db_name, coll_name): """Create new collection with new or existing database""" if self.__client is None: return None try: return self.__client.get_database(db_name).create_collection(coll_name) except CollectionInvalid: logging.error('collection {} already exists in database {}'.format(coll_name, db_name)) return None def session_pipeline(self, pipeline): if self.__client is None: logging.error('client is None in session_pipeline: {}'.format(self.__client)) return None with self.__client.start_session(causal_consistency=True) as session: result = [] for operation in pipeline: try: if operation.level == 'client': target = self.__client elif operation.level == 'db': target = self.__db elif operation.level == 'coll': target = self.__coll operation_name = operation.operation_name args = operation.args kwargs = operation.kwargs operator = getattr(target, operation_name) if type(args) == tuple: ops_rst = operator(*args, session=session, **kwargs) else: ops_rst = operator(args, session=session, **kwargs) if operation.callback is not None: operation.out = operation.callback(ops_rst) else: operation.out = ops_rst except Exception as e: logging.error('{} {} Exception, session_pipeline args: {}, kwargs: {}'.format( target, operation, args, kwargs)) logging.error('session_pipeline Exception: {}'.format(repr(e))) result.append(operation) return result # https://api.mongodb.com/python/current/api/pymongo/client_session.html#transactions def transaction_pipeline(self, pipeline): if self.__client is None: logging.error('client is None in transaction_pipeline: {}'.format(self.__client)) return None with self.__client.start_session(causal_consistency=True) as session: with session.start_transaction(): result = [] for operation in pipeline: try: if operation.level == 'client': target = self.__client elif operation.level == 'db': target = self.__db elif operation.level == 'coll': target = self.__coll operation_name = operation.operation_name args = operation.args kwargs = operation.kwargs operator = getattr(target, operation_name) if type(args) == tuple: ops_rst = operator(*args, session=session, **kwargs) else: ops_rst = operator(args, session=session, **kwargs) if operation.callback is not None: operation.out = operation.callback(ops_rst) else: operation.out = ops_rst except Exception as e: logging.error('{} {} Exception, transaction_pipeline args: {}, kwargs: {}'.format( target, operation, args, kwargs)) logging.error('transaction_pipeline Exception: {}'.format(repr(e))) raise Exception(repr(e)) result.append(operation) return result
這裏給出一些例子來講明 DBManager的使用方法。
# get DBManger instance var dbm = DBManager('mongodb://localhost:27017/admin') # db_name, coll_name default 'test' dbm.create_coll('testDB', 'testCollection') # change db or coll dbm.db_name = 'testDB' # dbm.db (test -> testDB) and dbm.coll (test.testCollection-> testDB.testCollection) will be changed at the same time dbm.coll_nmae = 'testCollection' # dbm.coll (test.test-> test.testCollection) will be change at the same time
# simple manipulation operation dbm.coll.insert_one({'hello': 'world'}) print(dbm.coll.find_one()) # {'_id': ObjectId('...'), 'hello': 'world'} dbm.coll.update_one({'hello': 'world'}, {'hello': 'hell'}) # bulk operation from pymongo import InsertOne, DeleteOne, ReplaceOne, ReplaceOne dbm.coll.bulk_write([InsertOne({'y':1}), DeleteOne({'x':1}), ReplaceOne({{'w':1}, {'z':1}, upsert=True})]) # simple managing operation import pymongo dbm.coll.create_index([('hello', pymongo.DESCENDING)], background=True) dbm.client.list_database_names() dbm.db.list_collection_names()
# thread concurrent import threading def fun(uri, db_name, coll_name): # new DBManager instance avoid variable competition dbm = DBManager(uri, db_name, coll_name) pass t = threading.Thread(target=func, args=(uri, db_name, coll_name)) t.start() # multiprocess parallel import multiprocessing def func(uri, db_name, coll_name): # new process, new client with fork=True parameter, and new DBManager instance. dbm = DBManager(uri, db_name, coll_name, fork=True) # Do something with db. pass proc = multiprocessing.Process(target=func, args=(uri, db_name, coll_name)) proc.start()
# MapReduce from bson.code import Code mapper = Code(''' function () {...} ''') reducer = Code(''' function (key, value) {...} ''') rst = dbm.coll.inline_map_reduce(mapper, reducer)
MongoDB Reference
- docs: https://docs.mongodb.com/manual/
- causal-consistency session: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
- transation: https://docs.mongodb.com/manual/core/transactions/#transactions
會話(session),是對數據庫鏈接的一種邏輯表示。從MongoDB 3.6開始,MongoDB引入了客戶端會話(client session),並在其中加入了對操做的因果一致性(causal-consistency)的支持。所以,更準確地說,這裏 DBManger 類封裝的實際上是因果一致性的會話,即client.start_session(causal_consistency=True)
。不過,一致性可以保證的前提是客戶端的應用應保證在一個會話中只有一個線程(thread)在作這些操做。在一個客戶端會話中,多個順序的讀寫操做獲得的結果與它們的執行順序將是因果一致的,讀寫的設置都自動設爲 "majority"。應用場景:先寫後讀,先讀後寫,一致性的寫,一致性的讀(Read your writes,Writes follow reads,Monotonic writes, Monotonic reads)。客戶端會話與服務端會話(server session)進行交互。從3.6版本開始,MongoDB驅動將全部的操做都關聯到服務端會話。服務端會話是客戶端會話順序操做因果一致性和重試寫操做的得以支持的底層框架。
MongoDB 對單個文檔的操做時是原子性的(atomic)。原子性是指一個操做的結果要麼有要麼沒有,不可再切割,換句話說叫 「all or nothing」。從MongoDB 4.0開始,副本集(Replica set)開始支持多個文檔級別的原子性,即多文檔事務(muti-document transaction)。在同一個事務中,對跨越不一樣數據庫或集合下的多個文檔操做,若是所有操做成功,則該事務被成功提交(commit);若是某些操做出現失敗,則整個事務會終止(abort),操做中對數據庫的改動會被丟棄。只有在事務被成功提交以後,操做的結果才能被事務外看到,事務正在進行或者事務失敗,其中的操做對外都不可見。單個mongod服務和分片集羣(sharded cluster)暫不支持事務。MongoDB官方預計在4.2版本左右對分片集羣加入對事務的支持。另外,須要注意的是,多文檔事務會引入更大的性能開銷,在場景容許的狀況下,儘量考慮用嵌套文檔或數組的單文檔操做方式來解決問題。
會話和事務的主要應用場景其實都是多個的時序性操做,即流水線形式。所以 DBManager 加入了session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
的操做方法。首先引入表徵操做的類Operation,描述一個操做做用的層次(client, db或coll)、操做方法、參數和操做結果須要調用的回調函數,見名知意,再也不贅解。多個操做 Operation 類的實例構成的list 爲pipeline, 做爲session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
的輸入參數。pipeline 操做的每一步的輸出會寫入到對應Operation 類的實例的out屬性中。
class Operation: """Operation for constructing sequential pipeline. Only used in DBManager.session_pipeline() or transaction_pipeline(). Constructor parameters: level: <'client' | 'db' | 'coll'> indicating different operation level, MongoClient, Database, Collection operation_name: Literally, the name of operation on specific level args: position arguments the operation need. Require the first parameter or a tuple of parameters of the operation. kwargs: key word arguments the operation need. callback: callback function for operation result Examples: # pymongo.collection.Collection.find(filter, projection, skip=None, limit=None,...) Operation('coll', 'find', {'x': 5}) only filter parameter, equivalent to: Operation('coll', 'find', args={'x': 5}) or Operation('coll', 'find', kwargs={filter: {'x': 5}}) Operation('coll', 'find', ({'x': 5},{'_id': 0}) {'limit':100}), equivalent to: Operation('coll', 'find', args=({'x': 5},{'_id': 0}, None, {'limit':100}) ), OR Operation('coll', 'find', kwargs={'filter':{'x': 5}, 'projection': {'_id': 0},'limit':100}) def cursor_callback(cursor): return cursor.distinct('hello') Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback) """ def __init__(self, level, operation_name, args=(), kwargs={}, callback=None): self.level = level self.operation_name = operation_name self.args = args if kwargs is None: self.kwargs = None else: self.kwargs = kwargs self.callback = callback self.out = None
基於 DBManager 和 Operation 的因果一致性的會話和事務的簡單示例以下:
# causal-consistency session or transaction pipeline operation def cursor_callback(cursor): return cursor.distinct('hello') op_1 = Operation('coll', 'insert_one', {'hello': 'heaven'}) op_2 = Operation('coll', 'insert_one', {'hello': 'hell'}) op_3 = Operation('coll', 'insert_one', {'hello': 'god'}) op_4 = Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback) op_5 = Operation('coll', 'find_one', {'hello': 'god'}) pipeline = [op_1, op_2, op_3, op_4, op_5] ops = dbm.transaction_pipeline(pipeline) # only on replica set deployment # ops = dbm.session_pipeline(pipeline) # can be standalone, replica set or sharded cluster. for op in ops: print(op.out)
【正文完】
注:內容同步自同名CSDN博客:https://blog.csdn.net/fzlulee/article/details/85944967