Spring Cloud Stream

Spring Cloud Stream

基本概念

Source:來源(近義詞:Producer、Publisher)異步

Sink:接收器(近義詞:Consumer、Subscriber)ide

Processor:對於上流而言是Sink,對於下游而言是Sourceui

Binder:kafka

消息大體分爲兩個部分:this

  • 消息頭(Headers)atom

  • 消息體(Body/Payload)spa

啓動Zookeeper

啓動kafka

Producer實現

  1. 定義發送通道線程

     public interface Source {
     
         /**
          * 需求通道
          */
         String OUT_PUT_DEMAND = "out_put_demand";
     
         /**
          * 任務通道
          */
         String OUT_PUT_TASK = "out_put_task";
     
         /**
          * 工做日誌通道
          */
         String OUT_PUT_WORK_LOG = "out_put_workLog";
     
         /**
          * 組織結構信息通道
          */
         String OUT_PUT_ORG = "out_put_org";
     
         /**
          * 代碼質量通道
          */
         String OUT_PUT_QUALITY = "out_put_quality";
     
     
         @Output(Source.OUT_PUT_DEMAND)
         MessageChannel demand();
     
         @Output(Source.OUT_PUT_TASK)
         MessageChannel task();
     
         @Output(Source.OUT_PUT_WORK_LOG)
         MessageChannel workLog();
     
         @Output(Source.OUT_PUT_ORG)
         MessageChannel org();
     
         @Output(Source.OUT_PUT_QUALITY)
         MessageChannel quality();
     }

     

  2. 生產類日誌

     public class Producer {
     
         /**
          * 默認發送消息
          *
          * @param message
          * @param channel
          * @return
          */
         public static Boolean send(Object message, MessageChannel channel) {
             return send(message, channel, 5000L);
        }
     
         /**
          * 帶超時時間
          *
          * @param message
          * @param timeout
          * @param channel
          * @return
          */
         public static Boolean send(Object message, MessageChannel channel, Long timeout) {
             return channel.send(MessageBuilder.withPayload(message).build(), timeout);
        }
     
     }

     

  3. Bindingcode

     @EnableBinding(Source.class)
     public class SourceAutoConfiguration {
     }

     

  4. 策略模式-消息類型接口

     public enum SendType {
     
         DEMAND_MESSAGE(new DemandMessage()),
         TASK_MESSAGE(new TaskMessage()),
         WORK_LOG_MESSAGE(new WorkLogMessage()),
         CODE_QUALITY_MESSAGE(new CodeQualityMessage());
     
         private MessageSend messageSend;
         SendType(MessageSend messageSend){
             this.messageSend = messageSend;
        }
     
         public MessageSend get(){
             return  this.messageSend;
        }
     
     }

     

  5. 消息發送接口

     public interface MessageSend {
     
         public  Boolean send(Object message);
     
     }

     

  6. 接口實現

     public class DemandMessage implements MessageSend {
     
         private static final Source SOURCE = SpringContextHelper.getBean(Source.class);
     
         @Override
         public Boolean send(Object message) {
             message = MaskMessage.messageHelper(message);
             if (null == message) {
                 return false;
            }
             return Producer.send(message, SOURCE.demand());
        }
     }

     

  7. 生產消息

     public class ProduceHelper {
     
         /**
          * 需求消息生產
          * @param sendType 發送類型
          * @param message 消息內容
          * @return boolean
          */
         public static Boolean produce(SendType sendType, Demand message) {
             return sendType.get().send(message);
        }
     
         /**
          * 任務消息生產
          * @param sendType 發送類型
          * @param message 消息內容
          * @return boolean
          */
         public static Boolean produce(SendType sendType, Task message) {
             return sendType.get().send(message);
        }
     
         /**
          * 工做日誌消息生產
          * @param sendType 發送類型
          * @param message 消息內容
          * @return boolean
          */
         public static Boolean produce(SendType sendType, WorkLog message) {
             return sendType.get().send(message);
        }
     
         /**
          * 代碼質量消息生產
          * @param sendType 發送類型
          * @param message 消息內容
          * @return boolean
          */
         public static Boolean produce(SendType sendType, CodeQuality message) {
             return sendType.get().send(message);
        }
     
     }

     

