ETL採集器(單機版)

ETL採集器是基於Job管理器管理任務,spring管理採集清洗對象,JDBC管理器管理JDBC。 java

 數據處理流程:Job管理器調度->採集(生成文件)>->清洗層讀取文件->存儲存儲泛化日誌 spring

1.ETL採集器主要特色

  • ETL採集器:分爲三個部分組成採集層、清洗層、存儲層 數據庫

  • 採集層:主要任務採集數據並生成文件 架構

    1. 採集層支持DB併發採集、FTP併發採集、syslog接收、本地文件採集 併發

    2. 支持FTP、DB 異常補採 app

    3. 採集層支持JOB任務閥值配置,DB鏈接池設置、Ftp鏈接設置、syslog 批量生產文件等 ide

    4. 提供採集層開發者模式,標準API接口 this

    5. 數據庫表管理採集任務 url

  • 清洗層:主要讀取文件拆分任務,併發清洗任務 spa

    1. 清洗層支持數據追加、數據彙總、數據補全、過濾、映射、轉換、拆分、解析 

    2. 清洗層支持清洗任務閥值配置

    3. 清洗層清洗開發者模式 ,標準API接口

    4. 清洗層支持庫表管理清洗流程

  • 存儲層:接收清洗完成的數據,自定義存儲,庫、表、hive 等

    1. 存儲層支持自定義多庫存儲、自定義表存儲

    2. 提供存儲層開發者模式,標準API接口

    3. 存儲異常保存文件,監控異常文件從新存儲。

  • 日誌:根據採集編號記錄日誌,記錄日誌採集條數、存儲條數、日誌採集效率、泛化效率、異常信息等。

  • 2.ETL採集器架構設計

  • 採集清洗架構圖

 

  • 流程圖

  • 表結構設計以下
      

3.ETL採集器運行流程

  1. 配置

採集

 配置清洗流程(Config)

    <columns key="1" method="splitThis" parameters="{{this}},{\$}"/>
    <columns key="2"  method="info" parameters="{{1}[7]}{+}-{+}{{1}[9]}"/>
    <columns key="3"  method="info" parameters="{{1}[18]}{+}${+}{{1}[19]}"/>
    <columns key="4" method="regexpArray" parameters="{{1}[0]},{(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})}"/>
    <columns key="SIP" method="iPIntID" parameters="{{4}}"/>
    <columns key="DIP" method="iPIntArray" parameters="{{1}[1]}"/>
    <columns key="DEVIP" method="iPIntArray" parameters="{{1}[1]}"/>
    <columns key="OPEROBJECT" method="subStringArray" parameters="{{1}[2]},{16}"/>
    <columns key="DEVTIME" method="dateUnixArrayTime" parameters="{{1}[3]},{yyyyMMddHHmmss}"/>
    <columns key="SECONDARYACCOUNT" method="textArray" parameters="{{1}[4]}"/>
    <columns key="USERORGANIZATION" method="textArray" parameters="{{1}[5]}"/>
    <columns key="OPERRESULT" method="textArray" parameters="{{1}[6]}"/>
    <columns key="RAWEVENTID" method="textArray" parameters="{{1}[7]}"/>
    <columns key="OPERCONTENT" method="subStringArray" parameters="{{1}[11]},{15}"/>
    <columns key="EVENTCOUNT" method="regexpArray" parameters="{{1}[11]},{.*金額:([0-9]+\\.[0-9]+).*}}"/>
    <columns key="FORMID" method="regexpArray" parameters="{{1}[11]},{.*備註:(.*)}"/>
    <columns key="FOTMID" method="textArray" parameters="{{1}[13]}"/>
    <columns key="DEVNAME" method="info"  parameters="CRM1_1501"/>
    <columns key="FK_DEVTYPE" method="info"  parameters="1501"/> 
    <columns key="VERSION" method="info" parameters="3"/> 
    <mapping method="mappingValueID" parameters="{{2}},{FK_EVENTTYPE},{EVENTNAME}"/>
    <mapping method="textID" parameters="{{2}}"/>
    <mapping method="mappingValueArray" parameters="{{1}[17]},{TREASURYSCENENUM},{TREASURYSCENE}"/>
    <mapping method="mappingArray" parameters="{{1}[18]},{TREASURYAUTHRESULT}"/>
    <mapping method="mappingID" parameters="{{3}},{TREASURYMODE}"/>
    <mapping method="completionData" parameters="{{opid}}"/>
  • 加入JOB任務

  • 執行Job

