Flume將MySQL表數據存入到HDFS

Flume將MySQL表數據存入到HDFS

1、建立MySQL表

-- ----------------------------
-- Table structure for t_name
-- ----------------------------
DROP TABLE IF EXISTS `t_name`;
CREATE TABLE `t_name`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `sip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
  `dip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
  `sport` int(11) NULL DEFAULT NULL,
  `dport` int(11) NULL DEFAULT NULL,
  `protocol` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
  `flowvalue` int(11) NULL DEFAULT NULL,
  `createtime` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 21 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of t_name
-- ----------------------------
INSERT INTO `t_name` VALUES (1, '76.58.692.49', '23.28.380.27', 53, 17, 'TCP', 10, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (2, '36.30.754.95', '21.19.847.60', 92, 61, 'TCP', 56, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (3, '29.65.205.81', '61.41.360.21', 79, 44, 'TCP', 45, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (4, '69.65.715.32', '90.60.887.73', 82, 89, 'TCP', 25, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (5, '92.51.427.29', '86.42.538.10', 98, 96, 'TCP', 11, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (6, '10.43.459.69', '42.16.826.51', 77, 32, 'TCP', 53, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (7, '40.52.822.52', '37.87.208.90', 79, 77, 'TCP', 12, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (8, '99.49.363.76', '53.13.402.25', 81, 90, 'TCP', 30, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (9, '94.90.526.47', '80.29.188.65', 29, 62, 'TCP', 62, '2019-03-05 12:25:47');
INSERT INTO `t_name` VALUES (10, '37.84.816.99', '31.64.935.94', 27, 45, 'TCP', 30, '2019-03-05 12:25:47');

2、建立相關狀態文件與HDFS目標目錄

1. 建立狀態文件

mkdir /home/lwenhao/flume
cd /home/lwenhao/flume
touch sql-source.status
chmod -R 777 /home/lwenhao/flume

2. 建立HDFS目錄

hdfs fs -mkdir /flume/mysql
hdfs fs -chmod -R 777 /flume/mysql

3、導入JAR包

我安裝的是MySQL5.7版本須要flume-ng-sql-source-1.x.x.jarmysql-connector-java-5.x.x-bin.jarjava

下載完成以後,把這兩個jar包複製到/apache-flume-1.9.0-bin/lib/目錄下mysql

4、配置Flume

# Channel名稱
agent.channels = ch1

# Sink名稱
agent.sinks = HDFS

# Source名稱
agent.sources = sql-source

# Agent的channel類型
agent.channels.ch1.type = memory

# Source對應的channel名稱
agent.sources.sql-source.channels = ch1

# Source類型
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

# 數據庫URL
agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop

# 數據庫用戶名
agent.sources.sql-source.hibernate.connection.user = root

# 數據庫密碼
agent.sources.sql-source.hibernate.connection.password = root

# 數據庫表名
agent.sources.sql-source.table = t_name

# 查詢的列
agent.sources.sql-source.columns.to.select = *

# 查詢的列
agent.sources.sql-source.incremental.column.name = id

# 增量初始值
agent.sources.sql-source.incremental.value = 0

# 發起查詢的時間間隔,單位是毫秒
agent.sources.sql-source.run.query.delay=5000

# 狀態文件路徑
agent.sources.sql-source.status.file.path = /home/lwenhao/flume

# 狀態文件名稱
agent.sources.sql-source.status.file.name = sql-source.status

# Sink對應的channel名稱
agent.sinks.HDFS.channel = ch1

# Sink類型
agent.sinks.HDFS.type = hdfs

# Sink路徑
agent.sinks.HDFS.hdfs.path = hdfs://192.168.1.67:9001/flume/mysql

# 流數據的文件類型
agent.sinks.HDFS.hdfs.fileType = DataStream

# 數據寫入格式
agent.sinks.HDFS.hdfs.writeFormat = Text

# 目標文件輪轉大小,單位是字節
agent.sinks.HDFS.hdfs.rollSize = 268435456

# hdfs sink間隔多長將臨時文件滾動成最終目標文件,單位是秒;若是設置成0,則表示不根據時間來滾動文件
agent.sinks.HDFS.hdfs.rollInterval = 0

# 當events數據達到該數量時候,將臨時文件滾動成目標文件;若是設置成0,則表示不根據events數據來滾動文件
agent.sinks.HDFS.hdfs.rollCount = 0

5、 啓動flume

bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-conf.conf

6、 效果

相關文章
相關標籤/搜索