flume自定義Source(taildirSource),自定義Sink(數據庫),開發完整步驟

1、flume簡單瞭解推薦網站(簡介包括簡單案例部署):  http://www.aboutyun.com/thread-8917-1-1.htmlhtml

2、個人需求是實現從ftp目錄下采集數據,目錄下文件名稱要符合特定正則,要求文件要一行一行讀取並解析後寫入數據庫。且實現斷點續傳(服務重啓後會從上次讀的位置繼續)。java

  flume1.7.0中taildirSource實現的是監控目錄下文件而且一行一行的讀取,我只需選用這個source就能夠實現。可是服務並不能直接部署在數據所在的服務器上,因此涉及到ftp的問題。這樣就須要重寫taildirSource,因爲本人能力有限,並無在源碼的基礎上修改。使用的別的實現方式:用taildirSource配置,可是使用流下載的方式讀取數據。sql

3、直接走步驟,不足之處請指出,目前服務已經部署在使用,有改進優化之處歡迎留言(服務器1,服務器2)數據庫

  1.下載:從flume官網下載flume1.7.0  http://flume.apache.org/download.htmlapache

  2.安裝:上傳至服務器1目錄/home/hadoop下,解壓到apache-flume-1.7.0-bin(安裝路徑/home/hadoop自行定義)json

  3.配置jdk:修改apache-flume-1.7.0-bin/conf/flume-env.sh.template中jdk的路徑:eg: export JAVA_HOME=/usr/java/jdk1.8.0_92服務器

  4.配置flume環境,在/etc/profile中增長:
      export FLUME_HOME=/安裝路徑/apache-flume-1.7.0-bin
      export PATH=$PATH:$FLUME_HOME/binoracle

  5.重啓文件/etc/profile,使用命令source /etc/profileapp

  6.執行命令flume-ng version ,出現版本號說明安裝成功dom

  -------以上步驟完成flume的安裝,下面開始自定義source------

  7.寫flume配置文件,在apache-flume-1.7.0-bin/conf/下新建taildirsource.conf

    agent1.sources = tail_dir_source
    agent1.sinks = taildirsink
    agent1.channels = channel1

    # source: tail_dir_source -----------------
    agent1.sources.tail_dir_source.type = com.esa.source.ftp.FtpSource3    //自定義類
    agent1.sources.tail_dir_source.channels = channel1
    agent1.sources.tail_dir_source.positionFile =/opt/flume/taildir_position.json  //json文件位置
    agent1.sources.tail_dir_source.filegroups =f1               //監控文件,配置多個以空格隔開(我是本身寫的路徑,這個屬性沒有用到)
    agent1.sources.tail_dir_source.filegroups.f1 =/Desktop/studyflume3/.*    //監控文件路徑和正則(我是本身寫的路徑,這個屬性沒有用到)
    agent1.sources.tail_dir_source.filePath =/Desktop/studyflume3       //監控ftp的目錄 (本身加的)
    agent1.sources.tail_dir_source.regex =.*                 //文件夾正則(本身加的)
    agent1.sources.tail_dir_source.batchSize = 100
    agent1.sources.tail_dir_source.backoffSleepIncrement  = 1000
    agent1.sources.tail_dir_source.maxBackoffSleep  = 5000
    agent1.sources.tail_dir_source.recursiveDirectorySearch = true
    agent1.sources.tail_dir_source.yarnApplicationHeader = true
    agent1.sources.tail_dir_source.yarnContainerHeader = true
    agent1.sources.tail_dir_source.ftpip = 192.160.111.211           //ftp地址
    agent1.sources.tail_dir_source.ftpport = 21                 //ftp端口
    agent1.sources.tail_dir_source.username = root              //用戶名
    agent1.sources.tail_dir_source.password = root               //密碼

    # Describe taildirsink
    agent1.sinks.taildirsink.type =com.esa.sink.db.OracleSink          //數據庫驅動
    agent1.sinks.taildirsink.url=url                       //數據庫url                 
    agent1.sinks.taildirsink.user=root                       //數據庫用戶名
    agent1.sinks.taildirsink.password=root                    //數據庫密碼
    agent1.sinks.taildirsink.regex=.*                      //數據解析規則
    agent1.sinks.taildirsink.sql=insert into table values(?,?,?)          //入庫sql
    agent1.sinks.taildirsink.channel = channel1

    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 1000
    agent1.channels.channel1.transactionCapactiy = 100

  8.新建maven工程,自定義taildirsource類FtpSource3繼承AbstractSource,實現PollableSource, Configurable

