前置條件:
《Dapr運用》
《Dapr 運用之 Java gRPC 調用篇》
《Dapr 運用之集成 Asp.Net Core Grpc 調用篇》html
搭建 RabbitMQjava
Docker 搭建 RabbitMQ 服務git
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
建立 rabbiqmq.yamlgithub
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: messagebus spec: type: pubsub.rabbitmq metadata: - name: host value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672" - name: consumerID value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID" - name: durable value: "true" # Optional. Default: "false" - name: deletedWhenUnused value: "false" # Optional. Default: "false" - name: autoAck value: "false" # Optional. Default: "false" - name: deliveryMode value: "2" # Optional. Default: "0". Values between 0 - 2. - name: requeueInFailure value: "true" # Optional. Default: "false".
改造 StorageService.Apigolang
目的:把 StorageService 從 Grpc 客戶端改造爲 Grpc 服務端,並 Sub Storage.Reduce 主題,完成減庫存操做。web
修改 Program.cs 中的 CreateHostBuilder 代碼爲redis
public static IHostBuilder CreateHostBuilder(string[] args) { return Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.ConfigureKestrel(options => { options.Listen(IPAddress.Loopback, 5003, listenOptions => { listenOptions.Protocols = HttpProtocols.Http2; }); }); webBuilder.UseStartup<Startup>(); }); }
添加 DaprClientServicesql
public sealed class DaprClientService : DaprClient.DaprClientBase { public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } }
Dapr 運行時將調用此方法獲取 StorageServcie 關注的主題列表。docker
修改 Startup.cs數據庫
/// <summary> /// This method gets called by the runtime. Use this method to add services to the container. /// </summary> /// <param name="services">Services.</param> public void ConfigureServices(IServiceCollection services) { services.AddGrpc(); services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); }); }
/// <summary> /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. /// </summary> /// <param name="app">app.</param> /// <param name="env">env.</param> public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); endpoints.MapGrpcService<DaprClientService>(); }); }
複製 rabbimq.yaml 文件到 components 文件夾中,刪除 redis_messagebus.yaml 文件
啓動 StorageService 服務
dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
在當前上下文中着重處理的是下單功能,以及下單成功後 Java 服務端將發佈一個事件到 Storage.Reduce 主題,即減小庫存。
建立 CreateOrder.proto 文件
syntax = "proto3"; package daprexamples; option java_outer_classname = "CreateOrderProtos"; option java_package = "generate.protos"; service OrderService { rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse); rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse); rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse); } message CreateOrderRequest { string ProductID = 1; //Product ID int32 Amount=2; //Product Amount string CustomerID=3; //Customer ID } message CreateOrderResponse { bool Succeed = 1; //Create Order Result,true:success,false:fail } message RetrieveOrderRequest{ string OrderID=1; } message RetrieveOrderResponse{ Order Order=1; } message GetOrderListRequest{ string CustomerID=1; } message GetOrderListResponse{ repeated Order Orders=1; } message Order{ string ID=1; string ProductID=2; int32 Amount=3; string CustomerID=4; }
使用 protoc 生成 Java 代碼
protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
createOrder()
、 getOrderList()
、 retrieveOrder()
三個函數的實現啓動 OrderService 服務
dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
建立 Golang Grpc 客戶端,該客戶端須要完成建立訂單 Grpc 調用,訂單建立成功發佈扣除庫存事件
引用 CreateOrder.proto 文件,並生成 CreateOrder.pb.go 文件
如未安裝 protoc-gen-gogo ,經過一下命令獲取並安裝
go get github.com/gogo/protobuf/gogoproto
安裝 protoc-gen-gogo
go install github.com/gogo/protobuf/gogoproto
根據 proto 文件生成代碼
protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
客戶端代碼,建立訂單
... response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{ Id: "OrderService", Data: createOrderRequestData, Method: "createOrder", }) if err != nil { fmt.Println(err) return } ...
添加 DataToPublish.proto 文件,此文件做爲事件發佈數據結構
syntax = "proto3"; package daprexamples; option java_outer_classname = "DataToPublishProtos"; option java_package = "generate.protos"; message StorageReduceData { string ProductID = 1; int32 Amount=2; }
生成 DataToPublish 代碼
protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
修改 main.go 代碼,根據 createOrder 結果判斷是否要發佈信息到消息隊列
... createOrderResponse := &daprexamples.CreateOrderResponse{} if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil { fmt.Println(err) return } fmt.Println(createOrderResponse.Succeed) if !createOrderResponse.Succeed { //下單失敗 return } storageReduceData := &daprexamples.StorageReduceData{ ProductID: createOrderRequest.ProductID, Amount: createOrderRequest.Amount, } storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData) if err != nil { fmt.Println(err) return } _, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{ Topic: "Storage.Reduce", Data: &any.Any{Value: storageReduceDataData}, }) fmt.Println(storageReduceDataData) if err != nil { fmt.Println(err) } else { fmt.Println("Published message!") } ...
注意: 發送數據前,使用 jsoniter 轉換數據爲 json 字符串,緣由是若是直接傳輸 Grpc 流,當前版本(0.3.x) Dapr runtime 打包數據時使用 Json 打包,解包使用 String ,致使數據不一致。
啓動 golang Grpc 客戶端
dapr run --app-id client go run main.go
輸出
== APP == true == APP == Published message!
RabbitMQ
http://localhost:15672/
,帳號和密碼均爲 guest查看 Exchanges
Name Type Features Message rate in Message rate out (AMQP default) direct D Storage.Reduce fanout D amq.direct direct D amq.fanout fanout D ...
着重看 Storage.Reduce ,能夠看出 Dapr 運行時建立了一個 fanout 類型的 Exchange ,這代表該 Exhange 中的數據是廣播的。
查看 Queues
Dapr 運行時建立了 storageService-Storage.Reduce ,該 Queue 綁定了 Storage.Reduce Exchange ,因此能夠收到 Storage.Reduce 的廣播數據。
DotNet Core StorageService.Api 改造以完成 Sub 事件
打開 DaprClientService.cs 文件,更改內容爲
public sealed class DaprClientService : DaprClient.DaprClientBase { private readonly StorageContext _storageContext; public DaprClientService(StorageContext storageContext) { _storageContext = storageContext; } public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context) { if (request.Topic.Equals("Storage.Reduce")) { StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8()); Console.WriteLine("ProductID:" + storageReduceData.ProductID); Console.WriteLine("Amount:" + storageReduceData.Amount); await HandlerStorageReduce(storageReduceData); } return new Empty(); } private async Task HandlerStorageReduce(StorageReduceData storageReduceData) { Guid productID = Guid.Parse(storageReduceData.ProductID); Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID)); if (storageFromDb == null) { return; } if (storageFromDb.Amount < storageReduceData.Amount) { return; } storageFromDb.Amount -= storageReduceData.Amount; Console.WriteLine(storageFromDb.Amount); await _storageContext.SaveChangesAsync(); }
GetTopicSubscriptions()
將完成對主題的關注
OnTopicEvent()
重寫,此方法將完成對 Sub 主題的事件處理HandlerStorageReduce
用於減小庫存啓動 DotNet Core StorageService.Api Grpc 服務,啓動 Java OrderService Grpc 服務,啓動 Go Grpc 客戶端
DotNet Core
dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
Java
dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
go
dapr run --app-id client go run main.go
go grpc 輸出爲
== APP == true == APP == Published message!
查看 MySql Storage 數據庫,對應產品庫存減小 20
至此,經過 Dapr runtime 完成了 Go 和 Java 之間的 Grpc 調用,並經過 RabbitMQ 組件完成了 Pub/Sub