Source:來源(近義詞:Producer、Publisher)異步
ide
Processor:對於上流而言是Sink,對於下游而言是Sourceui
消息大體分爲兩個部分:this
消息頭(Headers)atom
消息體(Body/Payload)spa
定義發送通道線程
public interface Source {
/**
* 需求通道
*/
String OUT_PUT_DEMAND = "out_put_demand";
/**
* 任務通道
*/
String OUT_PUT_TASK = "out_put_task";
/**
* 工做日誌通道
*/
String OUT_PUT_WORK_LOG = "out_put_workLog";
/**
* 組織結構信息通道
*/
String OUT_PUT_ORG = "out_put_org";
/**
* 代碼質量通道
*/
String OUT_PUT_QUALITY = "out_put_quality";
生產類日誌
public class Producer {
/**
* 默認發送消息
*
* @param message
* @param channel
* @return
*/
public static Boolean send(Object message, MessageChannel channel) {
return send(message, channel, 5000L);
}
/**
* 帶超時時間
*
* @param message
* @param timeout
* @param channel
* @return
*/
public static Boolean send(Object message, MessageChannel channel, Long timeout) {
return channel.send(MessageBuilder.withPayload(message).build(), timeout);
}
}
Bindingcode
策略模式-消息類型接口
public enum SendType {
DEMAND_MESSAGE(new DemandMessage()),
TASK_MESSAGE(new TaskMessage()),
WORK_LOG_MESSAGE(new WorkLogMessage()),
CODE_QUALITY_MESSAGE(new CodeQualityMessage());
private MessageSend messageSend;
SendType(MessageSend messageSend){
this.messageSend = messageSend;
}
public MessageSend get(){
return this.messageSend;
}
}
消息發送接口
public interface MessageSend {
public Boolean send(Object message);
}
接口實現
public class DemandMessage implements MessageSend {
private static final Source SOURCE = SpringContextHelper.getBean(Source.class);
生產消息
public class ProduceHelper {
/**
* 需求消息生產
* @param sendType 發送類型
* @param message 消息內容
* @return boolean
*/
public static Boolean produce(SendType sendType, Demand message) {
return sendType.get().send(message);
}
/**
* 任務消息生產
* @param sendType 發送類型
* @param message 消息內容
* @return boolean
*/
public static Boolean produce(SendType sendType, Task message) {
return sendType.get().send(message);
}
/**
* 工做日誌消息生產
* @param sendType 發送類型
* @param message 消息內容
* @return boolean
*/
public static Boolean produce(SendType sendType, WorkLog message) {
return sendType.get().send(message);
}
/**
* 代碼質量消息生產
* @param sendType 發送類型
* @param message 消息內容
* @return boolean
*/
public static Boolean produce(SendType sendType, CodeQuality message) {
return sendType.get().send(message);
}
}
定義接收通道
public interface Sink {
/**
* 需求通道
*/
String IN_PUT_DEMAND = "in_put_demand";
/**
* 任務通道
*/
String IN_PUT_TASK = "in_put_task";
/**
* 工做日誌通道
*/
String IN_PUT_WORK_LOG = "in_put_workLog";
/**
* 組織結構信息通道
*/
String IN_PUT_ORG = "in_put_org";
/**
* 代碼質量通道
*/
String IN_PUT_QUALITY = "in_put_quality";
消費類
public interface Consumer<T> {
void onMessage(T message);
}
消息監聽
@StreamListener方式
@ServiceActivator
@PostConstruct
消息處理