-- ---------------------------- -- Table structure for t_name -- ---------------------------- DROP TABLE IF EXISTS `t_name`; CREATE TABLE `t_name` ( `id` int(11) NOT NULL AUTO_INCREMENT, `sip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, `dip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, `sport` int(11) NULL DEFAULT NULL, `dport` int(11) NULL DEFAULT NULL, `protocol` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, `flowvalue` int(11) NULL DEFAULT NULL, `createtime` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 21 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of t_name -- ---------------------------- INSERT INTO `t_name` VALUES (1, '76.58.692.49', '23.28.380.27', 53, 17, 'TCP', 10, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (2, '36.30.754.95', '21.19.847.60', 92, 61, 'TCP', 56, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (3, '29.65.205.81', '61.41.360.21', 79, 44, 'TCP', 45, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (4, '69.65.715.32', '90.60.887.73', 82, 89, 'TCP', 25, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (5, '92.51.427.29', '86.42.538.10', 98, 96, 'TCP', 11, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (6, '10.43.459.69', '42.16.826.51', 77, 32, 'TCP', 53, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (7, '40.52.822.52', '37.87.208.90', 79, 77, 'TCP', 12, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (8, '99.49.363.76', '53.13.402.25', 81, 90, 'TCP', 30, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (9, '94.90.526.47', '80.29.188.65', 29, 62, 'TCP', 62, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (10, '37.84.816.99', '31.64.935.94', 27, 45, 'TCP', 30, '2019-03-05 12:25:47');
mkdir /home/lwenhao/flume cd /home/lwenhao/flume touch sql-source.status chmod -R 777 /home/lwenhao/flume
hdfs fs -mkdir /flume/mysql hdfs fs -chmod -R 777 /flume/mysql
我安裝的是MySQL5.7
版本須要flume-ng-sql-source-1.x.x.jar
與mysql-connector-java-5.x.x-bin.jar
java
下載完成以後,把這兩個jar包複製到/apache-flume-1.9.0-bin/lib/
目錄下mysql
# Channel名稱 agent.channels = ch1 # Sink名稱 agent.sinks = HDFS # Source名稱 agent.sources = sql-source # Agent的channel類型 agent.channels.ch1.type = memory # Source對應的channel名稱 agent.sources.sql-source.channels = ch1 # Source類型 agent.sources.sql-source.type = org.keedio.flume.source.SQLSource # 數據庫URL agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop # 數據庫用戶名 agent.sources.sql-source.hibernate.connection.user = root # 數據庫密碼 agent.sources.sql-source.hibernate.connection.password = root # 數據庫表名 agent.sources.sql-source.table = t_name # 查詢的列 agent.sources.sql-source.columns.to.select = * # 查詢的列 agent.sources.sql-source.incremental.column.name = id # 增量初始值 agent.sources.sql-source.incremental.value = 0 # 發起查詢的時間間隔,單位是毫秒 agent.sources.sql-source.run.query.delay=5000 # 狀態文件路徑 agent.sources.sql-source.status.file.path = /home/lwenhao/flume # 狀態文件名稱 agent.sources.sql-source.status.file.name = sql-source.status # Sink對應的channel名稱 agent.sinks.HDFS.channel = ch1 # Sink類型 agent.sinks.HDFS.type = hdfs # Sink路徑 agent.sinks.HDFS.hdfs.path = hdfs://192.168.1.67:9001/flume/mysql # 流數據的文件類型 agent.sinks.HDFS.hdfs.fileType = DataStream # 數據寫入格式 agent.sinks.HDFS.hdfs.writeFormat = Text # 目標文件輪轉大小,單位是字節 agent.sinks.HDFS.hdfs.rollSize = 268435456 # hdfs sink間隔多長將臨時文件滾動成最終目標文件,單位是秒;若是設置成0,則表示不根據時間來滾動文件 agent.sinks.HDFS.hdfs.rollInterval = 0 # 當events數據達到該數量時候,將臨時文件滾動成目標文件;若是設置成0,則表示不根據events數據來滾動文件 agent.sinks.HDFS.hdfs.rollCount = 0
bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-conf.conf