Flume將MySQL表數據存入到HBase

Flume將MySQL表數據存入到HBase

HBasesink的三種序列化模式mysql

  • SimpleHbaseEventSerializer
  • RegexHbaseEventSerializer
  • SimpleAsyncHbaseEventSerializer

使用SimpleHbaseEventSerializer序列化模式

1、在HBase中建立table1

hbase(main):021:0> create 'default:table1', 'info'
Created table default:table1
Took 1.3042 seconds
=> Hbase::Table - table1

2、flume的配置文件

agent.channels = ch1
agent.sinks = hbase-sink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource


agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop
agent.sources.sql-source.hibernate.connection.user = root  
agent.sources.sql-source.hibernate.connection.password = root
agent.sources.sql-source.table = t_name
agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /home/lwenhao/flume
agent.sources.sql-source.status.file.name = sql-source.status


# sink 配置爲HBaseSink 和 SimpleHbaseEventSerializer
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
#HBase表名
agent.sinks.hbase-sink.table = table1
#HBase表的列族名稱
agent.sinks.hbase-sink.columnFamily  = info
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
#HBase表的列族下的某個列名稱
agent.sinks.hbase-sink.serializer.payloadColumn = id,sip,dip,sport,dport,protocol,flowvalue,createtime
# 組合sink和channel
agent.sinks.hbase-sink.channel = ch1

3、啓動flume

bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-hbase.conf -Dflume.root.logger=DEBUG,console

4、效果

字段對應的值存在問題,緣由:SimpleHbaseEventSerializer只能進行簡單的匹配,數據已經存入hbase。若是想多個字段匹配怎麼辦?使用RegexHbaseEventSerializerSimpleAsyncHbaseEventSerializer,也能夠自定義。sql

使用RegexHbaseEventSerializer序列化模式

RegexHbaseEventSerializer能夠使用正則匹配切割event,而後存入HBase表的多個列apache

先清空table1oop

truncate 'table1'

1、修改flume的配置文件

agent.channels = ch1
agent.sinks = hbase-sink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop
agent.sources.sql-source.hibernate.connection.user = root
agent.sources.sql-source.hibernate.connection.password = root
agent.sources.sql-source.table = t_name
agent.sources.sql-source.columns.to.select = *
agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0
agent.sources.sql-source.run.query.delay=5000
agent.sources.sql-source.status.file.path = /home/lwenhao/flume
agent.sources.sql-source.status.file.name = sql-source.status

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = table1
agent.sinks.hbase-sink.columnFamily  = info
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"$
agent.sinks.hbase-sink.serializer.colNames = id,sip,dip,sport,dport,protocol,flowvalue,createtime
agent.sinks.hbase-sink.channel = ch1

2、啓動flume

bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-hbase.conf

3、效果

相關文章
相關標籤/搜索