kafka-node+socket.io 測試配置

1.安裝須要插件javascript

   npm install express html

   npm install  socket.io java

   npm install  kafka-nodenode

2.kafkatest.js文件express

var express = require('express'); 
var app = express();

var server = require('http').createServer(app);
var io = require('socket.io')(server);
var kafka = require('kafka-node');
var users = [];
app.use('/', express.static(__dirname + '/')); 

app.get('/send', function (req, res) {
    var msg=req.query.msg;
    var Producer = kafka.Producer,
    client = new kafka.Client('localhost:2181'),
    producer = new Producer(client);
    payloads = [
        { topic: 'test', messages: msg, partition: 0 },
    ];
    producer.on('ready', function(){
        producer.send(payloads, function(err, data){
            console.log(data);
           // socket.emit('server_counter',data);
        });
    });
    producer.on('error', function(err){})
    
    res.send('輸入消息='+msg);
 })
server.listen(8080);


setTimeout(function(){
    var Consumer = kafka.Consumer;
    var Offset = kafka.Offset;
    var topic = 'test';

    var client = new kafka.Client('localhost:2181');//'localhost:2181'
    var topics = [{ topic: topic, partition: 0 }];
    var options = { autoCommit: false };//, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024

    var consumer = new Consumer(client, topics, options);
    var offset = new Offset(client);
    consumer.on('message', function (message) {
        console.log(message);
        io.sockets.emit('server_counter',message);
      });
      
      consumer.on('error', function (err) {
        console.log('error', err);
      });
      consumer.on('offsetOutOfRange', function (topic) {
        topic.maxNum = 2;
        offset.fetch([topic], function (err, offsets) {
          if (err) {
            return console.error(err);
          }
          var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
          consumer.setOffset(topic.topic, topic.partition, min);
        });
      });
    
},1000);

 

3.counter.htmlnpm

<!DOCTYPE html>
<html>

<head>
    <title>socket</title>
</head>

<body>

<div style="margin: 0 auto;" id='msg'>

</div>
</body>
<script type="text/javascript" src="./node_modules/socket.io-client/dist/socket.io.js"></script>
<script type="text/javascript">
    var socket=io.connect('localhost:8080'),//與服務器進行鏈接
        send=document.getElementById('sendBtn'),
        leave=document.getElementById('leaveBtn');

    //接收來自服務端的信息事件
    socket.on('server_counter',function(msg){
        var div =document.createElement('div');
        div.innerHTML=JSON.stringify(msg);
       document.getElementById('msg').appendChild(div);
    })
</script>

</html>

 

3.效果展現服務器

 啓動 kafkatest.jsapp

 打開http://localhost:8080/counter.htmlsocket

 打開http://localhost:8080/send?msg=謝大神你好fetch

相關文章
相關標籤/搜索