ETL採集器(負載均衡)

     ETL集羣採集器基於ETL採集器(單機版)的一個升級,主要分爲五大組件:分別爲JOB任務組件、採集器組件、KAFKA消息組件、ETL清洗組件、存儲組件,每臺節點每秒清洗3W-5W條日誌。 java

  1. JOB任務組件

  • JOB任務組件簡要介紹

    1. 技術簡要說明:基於JOB管理器、zookeeper、hadoop RPC 開發 node

    2. JOB任務組件分三部分組成:MasterJob、SlaveJob、zookeeper 數據庫

      1. MasterJob主要責任是生產任務,創建RPC服務 服務器

      2. SlaveJob主要責任是消費執行任務,經過RPC獲取任務 架構

      3. zookeeper主要責任監控MasterJob、SlaveJob 快速切換 Master,並廣播任務 併發

    3.  負載均衡消費任務 負載均衡

    4.  支持任務的啓用、停用 ide

    5.  支持MySql、Oracle、DB2等多種數據庫管理任務 函數

    6.  靈活便利的管理quartz任務 oop

    7. 支持任務參數的傳遞 

    8. 具體實現參考單機版ETL http://my.oschina.net/u/2470985/blog/509714

  1. 支持任務參數的傳遞

  • JOB任務組件架構設計

  • zookeeper 監控Master 、Slave ,選舉Master,選舉RPC服務端

 papublic void init() throws Exception {
  client = CuratorFrameworkFactory.newClient(zookQuorum, // 服務器列表
    createTimeout, // 會話超時時間,單位毫秒
    connTimeout, // 鏈接建立超時時間,單位毫秒
    new ExponentialBackoffRetry(time, timeoutCount) // 重試策略
    );
  nodeFactory = NodeFactory.getNodeFactory();
  // 啓動zk
  client.start();
  // 監控分發job任務監控
  jobMonitor();
  // 添加master、slave監控
  addMasterMonitor();
  // 初始化節點
  initCreateNode(client, nodePath, nodePathName, jobTaskPath,
    jobTaskPathName, nodeFactory);
 }
 /**
  * 
  * @Title: initCreateNode
  * @Description: 初始化節點
  * @param client
  * @param nodePath
  * @param nodePathName
  * @param jobTaskPath
  * @param jobTaskPathName
  * @param nodeFactory
  * @throws Exception
  * @return: void
  */
 public void initCreateNode(CuratorFramework client, String nodePath,
   String nodePathName, String jobTaskPath, String jobTaskPathName,
   NodeFactory nodeFactory) throws Exception {
  // 建立node節點
  client.create().creatingParentsIfNeeded()
    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    .forPath(nodePath + nodePathName);
  if (client.checkExists().forPath(jobTaskPath + jobTaskPathName) == null) {
   client.create()
     .creatingParentsIfNeeded()
     .forPath(jobTaskPath + jobTaskPathName,
       nodeFactory.getIp().getBytes());
  }
 }
 /**
  * 
  * @Title: jobMonitor
  * @Description: 監控JOB
  * @throws Exception
  * @return: void
  */
 public void jobMonitor() throws Exception {
  zookeeperjobTask = new PathChildrenCache(client, jobTaskPath, true);
  zookeeperjobTask.getListenable().addListener(
    new PathChildrenCacheListener() {
     public void childEvent(CuratorFramework client,
       PathChildrenCacheEvent enEvent) throws Exception {
      nodeFactory.getZookManage().masterMonitor(client,
        jobTaskPath, rpcPort, nodeFactory);
     }
    });
  zookeeperjobTask.start(StartMode.BUILD_INITIAL_CACHE);
 }
 /**
  * 
  * @Title: addmasterMonitor
  * @Description: 監控Master
  * @throws Exception
  * @return: void
  */
 public void addMasterMonitor() throws Exception {
  zookeeperNodeEvent = new PathChildrenCache(client, nodePath, true);
  zookeeperNodeEvent.getListenable().addListener(
    new PathChildrenCacheListener() {
     public void childEvent(CuratorFramework client,
       PathChildrenCacheEvent enEvent) throws Exception {
      nodeFactory.getZookManage().masterMonitor(client,
        nodePath, rpcPort, nodeFactory);
     }
    });
  zookeeperNodeEvent.start(StartMode.BUILD_INITIAL_CACHE);
 }
