物聯網架構成長之路(8)-EMQ-Hook瞭解、鏈接Kafka發送消息

1. 前言html

  按照我本身設計的物聯網框架,對於MQTT集羣中的全部消息,是要持久化到磁盤的,這裏採用一個消息隊列中間件Kafka做爲數據緩衝,緩衝結果存到數據倉庫中,以供後續做爲數據分析。因爲MQTT集羣中的消息都是比較分散的,因此使用Kafka來聚合、採集消息。node

2. 下載&編譯&安裝git

  Kafka依賴ZooKeepergithub

  在這裏下載 http://mirrors.shuosc.org/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gzredis

  http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka-1.0.0-src.tgzapache

  http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgzjson

  學習的話, 能夠參考這個文檔 http://orchome.com/kafka/index bootstrap

  配置使用過程能夠參考官網的  http://kafka.apache.org/quickstart  http://zookeeper.apache.org/doc/current/zookeeperStarted.html ,有些資料由於版本升級的緣由,已經不是之前的那種啓動方式了。https://cwiki.apache.org/confluence/display/KAFKA/Clients 服務器

3. 啓動Zookeepersession

1 cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
2 ./bin/zkServer.sh start


  上圖,就表示啓動單機模式。

  ./bin/zkCli.sh -server 127.0.0.1:2181 進行鏈接

  而後進入命令行模式,能夠慢慢玩了

  單擊模式就沒有什麼要配置的。最多修改zoo.cfg中的dataDir文件

  ZooKeeper啓動replicated模式,集羣模式

  zoo.cfg 增長服務器集羣信息

1 server.1=172.16.20.229:2888:3888
2 server.2=172.16.23.203:2888:3888
3 server.3=172.16.23.204:2888:3888
1 ./bin/zkServer.sh start-foreground #啓動

  注意在echo 「1」 > %dataDir%/myid 對於每一個服務器都要建立一個myid文件

  啓動都是會有一些奇奇怪怪的問題,上網找資料就能夠了。

  通常第一臺ZooKeeper啓動是會有Connection refused 出錯,這個是正常的,後面的兩臺尚未啓動,不事後面也一個一個啓動了。

  若是過程當中,有一個斷開了,而後修改數據,而後這個斷開的又連上了,那麼ZooKeeper集羣內部會鏡像diff

1 2017-12-28 16:30:38,570 [myid:3] - INFO  [QuorumPeer[myid=3]/0.0.0.0:2181:Learner@332] - Getting a diff from the leader 0x100000005

  而後就用客戶端A

1 ./bin/zkCli.sh -server 172.16.20.229:2181
2 ls /
3 create /zk_test my_data
4 ls /

  而後用客戶端B

1 ./bin/zkCli.sh -server 172.16.23.203:2181
2 ls /
3 get /zk_test
4 ls /

  能夠看到ZooKeeper信息在內部進行了共享

  具體能夠參考這篇博客 http://www.cnblogs.com/sunddenly/p/4018459.html 

4. 啓動 Kafka

  因爲kafka以來ZooKeeper,因此有了上面一步的ZooKeeper瞭解。

  實際中,能夠直接下載kafka的二進制包,直接使用,http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 

   啓動Zookeeper

1 ./bin/zookeeper-server-start.sh config/zookeeper.properties

  啓動Kafka

1 ./bin/kafka-server-start.sh config/server.properties

  建立主題

1 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  查看主題

1 ./bin/kafka-topics.sh --list --zookeeper localhost:2181

  發送消息

1 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  消費消息

1 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

5. Erlang 鏈接 kafka

  主要參考這個 https://github.com/msdevanms/emqttd_plugin_kafka_bridge 

  增長依賴 https://github.com/helpshift/ekaf.git

  首先在Makefile增長

 

1 DEPS = eredis ecpool clique ekaf
2 dep_ekaf = git https://github.com/helpshift/ekaf master

 

  而後在rebar.config 增長

