1、flume簡單瞭解推薦網站(簡介包括簡單案例部署): http://www.aboutyun.com/thread-8917-1-1.htmlhtml
2、個人需求是實現從ftp目錄下采集數據,目錄下文件名稱要符合特定正則,要求文件要一行一行讀取並解析後寫入數據庫。且實現斷點續傳(服務重啓後會從上次讀的位置繼續)。java
flume1.7.0中taildirSource實現的是監控目錄下文件而且一行一行的讀取,我只需選用這個source就能夠實現。可是服務並不能直接部署在數據所在的服務器上,因此涉及到ftp的問題。這樣就須要重寫taildirSource,因爲本人能力有限,並無在源碼的基礎上修改。使用的別的實現方式:用taildirSource配置,可是使用流下載的方式讀取數據。sql
3、直接走步驟,不足之處請指出,目前服務已經部署在使用,有改進優化之處歡迎留言(服務器1,服務器2)數據庫
1.下載:從flume官網下載flume1.7.0 http://flume.apache.org/download.htmlapache
2.安裝:上傳至服務器1目錄/home/hadoop下,解壓到apache-flume-1.7.0-bin(安裝路徑/home/hadoop自行定義)json
3.配置jdk:修改apache-flume-1.7.0-bin/conf/flume-env.sh.template中jdk的路徑:eg: export JAVA_HOME=/usr/java/jdk1.8.0_92服務器
4.配置flume環境,在/etc/profile中增長:
export FLUME_HOME=/安裝路徑/apache-flume-1.7.0-bin
export PATH=$PATH:$FLUME_HOME/binoracle
5.重啓文件/etc/profile,使用命令source /etc/profileapp
6.執行命令flume-ng version ,出現版本號說明安裝成功dom
-------以上步驟完成flume的安裝,下面開始自定義source------
7.寫flume配置文件,在apache-flume-1.7.0-bin/conf/下新建taildirsource.conf
agent1.sources = tail_dir_source
agent1.sinks = taildirsink
agent1.channels = channel1
# source: tail_dir_source -----------------
agent1.sources.tail_dir_source.type = com.esa.source.ftp.FtpSource3 //自定義類
agent1.sources.tail_dir_source.channels = channel1
agent1.sources.tail_dir_source.positionFile =/opt/flume/taildir_position.json //json文件位置
agent1.sources.tail_dir_source.filegroups =f1 //監控文件,配置多個以空格隔開(我是本身寫的路徑,這個屬性沒有用到)
agent1.sources.tail_dir_source.filegroups.f1 =/Desktop/studyflume3/.* //監控文件路徑和正則(我是本身寫的路徑,這個屬性沒有用到)
agent1.sources.tail_dir_source.filePath =/Desktop/studyflume3 //監控ftp的目錄 (本身加的)
agent1.sources.tail_dir_source.regex =.* //文件夾正則(本身加的)
agent1.sources.tail_dir_source.batchSize = 100
agent1.sources.tail_dir_source.backoffSleepIncrement = 1000
agent1.sources.tail_dir_source.maxBackoffSleep = 5000
agent1.sources.tail_dir_source.recursiveDirectorySearch = true
agent1.sources.tail_dir_source.yarnApplicationHeader = true
agent1.sources.tail_dir_source.yarnContainerHeader = true
agent1.sources.tail_dir_source.ftpip = 192.160.111.211 //ftp地址
agent1.sources.tail_dir_source.ftpport = 21 //ftp端口
agent1.sources.tail_dir_source.username = root //用戶名
agent1.sources.tail_dir_source.password = root //密碼
# Describe taildirsink
agent1.sinks.taildirsink.type =com.esa.sink.db.OracleSink //數據庫驅動
agent1.sinks.taildirsink.url=url //數據庫url
agent1.sinks.taildirsink.user=root //數據庫用戶名
agent1.sinks.taildirsink.password=root //數據庫密碼
agent1.sinks.taildirsink.regex=.* //數據解析規則
agent1.sinks.taildirsink.sql=insert into table values(?,?,?) //入庫sql
agent1.sinks.taildirsink.channel = channel1
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
8.新建maven工程,自定義taildirsource類FtpSource3繼承AbstractSource,實現PollableSource, Configurable
public class FtpSource3 extends AbstractSource implements PollableSource, Configurable {
private static final Logger logger = LoggerFactory.getLogger(FtpSource3.class);
private String filePath;// 文件目錄
private String regex;// 匹配的正則
private Table<String, String, String> headerTable;
private int batchSize;
private static String positionFilePath;// 校驗文件目錄
private boolean skipToEnd;
private boolean byteOffsetHeader;
private SourceCounter sourceCounter;
private FtpReliableTaildirEventReader reader;
private ScheduledExecutorService idleFileChecker;
private ScheduledExecutorService positionWriter;
private int retryInterval = 1000;
private int maxRetryInterval = 5000;
private int idleTimeout;
private int checkIdleInterval = 5000;
private int writePosInitDelay = 5000;
private int writePosInterval;
private boolean cachePatternMatching;
private Long backoffSleepIncrement;
private Long maxBackOffSleepInterval;
private boolean fileHeader;
private String fileHeaderKey;
private FTPClient ftp;// ftp對象
private String ftpip;// 須要鏈接到的ftp端的ip
private int ftpport;// 鏈接端口,默認21
private String username;// 要鏈接到的ftp端的名字
private String password;// 要鏈接到的ftp端的對應得密碼
private Map<String, Long> map;
//獲取配置文件中的配置
@Override
public void configure(Context context) {
String fileGroups = context.getString(TaildirSourceConfigurationConstants.FILE_GROUPS);
Preconditions.checkState(fileGroups != null,
"Missing param: " + TaildirSourceConfigurationConstants.FILE_GROUPS);
filePath = context.getString("filePath");// 文件目錄
regex = context.getString("regex");
Preconditions.checkState(!filePath.isEmpty(), "Mapping for tailing files is empty or invalid: '"
+ TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX + "'");
positionFilePath = context.getString("positionFile");// 寫絕對路徑
headerTable = getTable(context, TaildirSourceConfigurationConstants.HEADERS_PREFIX);
batchSize = context.getInteger(TaildirSourceConfigurationConstants.BATCH_SIZE,
TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
skipToEnd = context.getBoolean(TaildirSourceConfigurationConstants.SKIP_TO_END,
TaildirSourceConfigurationConstants.DEFAULT_SKIP_TO_END);
byteOffsetHeader = context.getBoolean(TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER,
TaildirSourceConfigurationConstants.DEFAULT_BYTE_OFFSET_HEADER);
idleTimeout = context.getInteger(TaildirSourceConfigurationConstants.IDLE_TIMEOUT,
TaildirSourceConfigurationConstants.DEFAULT_IDLE_TIMEOUT);
writePosInterval = context.getInteger(TaildirSourceConfigurationConstants.WRITE_POS_INTERVAL,
TaildirSourceConfigurationConstants.DEFAULT_WRITE_POS_INTERVAL);
cachePatternMatching = context.getBoolean(TaildirSourceConfigurationConstants.CACHE_PATTERN_MATCHING,
TaildirSourceConfigurationConstants.DEFAULT_CACHE_PATTERN_MATCHING);
backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
fileHeader = context.getBoolean(TaildirSourceConfigurationConstants.FILENAME_HEADER,
TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER);
fileHeaderKey = context.getString(TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY,
TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
ftpip = context.getString("ftpip");
logger.info(ftpip + "---ftpip---");
Preconditions.checkNotNull(ftpip, "ftpip must be set!!");
ftpport = context.getInteger("ftpport");
logger.info(ftpport + "---ftpport---");
Preconditions.checkNotNull(ftpport, "ftpport must be set!!");
username = context.getString("username");
logger.info(username + "---username---");
Preconditions.checkNotNull(username, "username must be set!!");
password = context.getString("password");
logger.info(password + "---password---");
}
//服務開始時執行一遍
@Override
public void start() {
logger.info("{} TaildirSource source starting with directory: {}", getName(), filePath);
logger.info("filePath==" + filePath);
logger.info("positionFilePath==" + positionFilePath);
// 先鏈接ftp
ftp = new FTPClient();
// 驗證登陸
try {
ftp.connect(ftpip, ftpport);
System.out.println(ftp.login(username, password));
ftp.setCharset(Charset.forName("UTF-8"));
ftp.setControlEncoding("UTF-8");
logger.info("fileHeaderKey==" + fileHeaderKey);
logger.info("ftp==" + ftp);
ftp.enterLocalActiveMode();//ftp客戶端必定要切換主動模式,否則每次都會請求一個端口,ftp服務器端要設置成被動模式
logger.info("轉換路徑是否成功" + ftp.changeWorkingDirectory("/"));
} catch (IOException e) {
e.printStackTrace();
}
// 建立json文件(用dom4j寫的xml格式的文件,每一個節點記錄一個文件及位置)
try {
create();
} catch (Exception e1) {
e1.printStackTrace();
logger.error("----建立文件失敗----");
}
super.start();
logger.debug("TaildirSource started");
sourceCounter.start();
}
@Override
public Status process() throws EventDeliveryException {
// 從ftp下載文件到本地
Status status = Status.READY;
logger.info("--進入主程序--");
logger.info("--filePath--" + filePath);
if (ftp.isConnected()) {
logger.info("--ftp--" + ftp);
try {
FTPFile[] ftpFiles = null;
// ftp.enterLocalPassiveMode();
ftp.enterLocalActiveMode();
boolean a = ftp.changeWorkingDirectory(filePath);
logger.info("轉換路徑是否成功" + a);
if (a) {
logger.info("---開始ftp.listFiles()---");
ftpFiles = ftp.listFiles(filePath);
logger.info("目錄下文件個數==" + ftpFiles.length);
//我這裏規定好了文件結構,因此直接找結構裏的文件。
for (int i = 0; i < ftpFiles.length; i++) {//ANH
if(ftpFiles[i].isDirectory()){
System.out.println("---isdirectory---" + ftpFiles[i]);
try {
System.out.println("---開始ftp.listFiles()---" + filePath + "/" + ftpFiles[i].getName());
FTPFile[] files = ftp.listFiles(filePath + "/" + ftpFiles[i].getName());
System.out.println("目錄下文件個數---" + files.length);
for (int j = 0; j < files.length; j++) {//SESSIONLOG
if(files[j].getName().equals("SESSIONLOG")){
System.out.println("---isdirectory---" + "SESSIONLOG");
System.out.println(ftp.changeWorkingDirectory(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG"));
FTPFile[] files1 = ftp.listFiles(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
for(FTPFile sFile : files1){
System.out.println(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG" + "下面的文件" + sFile);
downloderFile(sFile, filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
} else {
logger.error("鏈接的fpt上沒有制定下載路徑,無法下載數據!");
}
} catch (Exception e) {
logger.error("下載失敗!");
logger.error("詳情", e);
status = Status.BACKOFF;
}
}
return status;
}
//下載文件
private void downloderFile(FTPFile ftpFile, String ftpFileDir) {
// 篩選文件,下載那些文件
// if (ftpFile.isDirectory()) {
// logger.info("----isDirectory----");
// try {
// FTPFile[] files = ftp.listFiles(ftpFileDir + "/" + ftpFile.getName());
// logger.info("---遞歸找文件---" + files.length);
// for (int i = 0; i < files.length; i++) {
// System.out.println(files[i]);
// System.out.println(ftp.changeWorkingDirectory(ftpFileDir + "/" + ftpFile.getName()));
// ;
// downloderFile(files[i], ftpFileDir + "/" + ftpFile.getName());
// }
// } catch (IOException e) {
// e.printStackTrace();
// }
// } else
if(ftpFile.isFile()) {
logger.info("---isfile---");
if (ftpFile.getName().matches(regex)) {
logger.info("---匹配成功---");
logger.info("---ftpfile--" + ftpFile);
map = new HashMap<>();
// 把文件中數據加載到map中
try {
SAXReader reader = new SAXReader();
Document doc = reader.read(new File(positionFilePath));
Element root = doc.getRootElement();
Element foo;
for (Iterator i = root.elementIterator("files"); i.hasNext();) {
foo = (Element) i.next();
if (!foo.elementText("name").isEmpty() && !foo.elementText("size").isEmpty()) {
long size = Long.parseLong(foo.elementText("size"));
map.put(foo.elementText("name"), size);
}
}
} catch (Exception e) {
e.printStackTrace();
}
logger.info("map====" + map);
if (map.containsKey(ftpFile.getName())) {
logger.info("--- 進入包含---" + ftpFile.getName());
// 要修改文件中的值
readAppointedLineNumber(ftpFile, map.get(ftpFile.getName()));
} else {
logger.info("--- 進入不包含---" + ftpFile.getName());
// 要寫文件增長節點
InputStream iStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader br = null;
try {
// 這裏定義一個字符流的輸入流的節點流,用於讀取文件(一個字符一個字符的讀取)
iStream = ftp.retrieveFileStream(new String(ftpFile.getName().getBytes("gbk"),"utf-8"));
if (iStream != null) {
logger.info("---流不爲空---");
inputStreamReader = new InputStreamReader(iStream);
br = new BufferedReader(inputStreamReader); // 在定義好的流基礎上套接一個處理流,用於更加效率的讀取文件(一行一行的讀取)
long x = 0; // 用於統計行數,從0開始
String string = br.readLine();
while (string != null) { // readLine()方法是按行讀的,返回值是這行的內容
x++; // 每讀一行,則變量x累加1
getChannelProcessor().processEvent(EventBuilder.withBody(string.getBytes()));
string = br.readLine();
}
System.out.println(x);
try {
// 增長節點
add(ftpFile.getName(), x + "");
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (iStream != null) {
try {
iStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStreamReader != null) {
try {
inputStreamReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
ftp.getReply();// 加上這個防止第二次獲取文件流爲空的問題
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 建立文件,存儲文件及文件位置
*
* @param file
* @throws Exception
*/
public static void create() throws Exception {
File file = new File(positionFilePath);
if(file.exists()){
logger.info("---json文件已存在---");
}else{
Document document = DocumentHelper.createDocument();
Element root = document.addElement("rss");
root.addAttribute("version", "2.0");
XMLWriter writer = new XMLWriter(new FileOutputStream(new File(positionFilePath)), OutputFormat.createPrettyPrint());
writer.setEscapeText(false);// 字符是否轉義,默認true
writer.write(document);
writer.close();
logger.info("---建立文件成功---");
}
}
/**
* 增長文件節點
*/
public void add(String name, String length) throws Exception {
logger.info("---開始增長節點---");
SAXReader reader = new SAXReader();
Document document = reader.read(new File(positionFilePath));
Element root = document.getRootElement();
try {
Element resource = root.addElement("files");
Element nameNode = resource.addElement("name");
Element sizeNode = resource.addElement("size");
nameNode.setText(name);
sizeNode.setText(length);
XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
writer.setEscapeText(false);// 字符是否轉義,默認true
writer.write(document);
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
logger.info("---結束增長節點---");
}
/**
* 更新節點
*
* @throws IOException
*/
public void Update(String name, String size) throws IOException {
logger.info("---開始文件更新---");
SAXReader reader = new SAXReader();
Document document = null;
try {
document = reader.read(new File(positionFilePath));
Element root = document.getRootElement();
Element foo;
for (Iterator i = root.elementIterator("files"); i.hasNext();) {
foo = (Element) i.next();
if (name.equals(foo.elementText("name"))) {
foo.element("size").setText(size);
}
}
XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
writer.setEscapeText(false);// 字符是否轉義,默認true
writer.write(document);
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
OutputFormat format = OutputFormat.createPrettyPrint();
format.setEncoding("UTF-8");
XMLWriter writer = new XMLWriter(new OutputStreamWriter(new FileOutputStream(positionFilePath)), format);
writer.write(document);
writer.close();
logger.info("---文件更新成功---");
}
@Override
public void stop() {
logger.info("----執行結束---");
super.stop();
if (ftp != null) {
try {
ftp.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private Table<String, String, String> getTable(Context context, String prefix) {
Table<String, String, String> table = HashBasedTable.create();
for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
String[] parts = e.getKey().split("\\.", 2);
table.put(parts[0], parts[1], e.getValue());
}
return table;
}
}
9.重寫sink,新建類OracleSink 繼承AbstractSink 實現Configurable
public class OracleSink extends AbstractSink implements Configurable {
private Logger logger = LoggerFactory.getLogger(OracleSink.class);
private String driverClassName;
private String url;
private String user;
private String password;
private PreparedStatement preparedStatement;
private Connection conn;
private int batchSize;
private String regex;
private String sql;
public OracleSink() {
logger.info("---start---");
}
@Override
public void configure(Context context) {
url = context.getString("url");
logger.info(url + "---url---");
Preconditions.checkNotNull(url, "url must be set!!");
driverClassName = context.getString("driverClassName");
logger.info(driverClassName + "---driverClassName---");
Preconditions.checkNotNull(driverClassName, "driverClassName must be set!!");
user = context.getString("user");
logger.info(user + "---user---");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
logger.info(password + "---password---");
Preconditions.checkNotNull(password, "password must be set!!");
regex = context.getString("regex");
logger.info(regex + "---regex---");
Preconditions.checkNotNull(regex, "regex must be set!!");
sql = context.getString("sql");
logger.info(sql + "---sql---");
Preconditions.checkNotNull(sql, "sql must be set!!");
batchSize = context.getInteger("batchSize", 100);
logger.info(batchSize + "---batchSize---");
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
}
@Override
public void start() {
super.start();
try {
//調用Class.forName()方法加載驅動程序
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
// String url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + databaseName;
//調用DriverManager對象的getConnection()方法,得到一個Connection對象
try {
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
//建立一個Statement對象
preparedStatement = conn.prepareStatement(sql);
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void stop() {
logger.info("----執行結束---");
super.stop();
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<Info> infos = Lists.newArrayList();
transaction.begin();
try {
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {//對事件進行處理
logger.info("---讀取數據---");
logger.info("--數據庫鏈接是否關閉--" + conn.isClosed());
content = new String(event.getBody());
Info info=new Info();
if(content != null && content != ""){
logger.info("---" + content + "---");
logger.info("---" + regex + "---");
//文件的字段順序固定且有值?
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
if (m.find()) {
logger.info("匹配上的長度==" + m.groupCount());
for (int j = 1; j <= m.groupCount(); j++) {
logger.info("---每一個匹配後的值" + j + "=" + m.group(j) + "---");
switch (j) {
case 1:
info.setProvinceid(m.group(j));
break;
case 2:
info.setProvincename(m.group(j));
break;
case 3:
info.setSessionid(m.group(j));
break;
case 4:
info.setMainacctid(m.group(j));
break;
case 5:
info.setMainacctname(m.group(j));
break;
case 6:
info.setSlaveacctid(m.group(j));
break;
case 7:
info.setOrgid(m.group(j));
break;
case 8:
info.setOrgname(m.group(j));
break;
case 9:
info.setClientaddr(m.group(j));
break;
case 10:
info.setResid(m.group(j));
break;
case 11:
info.setResip(m.group(j));
break;
case 12:
info.setResport(m.group(j));
break;
case 13:
info.setBelongsysid(m.group(j));
break;
case 14:
info.setBelongsysname(m.group(j));
break;
case 15:
info.setLogintime(m.group(j));
break;
case 16:
info.setLogouttime(m.group(j));
break;
case 17:
info.setVideoname(m.group(j));
break;
case 18:
info.setVideofiledir(m.group(j));
break;
case 19:
info.setReserve(m.group(j));
break;
default:
break;
}
}
infos.add(info);
}
}
} else {
result = Status.BACKOFF;
break;
}
}
if (infos.size() > 0) {
logger.info("infos的長度==" + infos.size());
preparedStatement.clearBatch();
for (Info temp : infos) {
preparedStatement.setString(1, temp.getProvinceid() != null ? temp.getProvinceid() : "");
preparedStatement.setString(2, temp.getProvincename() != null ? temp.getProvincename() : "");
preparedStatement.setString(3, temp.getSessionid() != null ? temp.getSessionid() : "");
preparedStatement.setString(4, temp.getMainacctid() != null ? temp.getMainacctid() : "");
preparedStatement.setString(5, temp.getMainacctname() != null ? temp.getMainacctname() : "");
preparedStatement.setString(6, temp.getSlaveacctid() != null ? temp.getSlaveacctid() : "");
preparedStatement.setString(7, temp.getOrgid() != null ? temp.getOrgid() : "");
preparedStatement.setString(8, temp.getOrgname() != null ? temp.getOrgname() : "");
preparedStatement.setString(9, temp.getClientaddr() != null ? temp.getClientaddr() : "");
preparedStatement.setString(10, temp.getResid() != null ? temp.getResid() : "");
preparedStatement.setString(11, temp.getResip() != null ? temp.getResip() : "");
preparedStatement.setString(12, temp.getResport() != null ? temp.getResport() : "");
preparedStatement.setString(13, temp.getBelongsysid() != null ? temp.getBelongsysid() : "");
preparedStatement.setString(14, temp.getBelongsysname() != null ? temp.getBelongsysname() : "");
preparedStatement.setString(15, temp.getLogintime() != null ? temp.getLogintime() : "");
preparedStatement.setString(16, temp.getLogouttime() != null ? temp.getLogouttime() : "");
preparedStatement.setString(17, temp.getVideoname() != null ? temp.getVideoname() : "");
preparedStatement.setString(18, temp.getVideofiledir() != null ? temp.getVideofiledir() : "");
preparedStatement.setString(19, temp.getReserve() != null ? temp.getReserve() : "");
preparedStatement.addBatch();
}
try{
preparedStatement.executeBatch();
}catch(SQLException e){
logger.error("------批量執行sql錯誤");
e.printStackTrace();
}
conn.commit();
}
transaction.commit();
} catch (Exception e) {
e.printStackTrace();
try {
conn.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
transaction.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally {
transaction.close();
}
return result;
}
}
10.把maven打包,放在apache-flume-1.7.0-bin/lib下,把依賴的包好比dom4j、ojdbc6也放進去。
11.運行flume,在apache-flume-1.7.0-bin寫命令:
//運行flume(按ctrl+c能夠中止服務)
flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console
//後臺運行flume(後臺運行的flume,寫命令ps -ef|grep taildirsource.conf或者ps -ef|grep flume ,獲取進程id,而後kill掉)
flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console &