public void execute(JobExecutionContext context) {
		JobDataMap map = context.getJobDetail().getJobDataMap();
		Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
		IData<String, Object> jobParameters = new DataMap<String, Object>();
		// 將參數存儲到data中
		while (it.hasNext()) {
			Map.Entry<String, String> entry = it.next();
			jobParameters.put(entry.getKey(), entry.getValue());
		}
		BeanFactory beanFactory = BeanFactory.getBean();
		beanFactory.getCollectorServer().execcCollectorJob(jobParameters,
				beanFactory);
	}


建立採集對象,加入spring工廠

  1. 建立spring 配置xml,對象加入內存中
<!--配置FTP 生成文件 -->
	<bean id="collector" class="com.venustech.collector.main.Collector">
		<property name="collectorMap">
			<map>
				<!--type 1 ftp -->
				<entry key="1" value="ftpCreateXmlConfigImpl" />
				<!--type 2 db -->
				<entry key="2" value="dbCreateXmlConfigImpl" />
				<!--type 3 syslog -->
				<entry key="3" value="collectDataSyslogServer" />
				<!--type 4 本地File -->
				<entry key="4" value="collectLocalImpl" />
			</map>
		</property>
	</bean>
<?xml version="1.0" encoding="GBK"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://www.springframework.org/schema/beans classpath:spring-beans.xsd"> 
<bean id="SA_AUDITALERT_VIEW2" class="com.venustech.collector.service.impl.CollectDataFtpImpl" >
   <property name="mappingId" value="CRM"/>
   <property name="username" value="joy"/>
   <property name="password" value="go2hell"/>
   <property name="resourcename" value="/data/data/4A"/>
   <property name="datasource" value="HE_AuditLogging.log.[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}"/>
   <property name="localpath" value="E:\java\venus\AuditData2.0\config/collector/rawlog/FTP/SA_AUDITALERT_VIEW2/"/>
   <property name="hostname" value="10.70.41.126"/>
   <property name="port" value="21"/>
   <property name="tablename" value="SA_AUDITALERT_VIEW2"/>
   <property name="parameter">
      <map>
         <entry key="startFile" value="${startFile}"/>
         <entry key="fileEncode" value="${fileEncode}"/>
         <entry key="startCount" value="${startCount}"/>
         <entry key="saveId" value="${saveId}"/>
         <entry key="isAuto" value="${isAuto}"/>
      </map>
 </property>
</bean>
<bean id="property" 
	class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>
					file:E:\java\venus\AuditData2.0\config/collector/py/properties/SA_AUDITALERT_VIEW2.properties
				</value>
			</list>
		</property>
	</bean>

</beans>



採集生成文件

public String[] getAutoCollectFile(String id) throws Exception {
		if (data.getInt(ConfigParameter.PARAMETER_FTP_DIR) == 0) {
			ftp = new ContinueFTPImp(hostname, port, username, password,
					resourcename, localpath, datasource, id, data.getInt(
							ConfigParameter.PARAMETER_FTP_CONN_TIMEOUT, 60),
					data.getInt(ConfigParameter.PARAMETER_FTP_DATA_TIMEOUT, 60));
		} else {
			ftp = new ContinueFTPDirImp(hostname, port, username, password,
					resourcename, localpath, datasource, id, data.getInt(
							ConfigParameter.PARAMETER_FTP_CONN_TIMEOUT, 60),
					data.getInt(ConfigParameter.PARAMETER_FTP_DATA_TIMEOUT, 60));
		}
		return ftp.ftpAutoDownload(data .getString(ConfigParameter.PARAMETER_START_FILE)); }

拆分任務

