工做的項目直接用了pymongo來操做直接操做mongodb。對於用慣了Djongo ORM的我來講,這種方式簡直太不優雅。在網上找了一下,發現有一個mongoengine的 orm庫。雖然項目暫時沒辦法改造。我就想先來研究一下這個庫的源碼。由於這是第一次把本身閱讀源碼的心得寫出來。因此可能會比較囉嗦。請各位見諒!mongodb
在 __init__ 中,定義了__all__,在包級別就能夠方便的導如全部模塊的符號數據庫
#coding:utf-8 # Import submodules so that we can expose their __all__ from mongoengine import connection from mongoengine import document from mongoengine import errors from mongoengine import fields from mongoengine import queryset from mongoengine import signals # Import everything from each submodule so that it can be accessed via # mongoengine, e.g. instead of `from mongoengine.connection import connect`, # users can simply use `from mongoengine import connect`, or even # `from mongoengine import *` and then `connect('testdb')`. from mongoengine.connection import * from mongoengine.document import * from mongoengine.errors import * from mongoengine.fields import * from mongoengine.queryset import * from mongoengine.signals import * """ 定義了當你使用 from module import * 導入 某個模塊的時候能導出的符號(這裏表明變量,函數,類等) """ __all__ = (list(document.__all__) + list(fields.__all__) + list(connection.__all__) + list(queryset.__all__) + list(signals.__all__) + list(errors.__all__)) VERSION = (0, 11, 0) def get_version(): """Return the VERSION as a string, e.g. for VERSION == (0, 10, 7), return '0.10.7'. """ """ map()函數接收兩個參數, 一個是函數,一個是序列,map將傳入的函數依次做用到序列的每一個元素, 並把結果做爲新的list返回 """ return '.'.join(map(str, VERSION)) __version__ = get_version()
from pymongo import MongoClient, ReadPreference, uri_parser import six from mongoengine.python_support import IS_PYMONGO_3 __all__ = ['MongoEngineConnectionError', 'connect', 'register_connection', 'DEFAULT_CONNECTION_NAME'] DEFAULT_CONNECTION_NAME = 'default' if IS_PYMONGO_3: READ_PREFERENCE = ReadPreference.PRIMARY else: from pymongo import MongoReplicaSetClient READ_PREFERENCE = False class MongoEngineConnectionError(Exception): """Error raised when the database connection can't be established or when a connection with a requested alias can't be retrieved. """ pass _connection_settings = {} _connections = {} _dbs = {} def register_connection(alias, name=None, host=None, port=None, read_preference=READ_PREFERENCE, username=None, password=None, authentication_source=None, authentication_mechanism=None, **kwargs): """Add a connection. :param alias: the name that will be used to refer to this connection throughout MongoEngine :param name: the name of the specific database to use :param host: the host name of the :program:`mongod` instance to connect to :param port: the port that the :program:`mongod` instance is running on :param read_preference: The read preference for the collection ** Added pymongo 2.1 :param username: username to authenticate with :param password: password to authenticate with :param authentication_source: database to authenticate against :param authentication_mechanism: database authentication mechanisms. By default, use SCRAM-SHA-1 with MongoDB 3.0 and later, MONGODB-CR (MongoDB Challenge Response protocol) for older servers. :param is_mock: explicitly use mongomock for this connection (can also be done by using `mongomock://` as db host prefix) :param kwargs: allow ad-hoc parameters to be passed into the pymongo driver .. versionchanged:: 0.10.6 - added mongomock support """ conn_settings = { 'name': name or 'test', 'host': host or 'localhost', 'port': port or 27017, 'read_preference': read_preference, 'username': username, 'password': password, 'authentication_source': authentication_source, 'authentication_mechanism': authentication_mechanism } # Handle uri style connections conn_host = conn_settings['host'] # host can be a list or a string, so if string, force to a list if isinstance(conn_host, six.string_types): conn_host = [conn_host] resolved_hosts = [] for entity in conn_host: # Handle Mongomock if entity.startswith('mongomock://'): conn_settings['is_mock'] = True # `mongomock://` is not a valid url prefix and must be replaced by `mongodb://` resolved_hosts.append(entity.replace('mongomock://', 'mongodb://', 1)) # Handle URI style connections, only updating connection params which # were explicitly specified in the URI. elif '://' in entity: uri_dict = uri_parser.parse_uri(entity) resolved_hosts.append(entity) if uri_dict.get('database'): conn_settings['name'] = uri_dict.get('database') for param in ('read_preference', 'username', 'password'): if uri_dict.get(param): conn_settings[param] = uri_dict[param] uri_options = uri_dict['options'] if 'replicaset' in uri_options: conn_settings['replicaSet'] = True if 'authsource' in uri_options: conn_settings['authentication_source'] = uri_options['authsource'] if 'authmechanism' in uri_options: conn_settings['authentication_mechanism'] = uri_options['authmechanism'] else: resolved_hosts.append(entity) conn_settings['host'] = resolved_hosts # Deprecated parameters that should not be passed on kwargs.pop('slaves', None) kwargs.pop('is_slave', None) conn_settings.update(kwargs) _connection_settings[alias] = conn_settings def disconnect(alias=DEFAULT_CONNECTION_NAME): """Close the connection with a given alias.""" if alias in _connections: get_connection(alias=alias).close() del _connections[alias] if alias in _dbs: del _dbs[alias] def get_connection(alias=DEFAULT_CONNECTION_NAME, reconnect=False): """Return a connection with a given alias.""" # Connect to the database if not already connected if reconnect: disconnect(alias) # If the requested alias already exists in the _connections list, return # it immediately. if alias in _connections: return _connections[alias] # Validate that the requested alias exists in the _connection_settings. # Raise MongoEngineConnectionError if it doesn't. if alias not in _connection_settings: if alias == DEFAULT_CONNECTION_NAME: msg = 'You have not defined a default connection' else: msg = 'Connection with alias "%s" has not been defined' % alias raise MongoEngineConnectionError(msg) def _clean_settings(settings_dict): irrelevant_fields = set([ 'name', 'username', 'password', 'authentication_source', 'authentication_mechanism' ]) return { k: v for k, v in settings_dict.items() if k not in irrelevant_fields } # Retrieve a copy of the connection settings associated with the requested # alias and remove the database name and authentication info (we don't # care about them at this point). conn_settings = _clean_settings(_connection_settings[alias].copy()) # Determine if we should use PyMongo's or mongomock's MongoClient. is_mock = conn_settings.pop('is_mock', False) if is_mock: try: import mongomock except ImportError: raise RuntimeError('You need mongomock installed to mock ' 'MongoEngine.') connection_class = mongomock.MongoClient else: connection_class = MongoClient # Handle replica set connections if 'replicaSet' in conn_settings: # Discard port since it can't be used on MongoReplicaSetClient conn_settings.pop('port', None) # Discard replicaSet if it's not a string if not isinstance(conn_settings['replicaSet'], six.string_types): del conn_settings['replicaSet'] # For replica set connections with PyMongo 2.x, use # MongoReplicaSetClient. # TODO remove this once we stop supporting PyMongo 2.x. if not IS_PYMONGO_3: connection_class = MongoReplicaSetClient conn_settings['hosts_or_uri'] = conn_settings.pop('host', None) # Iterate over all of the connection settings and if a connection with # the same parameters is already established, use it instead of creating # a new one. existing_connection = None connection_settings_iterator = ( (db_alias, settings.copy()) for db_alias, settings in _connection_settings.items() ) for db_alias, connection_settings in connection_settings_iterator: connection_settings = _clean_settings(connection_settings) if conn_settings == connection_settings and _connections.get(db_alias): existing_connection = _connections[db_alias] break # If an existing connection was found, assign it to the new alias if existing_connection: _connections[alias] = existing_connection else: # Otherwise, create the new connection for this alias. Raise # MongoEngineConnectionError if it can't be established. try: _connections[alias] = connection_class(**conn_settings) except Exception as e: raise MongoEngineConnectionError( 'Cannot connect to database %s :\n%s' % (alias, e)) return _connections[alias] def get_db(alias=DEFAULT_CONNECTION_NAME, reconnect=False): if reconnect: disconnect(alias) if alias not in _dbs: conn = get_connection(alias) conn_settings = _connection_settings[alias] db = conn[conn_settings['name']] auth_kwargs = {'source': conn_settings['authentication_source']} if conn_settings['authentication_mechanism'] is not None: auth_kwargs['mechanism'] = conn_settings['authentication_mechanism'] # Authenticate if necessary if conn_settings['username'] and (conn_settings['password'] or conn_settings['authentication_mechanism'] == 'MONGODB-X509'): db.authenticate(conn_settings['username'], conn_settings['password'], **auth_kwargs) _dbs[alias] = db return _dbs[alias] def connect(db=None, alias=DEFAULT_CONNECTION_NAME, **kwargs): """Connect to the database specified by the 'db' argument. Connection settings may be provided here as well if the database is not running on the default port on localhost. If authentication is needed, provide username and password arguments as well. Multiple databases are supported by using aliases. Provide a separate `alias` to connect to a different instance of :program:`mongod`. .. versionchanged:: 0.6 - added multiple database support. """ if alias not in _connections: register_connection(alias, db, **kwargs) return get_connection(alias) # Support old naming convention _get_connection = get_connection _get_db = get_db