ETL採集器是基於Job管理器管理任務,spring管理採集清洗對象,JDBC管理器管理JDBC。 java
數據處理流程:Job管理器調度->採集(生成文件)>->清洗層讀取文件->存儲存儲泛化日誌 spring
1.ETL採集器主要特色
-
ETL採集器:分爲三個部分組成採集層、清洗層、存儲層 數據庫
-
採集層:主要任務採集數據並生成文件 架構
-
採集層支持DB併發採集、FTP併發採集、syslog接收、本地文件採集 併發
-
支持FTP、DB 異常補採 app
-
採集層支持JOB任務閥值配置,DB鏈接池設置、Ftp鏈接設置、syslog 批量生產文件等 ide
-
提供採集層開發者模式,標準API接口 this
-
數據庫表管理採集任務 url
-
清洗層:主要讀取文件拆分任務,併發清洗任務 spa
-
清洗層支持數據追加、數據彙總、數據補全、過濾、映射、轉換、拆分、解析
-
清洗層支持清洗任務閥值配置
-
清洗層清洗開發者模式 ,標準API接口
-
清洗層支持庫表管理清洗流程
-
存儲層:接收清洗完成的數據,自定義存儲,庫、表、hive 等
-
存儲層支持自定義多庫存儲、自定義表存儲
-
提供存儲層開發者模式,標準API接口
-
存儲異常保存文件,監控異常文件從新存儲。
-
日誌:根據採集編號記錄日誌,記錄日誌採集條數、存儲條數、日誌採集效率、泛化效率、異常信息等。
-
2.ETL採集器架構設計
-
採集清洗架構圖
-
流程圖
-
表結構設計以下
3.ETL採集器運行流程
-
策略配置
採集配置
配置清洗流程(Config)
<columns key="1" method="splitThis" parameters="{{this}},{\$}"/> <columns key="2" method="info" parameters="{{1}[7]}{+}-{+}{{1}[9]}"/> <columns key="3" method="info" parameters="{{1}[18]}{+}${+}{{1}[19]}"/> <columns key="4" method="regexpArray" parameters="{{1}[0]},{(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})}"/> <columns key="SIP" method="iPIntID" parameters="{{4}}"/> <columns key="DIP" method="iPIntArray" parameters="{{1}[1]}"/> <columns key="DEVIP" method="iPIntArray" parameters="{{1}[1]}"/> <columns key="OPEROBJECT" method="subStringArray" parameters="{{1}[2]},{16}"/> <columns key="DEVTIME" method="dateUnixArrayTime" parameters="{{1}[3]},{yyyyMMddHHmmss}"/> <columns key="SECONDARYACCOUNT" method="textArray" parameters="{{1}[4]}"/> <columns key="USERORGANIZATION" method="textArray" parameters="{{1}[5]}"/> <columns key="OPERRESULT" method="textArray" parameters="{{1}[6]}"/> <columns key="RAWEVENTID" method="textArray" parameters="{{1}[7]}"/> <columns key="OPERCONTENT" method="subStringArray" parameters="{{1}[11]},{15}"/> <columns key="EVENTCOUNT" method="regexpArray" parameters="{{1}[11]},{.*金額:([0-9]+\\.[0-9]+).*}}"/> <columns key="FORMID" method="regexpArray" parameters="{{1}[11]},{.*備註:(.*)}"/> <columns key="FOTMID" method="textArray" parameters="{{1}[13]}"/> <columns key="DEVNAME" method="info" parameters="CRM1_1501"/> <columns key="FK_DEVTYPE" method="info" parameters="1501"/> <columns key="VERSION" method="info" parameters="3"/> <mapping method="mappingValueID" parameters="{{2}},{FK_EVENTTYPE},{EVENTNAME}"/> <mapping method="textID" parameters="{{2}}"/> <mapping method="mappingValueArray" parameters="{{1}[17]},{TREASURYSCENENUM},{TREASURYSCENE}"/> <mapping method="mappingArray" parameters="{{1}[18]},{TREASURYAUTHRESULT}"/> <mapping method="mappingID" parameters="{{3}},{TREASURYMODE}"/> <mapping method="completionData" parameters="{{opid}}"/>
-
加入JOB任務
-
執行Job
public void execute(JobExecutionContext context) { JobDataMap map = context.getJobDetail().getJobDataMap(); Iterator<Map.Entry<String, String>> it = map.entrySet().iterator(); IData<String, Object> jobParameters = new DataMap<String, Object>(); // 將參數存儲到data中 while (it.hasNext()) { Map.Entry<String, String> entry = it.next(); jobParameters.put(entry.getKey(), entry.getValue()); } BeanFactory beanFactory = BeanFactory.getBean(); beanFactory.getCollectorServer().execcCollectorJob(jobParameters, beanFactory); }
建立採集對象,加入spring工廠
- 建立spring 配置xml,對象加入內存中
<!--配置FTP 生成文件 --> <bean id="collector" class="com.venustech.collector.main.Collector"> <property name="collectorMap"> <map> <!--type 1 ftp --> <entry key="1" value="ftpCreateXmlConfigImpl" /> <!--type 2 db --> <entry key="2" value="dbCreateXmlConfigImpl" /> <!--type 3 syslog --> <entry key="3" value="collectDataSyslogServer" /> <!--type 4 本地File --> <entry key="4" value="collectLocalImpl" /> </map> </property> </bean>
<?xml version="1.0" encoding="GBK"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans classpath:spring-beans.xsd"> <bean id="SA_AUDITALERT_VIEW2" class="com.venustech.collector.service.impl.CollectDataFtpImpl" > <property name="mappingId" value="CRM"/> <property name="username" value="joy"/> <property name="password" value="go2hell"/> <property name="resourcename" value="/data/data/4A"/> <property name="datasource" value="HE_AuditLogging.log.[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}"/> <property name="localpath" value="E:\java\venus\AuditData2.0\config/collector/rawlog/FTP/SA_AUDITALERT_VIEW2/"/> <property name="hostname" value="10.70.41.126"/> <property name="port" value="21"/> <property name="tablename" value="SA_AUDITALERT_VIEW2"/> <property name="parameter"> <map> <entry key="startFile" value="${startFile}"/> <entry key="fileEncode" value="${fileEncode}"/> <entry key="startCount" value="${startCount}"/> <entry key="saveId" value="${saveId}"/> <entry key="isAuto" value="${isAuto}"/> </map> </property> </bean> <bean id="property" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value> file:E:\java\venus\AuditData2.0\config/collector/py/properties/SA_AUDITALERT_VIEW2.properties </value> </list> </property> </bean> </beans>
採集生成文件
public String[] getAutoCollectFile(String id) throws Exception { if (data.getInt(ConfigParameter.PARAMETER_FTP_DIR) == 0) { ftp = new ContinueFTPImp(hostname, port, username, password, resourcename, localpath, datasource, id, data.getInt( ConfigParameter.PARAMETER_FTP_CONN_TIMEOUT, 60), data.getInt(ConfigParameter.PARAMETER_FTP_DATA_TIMEOUT, 60)); } else { ftp = new ContinueFTPDirImp(hostname, port, username, password, resourcename, localpath, datasource, id, data.getInt( ConfigParameter.PARAMETER_FTP_CONN_TIMEOUT, 60), data.getInt(ConfigParameter.PARAMETER_FTP_DATA_TIMEOUT, 60)); } return ftp.ftpAutoDownload(data .getString(ConfigParameter.PARAMETER_START_FILE)); }
拆分任務
public static List<FileAttribute> getFileAttribute(String[] fileName, int startCount, IData<String, Integer> fileData, int countData, String localpath, String mappingId, String tableName, String id, CollectorJobRunState collectorJobRunState, String dbId, int inserCount) { // 循環文件開關 boolean isFlag = true; // 拆分文件容器 List<FileAttribute> listFile = new DatasetList<FileAttribute>(); // 日誌總數 int count = 0; BeanFactory beanFactory = BeanFactory.getBean(); // 循環文件計算器 int j = 0; // 文件容器 List<String> fileList = new ArrayList<String>(); // 文件模型 FileAttribute file; // 獲取要處理的次數 int countFor = getSize(countData, inserCount); // 本次採集生成文件次數 int index = 1; for (int i = 0; i < countFor; i++) { while (isFlag) { count = count + fileData.getInt(fileName[j]); fileList.add(fileName[j]); if (count - startCount > inserCount - 1) { file = new FileAttribute(); file.setStartCount(startCount); startCount = fileData.getInt(fileName[j], 0) - (count - inserCount - startCount); file.setEndCount(startCount); file.setFileName(fileList); file.setMappingId(mappingId); file.setDbId(dbId); file.setId(id); file.setTableName(tableName); file.setForCount(index); file.setJobExecCount(collectorJobRunState.getCount()); file.setLocalpath(localpath); // 加入採集的日誌 listFile.add(file); fileList = new ArrayList<String>(); isFlag = false; beanFactory.getLog().debug( id, Thread.currentThread().getName() + ":" + file.toString(), LogEvent.COLLECT_ADD_TASK_COUNT, inserCount, collectorJobRunState.getCount(), index); count = 0; index++; } else if (countFor == i + 1 && fileName.length - 1 == j) { file = new FileAttribute(); file.setStartCount(startCount); int indexCount = count - startCount; startCount = fileData.getInt(fileName[j], 0) - (count - inserCount - startCount); file.setEndCount(fileData.getInt(fileName[j], 0)); file.setFileName(fileList); file.setMappingId(mappingId); file.setDbId(dbId); file.setId(id); file.setTableName(tableName); file.setForCount(index); file.setJobExecCount(collectorJobRunState.getCount()); file.setLocalpath(localpath); // 加入採集的日誌 listFile.add(file); isFlag = false; beanFactory.getLog().debug( id, Thread.currentThread().getName() + ":" + file.toString(), LogEvent.COLLECT_ADD_TASK_COUNT, indexCount, collectorJobRunState.getCount(), index); count = 0; index++; } else { if (j < fileName.length - 1) { j++; } } } isFlag = true; } return listFile; }
線程分發讀取日誌進入清洗流程
/** * 獲得範化數據 * * @param id * @param list * @param key * @return * @throws XMLManagerException * @throws DocumentException */ public static IDataset<IData<String, Object>> getForMatData(String id, String mappingId, IDataset<String> list) { Map<String, CleanDataMethod> method = BeanFactory.getBean() .getMainformat().getMethod(); IDataset<IData<String, Object>> insertList = new DatasetList<IData<String, Object>>(); Element beans = null; try { beans = com.venustech.collector.BeanFactory.getBean() .getElement(id); } catch (Exception e) { new CollectorExceptionLog(Thread.currentThread().getName() + ": " + "get xml content error! ", LogEvent.LOG_TYPE_FORMAT, e) .error(id); } for (String valueThis : list) { try { IData<String, Object> data = new DataMap<String, Object>(); // 泛化 for (Iterator<?> it = beans.elementIterator(); it.hasNext();) { Element bean = (Element) it.next(); executeCleanData(mappingId, valueThis, bean, method, data); } data = ApplicationContextSynchronous.defaultData .completionDefault(data); data.put(IDataHandle.RAWLOG_ID, valueThis); insertList.add(data); data = null; } catch (Exception e) { // 內置過濾方法 } } return insertList; } /** * * @Title: execForMat * @Description: TODO(執行標準化) * @param id * @param valueThis * @param bean * @param forMat * @param forMatReturn * @return * @throws CollectorExceptionLog * @return ForMatReturn 返回類型 * @throws */ public static IData<String, Object> executeCleanData(String id, String valueThis, Element bean, Map<String, CleanDataMethod> methodMap, IData<String, Object> data) throws CollectorExceptionLog { // 方法 String method = bean.attributeValue(INI_FORMAT_ATTRIBUTE_METHOD); // 參數 String parameters = bean .attributeValue(INI_FORMAT_ATTRIBUTE_PARAMETERS); // 標識 String key = bean.attributeValue(INI_FORMAT_ATTRIBUTE_KYE); CleanDataMethod cleanDataMethod = methodMap.get(method); if (cleanDataMethod == null) { new CollectorExceptionLog("[fromat->" + id + "]" + "[element->" + bean.asXML() + "] error:java.lang.NullPointerException", new Exception("can not find method !")).formatError(id, valueThis); return data; } return cleanDataMethod.executeCleanData(valueThis, parameters, bean.asXML(), data, key, id); }
存儲清洗數據
package com.venustech.collector.service.impl.thread.table; import com.venustech.collector.model.FileAttribute; import com.venustech.collector.service.SaveCollector; import com.venustech.dao.JdbcDaoManager; import com.venustech.data.IData; import com.venustech.data.IDataset; import com.venustech.model.JdbcDataSource; public class SaveCollectorDb implements SaveCollector { JdbcDataSource jdbcDataSource; @Override public String save(String id, String[] tableName, IDataset<IData<String, Object>> list, FileAttribute fileAttribute) throws Exception { JdbcDaoManager dao = new JdbcDaoManager(jdbcDataSource); try { for (String table : tableName) { dao.insert(table, list); } //自動回收 list=null; //自動回收 fileAttribute=null; } catch (Exception e) { throw e; } finally { dao.cleanupConnections(); } return jdbcDataSource.toString(); } public JdbcDataSource getJdbcDataSource() { return jdbcDataSource; } public void setJdbcDataSource(JdbcDataSource jdbcDataSource) { this.jdbcDataSource = jdbcDataSource; } }
採集層、清洗層、存儲層開發者模式
採集層開發者API
實現ICollectData接口,存儲本地文件
public interface ICollectData { //執行採集過程 public IData<String, String> execute(String id,CollectorJobRunState collectorJobRunState) throws Exception ; //採集同步 public IData<String, String> synchronous(String id,CollectorJobRunState collectorJobRunState) throws Exception; //自動採集 public IData<String, String> collectAuto(String id,CollectorJobRunState collectorJobRunState) throws Exception; //獲取採集參數 public Map<String, String> getParameter(); //初始化採集參數 public void initCollectParameter(String id); //設置參數 public void setCollectParameter(String id,IData<String,String> data); }
將實現類加入spring容器中
</bean>
清洗開發者API
實現CleanDataMethod清洗接口
public interface CleanDataMethod { /** * * @Title: getForMatData * @Description: TODO(清洗數據方法) * @param valueThis 原始日誌 * @param parameters 參數 * @param elementXml xml屬性 * @param data 保存數據 * @param key 字段 * @param id id * @return * @throws CollectorExceptionLog * @return ForMatReturn 返回類型 * @throws */ public IData<String,Object> executeCleanData(String valueThis,String parameters,String elementXml, IData<String,Object> data,String key,String id)throws CollectorExceptionLog; }
將實現類加入spring容器中
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans classpath:spring-beans.xsd"> <bean id="mappingArray" class="com.venustech.collector.service.fromat.method.mapping.MappingArray" /> <bean id="mappingID" class="com.venustech.collector.service.fromat.method.mapping.MappingID" /> <bean id="mappingValueArray" class="com.venustech.collector.service.fromat.method.mapping.MappingValueArray" /> <bean id="mappingValueID" class="com.venustech.collector.service.fromat.method.mapping.MappingValueID" /> <bean id="dateUnix" class="com.venustech.collector.service.fromat.method.dateunix.DateUnix" /> <bean id="dateUnixArray" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixArray" /> <bean id="dateUnixArrayTime" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixArrayTime" /> <bean id="dateUnixID" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixID" /> <bean id="dateUnixIDTime" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixIDTime" /> <bean id="iPIntArray" class="com.venustech.collector.service.fromat.method.iptoint.IPIntArray" /> <bean id="iPIntID" class="com.venustech.collector.service.fromat.method.iptoint.IPIntID" /> <bean id="regexpArray" class="com.venustech.collector.service.fromat.method.regexp.RegexpArray" /> <bean id="regexpID" class="com.venustech.collector.service.fromat.method.regexp.RegexpID" /> <bean id="regexpThis" class="com.venustech.collector.service.fromat.method.regexp.RegexpThis" /> <bean id="splitArray" class="com.venustech.collector.service.fromat.method.split.SplitArray" /> <bean id="splitID" class="com.venustech.collector.service.fromat.method.split.SplitID" /> <bean id="splitThis" class="com.venustech.collector.service.fromat.method.split.SplitThis" /> <bean id="textArray" class="com.venustech.collector.service.fromat.method.text.TextArray" /> <bean id="textID" class="com.venustech.collector.service.fromat.method.text.TextID" /> <bean id="info" class="com.venustech.collector.service.fromat.method.info.Info" /> <bean id="subStringArray" class="com.venustech.collector.service.fromat.method.substring.SubStringArray" /> <bean id="filterRegexpArray" class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpArray" /> <bean id="filterRegexpID" class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpID" /> <bean id="filterRegexpThis" class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpThis" /> <bean id="splitKeyValueID" class="com.venustech.collector.service.fromat.method.split.SplitKeyValueID" /> <bean id="subStringID" class="com.venustech.collector.service.fromat.method.substring.SubStringID" /> <bean id="completionData" class="com.venustech.collector.service.fromat.method.completion.CompletionData" /> <bean id="completionDataDefault" class="com.venustech.collector.service.fromat.method.completion.CompletionDataDefault" /> <bean id="subIndexStringArray" class="com.venustech.collector.service.fromat.method.substring.SubIndexStringArray" /> <bean id="subIndexStringId" class="com.venustech.collector.service.fromat.method.substring.SubIndexStringId" /> <bean id="urlToID" class="com.venustech.collector.service.fromat.method.urltoip.UrlToID" /> <bean id="urlToIpArray" class="com.venustech.collector.service.fromat.method.urltoip.UrlToIpArray" /> <bean id="mappingKey" class="com.venustech.collector.service.fromat.method.mapping.MappingKey" /> <bean id="mappingValueKey" class="com.venustech.collector.service.fromat.method.mapping.MappingValueKey" /> <bean id="toDateArray" class="com.venustech.collector.service.fromat.method.todate.ToDateArray" /> <bean id="toDateIDTime" class="com.venustech.collector.service.fromat.method.todate.ToDateIDTime" /> <bean id="mainformat" class="com.venustech.collector.service.fromat.MainForMat"> <property name="method"> <map> <entry key="mappingValueKey" value-ref="mappingValueKey" /> <entry key="mappingKey" value-ref="mappingKey" /> <entry key="mappingArray" value-ref="mappingArray" /> <entry key="mappingID" value-ref="mappingID" /> <entry key="mappingValueArray" value-ref="mappingValueArray" /> <entry key="mappingValueID" value-ref="mappingValueID" /> <entry key="dateUnix" value-ref="dateUnix" /> <entry key="dateUnixArray" value-ref="dateUnixArray" /> <entry key="dateUnixArrayTime" value-ref="dateUnixArrayTime" /> <entry key="dateUnixID" value-ref="dateUnixID" /> <entry key="dateUnixIDTime" value-ref="dateUnixIDTime" /> <entry key="iPIntArray" value-ref="iPIntArray" /> <entry key="iPIntID" value-ref="iPIntID" /> <entry key="regexpArray" value-ref="regexpArray" /> <entry key="regexpID" value-ref="regexpID" /> <entry key="regexpThis" value-ref="regexpThis" /> <entry key="splitArray" value-ref="splitArray" /> <entry key="splitID" value-ref="splitID" /> <entry key="splitThis" value-ref="splitThis" /> <entry key="textArray" value-ref="textArray" /> <entry key="textID" value-ref="textID" /> <entry key="subStringID" value-ref="subStringID" /> <entry key="subStringArray" value-ref="subStringArray" /> <entry key="completionData" value-ref="completionData" /> <entry key="subIndexStringArray" value-ref="subIndexStringArray" /> <entry key="subIndexStringId" value-ref="subIndexStringId" /> <entry key="completionDataDefault" value-ref="completionDataDefault" /> <entry key="urlToID" value-ref="urlToID" /> <entry key="urlToIpArray" value-ref="urlToIpArray" /> <entry key="filterRegexpArray" value-ref="filterRegexpArray" /> <entry key="filterRegexpID" value-ref="filterRegexpID" /> <entry key="filterRegexpThis" value-ref="filterRegexpThis" /> <entry key="splitKeyValueID" value-ref="splitKeyValueID" /> <!--映射 --> <entry key="mappingArray" value-ref="mappingArray" /> <entry key="mappingID" value-ref="mappingID" /> <entry key="mappingValueArray" value-ref="mappingValueArray" /> <entry key="mappingValueID" value-ref="mappingValueID" /> <entry key="mappingKey" value-ref="mappingKey" /> <entry key="mappingValueKey" value-ref="mappingValueKey" /> <entry key="info" value-ref="info" /> <!--補全 --> <entry key="completionData" value-ref="completionData" /> <entry key="completionDataDefault" value-ref="completionDataDefault" /> <!--data轉換--> <entry key="toDateIDTime" value-ref="toDateIDTime" /> <entry key="toDateArray" value-ref="toDateArray" /> </map> </property> </bean> </beans>
存儲層開發者API
實現SaveCollector接口
public interface SaveCollector{ public String save(String id, String[] tableName, IDataset<IData<String, Object>> list,FileAttribute fileAttribute) throws Exception; }將實現類加入spring容器中