layout: post
title: 任務隊列和異步接口的正確打開方式(.NET Core版本)
category: dotnet core
date: 2019-01-12
tags:git
<h2 id="asynchronous-operations">Asynchronous Operations</h2>github
Certain types of operations might require processing of the request in an asynchronous manner (e.g. validating a bank account, processing an image, etc.) in order to avoid long delays on the client side and prevent long-standing open client connections waiting for the operations to complete. For such use cases, APIs MUST employ the following pattern:web
_For POST
requests_:redis
202 Accepted
HTTP response code.In the response body, include one or more URIs as hypermedia links, which could include:api
GET
request to that URI in order to obtain the completed resource. Until the resource is ready, the final URI SHOULD return the HTTP status code 404 Not Found
.`{ "rel": "self", "href": "/v1/namespace/resources/{resource_id}", "method": "GET" }` * A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP `GET` request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed. `{ "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"`
_For PUT
/PATCH
/DELETE
/GET
requests_:異步
Like POST
, you can support PUT/PATCH
/DELETE
/GET
to be asynchronous. The behaviour would be as follows:async
202 Accepted
HTTP response code.In the response body, include one or more URIs as hypermedia links, which could include:ide
GET
request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.`{ "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"`
_APIs that support both synchronous and asynchronous processing for an URI_:post
APIs that support both synchronous and asynchronous operations for a particular URI and an HTTP method combination, MUST recognize the Prefer
header and exhibit following behavior:ui
Prefer=respond-async
header, the service MUST switch the processing to asynchronous mode.Prefer=respond-async
header, the service MUST process the request synchronously.It is desirable that all APIs that implement asynchronous processing, also support webhooks as a mechanism of pushing the processing status to the client.
資料引自:paypal/API Design Patterns And Use Cases:asynchronous-operations
好像也沒什麼講了....
全文結束吧.
時序圖大概長這樣:
RequestService.cs
// RequestService.cs using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using CorrelationId; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using StackExchange.Redis; using static StackExchange.Redis.RedisChannel; namespace MTQueue.Service { public class RequestService { private readonly ICorrelationContextAccessor _correlationContext; private readonly ConnectionMultiplexer _redisMultiplexer; private readonly IServiceProvider _services; private readonly ILogger<RequestService> _logger; public RequestService(ICorrelationContextAccessor correlationContext, ConnectionMultiplexer redisMultiplexer, IServiceProvider services, ILogger<RequestService> logger) { _correlationContext = correlationContext; _redisMultiplexer = redisMultiplexer; _services = services; _logger = logger; } public long? AddRequest(JToken data) { var requestId = _correlationContext.CorrelationContext.CorrelationId; var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB); var index = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId); if (index == null) { data["requestId"] = requestId; redisDB.SortedSetAdd(CommonConst.REQUESTS_SORT_SETKEY, requestId, GetTotalSeconds()); PushRedisMessage(data.ToString()); } return redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId); } public static long GetTotalSeconds() { return (long)(DateTime.Now.ToLocalTime() - new DateTime(1970, 1, 1).ToLocalTime()).TotalSeconds; } private void PushRedisMessage(string message) { Task.Run(() => { try { using (var scope = _services.CreateScope()) { var multiplexer = scope.ServiceProvider.GetRequiredService<ConnectionMultiplexer>(); multiplexer.GetSubscriber().PublishAsync(CommonConst.REQUEST_CHANNEL, message); } } catch (Exception ex) { _logger.LogError(-1, ex, message); } }); } public Tuple<JToken, long?> GetRequest(string requestId) { var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB); var keyIndex = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId); var response = redisDB.StringGet(requestId); if (response.IsNull) { return Tuple.Create<JToken, long?>(default(JToken), keyIndex); } return Tuple.Create<JToken, long?>(JToken.Parse(response), keyIndex); } } }
// RedisMQListener.cs using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MTQueue.Model; using MTQueue.Service; using Newtonsoft.Json.Linq; using StackExchange.Redis; using static StackExchange.Redis.RedisChannel; namespace MTQueue.Listener { public class RedisMQListener : IHostedService { private readonly ConnectionMultiplexer _redisMultiplexer; private readonly IServiceProvider _services; private readonly ILogger<RedisMQListener> _logger; public RedisMQListener(IServiceProvider services, ConnectionMultiplexer redisMultiplexer, ILogger<RedisMQListener> logger) { _services = services; _redisMultiplexer = redisMultiplexer; _logger = logger; } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } public virtual bool Process(RedisChannel ch, RedisValue message) { _logger.LogInformation("Process start,message: " + message); var redisDB = _services.GetRequiredService<ConnectionMultiplexer>() .GetDatabase(CommonConst.DEFAULT_DB); var messageJson = JToken.Parse(message); var requestId = messageJson["requestId"]?.ToString(); if (string.IsNullOrEmpty(requestId)) { _logger.LogWarning("requestId not in message."); return false; } var mtAgent = _services.GetRequiredService<ZhihuClient>(); var text = mtAgent.GetZhuanlan(messageJson); redisDB.StringSet(requestId, text.ToString(), CommonConst.RESPONSE_TS); _logger.LogInformation("Process finish,requestId:" + requestId); redisDB.SortedSetRemove(CommonConst.REQUESTS_SORT_SETKEY, requestId); return true; } public void Register() { var sub = _redisMultiplexer.GetSubscriber(); var channel = CommonConst.REQUEST_CHANNEL; sub.SubscribeAsync(channel, (ch, value) => { Process(ch, value); }); } public void DeRegister() { // this.connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { // this.connection.Close(); return Task.CompletedTask; } } }
// RequestsController.cs using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using CorrelationId; using Microsoft.AspNetCore.Mvc; using MTQueue.Service; using Newtonsoft.Json.Linq; namespace MTQueue.Controllers { [Route("v1/[controller]")] [ApiController] public class RequestsController : ControllerBase { private readonly ICorrelationContextAccessor _correlationContext; private readonly RequestService _requestService; private readonly ZhihuClient _mtAgentClient; public RequestsController(ICorrelationContextAccessor correlationContext, RequestService requestService, ZhihuClient mtAgentClient) { _correlationContext = correlationContext; _requestService = requestService; _mtAgentClient = mtAgentClient; } [HttpGet("{requestId}")] public IActionResult Get(string requestId) { var result = _requestService.GetRequest(requestId); var resource = $"/v1/requests/{requestId}"; if (result.Item1 == default(JToken)) { return NotFound(new { rel = "self", href = resource, method = "GET", index = result.Item2 }); } return Ok(result.Item1); } [HttpPost] public IActionResult Post([FromBody] JToken data, [FromHeader(Name = "Prefer")]string prefer) { if (!string.IsNullOrEmpty(prefer) && prefer == "respond-async") { var index = _requestService.AddRequest(data); var requestId = _correlationContext.CorrelationContext.CorrelationId; var resource = $"/v1/requests/{requestId}"; return Accepted(resource, new { rel = "self", href = resource, method = "GET", index = index }); } return Ok(_mtAgentClient.GetZhuanlan(data)); } } }