[Spring cloud 一步步實現廣告系統] 16. 增量索引實現以及投送數據到MQ(kafka)

實現增量數據索引

上一節中,咱們爲實現增量索引的加載作了充足的準備,使用到mysql-binlog-connector-java 開源組件來實現MySQL 的binlog監聽,關於binlog的相關知識,你們能夠自行網絡查閱。或者能夠mailto:magicianisaac@gmail.comjava

本節咱們將根據binlog 的數據對象,來實現增量數據的處理,咱們構建廣告的增量數據,其實說白了就是爲了在後期能把廣告投放到索引服務,實現增量數據到增量索引的生成。Let's code.mysql

  • 定義一個投遞增量數據的接口(接收參數爲咱們上一節定義的binlog日誌的轉換對象)
/** * ISender for 投遞增量數據 方法定義接口 * * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a> */
public interface ISender {

    void sender(MysqlRowData rowData);
}
複製代碼
  • 建立增量索引監聽器
/** * IncrementListener for 增量數據實現監聽 * * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a> * @since 2019/6/27 */
@Slf4j
@Component
public class IncrementListener implements Ilistener {

    private final AggregationListener aggregationListener;

    @Autowired
    public IncrementListener(AggregationListener aggregationListener) {
        this.aggregationListener = aggregationListener;
    }

    //根據名稱選擇要注入的投遞方式
    @Resource(name = "indexSender")
    private ISender sender;

    /** * 標註爲 {@link PostConstruct}, * 即表示在服務啓動,Bean完成初始化以後,馬上初始化 */
    @Override
    @PostConstruct
    public void register() {
        log.info("IncrementListener register db and table info.");
        Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
    }

    @Override
    public void onEvent(BinlogRowData eventData) {
        TableTemplate table = eventData.getTableTemplate();
        EventType eventType = eventData.getEventType();

        //包裝成最後須要投遞的數據
        MysqlRowData rowData = new MysqlRowData();
        rowData.setTableName(table.getTableName());
        rowData.setLevel(eventData.getTableTemplate().getLevel());
        //將EventType轉爲OperationTypeEnum
        OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
        rowData.setOperationTypeEnum(operationType);

        //獲取模版中該操做對應的字段列表
        List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);
        if (null == fieldList) {
            log.warn("{} not support for {}.", operationType, table.getTableName());
            return;
        }

        for (Map<String, String> afterMap : eventData.getAfter()) {
            Map<String, String> _afterMap = new HashMap<>();
            for (Map.Entry<String, String> entry : afterMap.entrySet()) {
                String colName = entry.getKey();
                String colValue = entry.getValue();

                _afterMap.put(colName, colValue);
            }

            rowData.getFieldValueMap().add(_afterMap);
        }
        sender.sender(rowData);
    }
}
複製代碼
開啓binlog監聽
  • 首先來配置監聽binlog的數據庫鏈接信息
adconf:
  mysql:
    host: 127.0.0.1
    port: 3306
    username: root
    password: 12345678
    binlogName: ""
    position: -1 # 從當前位置開始監聽
複製代碼

編寫配置類:spring

/** * BinlogConfig for 定義監聽Binlog的配置信息 * * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a> */
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String binlogName;
    private Long position;
}
複製代碼

在咱們實現 監聽binlog那節,咱們實現了一個自定義client CustomBinlogClient,須要實現binlog的監聽,這個監聽的客戶端就必須是一個獨立運行的線程,而且要在程序啓動的時候進行監聽,咱們來實現運行當前client的方式,這裏咱們會使用到一個新的Runnerorg.springframework.boot.CommandLineRunner,let's code.sql

@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {

    @Autowired
    private CustomBinlogClient binlogClient;

    @Override
    public void run(String... args) throws Exception {
        log.info("BinlogRunner is running...");
        binlogClient.connect();
    }
}
複製代碼
增量數據投遞

在binlog監聽的過程當中,咱們看到針對於int, String 這類數據字段,mysql的記錄是沒有問題的,可是針對於時間類型,它被格式化成了字符串類型:Fri Jun 21 15:07:53 CST 2019數據庫

