本文簡單介紹一下kafka streams的join操做bash
A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.app
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> left = builder.stream("intpu-left");
KStream<String, String> right = builder.stream("intpu-right");
KStream<String, String> all = left.selectKey((key, value) -> value.split(",")[1])
.join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner<String, String, String>() {
@Override
public String apply(String value1, String value2) {
return value1 + "--" + value2;
}
}, JoinWindows.of(30000));
all.print();複製代碼
因爲join操做是根據key來,因此一般通常要再次映射一下keyide
sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181
sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-left
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-right複製代碼
左邊輸入諸如測試
1,a 2,b 3,c 3,c 4,d 1,a 2,b 3,c 1,a 2,b 3,c 4,e 5,h 6,f 7,g複製代碼
右邊輸入諸如ui
a,hello b,world c,hehehe c,aaa d,eee a,cccc b,aaaaaa c,332435 a,dddd b,2324 c,ddddd e,23453 h,2222222 f,0o0o0o0 g,ssss複製代碼
輸出實例spa
[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd [KSTREAM-MERGE-0000000014]: b , 2,b--b,2324 2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-17 22:17:34.585 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1複製代碼
這裏使用的是inner join,也有left join,也有outer join。若是要記錄在時間窗口沒有匹配上的記錄,能夠使用outer join,額外存儲下來,而後再根據已經匹配的記錄再過濾一次。code
輸出實例orm
[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd [KSTREAM-MERGE-0000000014]: f , 4,f--f,ddddddd 2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1 2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_0 2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_2 2017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_1 2017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_0 2017-10-17 22:31:12.539 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_2 2017-10-17 22:31:12.540 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_1 2017-10-17 22:31:12.541 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_2 [KSTREAM-MERGE-0000000014]: g , 5,g--null [KSTREAM-MERGE-0000000014]: h , 6,h--null [KSTREAM-MERGE-0000000014]: h , 6,h--h,ddddddd複製代碼
kafka streams的join操做,很是適合不一樣數據源的實時匹配操做。kafka