Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

前置條件:
《Dapr運用》
《Dapr 運用之 Java gRPC 調用篇》
《Dapr 運用之集成 Asp.Net Core Grpc 調用篇》html


  1. 搭建 RabbitMQjava

    • Docker 搭建 RabbitMQ 服務git

      docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    • 建立 rabbiqmq.yamlgithub

      apiVersion: dapr.io/v1alpha1
      kind: Component
      metadata:
      name: messagebus
      spec:
      type: pubsub.rabbitmq
      metadata:
      - name: host
          value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"
      - name: consumerID
          value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"
      - name: durable
          value: "true" # Optional. Default: "false"
      - name: deletedWhenUnused
          value: "false" # Optional. Default: "false"
      - name: autoAck
          value: "false" # Optional. Default: "false"
      - name: deliveryMode
          value: "2" # Optional. Default: "0". Values between 0 - 2.
      - name: requeueInFailure
          value: "true" # Optional. Default: "false".
  2. 改造 StorageService.Apigolang

    目的:把 StorageService 從 Grpc 客戶端改造爲 Grpc 服務端,並 Sub Storage.Reduce 主題,完成減庫存操做。web

    • 刪除 Storage 中無用的代碼 StorageController.cs
    • 修改 Program.cs 中的 CreateHostBuilder 代碼爲redis

      public static IHostBuilder CreateHostBuilder(string[] args)
      {
          return Host.CreateDefaultBuilder(args)
              .ConfigureWebHostDefaults(webBuilder =>
              {
                  webBuilder.ConfigureKestrel(options =>
                  {
                      options.Listen(IPAddress.Loopback, 5003, listenOptions =>
                      {
                          listenOptions.Protocols = HttpProtocols.Http2;
                      });
                  });
                  webBuilder.UseStartup<Startup>();
              });
      }
    • 添加 DaprClientServicesql

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {
          public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context)
          {
              var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();
              topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");
              return Task.FromResult(topicSubscriptionsEnvelope);
          }
      }

      Dapr 運行時將調用此方法獲取 StorageServcie 關注的主題列表docker

    • 修改 Startup.cs數據庫

      /// <summary>
      /// This method gets called by the runtime. Use this method to add services to the container.
      /// </summary>
      /// <param name="services">Services.</param>
      public void ConfigureServices(IServiceCollection services)
      {
          services.AddGrpc();
          services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });
      }
      /// <summary>
      /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
      /// </summary>
      /// <param name="app">app.</param>
      /// <param name="env">env.</param>
      public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
      {
          if (env.IsDevelopment())
          {
              app.UseDeveloperExceptionPage();
          }
      
          app.UseRouting();
      
          app.UseEndpoints(endpoints =>
          {
              endpoints.MapSubscribeHandler();
              endpoints.MapGrpcService<DaprClientService>();
          });
      }
    • 複製 rabbimq.yaml 文件到 components 文件夾中,刪除 redis_messagebus.yaml 文件

    • 啓動 StorageService 服務

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
  3. 使用 Java 開發一個 Order 服務端,Order 服務提供的功能爲
    • 下單
    • 查看訂單詳情
    • 獲取訂單列表

    在當前上下文中着重處理的是下單功能,以及下單成功後 Java 服務端將發佈一個事件到 Storage.Reduce 主題,即減小庫存。

    • 建立 CreateOrder.proto 文件

      syntax = "proto3";
      
      package daprexamples;
      
      option java_outer_classname = "CreateOrderProtos";
      option java_package = "generate.protos";
      
      service OrderService {
          rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);
          rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);
          rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);
      }
      
      message CreateOrderRequest {
          string ProductID = 1; //Product ID
          int32 Amount=2; //Product Amount
          string CustomerID=3; //Customer ID
      }
      
      message CreateOrderResponse {
          bool Succeed = 1; //Create Order Result,true:success,false:fail
      }
      
      message RetrieveOrderRequest{
          string OrderID=1;
      }
      
      message RetrieveOrderResponse{
          Order Order=1;
      }
      
      message GetOrderListRequest{
          string CustomerID=1;
      }
      
      message GetOrderListResponse{
          repeated Order Orders=1;
      }
      
      message Order{
          string ID=1;
          string ProductID=2;
          int32 Amount=3;
          string CustomerID=4;
      }
    • 使用 protoc 生成 Java 代碼

      protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
    • 引用 MyBatis 作爲 Mapper 工具
    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到單獨的包中,在此文件中添加 createOrder()getOrderList()retrieveOrder() 三個函數的實現
    • 複製 rabbimq.yaml 文件到 components 文件夾中,刪除原有 redis_messagebus.yaml 文件
    • 啓動 OrderService 服務

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
  4. 建立 Golang Grpc 客戶端,該客戶端須要完成建立訂單 Grpc 調用,訂單建立成功發佈扣除庫存事件

    • 引用 CreateOrder.proto 文件,並生成 CreateOrder.pb.go 文件

      如未安裝 protoc-gen-gogo ,經過一下命令獲取並安裝

      go get github.com/gogo/protobuf/gogoproto

      安裝 protoc-gen-gogo

      go install github.com/gogo/protobuf/gogoproto

      根據 proto 文件生成代碼

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 客戶端代碼,建立訂單

      ...
      
       response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{
          Id:     "OrderService",
          Data:   createOrderRequestData,
          Method: "createOrder",
          })
          if err != nil {
              fmt.Println(err)
              return
          }
      
      ...
    • 添加 DataToPublish.proto 文件,此文件做爲事件發佈數據結構

      syntax = "proto3";
      
      package daprexamples;
      
      option java_outer_classname = "DataToPublishProtos";
      option java_package = "generate.protos";
      
      message StorageReduceData {
          string ProductID = 1;
          int32 Amount=2;
      }
    • 生成 DataToPublish 代碼

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 修改 main.go 代碼,根據 createOrder 結果判斷是否要發佈信息到消息隊列

      ...
      
      createOrderResponse := &daprexamples.CreateOrderResponse{}
      
      if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {
          fmt.Println(err)
          return
      }
      fmt.Println(createOrderResponse.Succeed)
      
      if !createOrderResponse.Succeed {
          //下單失敗
          return
      }
      
      storageReduceData := &daprexamples.StorageReduceData{
          ProductID: createOrderRequest.ProductID,
          Amount:    createOrderRequest.Amount,
      }
      storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)
      if err != nil {
          fmt.Println(err)
          return
      }
      
      _, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{
          Topic: "Storage.Reduce",
          Data:  &any.Any{Value: storageReduceDataData},
      })
      
      fmt.Println(storageReduceDataData)
      
      if err != nil {
          fmt.Println(err)
      } else {
          fmt.Println("Published message!")
      }
      ...

      注意: 發送數據前,使用 jsoniter 轉換數據爲 json 字符串,緣由是若是直接傳輸 Grpc 流,當前版本(0.3.x) Dapr runtime 打包數據時使用 Json 打包,解包使用 String ,致使數據不一致。

    • 複製 rabbimq.yaml 文件到 components 文件夾,刪除原有 redis_messagebus.yaml 文件
    • 啓動 golang Grpc 客戶端

      dapr run --app-id client go run main.go

      輸出

      == APP == true
      == APP == Published message!
  5. RabbitMQ

    • 在瀏覽器中輸入 http://localhost:15672/ ,帳號和密碼均爲 guest
    • 查看 Connections ,有3個鏈接
      • 這個3個鏈接來自配置了 messagebus.yaml 組件的三個服務
    • 查看 Exchanges

      Name            Type    Features    Message rate in Message rate out
      (AMQP default)  direct  D
      Storage.Reduce  fanout  D
      amq.direct      direct  D
      amq.fanout      fanout  D
      ...

      着重看 Storage.Reduce ,能夠看出 Dapr 運行時建立了一個 fanout 類型的 Exchange ,這代表該 Exhange 中的數據是廣播的。

    • 查看 Queues

      Dapr 運行時建立了 storageService-Storage.Reduce ,該 Queue 綁定了 Storage.Reduce Exchange ,因此能夠收到 Storage.Reduce 的廣播數據。

  6. DotNet Core StorageService.Api 改造以完成 Sub 事件

    • 打開 DaprClientService.cs 文件,更改內容爲

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {
          private readonly StorageContext _storageContext;
      
          public DaprClientService(StorageContext storageContext)
          {
              _storageContext = storageContext;
          }
      
          public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context)
          {
              var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();
              topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");
              return Task.FromResult(topicSubscriptionsEnvelope);
          }
      
          public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context)
          {
              if (request.Topic.Equals("Storage.Reduce"))
              {
                  StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());
                  Console.WriteLine("ProductID:" + storageReduceData.ProductID);
                  Console.WriteLine("Amount:" + storageReduceData.Amount);
                  await HandlerStorageReduce(storageReduceData);
              }
              return new Empty();
          }
      
          private async Task HandlerStorageReduce(StorageReduceData storageReduceData)
          {
              Guid productID = Guid.Parse(storageReduceData.ProductID);
              Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));
              if (storageFromDb == null)
              {
                  return;
              }
      
              if (storageFromDb.Amount < storageReduceData.Amount)
              {
                  return;
              }
      
              storageFromDb.Amount -= storageReduceData.Amount;
              Console.WriteLine(storageFromDb.Amount);
              await _storageContext.SaveChangesAsync();
          }
    • 說明
      • 添加 GetTopicSubscriptions() 將完成對主題的關注
        • 當應用中止時,RabbitMQ 中的 Queue 自動刪除
        • 添加 OnTopicEvent() 重寫,此方法將完成對 Sub 主題的事件處理
      • HandlerStorageReduce 用於減小庫存
  7. 啓動 DotNet Core StorageService.Api Grpc 服務,啓動 Java OrderService Grpc 服務,啓動 Go Grpc 客戶端

    • DotNet Core

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
    • Java

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
    • go

      dapr run --app-id client  go run main.go

      go grpc 輸出爲

      == APP == true
      == APP == Published message!

    查看 MySql Storage 數據庫,對應產品庫存減小 20

至此,經過 Dapr runtime 完成了 Go 和 Java 之間的 Grpc 調用,並經過 RabbitMQ 組件完成了 Pub/Sub

源碼地址

相關文章
相關標籤/搜索