服務啓動後,用戶就能夠像proxy發送請求了,咱們都知道proxy會轉發請求到相應的object-server上,進行處理。一個請求來了之後主要的流程會經過鑑權,而後server.py中的handle_request()方法會判斷鑑權結果,若是經過了鑑權,執行相應的請求,下面咱們就經過一個PUT請求的例子(上傳文件)來分析服務轉發的過程。
node
服務轉發模型python
對於一個PUT請求,proxy判斷是什麼controlller(object,container,account),以後調用相應controller的PUT ,好比你PUT一個object,就會執行obj.py下的PUT方法。web
PUT請求主要的功能大概分三個部分,第一部分,會解析請求,進行相應的設置,第二部分轉發請求,第三部分,根據轉發請求的到的響應,製做最終的響應,而後返回。swift
如圖根據你要上傳的object的account container object ,經過ring文件會返回三個node,緩存
而後生成這個三個node的conn,而後每一個conn初始化一個Queue和一個線程函數,app
而後程序會想管道中put數據,而後線程函數會get 數據,而後轉發。less
數據所有轉發後,跳出put操做,接受響應,而後進入第三步。ide
源碼:函數
def __call__(self, env, start_response): """ WSGI entry point. Wraps env in webob.Request object and passes it down. :param env: WSGI environment dictionary :param start_response: WSGI callable """ try: if self.memcache is None: self.memcache = cache_from_env(env)#若是緩存爲空,緩存env req = self.update_request(Request(env)) return self.handle_request(req)(env, start_response)#請求處理函數 except UnicodeError: err = HTTPPreconditionFailed(request=req, body='Invalid UTF8') return err(env, start_response) except (Exception, Timeout): start_response('500 Server Error', [('Content-Type', 'text/plain')]) return ['Internal server error.\n']
重點來了 handle_request()ui
def handle_request(self, req): """ Entry point for proxy server. Should return a WSGI-style callable (such as webob.Response). :param req: webob.Request object """ try: self.logger.set_statsd_prefix('proxy-server') if req.content_length and req.content_length < 0:#content_length存在卻是小於0 返回錯誤。 self.logger.increment('errors') return HTTPBadRequest(request=req, body='Invalid Content-Length') try: if not check_utf8(req.path_info):#是utf8返回錯誤 self.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Invalid UTF8') except UnicodeError: self.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Invalid UTF8') try: controller, path_parts = self.get_controller(req.path)#根據path獲取相應的controller,path_parts p = req.path_info if isinstance(p, unicode): p = p.encode('utf-8') except ValueError: self.logger.increment('errors') return HTTPNotFound(request=req) if not controller: self.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Bad URL') if self.deny_host_headers and \ req.host.split(':')[0] in self.deny_host_headers: return HTTPForbidden(request=req, body='Invalid host header') self.logger.set_statsd_prefix('proxy-server.' + controller.server_type.lower()) controller = controller(self, **path_parts)#實例化controller if 'swift.trans_id' not in req.environ: # if this wasn't set by an earlier middleware, set it now trans_id = 'tx' + uuid.uuid4().hex req.environ['swift.trans_id'] = trans_id self.logger.txn_id = trans_id req.headers['x-trans-id'] = req.environ['swift.trans_id'] controller.trans_id = req.environ['swift.trans_id'] self.logger.client_ip = get_remote_client(req) try: handler = getattr(controller, req.method)#獲取處理方法句柄 getattr(handler, 'publicly_accessible') except AttributeError: return HTTPMethodNotAllowed(request=req) if path_parts['version']: req.path_info_pop() if 'swift.authorize' in req.environ:#swift.authrize是swift_auth提供的句柄, # We call authorize before the handler, always. If authorized, # we remove the swift.authorize hook so isn't ever called # again. If not authorized, we return the denial unless the # controller's method indicates it'd like to gather more # information and try again later. resp = req.environ['swift.authorize'](req)#若是存在這個句柄調用這個方法 if not resp: # No resp means authorized, no delayed recheck required. del req.environ['swift.authorize']#若是鑑權成功,刪除環境句柄 else: # Response indicates denial, but we might delay the denial # and recheck later. If not delayed, return the error now. if not getattr(handler, 'delay_denial', None): return resp # Save off original request method (GET, POST, etc.) in case it # gets mutated during handling. This way logging can display the # method the client actually sent. req.environ['swift.orig_req_method'] = req.method return handler(req)#執行方法 except (Exception, Timeout): self.logger.exception(_('ERROR Unhandled exception in request')) return HTTPServerError(request=req)
最終執行obj.py中的PUT方法。
其中_send_file是轉發線程函數,_connect_put_node()方法創建鏈接。
def _send_file(self, conn, path): """Method for a file PUT coro""" while True: chunk = conn.queue.get()#從隊列裏取 if not conn.failed: try: with ChunkWriteTimeout(self.app.node_timeout): conn.send(chunk)#發送 except (Exception, ChunkWriteTimeout): conn.failed = True self.exception_occurred(conn.node, _('Object'), _('Trying to write to %s') % path) conn.queue.task_done()#一次取任務的結束 def _connect_put_node(self, nodes, part, path, headers, logger_thread_locals): """Method for a file PUT connect""" self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', path, headers)#與相應的node創建鏈接 with Timeout(self.app.node_timeout): resp = conn.getexpect()#? if resp.status == HTTP_CONTINUE: conn.node = node return conn#返回鏈接 elif resp.status == HTTP_INSUFFICIENT_STORAGE: self.error_limit(node) except: self.exception_occurred(node, _('Object'), _('Expect: 100-continue on %s') % path) @public @delay_denial def PUT(self, req):@PUT方法 """HTTP PUT request handler.""" (container_partition, containers, _junk, req.acl, req.environ['swift_sync_key'], object_versions) = \ self.container_info(self.account_name, self.container_name, account_autocreate=self.app.account_autocreate)#獲取相關的container,account信息。 if 'swift.authorize' in req.environ:#???????????????? aresp = req.environ['swift.authorize'](req) if aresp: return aresp if not containers:#沒有相應的container返回NOTFOUND return HTTPNotFound(request=req) if 'x-delete-after' in req.headers:#設置object的有效期限 try: x_delete_after = int(req.headers['x-delete-after']) except ValueError: return HTTPBadRequest(request=req, content_type='text/plain', body='Non-integer X-Delete-After') req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after) if 'x-delete-at' in req.headers: try: x_delete_at = int(req.headers['x-delete-at']) if x_delete_at < time.time(): return HTTPBadRequest(body='X-Delete-At in past', request=req, content_type='text/plain') except ValueError: return HTTPBadRequest(request=req, content_type='text/plain', body='Non-integer X-Delete-At') delete_at_container = str(x_delete_at / self.app.expiring_objects_container_divisor * self.app.expiring_objects_container_divisor) delete_at_part, delete_at_nodes = \ self.app.container_ring.get_nodes( self.app.expiring_objects_account, delete_at_container) else: delete_at_part = delete_at_nodes = None partition, nodes = self.app.object_ring.get_nodes(#從ring文件中找出相應的partition,node。 self.account_name, self.container_name, self.object_name) # do a HEAD request for container sync and checking object versions#時間戳 if 'x-timestamp' in req.headers or (object_versions and not req.environ.get('swift_versioned_copy')): hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, environ={'REQUEST_METHOD': 'HEAD'}) hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes, hreq.path_info, len(nodes)) # Used by container sync feature if 'x-timestamp' in req.headers: try: req.headers['X-Timestamp'] = \ normalize_timestamp(float(req.headers['x-timestamp'])) if hresp.environ and 'swift_x_timestamp' in hresp.environ and \ float(hresp.environ['swift_x_timestamp']) >= \ float(req.headers['x-timestamp']): return HTTPAccepted(request=req) except ValueError: return HTTPBadRequest(request=req, content_type='text/plain', body='X-Timestamp should be a UNIX timestamp float value; ' 'was %r' % req.headers['x-timestamp']) else: req.headers['X-Timestamp'] = normalize_timestamp(time.time()) # Sometimes the 'content-type' header exists, but is set to None. content_type_manually_set = True if not req.headers.get('content-type'): guessed_type, _junk = mimetypes.guess_type(req.path_info) req.headers['Content-Type'] = guessed_type or \ 'application/octet-stream' content_type_manually_set = False error_response = check_object_creation(req, self.object_name) if error_response: return error_response if object_versions and not req.environ.get('swift_versioned_copy'): is_manifest = 'x-object-manifest' in req.headers or \ 'x-object-manifest' in hresp.headers if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:#判斷是否須要調用COPY # This is a version manifest and needs to be handled # differently. First copy the existing data to a new object, # then write the data from this request to the version manifest # object. lcontainer = object_versions.split('/')[0] prefix_len = '%03x' % len(self.object_name) lprefix = prefix_len + self.object_name + '/' ts_source = hresp.environ.get('swift_x_timestamp') if ts_source is None: ts_source = time.mktime(time.strptime( hresp.headers['last-modified'], '%a, %d %b %Y %H:%M:%S GMT')) new_ts = normalize_timestamp(ts_source) vers_obj_name = lprefix + new_ts copy_headers = { 'Destination': '%s/%s' % (lcontainer, vers_obj_name)} copy_environ = {'REQUEST_METHOD': 'COPY', 'swift_versioned_copy': True } copy_req = Request.blank(req.path_info, headers=copy_headers, environ=copy_environ) copy_resp = self.COPY(copy_req) if is_client_error(copy_resp.status_int): # missing container or bad permissions return HTTPPreconditionFailed(request=req) elif not is_success(copy_resp.status_int): # could not copy the data, bail return HTTPServiceUnavailable(request=req) reader = req.environ['wsgi.input'].read data_source = iter(lambda: reader(self.app.client_chunk_size), '') source_header = req.headers.get('X-Copy-From') source_resp = None if source_header:#若是請求爲從COPY-From source_header = unquote(source_header) acct = req.path_info.split('/', 2)[1] if isinstance(acct, unicode): acct = acct.encode('utf-8') if not source_header.startswith('/'): source_header = '/' + source_header source_header = '/' + acct + source_header try: src_container_name, src_obj_name = \ source_header.split('/', 3)[2:] except ValueError: return HTTPPreconditionFailed(request=req, body='X-Copy-From header must be of the form' '<container name>/<object name>') source_req = req.copy_get() source_req.path_info = source_header source_req.headers['X-Newest'] = 'true' orig_obj_name = self.object_name orig_container_name = self.container_name self.object_name = src_obj_name self.container_name = src_container_name source_resp = self.GET(source_req) if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: return source_resp self.object_name = orig_obj_name self.container_name = orig_container_name new_req = Request.blank(req.path_info, environ=req.environ, headers=req.headers) data_source = source_resp.app_iter new_req.content_length = source_resp.content_length if new_req.content_length is None: # This indicates a transfer-encoding: chunked source object, # which currently only happens because there are more than # CONTAINER_LISTING_LIMIT segments in a segmented object. In # this case, we're going to refuse to do the server-side copy. return HTTPRequestEntityTooLarge(request=req) new_req.etag = source_resp.etag # we no longer need the X-Copy-From header del new_req.headers['X-Copy-From'] if not content_type_manually_set: new_req.headers['Content-Type'] = \ source_resp.headers['Content-Type'] if new_req.headers.get('x-fresh-metadata', 'false').lower() \ not in TRUE_VALUES: for k, v in source_resp.headers.items(): if k.lower().startswith('x-object-meta-'): new_req.headers[k] = v for k, v in req.headers.items(): if k.lower().startswith('x-object-meta-'): new_req.headers[k] = v req = new_req node_iter = self.iter_nodes(partition, nodes, self.app.object_ring) pile = GreenPile(len(nodes)) for container in containers: nheaders = dict(req.headers.iteritems()) nheaders['Connection'] = 'close' nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container nheaders['X-Container-Partition'] = container_partition nheaders['X-Container-Device'] = container['device'] nheaders['Expect'] = '100-continue' if delete_at_nodes: node = delete_at_nodes.pop(0) nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node nheaders['X-Delete-At-Partition'] = delete_at_part nheaders['X-Delete-At-Device'] = node['device'] pile.spawn(self._connect_put_node, node_iter, partition, req.path_info, nheaders, self.app.logger.thread_locals) conns = [conn for conn in pile if conn] if len(conns) <= len(nodes) / 2: self.app.logger.error( _('Object PUT returning 503, %(conns)s/%(nodes)s ' 'required connections'), {'conns': len(conns), 'nodes': len(nodes) // 2 + 1}) return HTTPServiceUnavailable(request=req) chunked = req.headers.get('transfer-encoding') bytes_transferred = 0 try: with ContextPool(len(nodes)) as pool:#若是就是上傳一個新的object,最前的初始化,和邏輯操做作好後,下面是真正的轉發請求,使用queue for conn in conns: conn.failed = False conn.queue = Queue(self.app.put_queue_depth) pool.spawn(self._send_file, conn, req.path) while True: with ChunkReadTimeout(self.app.client_timeout): try: chunk = next(data_source) except StopIteration: if chunked: [conn.queue.put('0\r\n\r\n') for conn in conns] break bytes_transferred += len(chunk) if bytes_transferred > MAX_FILE_SIZE: return HTTPRequestEntityTooLarge(request=req) for conn in list(conns): if not conn.failed: conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) if chunked else chunk) else: conns.remove(conn) if len(conns) <= len(nodes) / 2: self.app.logger.error(_('Object PUT exceptions during' ' send, %(conns)s/%(nodes)s required connections'), {'conns': len(conns), 'nodes': len(nodes) / 2 + 1}) return HTTPServiceUnavailable(request=req) for conn in conns: if conn.queue.unfinished_tasks: conn.queue.join() conns = [conn for conn in conns if not conn.failed] except ChunkReadTimeout, err: self.app.logger.warn( _('ERROR Client read timeout (%ss)'), err.seconds) self.app.logger.increment('client_timeouts') return HTTPRequestTimeout(request=req) except (Exception, Timeout): self.app.logger.exception( _('ERROR Exception causing client disconnect')) return HTTPClientDisconnect(request=req) if req.content_length and bytes_transferred < req.content_length: req.client_disconnect = True self.app.logger.warn( _('Client disconnected without sending enough data')) self.app.logger.increment('client_disconnects') return HTTPClientDisconnect(request=req) statuses = []#後面的操做,生成響應,而後返回。 reasons = [] bodies = [] etags = set() for conn in conns: try: with Timeout(self.app.node_timeout): response = conn.getresponse() statuses.append(response.status) reasons.append(response.reason) bodies.append(response.read()) if response.status >= HTTP_INTERNAL_SERVER_ERROR: self.error_occurred(conn.node, _('ERROR %(status)d %(body)s From Object Server ' \ 're: %(path)s') % {'status': response.status, 'body': bodies[-1][:1024], 'path': req.path}) elif is_success(response.status): etags.add(response.getheader('etag').strip('"')) except (Exception, Timeout): self.exception_occurred(conn.node, _('Object'), _('Trying to get final status of PUT to %s') % req.path) if len(etags) > 1: self.app.logger.error( _('Object servers returned %s mismatched etags'), len(etags)) return HTTPServerError(request=req) etag = len(etags) and etags.pop() or None while len(statuses) < len(nodes): statuses.append(HTTP_SERVICE_UNAVAILABLE) reasons.append('') bodies.append('') resp = self.best_response(req, statuses, reasons, bodies, _('Object PUT'), etag=etag) if source_header: resp.headers['X-Copied-From'] = quote( source_header.split('/', 2)[2]) if 'last-modified' in source_resp.headers: resp.headers['X-Copied-From-Last-Modified'] = \ source_resp.headers['last-modified'] for k, v in req.headers.items(): if k.lower().startswith('x-object-meta-'): resp.headers[k] = v resp.last_modified = float(req.headers['X-Timestamp']) return resp