pycapa網絡數據採集器是metron-sensors模塊下的一個低效能網絡數據採集。其雖然採集的效率比不上fastcapa,可是其結構簡單,實用性比較強。特別方便測試使用。在筆者的另一篇關於Metron的文章中已經對pycapa進行了一些簡單的介紹,這裏就再也不累述,而是重點說明在使用它的過程當中遇到的一些問題以及介紹其代碼實現的邏輯。html
在前面的文章中,筆者採集了網絡上的數據,並利用pycapa自產自消,將消費的數據轉化成了pcap的格式。那麼是否能夠認爲pcap發送到kafka中的數據就是pcap格式的了,答案是否認的。 咱們採用和上文同樣的方法,只是將消費的代碼替換爲Kafka自帶的程序,此時發現,雖然是ASCII的格式,可是從內容上仍然能夠明顯看出這不是pcap格式的。可是咱們又確實利用其自帶的consumer程序,生成了一個pcap 文件,這彷佛是矛盾的,那麼producer發送到kafa中的格式是怎樣了? 先看官方對於發送和消費的說明python
Pycapa has two primary runtime modes.git
Producer Mode: Pycapa can capture packets from a network interface and forward those packets to a Kafka topic. Pycapa embeds the raw network packet data in the Kafka message body. The message key contains the timestamp indicating when the packet was captured in microseconds from the epoch, in network byte order.github
根據其上的說法,kafka的message中用原始的packet做爲數據負載,一個時間戳做爲key值。bootstrap
Consumer Mode: Pycapa can also perform the reverse operation. It can consume packets from Kafka and reconstruct each network packet. This can then be used to create a libpcap-compliant file or even to feed directly into a tool like Wireshark to monitor activity。網絡
這裏特別說明了,在消費模式下,它消費了Kafka中的數據之後,能夠生成一個兼容libpcap的文件。這樣看來,能夠猜想,發送到kafka中的數據,並非標準的pcap的格式,之因此用pycap自帶的腳本消費能夠生成標準的pcap文件,是由於在消費的腳本里面作了相關的動做所致,而kafka原始的消費者因爲沒有作對應的動做,因此生成的文件也不能是pcap文件。函數
雖然上面的解釋說明了,設計就是這樣,並非BUG或者使用上的問題,可是仍然沒有說明其具體的格式,沒辦法,只能看代碼了。 下面是producer.py的核心發送代碼:測試
def producer(args, sniff_timeout_ms=500, sniff_promisc=True): """ Captures packets from a network interface and sends them to a Kafka topic. """ # setup the signal handler signal.signal(signal.SIGINT, signal_handler) global producer_args producer_args = args # connect to kafka logging.info("Connecting to Kafka; %s", args.kafka_configs) kafka_producer = Producer(args.kafka_configs) # initialize packet capture logging.info("Starting packet capture") capture = pcapy.open_live(args.interface, args.snaplen, sniff_promisc, sniff_timeout_ms) pkts_in = 0 try: while not finished.is_set() and (args.max_packets <= 0 or pkts_in < args.max_packets): # capture a packet (pkt_hdr, pkt_raw) = capture.next()
注意上面的pcap_open_live函數,其主要的功能就是採集數據。之前提到過,pcapy本質上就是在原來的libpcap上面封裝的python庫,因此其返回和定義都有C的影子在裏面。
首先說明一下pcap_open_live的參數 第一個參數:device,抓取的網卡名稱.
第二個參數:snaplen(maximum number of bytes to capture _per_packet)抓取的數據包的最大長度,如以太網的話,通常就是1500 第三個參數:promiscious mode (1 for true)混雜模式否。
第四個參數: timeout 時間(單位是毫秒)。 返回值是一個對象指針。由於是一個對象指針,因此能夠在下面循環的代碼中能夠看到其實pycap只取了pkt_hdr和pkt_raw這兩個結構體。 先看pkt_hdr,當本地化的時候,該結構體裏面其實就是3個參數:ts,caplen,lenspa
當調用p_getts的時候,就將返回兩個時間。
具體代碼能夠參考: https://github.com/CoreSecurity/pcapy.net
在獲取了對象之後,其實真正採集的是
# capture a packet (pkt_hdr, pkt_raw) = capture.next()
這裏面的pkt_hdr表明pcap裏面的一些基礎信息, pkt_raw 就是真正的數據包。從下面連接的這段代碼中能夠比較明顯的看出區別:https://gist.github.com/dbrown29/3816908 在瞭解了pcap的大體邏輯後,咱們繼續追蹤producer代碼:
if pkt_hdr is not None: logging.debug("Packet received: pkts_in=%d, pkt_len=%s", pkts_in, pkt_hdr.getlen()) pkts_in += 1 pkt_ts = timestamp(pkt_hdr) kafka_producer.produce(args.kafka_topic, key=pack_ts(pkt_ts), value=pkt_raw, callback=delivery_callback) # pretty print, if needed if args.pretty_print > 0 and pkts_in % args.pretty_print == 0: print 'Packet received[%s]' % (pkts_in) # serve the callback queue kafka_producer.poll(0)
咱們先看Kafka的key值得來源:
pkt_ts = timestamp(pkt_hdr) def timestamp(pkt_hdr): """ Returns the timestamp of the packet in epoch milliseconds. """ (epoch_secs, delta_micros) = pkt_hdr.getts() epoch_micros = (epoch_secs * 1000000.0) + delta_micros return epoch_micros
結合上面提到的pkt_hdr.getts返回的將是兩個時間,能夠得出這個轉換爲ms的時間戳,並將其做爲messages的key發送到kafka。 這裏就能夠看出使用這個Key的優越性了,因爲Kafka的消息分發,默認是按照即hash(key) % numPartitions 的方式進行分發的,根據時間的話也就是說必定時間範圍內的消息基本上都是分發到同一個分區的,這樣就加快了Kafka檢索的速度。(能夠參看:https://www.cnblogs.com/huxi2b/p/4757098.html) 因此Kafka中的消息其實是與其餘Kafka消息字段相同,只是Key(時間戳)+Value(raw packet)而已。 具體能夠看下面的運行狀況:
$ pycapa --producer \ --interface eth0 \ --kafka-broker localhost:9092 \ --kafka-topic pcap \ --pretty-print 5 INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'UAWINMBDNQEH'} INFO:root:Starting packet capture Packet received[5] Packet delivered[5]: date=2017-05-08 14:48:54.474031 topic=pcap partition=0 offset=29086 len=42 Packet received[10] Packet received[15] Packet delivered[10]: date=2017-05-08 14:48:58.879710 topic=pcap partition=0 offset=0 len=187 Packet delivered[15]: date=2017-05-08 14:48:59.633127 topic=pcap partition=0 offset=0 len=43 Packet received[20] Packet delivered[20]: date=2017-05-08 14:49:01.949628 topic=pcap partition=0 offset=29101 len=134 Packet received[25] ^C INFO:root:Clean shutdown process started Packet delivered[25]: date=2017-05-08 14:49:03.589940 topic=pcap partition=0 offset=0 len=142 INFO:root:Waiting for '1' message(s) to flush INFO:root:'27' packet(s) in, '27' packet(s) out
從上面咱們能夠看到時間戳,分區編號等。對應到代碼上則是這樣:
def delivery_callback(err, msg): """ Callback executed when message delivery either succeeds or fails. """ # initialize counter, if needed if not hasattr(delivery_callback, "pkts_out"): delivery_callback.pkts_out = 0 if err: logging.error("message delivery failed: error=%s", err) elif msg is not None: delivery_callback.pkts_out += 1 pretty_print = 0 pretty_print = producer_args.pretty_print if pretty_print > 0 and delivery_callback.pkts_out % pretty_print == 0: print 'Packet delivered[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % ( delivery_callback.pkts_out, to_date(unpack_ts(msg.key())), msg.topic(), msg.partition(), msg.offset(), len(msg.value()))
仔細看起打印的日誌,時間戳的來源其實就是msg中的key。分區長度等就是kafk消息中的內容。其餘相似,整個kafka中的消息以下所示: 須要注意的是,在Kafka的文檔以及源碼中,消息(Message)並不包括它的offset。Kafka的log是由一條一條的記錄構成的,Kafka並無給這種記錄起個專門的名字,可是須要記住的是這個「記錄」並不等於"Message"。Offset MessageSize Message加在一塊兒,構成一條記錄。而在Kafka Protocol中,Message具體的格式爲
Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Key => bytes Value => bytes
爲了進一步覈實代碼的狀況,咱們將producer.py的代碼略作修改,將value的值單獨寫成一個文件:
pkt_ts = timestamp(pkt_hdr) kafka_producer.produce(args.kafka_topic, key=pack_ts(pkt_ts), value=pkt_raw, callback=delivery_callback) f=open("test123.pcap","ab+") f.write(pkt_raw) f.close()
將生成的文件用Hex方式打開查看以下: 能夠看到,value 就是一個完整的數據包。而不是一個pcap格式的數據。
前面解釋了Kafka中的消息,若是要解析其中的消息,顯然主要就是要解析其中的value值。前面提到,在使用pycpa的時候,是能夠生成標準的pcap文件的,但根據producer的代碼看,發出的msg_value是原始數據,key是時間,若是要生成pcap的格式,須要從新彙總一下應該才能夠的。 查看conusmer.Py發現:
# write the packet header and packet sys.stdout.write(packet_header(msg)) sys.stdout.write(msg.value()) sys.stdout.flush()
consumer在輸出的時候,除了輸出msg.value(實際就是發送的原始包之外)還輸出了2個包頭。
#構建pcap global header def global_header(args, magic=0xa1b2c3d4L, version_major=2, version_minor=4, zone=0, sigfigs=0, network=1): """ Returns the global header used in libpcap-compliant file. """ return struct.pack("IHHIIII", magic, version_major, version_minor, zone, sigfigs, args.snaplen, network) # if 'pretty-print' not set, write libpcap global header if args.pretty_print == 0: sys.stdout.write(global_header(args)) sys.stdout.flush() #包信息 def packet_header(msg): """ Returns the packet header used in a libpcap-compliant file. """ epoch_micros = struct.unpack_from(">Q", bytes(msg.key()), 0)[0] secs = epoch_micros / 1000000 usec = epoch_micros % 1000000 caplen = wirelen = len(msg.value()) hdr = struct.pack('IIII', secs, usec, caplen, wirelen) return hdr
第一個全局包,就是標準的pcap格式包(參見:https://blog.csdn.net/u013793399/article/details/51474831)共24byte,後面一部分16個字節組成了一個packer包頭,其包含secs,usec,caplen,wirelen幾個參數。在consumer文件中的位置如圖所示:。正由於這兩個header的存在,因此consumer才能夠轉存成pcap的文件格式。