Canal訂閱binlog變動並結合kafka實現消息緩衝

阿里Canal項目請先了解:canaljava

考慮可能binlog大批量變動,若是直接經過Canal訂閱binlog變更,會形成CanalClient會瞬間爆掉。爲了解決這個問題,咱們能夠引入kafka作一層封裝,能夠解決這個問題。mysql

公司實現一套框架,拿來分享你們。感謝原做者.git

1. 服務端-封裝Canal訂閱binlog消息並推送到kafkagithub

binlogService server 啓動端:
import java.util.concurrent.Executors import com.today.data.transfer.UTIL._ import com.today.data.transfer.canal.CanalClient import com.today.data.transfer.kafka.BinlogKafkaProducer import com.today.data.transfer.util.SysEnvUtil import com.typesafe.config.ConfigFactory import org.slf4j.LoggerFactory /** * * 描述: binlogService server 啓動端 * * @author hz.lei * @since 2018年03月07日 上午1:08 */ object BinLogServer { val logger = LoggerFactory.getLogger(getClass) def main(args: Array[String]) { startServer() } /** * 以Java 環境變量模式啓動 */ def startServer(): Unit = { logger.info(s"啓動服務 binlogServer...") val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST val topic = SysEnvUtil.CANAL_KAFKA_TOPIC val canalServerIp = SysEnvUtil.CANAL_SERVER_IP val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt val destination = SysEnvUtil.CANAL_DESTINATION val username = SysEnvUtil.CANAL_USERNAME val password = SysEnvUtil.CANAL_PASSWORD val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic) kafkaProducer.init() val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password); canalClient.registerBinlogListener(kafkaProducer) val executorService = Executors.newFixedThreadPool(1) executorService.execute(canalClient) logger.info("啓動服務 binlogService 成功...") } def startServerWithScala(): Unit = { logger.info(s"啓動服務 binlogServer...") val config = ConfigFactory.load() val producerBrokerHost = config.getStringProxy("kafka.producerBrokerHost") val topic = config.getStringProxy("kafka.topic") val canalServerIp = config.getStringProxy("canal.canalServerIp") val canalServerPort = config.getStringProxy("canal.canalServerPort").toInt val destination = config.getStringProxy("canal.destination") val username = config.getStringProxy("canal.username") val password = config.getStringProxy("canal.password") val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic) kafkaProducer.init() val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password); canalClient.registerBinlogListener(kafkaProducer) val executorService = Executors.newFixedThreadPool(1) executorService.execute(canalClient) logger.info("啓動服務 binlogService 成功...") } }

 

將收到的cannal 消息 發送到kafka:
import com.alibaba.otter.canal.protocol.CanalEntry; import com.today.data.transfer.listener.CanalBinaryListener; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; /** * 描述: 將收到的cannal 消息 發送到kafka * * @author hz.lei * @date 2018年03月07日 上午12:44 */
public class BinlogKafkaProducer implements CanalBinaryListener { private static Logger logger = LoggerFactory.getLogger(BinlogKafkaProducer.class); private String topic; private String host; protected Producer<Integer, byte[]> producer; public BinlogKafkaProducer(String kafkaHost, String topic) { this.topic = topic; this.host = kafkaHost; } public void init() { logger.info("[KafkaStringProducer] [init] " +
                ") broker-list(" + host + " )"); Properties properties = KafkaConfigBuilder.defaultProducer().bootstrapServers(host) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(ByteArraySerializer.class) .build(); producer = new KafkaProducer<>(properties); } /** * 異步回調模式發送消息 * * @param topic * @param message */
    public void send(String topic, byte[] message) { producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> { if (e != null) { logger.error("[" + getClass().getSimpleName() + "]: 消息發送失敗,cause: " + e.getMessage(), e); } logger.info("[binlog]:消息發送成功,topic:{}, offset:{}, partition:{}, time:{}", metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp()); }); } @Override public void onBinlog(CanalEntry.Entry entry) { send(topic, entry.toByteArray()); } }

 

