ETL集羣採集器基於ETL採集器(單機版)的一個升級,主要分爲五大組件:分別爲JOB任務組件、採集器組件、KAFKA消息組件、ETL清洗組件、存儲組件,每臺節點每秒清洗3W-5W條日誌。 java
-
JOB任務組件
-
JOB任務組件簡要介紹
-
技術簡要說明:基於JOB管理器、zookeeper、hadoop RPC 開發 node
-
JOB任務組件分三部分組成:MasterJob、SlaveJob、zookeeper 數據庫
-
MasterJob主要責任是生產任務,創建RPC服務 服務器
-
SlaveJob主要責任是消費執行任務,經過RPC獲取任務 架構
-
zookeeper主要責任監控MasterJob、SlaveJob 快速切換 Master,並廣播任務 併發
-
負載均衡消費任務 負載均衡
-
支持任務的啓用、停用 ide
-
支持MySql、Oracle、DB2等多種數據庫管理任務 函數
-
靈活便利的管理quartz任務 oop
-
支持任務參數的傳遞
-
具體實現參考單機版ETL http://my.oschina.net/u/2470985/blog/509714
-
支持任務參數的傳遞
-
JOB任務組件架構設計
-
zookeeper 監控Master 、Slave ,選舉Master,選舉RPC服務端
papublic void init() throws Exception { client = CuratorFrameworkFactory.newClient(zookQuorum, // 服務器列表 createTimeout, // 會話超時時間,單位毫秒 connTimeout, // 鏈接建立超時時間,單位毫秒 new ExponentialBackoffRetry(time, timeoutCount) // 重試策略 ); nodeFactory = NodeFactory.getNodeFactory(); // 啓動zk client.start(); // 監控分發job任務監控 jobMonitor(); // 添加master、slave監控 addMasterMonitor(); // 初始化節點 initCreateNode(client, nodePath, nodePathName, jobTaskPath, jobTaskPathName, nodeFactory); } /** * * @Title: initCreateNode * @Description: 初始化節點 * @param client * @param nodePath * @param nodePathName * @param jobTaskPath * @param jobTaskPathName * @param nodeFactory * @throws Exception * @return: void */ public void initCreateNode(CuratorFramework client, String nodePath, String nodePathName, String jobTaskPath, String jobTaskPathName, NodeFactory nodeFactory) throws Exception { // 建立node節點 client.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(nodePath + nodePathName); if (client.checkExists().forPath(jobTaskPath + jobTaskPathName) == null) { client.create() .creatingParentsIfNeeded() .forPath(jobTaskPath + jobTaskPathName, nodeFactory.getIp().getBytes()); } } /** * * @Title: jobMonitor * @Description: 監控JOB * @throws Exception * @return: void */ public void jobMonitor() throws Exception { zookeeperjobTask = new PathChildrenCache(client, jobTaskPath, true); zookeeperjobTask.getListenable().addListener( new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent enEvent) throws Exception { nodeFactory.getZookManage().masterMonitor(client, jobTaskPath, rpcPort, nodeFactory); } }); zookeeperjobTask.start(StartMode.BUILD_INITIAL_CACHE); } /** * * @Title: addmasterMonitor * @Description: 監控Master * @throws Exception * @return: void */ public void addMasterMonitor() throws Exception { zookeeperNodeEvent = new PathChildrenCache(client, nodePath, true); zookeeperNodeEvent.getListenable().addListener( new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent enEvent) throws Exception { nodeFactory.getZookManage().masterMonitor(client, nodePath, rpcPort, nodeFactory); } }); zookeeperNodeEvent.start(StartMode.BUILD_INITIAL_CACHE); }
@Override public void masterMonitor(CuratorFramework client, String nodePath, int port, NodeFactory nodeFactory) throws Exception { // 狀態 Map<String, String> nodeState = nodeFactory.getNodeState(); // 獲取當前IP String ip = nodeFactory.getIp(); // 獲取masterNode String masterNode = nodeFactory.getMasterNode(); // 設置job節點 List<String> nodeList = client.getChildren().forPath(nodePath); Collections.sort(nodeList); String master = (new String(client.getData().forPath( nodePath + "/" + nodeList.get(0)))); // 選本機爲master if (ip.equals(master)) { // 判斷本機是否新的master null則爲新的master if (nodeFactory.getRpcServiceNode() == null) { nodeFactory.setMasterNode(master); RpcServiceNode rpcServiceNode = new RpcServiceNode(); rpcServiceNode.setPort(port); rpcServiceNode.setHost(ip); rpcServiceNode.start(); // 設置rpc對象 nodeFactory.setRpcCommand(getRpcCommandProxy(master, port)); // 設置RPC客戶端 nodeFactory.setRpcServiceNode(rpcServiceNode); // 開啓策略 BeanFactory.getBean().getLoadingService().init(); // 設置服務 nodeState.put(ip + PROCESS_CUT + StartNode.HOST_MASTER, PROCESS_START); nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE, PROCESS_START); } } else { // 假如本機不是當前master則關閉RPC服務 if (nodeFactory.getRpcServiceNode() != null) { nodeFactory.getRpcServiceNode().stop(); nodeFactory.setRpcServiceNode(null); //關閉JOB BeanFactory.getBean().getLoadingService().stop(); nodeState.remove(ip + PROCESS_CUT + StartNode.HOST_MASTER); } // 假如當前master發生改變則切換 if ((!masterNode.equals(master))) { nodeFactory.setRpcCommand(getRpcCommandProxy(master, port)); nodeFactory.setMasterNode(master); nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE, PROCESS_START); } } } @Override public RpcCommand getRpcCommandProxy(String ip, int port) throws Exception { return RPC.getProxy(RpcCommand.class, RpcCommand.versionID, new InetSocketAddress(ip, port), new Configuration()); }
-
採集器組件
-
採集組件簡要介紹
-
採集層支持DB併發採集、FTP併發採集、syslog接收、本地文件採集
-
支持FTP、DB 異常補採
-
採集層支持JOB任務閥值配置,DB鏈接池設置、Ftp鏈接設置、syslog 批量生產文件等
-
提供採集層開發者模式,標準API接口
-
數據庫表管理採集任務
-
將採集的數據負載均衡到KAFKA中間件中
-
kafka組件
-
kafka組件簡要介紹
-
接收採集器消息存放分區中
-
kafka負載均衡消息,ETL負載均衡消費分區中消息
-
kafka支持訂閱、消費消息、ETL實時分析消息
-
etl組件
-
etl組件簡要介紹
-
清洗層支持數據追加、數據彙總、數據補全、過濾、映射、轉換、拆分、解析
-
清洗層支持清洗任務閥值配置
-
清洗層清洗開發者模式 ,標準API接口
-
清洗層支持庫表管理清洗流程
-
接收清洗完成的數據,自定義存儲,庫、表、hive 等
-
存儲層支持自定義多庫存儲、自定義表存儲
-
提供存儲層開發者模式,標準API接口
-
存儲異常保存文件,監控異常文件從新存儲。
-
支持實時分析,支持開發etl函數庫
ETL集羣採集器設計
ETL採集清洗應用(審計系統架構)