HBasesink的三種序列化模式mysql
hbase(main):021:0> create 'default:table1', 'info' Created table default:table1 Took 1.3042 seconds => Hbase::Table - table1
agent.channels = ch1 agent.sinks = hbase-sink agent.sources = sql-source agent.channels.ch1.type = memory agent.sources.sql-source.channels = ch1 agent.sources.sql-source.type = org.keedio.flume.source.SQLSource agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop agent.sources.sql-source.hibernate.connection.user = root agent.sources.sql-source.hibernate.connection.password = root agent.sources.sql-source.table = t_name agent.sources.sql-source.columns.to.select = * agent.sources.sql-source.incremental.column.name = id agent.sources.sql-source.incremental.value = 0 agent.sources.sql-source.run.query.delay=5000 agent.sources.sql-source.status.file.path = /home/lwenhao/flume agent.sources.sql-source.status.file.name = sql-source.status # sink 配置爲HBaseSink 和 SimpleHbaseEventSerializer agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink #HBase表名 agent.sinks.hbase-sink.table = table1 #HBase表的列族名稱 agent.sinks.hbase-sink.columnFamily = info agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer #HBase表的列族下的某個列名稱 agent.sinks.hbase-sink.serializer.payloadColumn = id,sip,dip,sport,dport,protocol,flowvalue,createtime # 組合sink和channel agent.sinks.hbase-sink.channel = ch1
bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-hbase.conf -Dflume.root.logger=DEBUG,console
字段對應的值存在問題,緣由:SimpleHbaseEventSerializer
只能進行簡單的匹配,數據已經存入hbase。若是想多個字段匹配怎麼辦?使用RegexHbaseEventSerializer
與SimpleAsyncHbaseEventSerializer
,也能夠自定義。sql
RegexHbaseEventSerializer
能夠使用正則匹配切割event,而後存入HBase表的多個列apache
先清空table1oop
truncate 'table1'
agent.channels = ch1 agent.sinks = hbase-sink agent.sources = sql-source agent.channels.ch1.type = memory agent.sources.sql-source.channels = ch1 agent.sources.sql-source.type = org.keedio.flume.source.SQLSource agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop agent.sources.sql-source.hibernate.connection.user = root agent.sources.sql-source.hibernate.connection.password = root agent.sources.sql-source.table = t_name agent.sources.sql-source.columns.to.select = * agent.sources.sql-source.incremental.column.name = id agent.sources.sql-source.incremental.value = 0 agent.sources.sql-source.run.query.delay=5000 agent.sources.sql-source.status.file.path = /home/lwenhao/flume agent.sources.sql-source.status.file.name = sql-source.status agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink agent.sinks.hbase-sink.table = table1 agent.sinks.hbase-sink.columnFamily = info agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer agent.sinks.hbase-sink.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"$ agent.sinks.hbase-sink.serializer.colNames = id,sip,dip,sport,dport,protocol,flowvalue,createtime agent.sinks.hbase-sink.channel = ch1
bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-hbase.conf