kafka streams的join實例

本文簡單介紹一下kafka streams的join操做app

join

A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.ide

實例

KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> left = builder.stream("intpu-left");
        KStream<String, String> right = builder.stream("intpu-right");

        KStream<String, String> all = left.selectKey((key, value) -> value.split(",")[1])
                .join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner<String, String, String>() {
            @Override
            public String apply(String value1, String value2) {
                return value1 + "--" + value2;
            }
        }, JoinWindows.of(30000));

        all.print();

因爲join操做是根據key來,因此一般通常要再次映射一下key測試

測試

sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181

sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181


sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-left
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-right

左邊輸入諸如ui

1,a
2,b
3,c
3,c
4,d
1,a
2,b
3,c
1,a
2,b
3,c
4,e
5,h
6,f
7,g

右邊輸入諸如code

a,hello
b,world
c,hehehe
c,aaa
d,eee
a,cccc
b,aaaaaa
c,332435
a,dddd
b,2324
c,ddddd
e,23453
h,2222222
f,0o0o0o0
g,ssss

輸出實例orm

[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd
[KSTREAM-MERGE-0000000014]: b , 2,b--b,2324
2017-10-17 22:17:34.578  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-10-17 22:17:34.578  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-17 22:17:34.585  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_1

join類別

這裏使用的是inner join,也有left join,也有outer join。若是要記錄在時間窗口沒有匹配上的記錄,能夠使用outer join,額外存儲下來,而後再根據已經匹配的記錄再過濾一次。kafka

輸出實例it

[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd
[KSTREAM-MERGE-0000000014]: f , 4,f--f,ddddddd
2017-10-17 22:31:12.530  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-10-17 22:31:12.530  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-17 22:31:12.531  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-17 22:31:12.531  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-17 22:31:12.531  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-17 22:31:12.533  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-17 22:31:12.533  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 2_0
2017-10-17 22:31:12.539  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 1_2
2017-10-17 22:31:12.540  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 2_1
2017-10-17 22:31:12.541  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 2_2
[KSTREAM-MERGE-0000000014]: g , 5,g--null
[KSTREAM-MERGE-0000000014]: h , 6,h--null
[KSTREAM-MERGE-0000000014]: h , 6,h--h,ddddddd

小結

kafka streams的join操做,很是適合不一樣數據源的實時匹配操做。io

相關文章
相關標籤/搜索