整個網關的核心是一個netty server,各個應用程序(包括web server,手機app等)連到這個netty server上請求數據;關於數據來源,須要監聽多個kafka topic(並且這裏的topic是可變的,也就是說須要kafka consumer的動態開始和中止),以後須要把全部這些topic的數據整合在一塊兒,經過channel發送給客戶端應用程序。web
下面把大部分的代碼貼出來,有須要的同窗能夠參考。會對關鍵的技術點進行說明,偏業務部分你們自行忽略吧。app
啓動disruptor;監聽一個固定的topic,把獲取到的msg,交給ConsumerProcessorGroup來完成kafka consumer的建立和中止。函數
public static void main(String[] args) {
DisruptorHelper.getInstance().start();
Properties props = ConsumerProps.getConsumerProps();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("uavlst"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
ConsumerRecord<String, String> lastRecord = null;
for (ConsumerRecord<String, String> record : records)
lastRecord = record;this
if (lastRecord != null){
ConsumerProcessorGroup.getInstance().recieveNewUavLst(lastRecord.value());
}
}
}spa
DisruptorHelper是一個單例,主要是包含了一個disruptor 對象,在new這個對象的時候,用到了ProducerType.MULTI和new BlockingWaitStrategy(),其中前者意味着咱們須要多個producer共同來工做,後者實際上是默認的producer的等待策略,後續根據實際狀況進行調整。線程
public class DisruptorHelper {
private static DisruptorHelper instance = null;netty
public static DisruptorHelper getInstance() {
if (instance == null) {
instance = new DisruptorHelper();
}
return instance;
}server
private final int BUFFER_SIZE = 1024;
private Disruptor<MsgEvent> disruptor = null;對象
private DisruptorHelper() {
MsgEventHandler eventHandler = new MsgEventHandler();
disruptor = new Disruptor(new MsgEventFactory(), BUFFER_SIZE, new ConsumerThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
disruptor.handleEventsWith(eventHandler);
}ci
public void start() {
disruptor.start();
}
public void shutdown() {
disruptor.shutdown();
}
public void produce(ConsumerRecord<String, String> record) {
RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setRecord(record);
} finally {
ringBuffer.publish(sequence);
}
}
}
ConsumerProcessorGroup是一個單例,當中包含一個fixedThreadPool,動態的啓動線程來進行kafka topic的消費。
public class ConsumerProcessorGroup {
private static ConsumerProcessorGroup instance = null;
public static ConsumerProcessorGroup getInstance(){
if (instance == null){
instance = new ConsumerProcessorGroup();
}
return instance;
}
private ConsumerProcessorGroup() {
}
private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);
public List<String> uavIDLst = new Vector<String>();
public void recieveNewUavLst(String uavIDs){
List<String> newUavIDs = Arrays.asList(uavIDs.split(","));
for (String uavID : newUavIDs){
if (!uavIDLst.contains(uavID)){
fixedThreadPool.execute(new ConsumerThread(uavID));
uavIDLst.add(uavID);
}
}
List<String> tmpLstForDel = new ArrayList<String>();
for (String uavID : uavIDLst){
if (!newUavIDs.contains(uavID)){
tmpLstForDel.add(uavID);
}
}
uavIDLst.removeAll(tmpLstForDel);
}
}
對kafka topic進行消費,經過DisruptorHelper將獲取的record寫入disruptor的ring buffer當中。
public class ConsumerThread implements Runnable {
private String uavID;
public ConsumerThread(String uavID) {
this.uavID = uavID;
}
public void run() {
Properties props = ConsumerProps.getConsumerProps();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(uavID));
System.out.println(uavID + " consumer started! Current thread id is " + Thread.currentThread().getId());
while (ConsumerProcessorGroup.getInstance().uavIDLst.contains(uavID)) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
DisruptorHelper.getInstance().produce(record);
}
}
System.out.println(uavID + " consumer finished! Current thread id is " + Thread.currentThread().getId());
}
}
Disruptor的消費者,依次從Ring Buffer當中讀取數據並執行相應的處理。
public class MsgEventHandler implements EventHandler<MsgEvent> {
private Map<Integer, String> converterMap;
public void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
ConsumerRecord<String, String> record = event.getRecord();
System.out.printf("topic = %s, part = %d, offset = %d, key = %s, value = %s \n\r", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
未完待續~~~