flink統計根據帳號每30秒 金額的平均值

package com.zetyun.streaming.flink;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;import org.apache.flink.util.Collector;import javax.annotation.Nullable;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import java.util.Properties;/** * Created by jyt on 2018/4/10. * 基於帳號計算每30秒 金額的平均值 */public class EventTimeAverage {    public static void main(String[] args) throws Exception {        final ParameterTool parameterTool = ParameterTool.fromArgs(args);        String topic = parameterTool.get("topic", "accountId-avg");        Properties properties = parameterTool.getProperties();        properties.setProperty("bootstrap.servers", "192.168.44.101:9092");        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        ObjectMapper objectMapper = new ObjectMapper();        SingleOutputStreamOperator<ObjectNode> source = env.addSource(new FlinkKafkaConsumer010(                topic,                new JSONDeserializationSchema(),                properties));        //設置WaterMarks方式一        /*SingleOutputStreamOperator<ObjectNode> objectNodeOperator = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(15)) {            @Override            public long extractTimestamp(ObjectNode element) {                SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");                Date eventTime = null;                try {                    eventTime = format.parse(element.get("eventTime").asText());                } catch (ParseException e) {                    e.printStackTrace();                }                return eventTime.getTime();            }        });*/        //設置WaterMarks方式二        SingleOutputStreamOperator<ObjectNode> objectNodeOperator = source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<ObjectNode>() {            public long currentMaxTimestamp = 0L;            public static final long maxOutOfOrderness = 10000L;//最大容許的亂序時間是10s            Watermark a = null;            SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");            @Nullable            @Override            public Watermark getCurrentWatermark() {                a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);                return a;            }            @Override            public long extractTimestamp(ObjectNode jsonNodes, long l) {                String time = jsonNodes.get("eventTime").asText();                long timestamp = 0;                try {                    timestamp = format.parse(time).getTime();                } catch (ParseException e) {                    e.printStackTrace();                }                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);                return timestamp;            }        });        KeyedStream<Tuple3<String, Double, String>, Tuple> keyBy = objectNodeOperator.map(new MapFunction<ObjectNode, Tuple3<String, Double, String>>() {            @Override            public Tuple3<String, Double, String> map(ObjectNode jsonNodes) throws Exception {                System.out.println(jsonNodes.get("accountId").asText() + "==map====" + jsonNodes.get("amount").asDouble() + "===map===" + jsonNodes.get("eventTime").asText());                return new Tuple3<String, Double, String>(jsonNodes.get("accountId").asText(), jsonNodes.get("amount").asDouble(), jsonNodes.get("eventTime").asText());            }        }).keyBy(0);        SingleOutputStreamOperator<Object> apply = keyBy.window(TumblingEventTimeWindows.of(Time.seconds(30))).apply(new WindowFunction<Tuple3<String,Double,String>, Object, Tuple, TimeWindow>() {            @Override            public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<String, Double, String>> iterable, Collector<Object> collector) throws Exception {                Iterator<Tuple3<String, Double, String>> iterator = iterable.iterator();                int count =0;                double num = 0.0;                ///Tuple2<String, Double> result = null;                Tuple3<String, Double, String> next = null;                String accountId = null ;                while (iterator.hasNext()) {                    next = iterator.next();                    System.out.println(next);                    accountId=next.f0;                    num += next.f1;                    count++;                }                if (next != null) {                    collector.collect(new Tuple2<String, Double>(accountId,num/count));                }            }        });        apply.print();        //apply.addSink(new FlinkKafkaProducer010<String>("192.168.44.101:9092","wiki-result",new SimpleStringSchema()));        env.execute("AverageDemo");    }}
相關文章
相關標籤/搜索