1 {ekaf, 「.*」, {git, 「https://github.com/helpshift/ekaf」, 「master」}}

  在etc/emq_plugin_wunaozai.conf 中增長

1 ## kafka config
2 wunaozai.msg.kafka.server = 127.0.0.1:9092
3 wunaozai.msg.kafka.topic = test

  在priv/emq_plugin_wunaozai.schema 中增長

 1 %% wunaozai.msg.kafka.server = 127.0.0.1:9092
 2 {
 3     mapping,
 4     "wunaozai.msg.kafka.server",
 5     "emq_plugin_wunaozai.kafka",
 6     [
 7         {default, {"127.0.0.1", 9092}},
 8         {datatype, [integer, ip, string]}
 9     ]
10 }.
11 
12 %% wunaozai.msg.kafka.topic = test
13 {
14     mapping,
15     "wunaozai.msg.kafka.topic",
16     "emq_plugin_wunaozai.kafka",
17     [
18         {default, "test"},
19         {datatype, string},
20         hidden
21     ]
22 }.
23 
24 %% translation
25 {
26     translation,
27     "emq_plugin_wunaozai.kafka",
28     fun(Conf) ->
29             {RHost, RPort} = case cuttlefish:conf_get("wunaozai.msg.kafka.server", Conf) of
30                                  {Ip, Port} -> {Ip, Port};
31                                  S          -> case string:tokens(S, ":") of
32                                                    [Domain]       -> {Domain, 9092};
33                                                    [Domain, Port] -> {Domain, list_to_integer(Port)}
34                                                end
35                              end,
36             Topic = cuttlefish:conf_get("wunaozai.msg.kafka.topic", Conf),
37             [
38              {host, RHost},
39              {port, RPort},
40              {topic, Topic}
41             ]
42     end
43 }.

6. 數據發往Kafka

  接下來,因爲功能基本上是基於EMQ框架的Hook鉤子設計,在EMQ接收到客戶端上下線、主題訂閱或消息發佈確認時,觸發鉤子順序執行回調函數,因此大部分功能在 src/emq_plugin_wunaozai.erl 文件進行修改。

  1 -module(emq_plugin_wunaozai).
  2 
  3 -include("emq_plugin_wunaozai.hrl").
  4 
  5 -include_lib("emqttd/include/emqttd.hrl").
  6 
  7 -export([load/1, unload/0]).
  8 
  9 %% Hooks functions
 10 
 11 -export([on_client_connected/3, on_client_disconnected/3]).
 12 
 13 -export([on_client_subscribe/4, on_client_unsubscribe/4]).
 14 
 15 -export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
 16 
 17 -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
 18 
 19 %% Called when the plugin application start
 20 load(Env) ->
 21     ekaf_init([Env]),
 22     emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
 23     emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
 24     emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
 25     emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
 26     emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
 27     emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
 28     emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
 29     emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
 30     emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
 31     emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
 32     emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]),
 33     io:format("start wunaozai Test Reload.~n", []).
 34 
 35 on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
 36     io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
 37     ekaf_send(<<"connected">>, ClientId, {}, _Env),
 38     {ok, Client}.
 39 
 40 on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
 41     io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
 42     ekaf_send(<<"disconnected">>, ClientId, {}, _Env),
 43     ok.
 44 
 45 on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
 46     io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
 47     {ok, TopicTable}.
 48     
 49 on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
 50     io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
 51     {ok, TopicTable}.
 52 
 53 on_session_created(ClientId, Username, _Env) ->
 54     io:format("session(~s/~s) created.", [ClientId, Username]).
 55 
 56 on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
 57     io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
 58     ekaf_send(<<"subscribed">>, ClientId, {Topic, Opts}, _Env),
 59     {ok, {Topic, Opts}}.
 60 
 61 on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
 62     io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
 63     ekaf_send(<<"unsubscribed">>, ClientId, {Topic, Opts}, _Env),
 64     ok.
 65 
 66 on_session_terminated(ClientId, Username, Reason, _Env) ->
 67     io:format("session(~s/~s) terminated: ~p.~n", [ClientId, Username, Reason]),
 68     stop.
 69 
 70 %% transform message and return
 71 on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
 72     {ok, Message};
 73 on_message_publish(Message, _Env) ->
 74     io:format("publish ~s~n", [emqttd_message:format(Message)]),
 75     ekaf_send(<<"public">>, {}, Message, _Env),
 76     {ok, Message}.
 77 
 78 on_message_delivered(ClientId, Username, Message, _Env) ->
 79     io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
 80     {ok, Message}.
 81 
 82 on_message_acked(ClientId, Username, Message, _Env) ->
 83     io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
 84     {ok, Message}.
 85 
 86 %% Called when the plugin application stop
 87 unload() ->
 88     emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
 89     emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
 90     emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
 91     emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
 92     emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
 93     emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
 94     emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
 95     emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
 96     emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
 97     emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
 98     emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
 99 