--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
    {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
複製代碼

對於這個時間格式,咱們須要關注2點信息:bash

  • CST,這個時間格式會比咱們的時間+ 8h(中國標準時間 China Standard Time UT+8:00)
  • 須要對這個日期進行解釋處理

固然,咱們也能夠經過設置mysql的日期格式來改變該行爲,在此,咱們經過編碼來解析該時間格式:網絡

/** * Thu Jun 27 08:00:00 CST 2019 */
  public static Date parseBinlogString2Date(String dateString) {
      try {
          DateFormat dateFormat = new SimpleDateFormat(
                  "EEE MMM dd HH:mm:ss zzz yyyy",
                  Locale.US
          );
          return DateUtils.addHours(dateFormat.parse(dateString), -8);

      } catch (ParseException ex) {
          log.error("parseString2Date error:{}", dateString);
          return null;
      }
  }
複製代碼

由於咱們在定義索引的時候,是根據表之間的層級關係(Level)來設定的,根據代碼規範,不容許出現Magic Number, 所以咱們定義一個數據層級枚舉,來表達數據層級。ide

/** * AdDataLevel for 廣告數據層級 * * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a> */
@Getter
public enum AdDataLevel {

    LEVEL2("2", "level 2"),
    LEVEL3("3", "level 3"),
    LEVEL4("4", "level 4");

    private String level;
    private String desc;

    AdDataLevel(String level, String desc) {
        this.level = level;
        this.desc = desc;
    }
}
複製代碼
實現數據投遞

由於增量數據能夠投遞到不一樣的位置以及用途,咱們以前實現了一個投遞接口com.sxzhongf.ad.sender.ISender,接下來咱們實現一個投遞類:測試

@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {

    /** * 根據廣告級別,投遞Binlog數據 */
    @Override
    public void sender(MysqlRowData rowData) {
        if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
            Level2RowData(rowData);
        } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
            Level3RowData(rowData);
        } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
            Level4RowData(rowData);
        } else {
            log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
        }
    }

    private void Level2RowData(MysqlRowData rowData) {

        if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
            List<AdPlanTable> planTables = new ArrayList<>();

            for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
                AdPlanTable planTable = new AdPlanTable();
                //Map的第二種循環方式
                fieldValueMap.forEach((k, v) -> {
                    switch (k) {
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
                            planTable.setPlanId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
                            planTable.setUserId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
                            planTable.setPlanStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
                            planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
                            planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                    }
                });
                planTables.add(planTable);
            }

            //投遞推廣計劃
            planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
        } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
            List<AdCreativeTable> creativeTables = new LinkedList<>();

            rowData.getFieldValueMap().forEach(afterMap -> {
                AdCreativeTable creativeTable = new AdCreativeTable();
                afterMap.forEach((k, v) -> {
                    switch (k) {
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
                            creativeTable.setAdId(Long.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
                            creativeTable.setType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
                            creativeTable.setMaterialType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
                            creativeTable.setHeight(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
                            creativeTable.setWidth(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
                            creativeTable.setAuditStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
                            creativeTable.setAdUrl(v);
                            break;
                    }
                });
                creativeTables.add(creativeTable);
            });

            //投遞廣告創意
            creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
        }
    }

    private void Level3RowData(MysqlRowData rowData) {
       ...
    }

    /** * 處理4級廣告 */
    private void Level4RowData(MysqlRowData rowData) {
        ...
    }
}
複製代碼
投放增量數據到MQ(kafka)

爲了咱們的數據投放更加靈活,方便數據統計,分析等系統的需求,咱們來實現一個投放到消息中的接口,其餘服務能夠訂閱當前MQ 的TOPIC來實現數據訂閱。ui

配置文件中配置TOPIC
adconf:
  kafka:
    topic: ad-search-mysql-data

--------------------------------------
/** * KafkaSender for 投遞Binlog增量數據到kafka消息隊列 * * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a> * @since 2019/7/1 */
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {

    @Value("${adconf.kafka.topic}")
    private String topic;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /** * 發送數據到kafka隊列 */
    @Override
    public void sender(MysqlRowData rowData) {
        kafkaTemplate.send(
                topic, JSON.toJSONString(rowData)
        );
    }

    /** * 測試消費kafka消息 */
    @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
    public void processMysqlRowData(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMsg = Optional.ofNullable(record.value());
        if (kafkaMsg.isPresent()) {
            Object message = kafkaMsg.get();
            MysqlRowData rowData = JSON.parseObject(
                    message.toString(),
                    MysqlRowData.class
            );
            System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
            //sender.sender();
        }

    }
}
複製代碼
相關文章
相關標籤/搜索