public static List<FileAttribute> getFileAttribute(String[] fileName,
			int startCount, IData<String, Integer> fileData, int countData,
			String localpath, String mappingId, String tableName, String id,
			CollectorJobRunState collectorJobRunState, String dbId,
			int inserCount) {
		// 循環文件開關
		boolean isFlag = true;
		// 拆分文件容器
		List<FileAttribute> listFile = new DatasetList<FileAttribute>();
		// 日誌總數
		int count = 0;
		BeanFactory beanFactory = BeanFactory.getBean();
		// 循環文件計算器
		int j = 0;
		// 文件容器
		List<String> fileList = new ArrayList<String>();
		// 文件模型
		FileAttribute file;
		// 獲取要處理的次數
		int countFor = getSize(countData, inserCount);
		// 本次採集生成文件次數
		int index = 1;
		for (int i = 0; i < countFor; i++) {
			while (isFlag) {
				count = count + fileData.getInt(fileName[j]);
				fileList.add(fileName[j]);
				if (count - startCount > inserCount - 1) {
					file = new FileAttribute();
					file.setStartCount(startCount);
					startCount = fileData.getInt(fileName[j], 0)
							- (count - inserCount - startCount);
					file.setEndCount(startCount);
					file.setFileName(fileList);
					file.setMappingId(mappingId);
					file.setDbId(dbId);
					file.setId(id);
					file.setTableName(tableName);
					file.setForCount(index);
					file.setJobExecCount(collectorJobRunState.getCount());
					file.setLocalpath(localpath);
					// 加入採集的日誌
					listFile.add(file);
					fileList = new ArrayList<String>();
					isFlag = false;
					beanFactory.getLog().debug(
							id,
							Thread.currentThread().getName() + ":"
									+ file.toString(),
							LogEvent.COLLECT_ADD_TASK_COUNT, inserCount,
							collectorJobRunState.getCount(), index);
					count = 0;
					index++;
				} else if (countFor == i + 1 && fileName.length - 1 == j) {
					file = new FileAttribute();
					file.setStartCount(startCount);
					int indexCount = count - startCount;
					startCount = fileData.getInt(fileName[j], 0)
							- (count - inserCount - startCount);
					file.setEndCount(fileData.getInt(fileName[j], 0));
					file.setFileName(fileList);
					file.setMappingId(mappingId);
					file.setDbId(dbId);
					file.setId(id);
					file.setTableName(tableName);
					file.setForCount(index);
					file.setJobExecCount(collectorJobRunState.getCount());
					file.setLocalpath(localpath);
					// 加入採集的日誌
					listFile.add(file);
					isFlag = false;
					beanFactory.getLog().debug(
							id,
							Thread.currentThread().getName() + ":"
									+ file.toString(),
							LogEvent.COLLECT_ADD_TASK_COUNT, indexCount,
							collectorJobRunState.getCount(), index);
					count = 0;
					index++;
				} else {
					if (j < fileName.length - 1) {
						j++;
					}
				}
			}
			isFlag = true;
		}
		return listFile;
	}

線程分發讀取日誌進入清洗流程

