因爲公司業務需求,須要搭建一套實時處理數據平臺,基於多方面調研選擇了Flink.docker
部署zookeeper集羣 基於docker-compose ,使用 docker stack 部署在容器中,因爲zookeeper存在數據持久化存儲,這塊後面能夠考慮共享存儲方案.app
services: zoo1: image: zookeeper restart: always hostname: zoo1 ports: - 2181:2181 environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 zoo2: image: zookeeper restart: always hostname: zoo2 ports: - 2182:2181 environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888 zoo3: image: zookeeper restart: always hostname: zoo3 ports: - 2183:2181 environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
version: "3" services: jobmanager: image: flink:1.7.2-scala_2.12-alpine ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:1.7.2-scala_2.12-alpine command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
此時只是一個jobmanager 存在單機問題,能夠考慮將容器內部的 fluentd.conf 掛載出來,配置zookeeper HA。ide
Flink案例demo,採用讀取kafka中數據實時處理,而後將結果存儲到influxDb中展現ui
// 實時流main
public class SportRealTimeJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); KafkaConnector connector = new KafkaConnector("192.168.30.60:9092","big-data"); env .addSource(connector.getConsumerConnector(Lists.newArrayList("test0"))) .<MessageBody>flatMap((sentence,out)->{ MessageBody body=JSON.parseObject(sentence, MessageBody.class); out.collect(body); }) .shuffle() .keyBy(messageBody -> messageBody.getPhone()+messageBody.getUserId()) .timeWindow(Time.seconds(10)) .reduce((t0, t1) -> new MessageBody(t0.getUserId(),t0.getPhone(),t0.getValue()+t1.getValue())) .addSink(new InfluxWriter()) .setParallelism(1); env.execute("Window WordCount"); } }
// 數據處理實體類demo
@Data @Measurement(name = "sport") public class MessageBody { @Column(name = "userId",tag = true) private String userId; @Column(name = "phone",tag = true) private String phone; @Column(name = "value") private int value; public MessageBody() { } public MessageBody(String userId, String phone, int value) { this.userId = userId; this.phone = phone; this.value = value; } }
// 自定義數據輸出源
public class InfluxWriter extends RichSinkFunction<MessageBody> { private InfluxTemplate template; @Override public void open(Configuration parameters) throws Exception { InfluxBean bean= InfluxBean.builder().dbName("game") .url("http://localhost:8086") .username("admin") .password("admin") .build(); template = new SimpleInfluxTemplate(bean); } @Override public void close() throws Exception { template.close(); } @Override public void invoke(MessageBody value, Context context) throws Exception { template.write(Point.measurement("sport") .addField("value",value.getValue()) .tag("userId",String.valueOf(value.getUserId())) .tag("phone",value.getPhone()) .time(context.currentProcessingTime(), TimeUnit.MILLISECONDS).build()); } }
// influxDb操做類 public class SimpleInfluxTemplate implements InfluxTemplate { private final InfluxDB db; public SimpleInfluxTemplate(InfluxBean bean){ this.db= InfluxDBFactory.connect(bean.getUrl(), bean.getUsername(), bean.getPassword()); db.setDatabase(bean.getDbName()); db.enableBatch(BatchOptions.DEFAULTS.exceptionHandler( (failedPoints, throwable) -> { /* custom error handling here */ }) .consistency(InfluxDB.ConsistencyLevel.ALL) .bufferLimit(100) ); } @Override public void write(Point point) { db.write(point); } @Override public void bentchWrite(BatchPoints points) { db.write(points); } @Override public <T> List<T> query(Query query, Class<T> tClass) { QueryResult result=db.query(query); InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); // thread-safe - can be reused return resultMapper.toPOJO(result, tClass); } @Override public void close() { db.close(); } public interface InfluxTemplate { void write(Point point); void bentchWrite(BatchPoints points); <T> List<T> query(Query query, Class<T> tClass); void close(); } @ToString @Getter @Setter @Builder public class InfluxBean { private String url; private String username; private String password; private String dbName; }