@Override
 public void masterMonitor(CuratorFramework client, String nodePath,
   int port, NodeFactory nodeFactory) throws Exception {
  // 狀態
  Map<String, String> nodeState = nodeFactory.getNodeState();
  // 獲取當前IP
  String ip = nodeFactory.getIp();
  // 獲取masterNode
  String masterNode = nodeFactory.getMasterNode();
  // 設置job節點
  List<String> nodeList = client.getChildren().forPath(nodePath);
  Collections.sort(nodeList);
  String master = (new String(client.getData().forPath(
    nodePath + "/" + nodeList.get(0))));
  // 選本機爲master
  if (ip.equals(master)) {
   // 判斷本機是否新的master null則爲新的master
   if (nodeFactory.getRpcServiceNode() == null) {
    nodeFactory.setMasterNode(master);
    RpcServiceNode rpcServiceNode = new RpcServiceNode();
    rpcServiceNode.setPort(port);
    rpcServiceNode.setHost(ip);
    rpcServiceNode.start();
    // 設置rpc對象
    nodeFactory.setRpcCommand(getRpcCommandProxy(master, port));
    // 設置RPC客戶端
    nodeFactory.setRpcServiceNode(rpcServiceNode);
    // 開啓策略
    BeanFactory.getBean().getLoadingService().init();
    // 設置服務
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_MASTER,
      PROCESS_START);
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE,
      PROCESS_START);
   }
  } else {
   // 假如本機不是當前master則關閉RPC服務
   if (nodeFactory.getRpcServiceNode() != null) {
    nodeFactory.getRpcServiceNode().stop();
    nodeFactory.setRpcServiceNode(null);
    //關閉JOB
    BeanFactory.getBean().getLoadingService().stop();
    nodeState.remove(ip + PROCESS_CUT + StartNode.HOST_MASTER);
   }
   // 假如當前master發生改變則切換
   if ((!masterNode.equals(master))) {
    nodeFactory.setRpcCommand(getRpcCommandProxy(master, port));
    nodeFactory.setMasterNode(master);
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE,
      PROCESS_START);
   }
  }
 }
 @Override
 public RpcCommand getRpcCommandProxy(String ip, int port) throws Exception {
  return RPC.getProxy(RpcCommand.class, RpcCommand.versionID,
    new InetSocketAddress(ip, port), new Configuration());
 }
  • 採集器組件

  • 採集組件簡要介紹

    1. 採集層支持DB併發採集、FTP併發採集、syslog接收、本地文件採集

    2. 支持FTP、DB 異常補採

    3. 採集層支持JOB任務閥值配置,DB鏈接池設置、Ftp鏈接設置、syslog 批量生產文件等

    4. 提供採集層開發者模式,標準API接口

    5. 數據庫表管理採集任務

    6. 將採集的數據負載均衡到KAFKA中間件中

  • kafka組件

  • kafka組件簡要介紹

    1. 接收採集器消息存放分區中

    2. kafka負載均衡消息,ETL負載均衡消費分區中消息

    3. kafka支持訂閱、消費消息、ETL實時分析消息

  • etl組件

  • etl組件簡要介紹

    1. 清洗層支持數據追加、數據彙總、數據補全、過濾、映射、轉換、拆分、解析 

    2. 清洗層支持清洗任務閥值配置

    3. 清洗層清洗開發者模式 ,標準API接口

    4. 清洗層支持庫表管理清洗流程

    5. 接收清洗完成的數據,自定義存儲,庫、表、hive 等

    6. 存儲層支持自定義多庫存儲、自定義表存儲

    7. 提供存儲層開發者模式,標準API接口

    8. 存儲異常保存文件,監控異常文件從新存儲。

    9. 支持實時分析,支持開發etl函數庫

ETL集羣採集器設計

 

ETL採集清洗應用(審計系統架構)

相關文章
相關標籤/搜索