這次 job
任務的目的是從 kafka
指定 topic
讀取消息,並寫入到 hbase
中;java
消息體包含 project
(消息所屬項目)、table
(要寫入的 hbase
表名)和 data
(json
字符串)。mysql
執行思路:sql
kafka source
;hbase sink
批量寫數據version: '3' services: mysql: image: "docker.io/mysql:5.7" environment: MYSQL_ROOT_PASSWORD: "123456" ports: - "3306:3306" zookeeper: image: harbor.oneitfarm.com/cidata/zookeeper:3.4.14 environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=0.0.0.0:2888:3888 ENABLE_SASL: "true" SUPER_PASSWORD: admin USER_KAFKA_PASSWORD: 123456 ports: - "2182:2181" kafka_broker: image: "harbor.oneitfarm.com/cidata/kafka:2.4.0" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ZOOKEEPER_SASL_ENABLE: "true" ZOOKEEPER_SASL_USER_KAFKA_PASSWORD: 123456 KAFKA_SASL_ENABLE: "true" KAFKA_ADMIN_PASSWORD: 123456 KAFKA_BROKER_ID: 1 KAFKA_HEAP_OPTS: "-Xmx512M -Xms256M" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT KAFKA_ADVERTISED_LISTENERS: INSIDE://:9094,OUTSIDE://${HOST_IP}:19092 KAFKA_LISTENERS: INSIDE://:9094,OUTSIDE://:9092 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE ports: - "19092:9092"
同目錄下要有 .env
文件docker
# 修改爲主機IP HOST_IP=192.168.50.187
<properties> <mainClass>xxx.flinkjob.kafka.Application</mainClass> <flink-version>1.10.0</flink-version> <hbase-version>1.3.1</hbase-version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink-version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase-version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase-version}</version> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>${mainClass}</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
package xxx.flinkjob.kafka.model; public class HttpDataModel { private String project; private String table; private String data; public HttpDataModel() {} public HttpDataModel(String project, String table, String data) { this.project = project; this.table = table; this.data = data; } public String getProject() { return project; } public String getTable() { return table; } public String getData() { return data; } public String getFullTable() { return project + ":" + table; } public void setProject(String project) { this.project = project; } public void setTable(String table) { this.table = table; } public void setData(String data) { this.data = data; } @Override public String toString() { return "HttpDataModel{" + "project='" + project + '\'' + ", table='" + table + '\'' + ", data='" + data + '\'' + '}'; } }
package xxx.flinkjob.kafka.sink; import com.alibaba.fastjson.JSONObject; import xxx.flinkjob.kafka.model.HttpDataModel; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class HbaseSink extends RichSinkFunction<List<HttpDataModel>> implements Serializable { private Logger log; private String hbase_zookeeper_host; private String hbase_zookeeper_port; private Connection connection; private Admin admin; public HbaseSink(String hbase_zookeeper_host, String hbase_zookeeper_port) { this.hbase_zookeeper_host = hbase_zookeeper_host; this.hbase_zookeeper_port = hbase_zookeeper_port; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); log = Logger.getLogger(HbaseSink.class); org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", hbase_zookeeper_port); configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } public void invoke(List<HttpDataModel> datas, Context context) throws Exception { // 按 project:table 概括 Map<String, List<HttpDataModel>> map = new HashMap<String, List<HttpDataModel>>(); for (HttpDataModel data : datas) { if (! map.containsKey(data.getFullTable())) { map.put(data.getFullTable(), new ArrayList<HttpDataModel>()); } map.get(data.getFullTable()).add(data); } // 遍歷 map for(Map.Entry<String, List<HttpDataModel>> entry : map.entrySet()){ // 若是 表不存在,即建立 createTable(entry.getKey()); // 寫數據 List<Put> list = new ArrayList<Put>(); for (HttpDataModel item : entry.getValue()) { Put put = new Put(Bytes.toBytes(String.valueOf(System.currentTimeMillis()))); JSONObject object = JSONObject.parseObject(item.getData()); for (String key: object.keySet()) { put.addColumn("data".getBytes(), key.getBytes(), object.getString(key).getBytes()); } list.add(put); } connection.getTable(TableName.valueOf(entry.getKey())).put(list); } } @Override public void close() throws Exception { super.close(); } /** * 建立 hbase 表 */ private void createTable(String tableName) throws Exception { createNamespace(tableName.split(":")[0]); TableName table = TableName.valueOf(tableName); if (! admin.tableExists(table)) { HTableDescriptor hTableDescriptor = new HTableDescriptor(table); // 固定只有 data 列簇 hTableDescriptor.addFamily(new HColumnDescriptor("data")); admin.createTable(hTableDescriptor); } } /** * 建立命名空間 */ private void createNamespace(String namespace) throws Exception { try { admin.getNamespaceDescriptor(namespace); } catch (NamespaceNotFoundException e) { admin.createNamespace(NamespaceDescriptor.create(namespace).build()); } } }
package xxx.flinkjob.kafka.trigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class CountTrigger<T> extends Trigger<T, TimeWindow> { // 當前的計數標誌 private static int flag = 0; // 最大數量 public static int threshold = 0; public CountTrigger(Integer threshold) { this.threshold = threshold; } /** * 添加到窗口的每一個元素都會調此方法 */ public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.maxTimestamp()); flag++; if(flag >= threshold){ flag = 0; ctx.deleteProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } /** * 當註冊的處理時間計時器觸發時,將調用此方法 */ public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if(flag > 0){ // System.out.println("到達窗口時間執行觸發:" + flag); flag = 0; return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } /** * 當註冊的事件時間計時器觸發時,將調用此方法 */ public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (time >= window.maxTimestamp() && flag > 0) { // System.out.println("到達時間窗口且有數據,觸發操做!"); flag = 0; return TriggerResult.FIRE_AND_PURGE; } else if (time >= window.maxTimestamp() && flag == 0) { // 清除窗口但不觸發 return TriggerResult.PURGE; } return TriggerResult.CONTINUE; } /** * 執行任何須要清除的相應窗口 */ public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); ctx.deleteEventTimeTimer(window.maxTimestamp()); } }
package xxx.flinkjob.kafka; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ci123.data.flinkjob.kafka.model.HttpDataModel; import com.ci123.data.flinkjob.kafka.sink.HbaseSink; import com.ci123.data.flinkjob.kafka.trigger.CountTrigger; import org.apache.commons.cli.*; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class Application { @SuppressWarnings(value={"unchecked"}) public static void main(String[] args) throws Exception { // kafka 須要的參數 String brokers = "127.0.0.1:9092"; String username = "admin"; String password = "123456"; String topic = "test"; // hbase 須要的參數 String hbase_zookeeper_host = "hbase"; String hbase_zookeeper_port = "2181"; // 接收命令行參數,覆蓋默認值 Options options = new Options(); options.addOption("kafka_brokers", true, "kafka cluster hosts, such 127.0.0.1:9092"); options.addOption("kafka_username", true, "kafka cluster username, default: admin"); options.addOption("kafka_user_password", true, "kafka cluster user password, default: 123456"); options.addOption("kafka_topic", true, "kafka cluster topic, default: test"); options.addOption("hbase_zookeeper_host", true, "hbase zookeeper host, default: hbase"); options.addOption("hbase_zookeeper_port", true, "hbase zookeeper port, default: 2181"); CommandLineParser parser = new DefaultParser(); CommandLine line = parser.parse( options, args ); if ( line.hasOption( "kafka_brokers" ) ) { brokers = line.getOptionValue("kafka_brokers"); } else { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp( "flink write hbase job", options ); System.exit(1); } if ( line.hasOption( "kafka_username" ) ) { username = line.getOptionValue("kafka_username"); } if ( line.hasOption( "kafka_user_password" ) ) { password = line.getOptionValue("kafka_user_password"); } if ( line.hasOption( "kafka_topic" ) ) { topic = line.getOptionValue("kafka_topic"); } if ( line.hasOption( "hbase_zookeeper_host" ) ) { hbase_zookeeper_host = line.getOptionValue("hbase_zookeeper_host"); } if ( line.hasOption( "hbase_zookeeper_port" ) ) { hbase_zookeeper_port = line.getOptionValue("hbase_zookeeper_port"); } // 執行任務 doExcute(brokers, username, password, topic, hbase_zookeeper_host, hbase_zookeeper_port); } /** * 具體任務執行 */ public static void doExcute(String kafka_brokers, String kafka_username, String kafka_password, String topic, String hbase_zookeeper_host, String hbase_zookeeper_port) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置 kafka source env.enableCheckpointing(5000 * 100000); Properties props = getKafkaProperties(kafka_username, kafka_password); props.setProperty("bootstrap.servers", kafka_brokers); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props)); // 過濾不標準格式的數據,並格式化 DataStream<HttpDataModel> formated_stream = stream.filter(s -> { JSONObject obj = JSONObject.parseObject(s); return obj.containsKey("project") && obj.containsKey("table") && obj.containsKey("data"); }).map(s -> { return JSON.parseObject(s, HttpDataModel.class); }); // 在 10 秒的時間窗口內,每 100 條觸發輸出到 hbase DataStream<List<HttpDataModel>> batch_stream = formated_stream .timeWindowAll(Time.seconds(10)) .trigger(new CountTrigger(100)) .apply(new AllWindowFunction<HttpDataModel, List<HttpDataModel>, Window>() { public void apply(Window window, Iterable<HttpDataModel> values, Collector<List<HttpDataModel>> out) throws Exception { List<HttpDataModel> lists = new ArrayList<HttpDataModel>(); for (HttpDataModel value : values) { lists.add(value); } out.collect(lists); } }); batch_stream.addSink(new HbaseSink(hbase_zookeeper_host, hbase_zookeeper_port)); // 控制檯輸出 //batch_stream.print(); env.execute("integration-http"); } /** * 獲取 kafka 的默認配置 */ public static Properties getKafkaProperties(String username, String password) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "127.0.0.1:9092"); props.setProperty("group.id", "dataworks-integration"); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, username, password); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", jaasCfg); return props; } }