一 sftp搭建
略java
這裏簡單說一下爲何使用sftp。ftp和sftp各有優勢,差異並非太大。sftp安全性好,性能比ftp低。ftp對於java來講並不複雜,效率也高。之因此使用sftp主要是能夠使用spring-boot+apache-camel。camel框架將文件傳輸分爲filter,prcessor,和路由,定時器等組件,模塊化開發,將可隨意將這些組件進行組合,耦合性低,開發較爲靈活。能夠將更多的精力放到業務層面。git
二使用apache-camel來定時從sftp服務器下載文件redis
2.1 pom依賴spring
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring-boot-starter</artifactId> <version>2.18.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.19.4</version> </dependency>
2.2 applicatin.properties配置apache
ftp.server.uri=sftp://${ftp.url}\ ?username=${ftp.username}\ &password=${ftp.password}\ &useUserKnownHostsFile=false\ &localWorkDirectory=${ftp.local.work.directory}\ &delay=5m\ &filter=#ftpDownloadFileFilter\ &stepwise=false\ &recursive=true ftp.url=192.168.20.162:22/ ftp.username=test ftp.password=123456 #文件服務器目錄 ftp.local.work.directory=/ # 文件拉取到本地存儲的文件 ftp.local.data.dir=E://test/
其中安全
readLock=rename\ 是否重命名,防止讀取文件服務器正在寫入的文件
recursive=true 是否遞歸讀取springboot
#有些地方說這裏須要顯式指定後臺運行服務器
camel.springboot.main-run-controller=true
2.3 過濾器
自定義規則判斷哪些文件須要下載,哪些文件不須要下載app
package com.test.comm; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.test.util.RedisTemplateUtil; @Component public class FtpDownloadFileFilter implements GenericFileFilter<Object> { private static Logger logger = LoggerFactory.getLogger(FtpDownloadFileFilter.class); @Value("${ftp.local.data.dir}") private String localDir; @Autowired private RedisTemplateUtil redisTemplateUtil; /** * 過濾下載文件 * * @author sunk */ @Override public boolean accept(GenericFile<Object> file) { try { return isDownloaded(file); } catch (Exception e) { logger.error("ftp download file filter error !", e); return false; } } /** * 根據時間戳來判斷是否下載過 * * @param fileName * * @return */ public boolean isDownloaded(GenericFile<Object> file) { String fileName = file.getFileName(); if (file.isDirectory()) { return true; } boolean bool = false; if (fileName.contains("_")) { long time = Long.parseLong(fileName.split("_")[3]); // 從redis中獲取上次的時間,當前文件時間大於當前時間則獲取,不然不獲取 Object preTime = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME); if (preTime == null) { bool = true; } else { if (Long.parseLong(preTime.toString()) < time) { bool = true; } } } return bool; } }
2.4 路由
自定義路由規則,通常是告訴程序,從哪裏讀文件,並搬運到哪裏去框架
package com.test.comm; import java.net.InetAddress; import org.apache.camel.LoggingLevel; import org.apache.camel.builder.RouteBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class FtpDownloadRoute extends RouteBuilder { private static Logger logger = LoggerFactory.getLogger(FtpDownloadRoute.class); @Value("${ftp.server.uri}") private String ftpUri; @Value("${ftp.local.data.dir}") private String localDir; @Autowired LocationFileProcessor locationFileProcessor; @Override public void configure() throws Exception { logger.debug("開始鏈接 " + ftpUri); from(ftpUri).to("file:" + localDir).process(locationFileProcessor).log(LoggingLevel.INFO, logger, "download file ${file:name} complete."); logger.debug("鏈接成功"); } }
2.5 其它自定義進程
除了文件搬運以外,容許自定義對文件的其它操做,好比入庫等等
,自定義的類,可添加在路由中
package com.test.comm; import java.io.RandomAccessFile; import java.util.HashMap; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFileMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.test.config.ApplicationStartup; import com.test.model.Device; import com.test.model.Pictrue; import com.test.util.DateUtil; import com.test.util.ESRepository; import com.test.util.FileUtil; import com.test.util.RedisTemplateUtil; /** * camel 業務類 * * <p> * Title:LocationFileProcessor * </p> * <p> * Description:TODO * </p> * <p> * Copyright:Copyright(c)2005 * </p> * <p> * Company:stest * </p> * * @author * @date 2018年11月15日 上午9:02:29 */ @Component public class LocationFileProcessor implements Processor { private static Logger logger = LoggerFactory.getLogger(LocationFileProcessor.class); @Autowired private RedisTemplateUtil redisTemplateUtil; @Autowired private FastDFSClient fastDFSClient; @Value("${ftp.local.data.dir}") private String localDir; @Autowired private ESRepository eSRepository; @Value("${elasticsearch.index}") private String esIndex; @Value("${elasticsearch.type}") private String esType; @Autowired private ApplicationStartup applicationStartup; @Override public void process(Exchange exchange) throws Exception { @SuppressWarnings("unchecked") GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn(); String fileName = inFileMessage.getGenericFile().getFileName();// 文件名 logger.info(fileName);// 文件的絕對路徑 String subfileName = fileName.substring(fileName.lastIndexOf("/") + 1); long time = Long.parseLong(fileName.split("_")[3]); // 上傳到fastdfs String path = upload(fileName); // 將圖片地址等信息保存到es saveEs(subfileName, path); // 獲取當前redis裏面保存的時間,若是爲空直接存入,若是不爲空且當前文件時間大於redis時間,那覆蓋 saveRedis(time); } /** * 將最後獲取圖片的時間標記保存至redis * * @param time */ private void saveRedis(long time) { Object redisKey = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME); if (redisKey == null || (redisKey != null && Long.parseLong(redisKey.toString()) < time)) { redisTemplateUtil.set(Constants.reids.YP_PICTRUE_TIME, time, 0); } } /** * 保存es * * @param subfileName * @param path */ private void saveEs(String subfileName, String path) { String[] fileNames = subfileName.split("_"); String deviceId = fileNames[0]; String plate = fileNames[2].substring(1); String captrue = fileNames[3]; String type = fileNames[4].split("\\.")[0]; String times = DateUtil.transForDate1(Integer.parseInt(captrue)); captrue = captrue + "000"; // 根據deviceId獲取經緯度 HashMap<Integer, Device> devices = applicationStartup.getDevices(); Device device = devices.get(Integer.parseInt(deviceId)); double latitude = 0; double longitude = 0; if (device != null) { latitude = device.getLat(); longitude = device.getLon(); } String deviceName = device.getDeviceName(); String address = device.getDeviceAddress(); Pictrue pictrue = new Pictrue(deviceId, plate, captrue, type, path, times, latitude, longitude, deviceName, address, "視頻數據"); Gson gson = new Gson(); eSRepository.addTargetDataALL(gson.toJson(pictrue), esIndex, esType, null); } /** * 上傳fastdfs * * @param fileName * @return * @throws Exception */ private String upload(String fileName) throws Exception { String path = fastDFSClient.uploadFile(FileUtil.getBytes(localDir + fileName), fileName); return path; } }