public class FtpSource3 extends AbstractSource implements PollableSource, Configurable {

 private static final Logger logger = LoggerFactory.getLogger(FtpSource3.class);
    private String filePath;// 文件目錄
    private String regex;// 匹配的正則
    private Table<String, String, String> headerTable;
    private int batchSize;
    private static String positionFilePath;// 校驗文件目錄
    private boolean skipToEnd;
    private boolean byteOffsetHeader;

    private SourceCounter sourceCounter;
    private FtpReliableTaildirEventReader reader;
    private ScheduledExecutorService idleFileChecker;
    private ScheduledExecutorService positionWriter;
    private int retryInterval = 1000;
    private int maxRetryInterval = 5000;
    private int idleTimeout;
    private int checkIdleInterval = 5000;
    private int writePosInitDelay = 5000;
    private int writePosInterval;
    private boolean cachePatternMatching;

    private Long backoffSleepIncrement;
    private Long maxBackOffSleepInterval;
    private boolean fileHeader;
    private String fileHeaderKey;

    private FTPClient ftp;// ftp對象

    private String ftpip;// 須要鏈接到的ftp端的ip

    private int ftpport;// 鏈接端口,默認21

    private String username;// 要鏈接到的ftp端的名字

    private String password;// 要鏈接到的ftp端的對應得密碼
    private Map<String, Long> map;

  //獲取配置文件中的配置

  @Override
    public void configure(Context context) {

        String fileGroups = context.getString(TaildirSourceConfigurationConstants.FILE_GROUPS);
        Preconditions.checkState(fileGroups != null,
                "Missing param: " + TaildirSourceConfigurationConstants.FILE_GROUPS);

        filePath = context.getString("filePath");// 文件目錄
        regex = context.getString("regex");
        Preconditions.checkState(!filePath.isEmpty(), "Mapping for tailing files is empty or invalid: '"
                + TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX + "'");
        positionFilePath = context.getString("positionFile");// 寫絕對路徑

        headerTable = getTable(context, TaildirSourceConfigurationConstants.HEADERS_PREFIX);
        batchSize = context.getInteger(TaildirSourceConfigurationConstants.BATCH_SIZE,
                TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
        skipToEnd = context.getBoolean(TaildirSourceConfigurationConstants.SKIP_TO_END,
                TaildirSourceConfigurationConstants.DEFAULT_SKIP_TO_END);
        byteOffsetHeader = context.getBoolean(TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER,
                TaildirSourceConfigurationConstants.DEFAULT_BYTE_OFFSET_HEADER);
        idleTimeout = context.getInteger(TaildirSourceConfigurationConstants.IDLE_TIMEOUT,
                TaildirSourceConfigurationConstants.DEFAULT_IDLE_TIMEOUT);
        writePosInterval = context.getInteger(TaildirSourceConfigurationConstants.WRITE_POS_INTERVAL,
                TaildirSourceConfigurationConstants.DEFAULT_WRITE_POS_INTERVAL);
        cachePatternMatching = context.getBoolean(TaildirSourceConfigurationConstants.CACHE_PATTERN_MATCHING,
                TaildirSourceConfigurationConstants.DEFAULT_CACHE_PATTERN_MATCHING);

        backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
                PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
        maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
                PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
        fileHeader = context.getBoolean(TaildirSourceConfigurationConstants.FILENAME_HEADER,
                TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER);
        fileHeaderKey = context.getString(TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY,
                TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);

        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }

        ftpip = context.getString("ftpip");
        logger.info(ftpip + "---ftpip---");
        Preconditions.checkNotNull(ftpip, "ftpip must be set!!");
        ftpport = context.getInteger("ftpport");
        logger.info(ftpport + "---ftpport---");
        Preconditions.checkNotNull(ftpport, "ftpport must be set!!");
        username = context.getString("username");
        logger.info(username + "---username---");
        Preconditions.checkNotNull(username, "username must be set!!");
        password = context.getString("password");
        logger.info(password + "---password---");
    }