100 %% ==================== ekaf_init STA.===============================%%
101 ekaf_init(_Env) ->
102     % clique 方式讀取配置文件
103     Env = application:get_env(?APP, kafka),
104     {ok, Kafka} = Env,
105     Host = proplists:get_value(host, Kafka),
106     Port = proplists:get_value(port, Kafka),
107     Broker = {Host, Port},
108     Topic = proplists:get_value(topic, Kafka),
109     io:format("~w ~w ~w ~n", [Host, Port, Topic]),
110 
111     % init kafka
112     application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
113     application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
114     application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
115     %application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
116     %application:set_env(ekaf, ekaf_bootstrap_topics, <<"test">>),
117 
118     %io:format("Init ekaf with ~s:~b~n", [Host, Port]),
119     %%ekaf:produce_async_batched(<<"test">>, list_to_binary(Json)),
120     ok.
121 %% ==================== ekaf_init END.===============================%%
122 
123 
124 %% ==================== ekaf_send STA.===============================%%
125 ekaf_send(Type, ClientId, {}, _Env) ->
126     Json = mochijson2:encode([
127                               {type, Type},
128                               {client_id, ClientId},
129                               {message, {}},
130                               {cluster_node, node()},
131                               {ts, emqttd_time:now_ms()}
132                              ]),
133     ekaf_send_sync(Json);
134 ekaf_send(Type, ClientId, {Reason}, _Env) ->
135     Json = mochijson2:encode([
136                               {type, Type},
137                               {client_id, ClientId},
138                               {cluster_node, node()},
139                               {message, Reason},
140                               {ts, emqttd_time:now_ms()}
141                              ]),
142     ekaf_send_sync(Json);
143 ekaf_send(Type, ClientId, {Topic, Opts}, _Env) ->
144     Json = mochijson2:encode([
145                               {type, Type},
146                               {client_id, ClientId},
147                               {cluster_node, node()},
148                               {message, [
149                                          {topic, Topic},
150                                          {opts, Opts}
151                                         ]},
152                               {ts, emqttd_time:now_ms()}
153                              ]),
154     ekaf_send_sync(Json);
155 ekaf_send(Type, _, Message, _Env) ->
156     Id = Message#mqtt_message.id,
157     From = Message#mqtt_message.from, %須要登陸和不須要登陸這裏的返回值是不同的
158     Topic = Message#mqtt_message.topic,
159     Payload = Message#mqtt_message.payload,
160     Qos = Message#mqtt_message.qos,
161     Dup = Message#mqtt_message.dup,
162     Retain = Message#mqtt_message.retain,
163     Timestamp = Message#mqtt_message.timestamp,
164 
165     ClientId = c(From),
166     Username = u(From),
167 
168     Json = mochijson2:encode([
169                               {type, Type},
170                               {client_id, ClientId},
171                               {message, [
172                                          {username, Username},
173                                          {topic, Topic},
174                                          {payload, Payload},
175                                          {qos, i(Qos)},
176                                          {dup, i(Dup)},
177                                          {retain, i(Retain)}
178                                         ]},
179                               {cluster_node, node()},
180                               {ts, emqttd_time:now_ms()}
181                              ]),
182     ekaf_send_sync(Json).
183 
184 ekaf_send_async(Msg) ->
185     Topic = ekaf_get_topic(),
186     ekaf_send_async(Topic, Msg).
187 ekaf_send_async(Topic, Msg) ->
188     ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Msg)).
189 ekaf_send_sync(Msg) ->
190     Topic = ekaf_get_topic(),
191     ekaf_send_sync(Topic, Msg).
192 ekaf_send_sync(Topic, Msg) ->
193     ekaf:produce_sync_batched(list_to_binary(Topic), list_to_binary(Msg)).
194 
195 i(true) -> 1;
196 i(false) -> 0;
197 i(I) when is_integer(I) -> I.
198 c({ClientId, Username}) -> ClientId;
199 c(From) -> From.
200 u({ClientId, Username}) -> Username;
201 u(From) -> From.
202 %% ==================== ekaf_send END.===============================%%
203 
204 
205 %% ==================== ekaf_set_host STA.===============================%%
206 ekaf_set_host(Host) ->
207     ekaf_set_host(Host, 9092).
208 ekaf_set_host(Host, Port) ->
209     Broker = {Host, Port},
210     application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
211     io:format("reset ekaf Broker ~s:~b ~n", [Host, Port]),
212     ok.
213 %% ==================== ekaf_set_host END.===============================%%
214 
215 %% ==================== ekaf_set_topic STA.===============================%%
216 ekaf_set_topic(Topic) ->
217     application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
218     ok.
219 ekaf_get_topic() ->
220     Env = application:get_env(?APP, kafka),
221     {ok, Kafka} = Env,
222     Topic = proplists:get_value(topic, Kafka),
223     Topic.
224 %% ==================== ekaf_set_topic END.===============================%%

  上面是全部源代碼,下面對其進行簡單說明

  ekaf_init 函數,主要對配置文件的讀取和解析並存放到application的環境變量中

  ekaf_send 函數,主要是封裝成對應的JSON數據,而後發到Kafka

  ekaf_send_async 函數,主要是異步發送JSON數據,不確保發往Kafka的順序與Kafka消費者的接收時的順序

  ekaf_send_sync 函數,是同步發送JSON數據,確保按照順序發往kafkaKafka消費者有序接收數據

  ekaf_set_host 函數,設置kafka的域名與端口

  ekaf_set_topic 函數,設置發往kafka時的主題

  ekaf_get_topic 函數,獲取當前主題

  load函數增長ekaf_init調用

  剩下的在每一個鉤子回調中調用 ekaf_send函數

7. 測試

  (1)啓動Zookeeper ./bin/zookeeper-server-start.sh config/zookeeper.properties

  (2)啓動Kafka ./bin/kafka-server-start.sh config/server.properties

  (3)啓動消費者 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

  (4)啓動一個生產者 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  (5)啓動EMQ ./_rel/emqttd/bin/emqttd console

  (6)打開MQTT客戶端並鏈接、訂閱、發佈等操做

  (7)能夠在消費者界面上看到獲取到的信息

8. 插件源碼

  最後給出本次插件開發的全部源碼

  https://files.cnblogs.com/files/wunaozai/emq_plugin_wunaozai.zip

相關文章
相關標籤/搜索