用戶下單數據會經過業務系統實時產生入庫到mysql庫,咱們要統計通某個推廣渠道實時下單量,以便線上運營推廣人員查看不一樣渠道推廣效果進而執行不一樣推廣策略html
架構圖java
注:組件不瞭解的同窗可參考其餘文章,本文主要講項目的實現
一、某些同窗會問,直接在業務系統加入JS埋點經過發日誌不更好嗎?
答:第1、JS埋點業務系統涉及產品改造,不可能由於一個需求讓你去隨便改業務系統。第2、即便加入JS埋點也不可能得到業務系統的所有數據。因此業務系統核心數據還得去業務系統庫獲取。mysql
二、還有人問加入Kafka太多餘
答:第1、加入Kafka爲了使系統擴展性更強,可方便對接各類開源產品。第2、經過Kafka消息組可以使同一條消息被不一樣Consumer消費,用戶離線和實時兩條線。linux
主要邏輯git
1.建立Canal鏈接
2.解析Mysql binlog得到insert語句github
public static void main(String args[]) { //第一個參數爲Canal server服務IP地址若是使用windows開發鏈接linux Canal服務須要制定IP eg: new InetSocketAddress("192.168.61.132", 11111) //第二個參數爲Canal server服務端口號 Canal server IP和端口號在 /conf/canal.properties中配置 //第三個參數爲Canal instance名稱 /conf下目錄名稱 //第四第五個參數爲mysql用戶名和密碼,若是在 /conf/example/instance.properties中已經配置 這裏不用謝 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.61.132", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmtryCount = 120; while (emptyCount < totalEmtryCount) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } }
private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue()); KafkaProducer.sendMsg("canal", UUID.randomUUID().toString() ,column.getName() + " : " + column.getValue()); } }
以DStream中的數據進行按key作reduce操做,而後對各個批次的數據進行累加
在有新的數據信息進入或更新時,可讓用戶保持想要的任何狀。使用這個功能須要完成兩步:sql
val orders = resut_lines.updateStateByKey(updateRunningSum _) def updateRunningSum(values: Seq[Long], state: Option[Long]) = { /* state:存放的歷史數據 values:當前批次彙總值 */ Some(state.getOrElse(0L)+values.sum) }
實時彙總某渠道下單量須要根據渠道爲主鍵更新或插入新數據
1.當某個渠道第一單時,庫中沒有以此渠道爲主鍵的數據,須要insert into 訂單統計表
2.當某渠道在庫中已有該渠道下單量,須要更新此渠道下單量值 update 訂單統計表
因此咱們使用:apache
#有該渠道就更新,沒有就插入 REPLACE INTO order_statistic(chanel, orders) VALUES(?, ?)
orders.foreachRDD(rdd =>{ rdd.foreachPartition(rdd_partition =>{ rdd_partition.foreach(data=>{ if(!data.toString.isEmpty) { System.out.println("訂單量"+" : "+data._2) DataUtil.toMySQL(data._1.toString,data._2.toInt) } }) }) }) def toMySQL(name: String,orders:Int) = { var conn: Connection = null var ps: PreparedStatement = null val sql = "REPLACE INTO order_statistic(chanel, orders) VALUES(?, ?)" try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.20.126:3306/test", "root", "root") ps = conn.prepareStatement(sql) ps.setString(1, name) ps.setInt(2, orders) ps.executeUpdate() } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } }
1.canal依賴Canal protobuf版本爲2.4.1,而spark依賴的2.5版本windows
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.1</version> </dependency>
1.Canal wiki:
https://github.com/alibaba/canal/wiki
2.streaming關於轉化操做
http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#transformations-on-dstreams
3.mysql的replace into
http://blog.sina.com.cn/s/blog_5f53615f01016wy3.html架構
做者:MichaelFly 連接:https://www.jianshu.com/p/3ec093a9d584 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。