/**
	 * 獲得範化數據
	 * 
	 * @param id
	 * @param list
	 * @param key
	 * @return
	 * @throws XMLManagerException
	 * @throws DocumentException
	 */
	public static IDataset<IData<String, Object>> getForMatData(String id,
			String mappingId, IDataset<String> list) {
		Map<String, CleanDataMethod> method = BeanFactory.getBean()
				.getMainformat().getMethod();
		IDataset<IData<String, Object>> insertList = new DatasetList<IData<String, Object>>();
		Element beans = null;
		try {
			beans = com.venustech.collector.BeanFactory.getBean()
					.getElement(id);
		} catch (Exception e) {
			new CollectorExceptionLog(Thread.currentThread().getName() + ": "
					+ "get xml content error! ", LogEvent.LOG_TYPE_FORMAT, e)
					.error(id);
		}
		for (String valueThis : list) {
			try {
				IData<String, Object> data = new DataMap<String, Object>();
				// 泛化
				for (Iterator<?> it = beans.elementIterator(); it.hasNext();) {
					Element bean = (Element) it.next();
					executeCleanData(mappingId, valueThis, bean, method, data);
				}
				data = ApplicationContextSynchronous.defaultData
						.completionDefault(data);
				data.put(IDataHandle.RAWLOG_ID, valueThis);
				insertList.add(data);
				data = null;
			} catch (Exception e) {
				// 內置過濾方法
			}

		}
		return insertList;
	}

	/**
	 * 
	 * @Title: execForMat
	 * @Description: TODO(執行標準化)
	 * @param id
	 * @param valueThis
	 * @param bean
	 * @param forMat
	 * @param forMatReturn
	 * @return
	 * @throws CollectorExceptionLog
	 * @return ForMatReturn 返回類型
	 * @throws
	 */
	public static IData<String, Object> executeCleanData(String id,
			String valueThis, Element bean,
			Map<String, CleanDataMethod> methodMap, IData<String, Object> data)
			throws CollectorExceptionLog {
		// 方法
		String method = bean.attributeValue(INI_FORMAT_ATTRIBUTE_METHOD);
		// 參數
		String parameters = bean
				.attributeValue(INI_FORMAT_ATTRIBUTE_PARAMETERS);
		// 標識
		String key = bean.attributeValue(INI_FORMAT_ATTRIBUTE_KYE);
		CleanDataMethod cleanDataMethod = methodMap.get(method);
		if (cleanDataMethod == null) {
			new CollectorExceptionLog("[fromat->" + id + "]" + "[element->"
					+ bean.asXML() + "] error:java.lang.NullPointerException",
					new Exception("can not find method !")).formatError(id,
					valueThis);
			return data;
		}
		return cleanDataMethod.executeCleanData(valueThis, parameters,
				bean.asXML(), data, key, id);
	}



存儲清洗數據

package com.venustech.collector.service.impl.thread.table;

import com.venustech.collector.model.FileAttribute;
import com.venustech.collector.service.SaveCollector;
import com.venustech.dao.JdbcDaoManager;
import com.venustech.data.IData;
import com.venustech.data.IDataset;
import com.venustech.model.JdbcDataSource;

public class SaveCollectorDb implements SaveCollector {
	JdbcDataSource jdbcDataSource;

	@Override
	public String save(String id, String[] tableName,
			IDataset<IData<String, Object>> list, FileAttribute  fileAttribute)
			throws Exception {
		JdbcDaoManager dao = new JdbcDaoManager(jdbcDataSource);
		try {
			for (String table : tableName) {
				dao.insert(table, list);
			}
			//自動回收
			list=null;
			//自動回收
			fileAttribute=null;
		} catch (Exception e) { 
			throw e;
		} finally {
			dao.cleanupConnections();
		}
		return jdbcDataSource.toString();
	}

	public JdbcDataSource getJdbcDataSource() {
		return jdbcDataSource;
	}

	public void setJdbcDataSource(JdbcDataSource jdbcDataSource) {
		this.jdbcDataSource = jdbcDataSource;
	}

}

 

採集層、清洗層、存儲層開發者模式

採集層開發者API

實現ICollectData接口,存儲本地文件

public interface ICollectData { 
	//執行採集過程
	public IData<String, String>  execute(String id,CollectorJobRunState collectorJobRunState) throws Exception ;
	//採集同步
	public IData<String, String> synchronous(String id,CollectorJobRunState collectorJobRunState) throws Exception;
	//自動採集
	public IData<String, String> collectAuto(String id,CollectorJobRunState collectorJobRunState) throws Exception;
	//獲取採集參數
	public Map<String, String> getParameter();
	//初始化採集參數
	public void initCollectParameter(String id);
	//設置參數
	public void setCollectParameter(String id,IData<String,String> data);
}

將實現類加入spring容器中


</bean>

 清洗開發者API

  實現CleanDataMethod清洗接口

public interface CleanDataMethod {
	