  //服務開始時執行一遍

  @Override
    public void start() {

        logger.info("{} TaildirSource source starting with directory: {}", getName(), filePath);
        logger.info("filePath==" + filePath);
        logger.info("positionFilePath==" + positionFilePath);
        

        // 先鏈接ftp
        ftp = new FTPClient();

        // 驗證登陸
        try {
            ftp.connect(ftpip, ftpport);
            System.out.println(ftp.login(username, password));

            ftp.setCharset(Charset.forName("UTF-8"));
            ftp.setControlEncoding("UTF-8");
            logger.info("fileHeaderKey==" + fileHeaderKey);
            logger.info("ftp==" + ftp);
             ftp.enterLocalActiveMode();//ftp客戶端必定要切換主動模式,否則每次都會請求一個端口,ftp服務器端要設置成被動模式
             logger.info("轉換路徑是否成功" + ftp.changeWorkingDirectory("/"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 建立json文件(用dom4j寫的xml格式的文件,每一個節點記錄一個文件及位置)
        try {
            create();
        } catch (Exception e1) {
            e1.printStackTrace();
            logger.error("----建立文件失敗----");
        }
        super.start();
        logger.debug("TaildirSource started");
        sourceCounter.start();

    }

  @Override
    public Status process() throws EventDeliveryException {

        // 從ftp下載文件到本地
        Status status = Status.READY;
        logger.info("--進入主程序--");
        logger.info("--filePath--" + filePath);

        if (ftp.isConnected()) {
            logger.info("--ftp--" + ftp);
            try {
                FTPFile[] ftpFiles = null;
                // ftp.enterLocalPassiveMode();
                ftp.enterLocalActiveMode();
                boolean a = ftp.changeWorkingDirectory(filePath);
                logger.info("轉換路徑是否成功" + a);
                if (a) {
                    logger.info("---開始ftp.listFiles()---");
                    ftpFiles = ftp.listFiles(filePath);
                    logger.info("目錄下文件個數==" + ftpFiles.length);

      //我這裏規定好了文件結構,因此直接找結構裏的文件。
                    for (int i = 0; i < ftpFiles.length; i++) {//ANH
                        if(ftpFiles[i].isDirectory()){
                            System.out.println("---isdirectory---" + ftpFiles[i]);
                            try {
                                System.out.println("---開始ftp.listFiles()---" + filePath + "/" + ftpFiles[i].getName());
                                FTPFile[] files = ftp.listFiles(filePath + "/" + ftpFiles[i].getName());
                                System.out.println("目錄下文件個數---" + files.length);
                                for (int j = 0; j < files.length; j++) {//SESSIONLOG
                                    if(files[j].getName().equals("SESSIONLOG")){
                                        System.out.println("---isdirectory---" + "SESSIONLOG");
                                        System.out.println(ftp.changeWorkingDirectory(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG"));
                                        FTPFile[] files1 = ftp.listFiles(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
                                        for(FTPFile sFile : files1){
                                            System.out.println(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG" + "下面的文件" + sFile);
                                            downloderFile(sFile, filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
                                        }
                                        
                                    }
                                    
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        
                    }
                } else {
                    logger.error("鏈接的fpt上沒有制定下載路徑,無法下載數據!");
                }
            } catch (Exception e) {
                logger.error("下載失敗!");
                logger.error("詳情", e);
                status = Status.BACKOFF;
            }
        }
       
        return status;
    }

//下載文件

private void downloderFile(FTPFile ftpFile, String ftpFileDir) {
        // 篩選文件,下載那些文件
//        if (ftpFile.isDirectory()) {
//            logger.info("----isDirectory----");
//            try {
//                FTPFile[] files = ftp.listFiles(ftpFileDir + "/" + ftpFile.getName());
//                logger.info("---遞歸找文件---" + files.length);
//                for (int i = 0; i < files.length; i++) {
//                    System.out.println(files[i]);
//                    System.out.println(ftp.changeWorkingDirectory(ftpFileDir + "/" + ftpFile.getName()));
//                    ;
//                    downloderFile(files[i], ftpFileDir + "/" + ftpFile.getName());
//                }
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
//        } else
        if(ftpFile.isFile()) {
            logger.info("---isfile---");
            if (ftpFile.getName().matches(regex)) {
                logger.info("---匹配成功---");
                logger.info("---ftpfile--" + ftpFile);
                map = new HashMap<>();
                // 把文件中數據加載到map中
                try {
                    SAXReader reader = new SAXReader();
                    Document doc = reader.read(new File(positionFilePath));
                    Element root = doc.getRootElement();
                    Element foo;
                    for (Iterator i = root.elementIterator("files"); i.hasNext();) {
                        foo = (Element) i.next();
                        if (!foo.elementText("name").isEmpty() && !foo.elementText("size").isEmpty()) {
                            long size = Long.parseLong(foo.elementText("size"));
                            map.put(foo.elementText("name"), size);
                        }

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }

                logger.info("map====" + map);
                if (map.containsKey(ftpFile.getName())) {
                    logger.info("--- 進入包含---" + ftpFile.getName());
                    // 要修改文件中的值
                    readAppointedLineNumber(ftpFile, map.get(ftpFile.getName()));
                } else {
                    logger.info("--- 進入不包含---" + ftpFile.getName());
                    // 要寫文件增長節點
                    InputStream iStream = null;
                    InputStreamReader inputStreamReader = null;
                    BufferedReader br = null;
                    try {

        // 這裏定義一個字符流的輸入流的節點流,用於讀取文件(一個字符一個字符的讀取)
                        iStream = ftp.retrieveFileStream(new String(ftpFile.getName().getBytes("gbk"),"utf-8"));                     
                        if (iStream != null) {
                            logger.info("---流不爲空---");
                            inputStreamReader = new InputStreamReader(iStream);
                            br = new BufferedReader(inputStreamReader); // 在定義好的流基礎上套接一個處理流,用於更加效率的讀取文件(一行一行的讀取)
                            long x = 0; // 用於統計行數,從0開始
                            String string = br.readLine();
                            while (string != null) { // readLine()方法是按行讀的,返回值是這行的內容
                                x++; // 每讀一行,則變量x累加1
                                getChannelProcessor().processEvent(EventBuilder.withBody(string.getBytes()));
                                string = br.readLine();
                            }
                            System.out.println(x);
                            try {
                                // 增長節點
                                add(ftpFile.getName(), x + "");
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (br != null) {
                            try {
                                br.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (iStream != null) {
                            try {
                                iStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (inputStreamReader != null) {
                            try {
                                inputStreamReader.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        try {
                            ftp.getReply();// 加上這個防止第二次獲取文件流爲空的問題
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                }

            }
        }

    }

/**
     * 建立文件,存儲文件及文件位置
     *
     * @param file
     * @throws Exception
     */
    public static void create() throws Exception {
        File file = new File(positionFilePath);
        if(file.exists()){
            logger.info("---json文件已存在---");
        }else{
            Document document = DocumentHelper.createDocument();
            Element root = document.addElement("rss");
            root.addAttribute("version", "2.0");
            XMLWriter writer = new XMLWriter(new FileOutputStream(new File(positionFilePath)), OutputFormat.createPrettyPrint());
            writer.setEscapeText(false);// 字符是否轉義,默認true
            writer.write(document);
            writer.close();
            logger.info("---建立文件成功---");
        }
    }

    /**
     * 增長文件節點
     */
    public void add(String name, String length) throws Exception {
        logger.info("---開始增長節點---");
        SAXReader reader = new SAXReader();
        Document document = reader.read(new File(positionFilePath));
        Element root = document.getRootElement();
        try {
            Element resource = root.addElement("files");
            Element nameNode = resource.addElement("name");
            Element sizeNode = resource.addElement("size");
            nameNode.setText(name);
            sizeNode.setText(length);
            XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
            writer.setEscapeText(false);// 字符是否轉義,默認true
            writer.write(document);
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        logger.info("---結束增長節點---");
    }

    /**
     * 更新節點
     *
     * @throws IOException
     */
    public void Update(String name, String size) throws IOException {
        logger.info("---開始文件更新---");
        SAXReader reader = new SAXReader();
        Document document = null;
        try {
            document = reader.read(new File(positionFilePath));
            Element root = document.getRootElement();
            Element foo;
            for (Iterator i = root.elementIterator("files"); i.hasNext();) {
                foo = (Element) i.next();

                if (name.equals(foo.elementText("name"))) {
                    foo.element("size").setText(size);

                }
            }
            XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
            writer.setEscapeText(false);// 字符是否轉義,默認true
            writer.write(document);
            writer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        OutputFormat format = OutputFormat.createPrettyPrint();
        format.setEncoding("UTF-8");
        XMLWriter writer = new XMLWriter(new OutputStreamWriter(new FileOutputStream(positionFilePath)), format);
        writer.write(document);
        writer.close();
        logger.info("---文件更新成功---");
    }

@Override
    public void stop() {
        logger.info("----執行結束---");
        super.stop();
        if (ftp != null) {
            try {
                ftp.disconnect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private Table<String, String, String> getTable(Context context, String prefix) {
        Table<String, String, String> table = HashBasedTable.create();
        for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
            String[] parts = e.getKey().split("\\.", 2);
            table.put(parts[0], parts[1], e.getValue());
        }
        return table;
    }

}

9.重寫sink,新建類OracleSink 繼承AbstractSink 實現Configurable

  public class OracleSink extends AbstractSink implements Configurable {

    private Logger logger = LoggerFactory.getLogger(OracleSink.class);
    private String driverClassName;
    private String url;
    private String user;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize;
    private String regex;
    private String sql;
    public OracleSink() {
        logger.info("---start---");
    }


    @Override
    public void configure(Context context) {
        url = context.getString("url");
        logger.info(url + "---url---");
        Preconditions.checkNotNull(url, "url must be set!!");
        driverClassName = context.getString("driverClassName");
        logger.info(driverClassName + "---driverClassName---");
        Preconditions.checkNotNull(driverClassName, "driverClassName must be set!!");
        user = context.getString("user");
        logger.info(user + "---user---");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        logger.info(password + "---password---");
        Preconditions.checkNotNull(password, "password must be set!!");
        regex = context.getString("regex");
        logger.info(regex + "---regex---");
        Preconditions.checkNotNull(regex, "regex must be set!!");
        sql = context.getString("sql");
        logger.info(sql + "---sql---");
        Preconditions.checkNotNull(sql, "sql must be set!!");
        batchSize = context.getInteger("batchSize", 100);
        logger.info(batchSize + "---batchSize---");
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");

    }

    @Override
    public void start() {
        super.start();
        try {
            //調用Class.forName()方法加載驅動程序
            Class.forName(driverClassName);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
//        String url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + databaseName;
        //調用DriverManager對象的getConnection()方法,得到一個Connection對象
        try {
            conn = DriverManager.getConnection(url, user, password);
            conn.setAutoCommit(false);
            //建立一個Statement對象
            preparedStatement = conn.prepareStatement(sql);

        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }
    
    @Override
    public void stop() {
        logger.info("----執行結束---");
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    @Override
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;

        List<Info> infos = Lists.newArrayList();
        transaction.begin();
        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {//對事件進行處理
                    logger.info("---讀取數據---");
                    logger.info("--數據庫鏈接是否關閉--" + conn.isClosed());
                    content = new String(event.getBody());
                    Info info=new Info();
                    if(content != null && content != ""){
                        logger.info("---" + content + "---");
                        logger.info("---" + regex + "---");
                        //文件的字段順序固定且有值?
                        Pattern p = Pattern.compile(regex);
                        Matcher m = p.matcher(content);
                        if (m.find()) {
                            logger.info("匹配上的長度==" + m.groupCount());
                            for (int j = 1; j <= m.groupCount(); j++) {
                                logger.info("---每一個匹配後的值" + j + "=" + m.group(j) + "---");
                                switch (j) {
                                case 1:
                                    info.setProvinceid(m.group(j));
                                    break;
                                case 2:
                                    info.setProvincename(m.group(j));
                                    break;
                                case 3:
                                    info.setSessionid(m.group(j));
                                    break;
                                case 4:
                                    info.setMainacctid(m.group(j));
                                    break;
                                case 5:
                                    info.setMainacctname(m.group(j));
                                    break;
                                case 6:
                                    info.setSlaveacctid(m.group(j));
                                    break;
                                case 7:
                                    info.setOrgid(m.group(j));
                                    break;
                                case 8:
                                    info.setOrgname(m.group(j));
                                    break;
                                case 9:
                                    info.setClientaddr(m.group(j));
                                    break;
                                case 10:
                                    info.setResid(m.group(j));
                                    break;
                                case 11:
                                    info.setResip(m.group(j));
                                    break;
                                case 12:
                                    info.setResport(m.group(j));
                                    break;
                                case 13:
                                    info.setBelongsysid(m.group(j));
                                    break;
                                case 14:
                                    info.setBelongsysname(m.group(j));
                                    break;
                                case 15:
                                    info.setLogintime(m.group(j));
                                    break;
                                case 16:
                                    info.setLogouttime(m.group(j));
                                    break;
                                case 17:
                                    info.setVideoname(m.group(j));
                                    break;
                                case 18:
                                    info.setVideofiledir(m.group(j));
                                    break;
                                case 19:
                                    info.setReserve(m.group(j));
                                    break;

                                default:
                                    break;
                                }
                            }
                            infos.add(info);
                        }
                        
                    }
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }

            if (infos.size() > 0) {
                logger.info("infos的長度==" + infos.size());
                preparedStatement.clearBatch();
                for (Info temp : infos) {
                    preparedStatement.setString(1, temp.getProvinceid() != null ? temp.getProvinceid() : "");
                    preparedStatement.setString(2, temp.getProvincename() != null ? temp.getProvincename() : "");
                    preparedStatement.setString(3, temp.getSessionid() != null ? temp.getSessionid() : "");
                    preparedStatement.setString(4, temp.getMainacctid() != null ? temp.getMainacctid() : "");
                    preparedStatement.setString(5, temp.getMainacctname() != null ? temp.getMainacctname() : "");
                    preparedStatement.setString(6, temp.getSlaveacctid() != null ? temp.getSlaveacctid() : "");
                    preparedStatement.setString(7, temp.getOrgid() != null ? temp.getOrgid() : "");
                    preparedStatement.setString(8, temp.getOrgname() != null ? temp.getOrgname() : "");
                    preparedStatement.setString(9, temp.getClientaddr() != null ? temp.getClientaddr() : "");
                    preparedStatement.setString(10, temp.getResid() != null ? temp.getResid() : "");
                    preparedStatement.setString(11, temp.getResip() != null ? temp.getResip() : "");
                    preparedStatement.setString(12, temp.getResport() != null ? temp.getResport() : "");
                    preparedStatement.setString(13, temp.getBelongsysid() != null ? temp.getBelongsysid() : "");
                    preparedStatement.setString(14, temp.getBelongsysname() != null ? temp.getBelongsysname() : "");
                    preparedStatement.setString(15, temp.getLogintime() != null ? temp.getLogintime() : "");
                    preparedStatement.setString(16, temp.getLogouttime() != null ? temp.getLogouttime() : "");
                    preparedStatement.setString(17, temp.getVideoname() != null ? temp.getVideoname() : "");
                    preparedStatement.setString(18, temp.getVideofiledir() != null ? temp.getVideofiledir() : "");
                    preparedStatement.setString(19, temp.getReserve() != null ? temp.getReserve() : "");
                    preparedStatement.addBatch();
                }
                try{
                    preparedStatement.executeBatch();
                }catch(SQLException e){
                    logger.error("------批量執行sql錯誤");
                    e.printStackTrace();
                }
                

                conn.commit();
            }
            transaction.commit();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                conn.rollback();
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
            try {
                transaction.rollback();
            } catch (Exception e2) {
                logger.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            logger.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }

}

10.把maven打包,放在apache-flume-1.7.0-bin/lib下,把依賴的包好比dom4j、ojdbc6也放進去。

11.運行flume,在apache-flume-1.7.0-bin寫命令: 

  //運行flume(按ctrl+c能夠中止服務)

  flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console

  //後臺運行flume(後臺運行的flume,寫命令ps  -ef|grep taildirsource.conf或者ps  -ef|grep flume ,獲取進程id,而後kill掉)

  flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console &

相關文章
相關標籤/搜索