Canal 客戶端,監聽處理邏輯:
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.Header; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.Message; import com.today.data.transfer.listener.CanalBinaryListener; import com.today.data.transfer.listener.CanalGsonListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * 描述: Canal 客戶端,監聽處理 邏輯 * * @author hz.lei * @date 2018年03月06日 下午8:21 */
public class CanalClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(CanalClient.class); private String hostname; private int port; private String destination; private String username; private String password; private CanalConnector connector; private final static int BatchSize = 1000; private final static long Sleep = 1000; private boolean runing = false; private List<CanalGsonListener> gsonListeners = new ArrayList<>(); private List<CanalBinaryListener> binaryListeners = new ArrayList<>(); /** * 構造函數 * * @param hostname canal服務端的ip * @param port canal服務端 port * @param destination canal 實例地址 * @param username canal用戶名 * @param password canal密碼 */
    public CanalClient(String hostname, int port, String destination, String username, String password) { this.hostname = hostname; this.port = port; this.destination = destination; this.username = username; this.password = password; init(); } public void init() { try { logger.info(new StringBuffer("[Canal實例信息 CanalClient] [start] ") .append("hostname: (").append(hostname) .append("), port: (").append(port) .append("), destination: (").append(destination) .append("), username: (").append(username) .append("), password: (").append(password).append(")").toString()); connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password); connector.connect(); connector.subscribe(".*\\..*"); } catch (Exception e) { logger.error("[CanalClient] [init] " + e.getMessage(), e); } } public void registerBinlogListener(CanalBinaryListener listener) { if (listener != null) { binaryListeners.add(listener); } } public void unregisterBinlogListener(CanalBinaryListener listener) { if (listener != null) { binaryListeners.remove(listener); } } @Override public void run() { logger.info("[CanalClient] [run] "); runing = true; work(); } /** * 處理工做 work */
    private void work() { try { while (runing) { Message message = connector.getWithoutAck(BatchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(Sleep); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } else { if(logger.isDebugEnabled()) { logger.debug("讀取binlog日誌 batchId: {}, size: {}, name: {}, offsets:{}", batchId, size, message.getEntries().get(0).getHeader().getLogfileName(), message.getEntries().get(0).getHeader().getLogfileOffset()); } //處理消息
 process(message.getEntries()); } // 提交確認
 connector.ack(batchId); } } catch (Exception e) { connector.disconnect(); logger.error("[CanalClient] [run] " + e.getMessage(), e); } finally { reconnect(); } } /** * 重連策略 */
    private void reconnect() { logger.info("[CanalClient reconnect] 從新鏈接 ..."); runing = false; while (!runing) { try { connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password); connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); runing = true; } catch (Exception e) { connector.disconnect(); logger.error("[CanalClient] [reconnect] " + e.getMessage(), e); try { Thread.sleep(3000); } catch (InterruptedException e1) { logger.error(e1.getMessage(), e1); } } } logger.info("[CanalClient reconnect] 從新鏈接成功!"); work(); } private void process(List<Entry> entries) { try { for (Entry entry : entries) { if(logger.isDebugEnabled()){ logger.debug("mysql binlog : " + entry.getHeader().getLogfileName() + "=>" + entry.getHeader().getLogfileOffset()); } /** * 忽略 事務開啓 、結束 ,query 的 binlog 內容 */
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND || entry.getHeader().getEventType() == EventType.QUERY) { continue; } logger.info("解析偏移量:" + entry.getHeader().getLogfileName() + "=>" + entry.getHeader().getLogfileOffset() + " ," +
                        "操做表[" + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + "]," +
                        "變動類型[" + entry.getHeader().getEventType() + "]," +
                        "執行時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(entry.getHeader().getExecuteTime()))); RowChange rowChange; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { logger.error("[CanalClient] [process] 解析RowChange事件錯誤: " + e.getMessage(), entry.toString()); continue; } log(entry.getHeader(), rowChange); if (gsonListeners.size() > 0) { GsonEntry binlog = new GsonEntry(entry.getHeader(), rowChange); for (CanalGsonListener listener : gsonListeners) { listener.onBinlog(binlog); } } if (binaryListeners.size() > 0) { for (CanalBinaryListener listener : binaryListeners) { listener.onBinlog(entry); } } } } catch (Exception e) { logger.error("[CanalClient] [process] " + e.getMessage(), e); } } private void log(Header header, RowChange rowChange) { EventType eventType = rowChange.getEventType(); if(logger.isDebugEnabled()){ logger.debug(String.format("binlog[%s:%s], name[%s,%s], eventType : %s", header.getLogfileName(), header.getLogfileOffset(), header.getSchemaName(), header.getTableName(), eventType)); } for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { log(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { log(rowData.getAfterColumnsList()); } else { log(rowData.getBeforeColumnsList()); log(rowData.getAfterColumnsList()); } } } private void log(List<Column> columns) { for (Column column : columns) { if(logger.isDebugEnabled()){ logger.debug(new StringBuffer() .append(column.getName()).append(" = ").append(column.getValue()) .append(" update[").append(column.getUpdated()).append("]").toString()); } } } }

 

kafka消息實體定義:redis

import java.sql.Timestamp import com.alibaba.otter.canal.protocol.CanalEntry.EventType /** * desc: BinlogEvent bean * * @author hz.lei 2018年03月07日 下午3:43 */
case class BinlogEvent(schema: String, tableName: String, eventType: EventType, timestamp: Timestamp, before: String, after: String)

 

2. 客戶端-訂閱kafka消息獲取想要的binlog變動spring

 

binlog監聽類:sql

import com.today.binlog.BinlogEvent import com.today.eventbus.annotation.{BinlogListener, KafkaConsumer} import com.today.service.binlog.action._ import org.springframework.transaction.annotation.Transactional import scala.collection.JavaConverters._ /** * * 描述: binlog 監聽類 * * @author hz.lei * @since 2018年03月08日 下午7:18 */ @KafkaConsumer(groupId = "GOODS_0.0.1_EVENT", topic = "Binlog") @Transactional(readOnly = true) class GoodsBinlogListener { @BinlogListener def onBinlog(event: java.util.List[BinlogEvent]): Unit = { event.asScala.foreach(new GoodsOnBinlogAction(_).action())
 } }

 

注意必須添加以下兩個註解:apache

@KafkaConsumer(groupId = "GOODS_0.0.1_EVENT", topic = "Binlog")
@BinlogListener

添加註解掃描類:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:soa="http://soa-springtag.dapeng.com/schema/service" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://soa-springtag.dapeng.com/schema/service http://soa-springtag.dapeng.com/schema/service/service.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
    <bean id="postProcessor" class="com.today.eventbus.spring.MsgAnnotationBeanPostProcessor"/>
</beans>

 

註解掃描類定義:json

import com.today.eventbus.ConsumerEndpoint; import com.today.eventbus.annotation.BinlogListener; import com.today.eventbus.annotation.KafkaConsumer; import com.today.eventbus.annotation.KafkaListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.core.MethodIntrospector; import org.springframework.core.Ordered; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.util.ReflectionUtils; import com.today.eventbus.utils.Constant; import java.lang.reflect.Method; import java.util.*; /** * 描述: MsgAnnotationBeanPostProcessor bean 後處理器,掃描自定義註解 @KafkaListener * * @author hz.lei * @see KafkaListenerRegistrar,BeanFactory,BeanPostProcessor,SmartInitializingSingleton * @since 2018年03月01日 下午9:36 */
public class MsgAnnotationBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware, Ordered, SmartInitializingSingleton { /** * logger */
    private final Logger logger = LoggerFactory.getLogger(getClass()); /** * hold beanFactory ,real impl is {@link DefaultListableBeanFactory} * for create bean dynamically, bean {@link KafkaListenerRegistrar} */
    private BeanFactory beanFactory; /** * 處理 kafka 消費者 的註冊與建立 */
    private KafkaListenerRegistrar registrar; /** * beanFactory 回調,讓bean持有容器的引用 * * @param beanFactory * @throws BeansException */ @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; createKafkaRegistryBean(); } /** * 動態建立bean KafkaListenerRegistrar */
    private void createKafkaRegistryBean() { // 獲取bean工廠並轉換爲DefaultListableBeanFactory
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory; // 經過BeanDefinitionBuilder建立bean定義
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaListenerRegistrar.class); // 註冊bean
 defaultListableBeanFactory.registerBeanDefinition(Constant.KAFKA_LISTENER_REGISTRAR_BEAN_NAME, beanDefinitionBuilder.getRawBeanDefinition()); this.registrar = (KafkaListenerRegistrar) beanFactory.getBean(Constant.KAFKA_LISTENER_REGISTRAR_BEAN_NAME); } /** * 全部單例 bean 初始化完成後,調用此方法 */ @Override public void afterSingletonsInstantiated() { this.registrar.afterPropertiesSet(); } /** * 實例化及依賴注入完成後、在任何初始化代碼(好比配置文件中的init-method)調用以前調用 * * @param bean * @param beanName * @return * @throws BeansException */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } /** * 實例化及依賴注入完成後、在任何初始化代碼(好比配置文件中的init-method)調用以後調用 * * @param bean * @param beanName * @return * @throws BeansException */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { logger.debug("access to postProcessAfterInitialization bean {}, beanName {}", bean, beanName); Class<?> targetClass = AopUtils.getTargetClass(bean); //獲取類上是否有註解 @KafkaConsumer
        Optional<KafkaConsumer> kafkaConsumer = findListenerAnnotations(targetClass); //類上是否有註解
        final boolean hasKafkaConsumer = kafkaConsumer.isPresent(); if (hasKafkaConsumer) { //方法列表 ,查找方法上標有 @KafkaListener 的註解
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> { Set<KafkaListener> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); }); //查找方法上標有 @BinlogListener 的註解
            Map<Method, Set<BinlogListener>> binlogMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<BinlogListener>>) method -> { Set<BinlogListener> listenerMethods = findBinlogListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); }); if (annotatedMethods.isEmpty() && binlogMethods.isEmpty()) { throw new IllegalArgumentException("@KafkaConsumer found on class type , " +
                        "but no @KafkaListener or @BinlogListener found on the method ,please set it on the method"); } if (!annotatedMethods.isEmpty() && !binlogMethods.isEmpty()) { throw new IllegalArgumentException("@KafkaListener or @BinlogListener only one could on the same  bean class"); } if (!annotatedMethods.isEmpty()) { // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { // process annotation information
 processKafkaListener(kafkaConsumer.get(), listener, method, bean, beanName); } } logger.info("there are {} methods have @KafkaListener on This bean ", binlogMethods.size()); } if (!binlogMethods.isEmpty()) { // Non-empty set of methods
                for (Map.Entry<Method, Set<BinlogListener>> entry : binlogMethods.entrySet()) { Method method = entry.getKey(); // process annotation information
 processBinlogListener(kafkaConsumer.get(), method, bean); } logger.info("there are {} methods have @BinlogListener on This bean ", binlogMethods.size()); } if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                        + beanName + "': " + annotatedMethods); } } else { this.logger.info("No @KafkaConsumer annotations found on bean type: " + bean.getClass()); } return bean; } /** * 掃描 bean 類上 是否有註解 @KafkaConsumer,只有有此註解才說明 是kafka message 消費者 */
    private Optional<KafkaConsumer> findListenerAnnotations(Class<?> clazz) { KafkaConsumer ann = AnnotationUtils.findAnnotation(clazz, KafkaConsumer.class); return Optional.ofNullable(ann); } /** * 掃描bean 方法上 是否有註解 @KafkaListener * * @param method * @return
     */
    private Set<KafkaListener> findListenerAnnotations(Method method) { Set<KafkaListener> listeners = new HashSet<>(); KafkaListener ann = AnnotationUtils.findAnnotation(method, KafkaListener.class); if (ann != null) { listeners.add(ann); } return listeners; } /** * 掃描bean 方法上 是否有註解 @BinlogListener * * @param method * @return
     */
    private Set<BinlogListener> findBinlogListenerAnnotations(Method method) { Set<BinlogListener> listeners = new HashSet<>(); BinlogListener ann = AnnotationUtils.findAnnotation(method, BinlogListener.class); if (ann != null) { listeners.add(ann); } return listeners; } /** * 處理有 @KafkaListener 註解的 方法上註解元信息,封裝成 consumerEndpoint,註冊 * * @param consumer * @param listener * @param method * @param bean * @param beanName */
    protected void processKafkaListener(KafkaConsumer consumer, KafkaListener listener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); ConsumerEndpoint endpoint = new ConsumerEndpoint(); endpoint.setMethod(methodToUse); endpoint.setBean(bean); endpoint.setParameterTypes(Arrays.asList(method.getParameterTypes())); // class annotation information
 endpoint.setGroupId(consumer.groupId()); endpoint.setTopic(consumer.topic()); endpoint.setKafkaHostKey(consumer.kafkaHostKey()); // method annotation information
 endpoint.setSerializer(listener.serializer()); //session timeout
        if (consumer.sessionTimeout() < Constant.DEFAULT_SESSION_TIMEOUT) { throw new RuntimeException("拋出該異常緣由爲: kafkaConsumer session 超時時間設置過小 ,請設置至少爲 10000L 以上,單位爲 ms(毫秒)"); } endpoint.setTimeout(consumer.sessionTimeout()); this.registrar.registerEndpoint(endpoint); } private void processBinlogListener(KafkaConsumer consumer, Method method, Object bean) { Method methodToUse = checkProxy(method, bean); ConsumerEndpoint endpoint = new ConsumerEndpoint(); endpoint.setMethod(methodToUse); endpoint.setBean(bean); // class annotation information
 endpoint.setGroupId(consumer.groupId()); endpoint.setTopic(consumer.topic()); endpoint.setKafkaHostKey(consumer.kafkaHostKey()); // method annotation information
        endpoint.setBinlog(true); this.registrar.registerEndpoint(endpoint); } /** * 獲取目標方法,若是是代理的,得到其目標方法 * * @param methodArg * @param bean * @return
     */
    private Method checkProxy(Method methodArg, Object bean) { Method method = methodArg; if (AopUtils.isJdkDynamicProxy(bean)) { try { // Found a @KafkaListener method on the target class for this JDK proxy -> // is it also present on the proxy itself?
                method = bean.getClass().getMethod(method.getName(), method.getParameterTypes()); Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces(); for (Class<?> iface : proxiedInterfaces) { try { method = iface.getMethod(method.getName(), method.getParameterTypes()); break; } catch (NoSuchMethodException noMethod) { } } } catch (SecurityException ex) { ReflectionUtils.handleReflectionException(ex); } catch (NoSuchMethodException ex) { throw new IllegalStateException(String.format( "@KafkaListener method '%s' found on bean target class '%s', " +
                                "but not found in any interface(s) for bean JDK proxy. Either " +
                                "pull the method up to an interface or switch to subclass (CGLIB) " +
                                "proxies by setting proxy-target-class/proxyTargetClass " +
                                "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex); } } return method; } @Override public int getOrder() { return LOWEST_PRECEDENCE; } }


處理 binlog 緩存監聽事件:
import com.github.dapeng.core.SoaException; import com.today.eventbus.common.MsgConsumer; import com.today.eventbus.common.retry.BinlogRetryStrategy; import com.today.eventbus.ConsumerEndpoint; import com.today.eventbus.config.KafkaConfigBuilder; import com.today.eventbus.serializer.KafkaIntDeserializer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; /** * 描述: 處理 binlog 緩存 監聽 事件 * * @author hz.lei * @since 2018年03月07日 上午1:42 */
public class BinlogKafkaConsumer extends MsgConsumer<Integer, byte[], ConsumerEndpoint> { /** * @param kafkaHost host1:port1,host2:port2,... * @param groupId * @param topic */
    public BinlogKafkaConsumer(String kafkaHost, String groupId, String topic) { super(kafkaHost, groupId, topic); } @Override protected void init() { logger.info("[KafkaConsumer] [init] " +
                "kafkaConnect(" + kafkaConnect +
                ") groupId(" + groupId +
                ") topic(" + topic + ")"); KafkaConfigBuilder.ConsumerConfiguration builder = KafkaConfigBuilder.defaultConsumer(); final Properties props = builder.bootstrapServers(kafkaConnect) .group(groupId) .withKeyDeserializer(KafkaIntDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withOffsetCommitted(false) .excludeInternalTopic(false) .maxPollSize("100") .build(); consumer = new KafkaConsumer<>(props); } @Override protected void buildRetryStrategy() { retryStrategy = new BinlogRetryStrategy(); } @Override protected void dealMessage(ConsumerEndpoint consumer, byte[] value, Integer keyId) throws SoaException { List<BinlogEvent> binlogEvents = BinlogMsgProcessor.process(value); // > 0 才處理
        if (binlogEvents.size() > 0) { try { consumer.getMethod().invoke(consumer.getBean(), binlogEvents); } catch (IllegalAccessException e) { logger.error("BinlogConsumer::實例化@BinlogListener 註解的方法 出錯", e); } catch (InvocationTargetException e) { throwRealException(e, consumer.getMethod().getName()); } logger.info("BinlogConsumer::[dealMessage(id: {})] end, method: {}, groupId: {}, topic: {}, bean: {}", keyId, consumer.getMethod().getName(), groupId, topic, consumer.getBean()); } } }

 

匹配binlog變更,獲取變動先後信息:bootstrap

import com.alibaba.otter.canal.protocol.CanalEntry import com.today.binlog.BinlogEvent import com.today.service.GoodsDataSource import com.today.service.binlog.bean.SkuBean import com.today.service.commons.cache.MemcacheProcessor import com.today.service.commons.cache.dto.RedisBean import redis.clients.jedis.JedisPool import spray.json._ import wangzx.scala_commons.sql._ import com.today.service.commons.help.BizHelp.withJedis import com.today.service.commons.help.SqlHelp import com.today.service.commons.util.DateTools import com.today.service.commons.`implicit`.Implicits._ import scala.collection.JavaConverters._ 
class GoodsOnBinlogAction(binlogEvent: BinlogEvent) extends MemcacheProcessor("goods_db_sku") { def action() = { logger.info(s"${getClass.getSimpleName} onBinlog ") logger.info(s"binlogEvent:$binlogEvent") binlogEvent match { case BinlogEvent("goods_db", "sku", CanalEntry.EventType.INSERT, timestamp, before, json"""{"id": $id,"sku_no":${skuNoJsValue}}""" ) => { logger.info(s"${getClass.getSimpleName} onInsert...") val skuNo = toStringValue(skuNoJsValue) val skuBean = reloadBySkuNo(skuNo) if (skuBean.isDefined) insertSortSet(skuBean.get) } case BinlogEvent("goods_db", "sku", CanalEntry.EventType.UPDATE, timestamp, json"""{"id": $beforeSkuId,"sku_no":${beforeSkuNoJsValue}}""", json"""{"id": $afterSkuId,"sku_no":${afterSkuNoJsValue}}""" ) => { logger.info(s"${getClass.getSimpleName} onUpdate...") val beforeSkuNo = toStringValue(beforeSkuNoJsValue) val afterSkuNo = toStringValue(afterSkuNoJsValue) if (!beforeSkuNo.equals(afterSkuNo)) { deleteSortSet(beforeSkuNo) } val skuBean = reloadBySkuNo(afterSkuNo) if (skuBean.isDefined) updateSortSet(skuBean.get.primaryKey, skuBean.get) } case BinlogEvent("goods_db", "sku", CanalEntry.EventType.DELETE, timestamp, json"""{"id": $id,"sku_no":${skuNo}}""", after) => { logger.info(s"${getClass.getSimpleName} onDelete...") deleteSortSet(toStringValue(skuNo)) }case _ => } } }
相關文章
相關標籤/搜索