Comsumer實現

  1. 定義接收通道

     public interface Sink {
     
         /**
          * 需求通道
          */
         String IN_PUT_DEMAND = "in_put_demand";
     
         /**
          * 任務通道
          */
         String IN_PUT_TASK = "in_put_task";
     
         /**
          * 工做日誌通道
          */
         String IN_PUT_WORK_LOG = "in_put_workLog";
     
         /**
          * 組織結構信息通道
          */
         String IN_PUT_ORG = "in_put_org";
     
     
         /**
          * 代碼質量通道
          */
         String IN_PUT_QUALITY = "in_put_quality";
     
     
         @Input(Sink.IN_PUT_DEMAND)
         SubscribableChannel demand();
     
         @Input(Sink.IN_PUT_TASK)
         SubscribableChannel task();
     
         @Input(Sink.IN_PUT_WORK_LOG)
         SubscribableChannel workLog();
     
         @Input(Sink.IN_PUT_ORG)
         SubscribableChannel org();
     
         @Input(Sink.IN_PUT_QUALITY)
         SubscribableChannel quality();
     }

     

  2. 消費類

     public interface Consumer<T> {
         void onMessage(T message);
     }

     

  3. 消息監聽

    • @StreamListener方式

       @Slf4j
       @Component
       public class MessageListener {
       
           @Autowired
           private MessageHandler messageHandler;
       
       
           /**
            * 監聽需求消息
            *
            * @param message
            */
           @StreamListener(Sink.IN_PUT_DEMAND)
           public void task(Message message) {
       
               LOGGER.info("監聽到任務信息:{}", message.getPayload());
               //調用demand入庫
               messageHandler.demandSave(message);
       
          }
       
           /**
            * 監放任務消息
            *
            * @param message
            */
           @StreamListener(Sink.IN_PUT_TASK)
           public void bug(Message message) {
       
               LOGGER.info("監聽到缺陷信息:{}", message.getPayload());
               //任務消息入庫
               messageHandler.taskSave(message);
       
          }
       
           /**
            * 監聽工做日誌消息
            *
            * @param message
            */
           @StreamListener(Sink.IN_PUT_WORK_LOG)
           public void workLog(Message message) {
       
               LOGGER.info("監聽到工做日誌信息:{}", message.getPayload());
               //工做日誌消息入庫
               messageHandler.worklogSave(message);
          }
       
       
           /**
            * 監聽組織消息
            *
            * @param message
            */
           @StreamListener(Sink.IN_PUT_ORG)
           public void org(Message message) {
       
               LOGGER.info("監聽到組織信息:{}", message.getPayload());
               //組織消息入庫
               messageHandler.orgSave(message);
          }
       
           /**
            * 監聽質量消息
            *
            * @param message
            */
           @StreamListener(Sink.IN_PUT_QUALITY)
           public void quality(Message message) {
               LOGGER.info("接收到質量信息:{}", message.getPayload());
               //質量消息入庫
               messageHandler.codeQualitySave(message);
          }
       }

       

    • @ServiceActivator

       @ServiceActivator(Sink.IN_PUT_DEMAND)
       public void onMessage(String message){
           System.out.printIn("@ServiceActivator:"+message);
       }

       

    • @PostConstruct

       @PostConstruct
       public void init(){
           //實現異步回調
           subscribableChannel.subscribe(new MessageHandler){
               @Override
               public void handleMessage(Message<?> message) throws MessagingException{
                   System.out.printIn("@PostConstruct:"+message);
              }
          }
       }

       

  4. 消息處理

     @Slf4j
     @Component
     public class MessageHandler {
     
         @Autowired
         private CodeQualityRepository codeQualityRepository;
     
         @Autowired
         private DemandRepository demandRepository;
     
         @Autowired
         private TaskRepository taskRepository;
     
         @Autowired
         private WorkLogRepository workLogRepository;
     
         @Autowired
         private CompanyRepository companyRepository;
     
         @Autowired
         private OrgInfoRepository orgInfoRepository;
     
     
     
     
         /**
          * 需求消息入庫
          */
         public void demandSave(Message message) {
             Demand demand = JSONObject.parseObject(message.getPayload().toString(), Demand.class);
             LOGGER.info("demand {}",demand);
             MongoNameGet.setCompanyId(demand.getCompanyId());
             if (null != demand.getId() && null != demand.getCompanyId()) {
                 demand.setGrabDate(new Date());
                 demandRepository.save(demand);
                 saveCompany(demand.getCompanyId(),"");
                 LOGGER.info("線程名:{}",Thread.currentThread().getName());
                 LOGGER.info("數據存儲完畢");
            }
        }
     
         /**
          * 任務消息入庫
          */
         public void taskSave(Message message) {
             Task task = JSONObject.parseObject(message.getPayload().toString(), Task.class);
             MongoNameGet.setCompanyId(task.getCompanyId());
             if (null != task.getId() && null != task.getCompanyId() && !StringUtils.isEmpty(task.getDemandId())) {
     
                 task.setGrabDate(new Date());
     
                 //查詢部門id和組id 補充數據
                 Optional<Demand> demand =  demandRepository.findById(task.getDemandId());
                 if(demand.isPresent()){
                     task.setDepartId(demand.get().getDepartId());
                     task.setTeamId(demand.get().getTeamId());
                }
     
                 taskRepository.save(task);
                 saveCompany(task.getCompanyId(),"");
                 LOGGER.info("數據存儲完畢");
            }
        }
     
     
         /**
          * 工做日誌消息入庫
          */
         public void worklogSave(Message message) {
             WorkLog workLog = JSONObject.parseObject(message.getPayload().toString(), WorkLog.class);
             MongoNameGet.setCompanyId(workLog.getCompanyId());
             if (null != workLog.getId() && null != workLog.getCompanyId() && !StringUtils.isEmpty(workLog.getDemandId())) {
                 workLog.setGrabDate(new Date());
     
                 //查詢部門id和組id 補充數據
                 Optional<Demand> demand =  demandRepository.findById(workLog.getDemandId());
                 if(demand.isPresent()){
                     workLog.setDepartId(demand.get().getDepartId());
                     workLog.setTeamId(demand.get().getTeamId());
                }
     
                 workLogRepository.save(workLog);
                 saveCompany(workLog.getCompanyId(),"");
                 LOGGER.info("數據存儲完畢");
     
            }
        }
     
     
         /**
          * 質量消息入庫
          */
         public void codeQualitySave(Message message) {
             CodeQuality codeQuality = JSONObject.parseObject(message.getPayload().toString(), CodeQuality.class);
             MongoNameGet.setCompanyId(codeQuality.getCompanyId());
             if (null != codeQuality.getId() && null != codeQuality.getCompanyId()) {
     
                 codeQuality.setGrabDate(new Date());
                 codeQualityRepository.save(codeQuality);
                 saveCompany(codeQuality.getCompanyId(),"");
                 LOGGER.info("數據存儲完畢");
            }
        }
     
         /**
          * 部門信息存儲
          * @param message
          */
         public void orgSave(Message message) {
             OrgInfo orgInfo = JSONObject.parseObject(message.getPayload().toString(), OrgInfo.class);
             if (orgInfo.getId()!=0) {
     
                 orgInfoRepository.save(orgInfo);
                 LOGGER.info("數據存儲完畢");
            }
        }
     
         /**
          * 保存企業id
          * @param companyId
          * @param companyName
          */
         private void saveCompany(String companyId,String companyName){
             if(!StringUtils.isEmpty(companyId)){
                 Company c = Company.builder().id(companyId).companyName(companyName).build();
                 companyRepository.save(c);
            }
     
        }
     }
相關文章
相關標籤/搜索