	/**
	 * 
	* @Title: getForMatData 
	* @Description: TODO(清洗數據方法) 
	* @param valueThis 原始日誌
	* @param parameters 參數
	* @param elementXml xml屬性
	* @param data 保存數據  
	* @param key 字段
	* @param id id
	* @return
	* @throws CollectorExceptionLog 
	* @return ForMatReturn    返回類型 
	* @throws
	 */
	public IData<String,Object> executeCleanData(String valueThis,String parameters,String elementXml, IData<String,Object> data,String key,String id)throws CollectorExceptionLog;
}

將實現類加入spring容器中

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
        classpath:spring-beans.xsd">
        
	<bean id="mappingArray"
		class="com.venustech.collector.service.fromat.method.mapping.MappingArray" />
	<bean id="mappingID"
		class="com.venustech.collector.service.fromat.method.mapping.MappingID" />
	<bean id="mappingValueArray"
		class="com.venustech.collector.service.fromat.method.mapping.MappingValueArray" />
	<bean id="mappingValueID"
		class="com.venustech.collector.service.fromat.method.mapping.MappingValueID" />
	<bean id="dateUnix"
		class="com.venustech.collector.service.fromat.method.dateunix.DateUnix" />
	<bean id="dateUnixArray"
		class="com.venustech.collector.service.fromat.method.dateunix.DateUnixArray" />
	<bean id="dateUnixArrayTime"
		class="com.venustech.collector.service.fromat.method.dateunix.DateUnixArrayTime" />
	<bean id="dateUnixID"
		class="com.venustech.collector.service.fromat.method.dateunix.DateUnixID" />
	<bean id="dateUnixIDTime"
		class="com.venustech.collector.service.fromat.method.dateunix.DateUnixIDTime" />
	<bean id="iPIntArray"
		class="com.venustech.collector.service.fromat.method.iptoint.IPIntArray" />
	<bean id="iPIntID"
		class="com.venustech.collector.service.fromat.method.iptoint.IPIntID" />
	<bean id="regexpArray"
		class="com.venustech.collector.service.fromat.method.regexp.RegexpArray" />
	<bean id="regexpID"
		class="com.venustech.collector.service.fromat.method.regexp.RegexpID" />
	<bean id="regexpThis"
		class="com.venustech.collector.service.fromat.method.regexp.RegexpThis" />
	<bean id="splitArray"
		class="com.venustech.collector.service.fromat.method.split.SplitArray" />
	<bean id="splitID"
		class="com.venustech.collector.service.fromat.method.split.SplitID" />
	<bean id="splitThis"
		class="com.venustech.collector.service.fromat.method.split.SplitThis" />
	<bean id="textArray"
		class="com.venustech.collector.service.fromat.method.text.TextArray" />
	<bean id="textID"
		class="com.venustech.collector.service.fromat.method.text.TextID" />
		<bean id="info"
		class="com.venustech.collector.service.fromat.method.info.Info" />
		<bean id="subStringArray"
		class="com.venustech.collector.service.fromat.method.substring.SubStringArray" />	
		<bean id="filterRegexpArray"
		class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpArray" />	
		<bean id="filterRegexpID"
		class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpID" />	
		<bean id="filterRegexpThis"
		class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpThis" />	
		<bean id="splitKeyValueID"
		class="com.venustech.collector.service.fromat.method.split.SplitKeyValueID" />	

	<bean id="subStringID"
		class="com.venustech.collector.service.fromat.method.substring.SubStringID" />	
		<bean id="completionData"
		class="com.venustech.collector.service.fromat.method.completion.CompletionData" />	
		<bean id="completionDataDefault"
		class="com.venustech.collector.service.fromat.method.completion.CompletionDataDefault" />	
		<bean id="subIndexStringArray"
		class="com.venustech.collector.service.fromat.method.substring.SubIndexStringArray" />	
		<bean id="subIndexStringId"
		class="com.venustech.collector.service.fromat.method.substring.SubIndexStringId" />	
		<bean id="urlToID" class="com.venustech.collector.service.fromat.method.urltoip.UrlToID" />	
		<bean id="urlToIpArray"
		class="com.venustech.collector.service.fromat.method.urltoip.UrlToIpArray" />	
		<bean id="mappingKey"
		class="com.venustech.collector.service.fromat.method.mapping.MappingKey" />	
		<bean id="mappingValueKey"
		class="com.venustech.collector.service.fromat.method.mapping.MappingValueKey" />	
	 
	  <bean id="toDateArray"
		class="com.venustech.collector.service.fromat.method.todate.ToDateArray" />	
	 
	  <bean id="toDateIDTime"
		class="com.venustech.collector.service.fromat.method.todate.ToDateIDTime" />	
	 
	  <bean id="mainformat" class="com.venustech.collector.service.fromat.MainForMat">
		<property name="method">
			<map>
				<entry key="mappingValueKey" value-ref="mappingValueKey" />
				<entry key="mappingKey" value-ref="mappingKey" />
				<entry key="mappingArray" value-ref="mappingArray" />
				<entry key="mappingID" value-ref="mappingID" />
				<entry key="mappingValueArray" value-ref="mappingValueArray" />
				<entry key="mappingValueID" value-ref="mappingValueID" />
				<entry key="dateUnix" value-ref="dateUnix" />
				<entry key="dateUnixArray" value-ref="dateUnixArray" />
				<entry key="dateUnixArrayTime" value-ref="dateUnixArrayTime" />
				<entry key="dateUnixID" value-ref="dateUnixID" />
				<entry key="dateUnixIDTime" value-ref="dateUnixIDTime" />
				<entry key="iPIntArray" value-ref="iPIntArray" />
				<entry key="iPIntID" value-ref="iPIntID" />
				<entry key="regexpArray" value-ref="regexpArray" />
				<entry key="regexpID" value-ref="regexpID" />
				<entry key="regexpThis" value-ref="regexpThis" />
				<entry key="splitArray" value-ref="splitArray" />
				<entry key="splitID" value-ref="splitID" />
				<entry key="splitThis" value-ref="splitThis" />
				<entry key="textArray" value-ref="textArray" />
				<entry key="textID" value-ref="textID" />
				<entry key="subStringID" value-ref="subStringID" />
				<entry key="subStringArray" value-ref="subStringArray" />
				<entry key="completionData" value-ref="completionData" />
				<entry key="subIndexStringArray" value-ref="subIndexStringArray" />
				<entry key="subIndexStringId" value-ref="subIndexStringId" />
				<entry key="completionDataDefault" value-ref="completionDataDefault" />
				<entry key="urlToID" value-ref="urlToID" />
				<entry key="urlToIpArray" value-ref="urlToIpArray" />
				<entry key="filterRegexpArray" value-ref="filterRegexpArray" />
				<entry key="filterRegexpID" value-ref="filterRegexpID" />
				<entry key="filterRegexpThis" value-ref="filterRegexpThis" />
				<entry key="splitKeyValueID" value-ref="splitKeyValueID" />
				<!--映射 -->
				<entry key="mappingArray" value-ref="mappingArray" />
				<entry key="mappingID" value-ref="mappingID" />
				<entry key="mappingValueArray" value-ref="mappingValueArray" />
				<entry key="mappingValueID" value-ref="mappingValueID" />
				<entry key="mappingKey" value-ref="mappingKey" />
				<entry key="mappingValueKey" value-ref="mappingValueKey" />
				<entry key="info" value-ref="info" />
				<!--補全 -->
				<entry key="completionData" value-ref="completionData" />
				<entry key="completionDataDefault" value-ref="completionDataDefault" />
				<!--data轉換-->
				<entry key="toDateIDTime" value-ref="toDateIDTime" />
				<entry key="toDateArray" value-ref="toDateArray" />
			</map> 
		</property>
	</bean> 
	
</beans>

 存儲層開發者API

    實現SaveCollector接口

public interface SaveCollector{
	public String save(String id, String[] tableName,
			IDataset<IData<String, Object>> list,FileAttribute fileAttribute) throws Exception;
	
}
將實現類加入spring容器中

相關文章
相關標籤/搜索