大數據統計分析平臺之1、Kafka單機搭建

一、zookeeper搭建html

  Kafka集羣依賴zookeeper,須要提早搭建好zookeeperjava

   單機模式(7步)(集羣模式進階請移步:http://blog.51cto.com/nileader/795230)

 Step1:python

cd /usr/local/software 

jdk-8u161-linux-x64.rpm
連接:https://pan.baidu.com/s/1i6iHIDJ 密碼:bgcc

rpm -ivh jdk-8u161-linux-x64.rpm

vi /etc/profile

JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATHlinux

source /etc/profilesql

echo $PATHshell

 

Step2:apache

# 下載zookeeperjson

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gzbootstrap

# 若是下載不到,可使用迅雷,或者使用百度雲盤服務器

連接:https://pan.baidu.com/s/1MXYd4UlKWvqB6EcVLyF8cg 密碼:an6t

 

# 解壓

tar -zxvf zookeeper-3.4.11.tar.gz

# 移動一下

mv zookeeper-3.4.11 /usr/local/zookeeper-3.4.11

 

Step3:重命名 zoo_sample.cfg文件

 mv /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg  /usr/local/zookeeper-3.4.11/conf/zoo.cfg

 Step4:vi /usr/local/zookeeper-3.4.11/conf/zoo.cfg,修改

dataDir=/usr/local/zookeeper-3.4.11/data

Step5:建立數據目錄

mkdir  /usr/local/zookeeper-3.4.11/data


Step6:啓動zookeeper:執行

/usr/local/zookeeper-3.4.11/bin/zkServer.sh start

Step7:檢測是否成功啓動:執行

/usr/local/zookeeper-3.4.11/bin/zkCli.sh 
或者
yum install nc -y
echo stat| nc localhost 2181

================================================================================================================

二、下載Kafka

# mkdir -p /usr/local/software
# cd /usr/local/software
# wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

# 百度雲下載地址:
連接:https://pan.baidu.com/s/1Kp0uD_5YjGKOLkbW_igm2g 密碼:v1q7
    kafka_2.12-1.0.0.tgz    //其中2.12-1.0.0爲Scala的版本, kafka-1.0.0-src.tgz爲kafka版本
 
三、解壓
# tar zxf kafka_2.12-1.0.0.tgz -C /usr/local/ # cd /usr/local/ # mv kafka_2.12-1.0.0/ kafka/
四、配置
mkdir -p /usr/local/kafka/kafkaLogs
複製代碼
# vi /usr/local/kafka/config/server.properties

# broker的ID,集羣中每一個broker ID不可相同
broker.id=0
# 監聽器,端口號和port一致便可
listeners=PLAINTEXT:/10.10.6.225/:9092
# Broker的監聽端口
port=9092

# 必須填寫當前服務器IP地址
host.name=10.10.6.225

# 必須填寫當前服務器IP地址
advertised.host.name=10.10.6.225
# 暫未配置集羣
zookeeper.connect=10.10.6.225:2181

# 消息持久化目錄
log.dirs=/usr/local/kafka/kafkaLogs

# 能夠刪除主題
delete.topic.enable=true

# 關閉自動建立topic
auto.create.topics.enable=false
複製代碼

 

五、配置Kafka的環境變量
# vi /etc/profile   export KAFKA_HOME=/usr/local/kafka   export PATH=$PATH:$KAFKA_HOME/bin # source /etc/profile


# vi /etc/hosts

# es爲主機名 ,這裏必定要注意,是主機名!!!!重要的話說三次!!!!!!!!
127.0.0.1 es
10.10.6.225 es
六、啓動與中止Kafka
# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  官方推薦啓動方式:
# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

但這種方式退出shell後會自動斷開

中止:

kafka-server-stop.sh
七、驗證
# jps 2608 Kafka
2236 QuorumPeerMain
2687 Jps
看到Kafka的進程,說明Kafka已經啓動
 
八、建立topic
    建立名爲test,partitions爲3,replication爲3的topic
# kafka-topics.sh --create --zookeeper 10.10.6.225:2181 --partitions 1 --replication-factor 1 --topic test
    查看topic狀態
# kafka-topics.sh --describe --zookeeper 10.10.6.225:2181 --topic test
  Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
   Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
 
    刪除topic
    執行以下命令
# kafka-topics.sh --delete --zookeeper 10.10.6.225:2181 --topic test
九、測試使用Kafka
    發送消息
# kafka-console-producer.sh --broker-list 10.10.6.225:9092 --topic test 輸入如下信息:   This is a message   This is another message
    接收消息
# kafka-console-consumer.sh --bootstrap-server 10.10.6.225:9092 --topic test --from-beginning 
    若看到上輸入的信息說明已經搭建成功。
 
更復雜配置參考:
 
黃海添加於2018-02-11 夜
連接:https://pan.baidu.com/s/1i6HnIzr 密碼:1soq
 
KafkaProducer.py
# http://kafka-python.readthedocs.io/en/master/
# 安裝辦法: # C:\Users\Administrator>pip install kafka-python # Collecting kafka-python # Downloading kafka_python-1.4.1-py2.py3-none-any.whl (235kB) # 100% |████████████████████████████████| 235kB 150kB/s # Installing collected packages: kafka-python # Successfully installed kafka-python-1.4.1 # http://blog.csdn.net/evankaka/article/details/52421314
from kafka import KafkaProducer
from Util.MySQLHelper import *
import json

producer = KafkaProducer(bootstrap_servers='10.10.6.225:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
db = MySQLHelper()
sql = "select ID,RESOURCE_ID_INT,RESOURCE_ID_CHAR,RESOURCE_TITLE,RESOURCE_TYPE_NAME,RESOURCE_FORMAT,RESOURCE_PAGE,CAST(CREATE_TIME AS CHAR) AS CREATE_TIME,DOWN_COUNT,FILE_ID,RESOURCE_TYPE,STRUCTURE_ID,PERSON_ID,PERSON_NAME,IDENTITY_ID from t_resource_info limit 100"
dt = db.query(sql)

print(len(dt))

for row in dt:
producer.send('t_resource_info', row)

producer.flush()

print('恭喜,完成!')
 
 

 

不依賴於MYSQL的數據提交:

import json from kafka import KafkaProducer import datetime # kafka的服務器位置
kafka_servers = '10.10.6.194:9092'

# 日期的轉換器
class DateEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) # 黃海定義的輸出信息的辦法,帶當前時間
def logInfo(msg): i = datetime.datetime.now() print(" %s %s" % (i, msg)) # 統一的topic名稱
topicName = 'test' dt=[{"id":1,"name":"劉備"},{"id":2,"name":"關羽"},{"id":3,"name":"張飛"}] # kafka的生產者
producer = KafkaProducer(bootstrap_servers=kafka_servers) # # 將字段大寫轉爲小寫
for row in dt: new_dics = {} for k, v in row.items(): new_dics[k.lower()] = v jstr = json.dumps(new_dics, cls=DateEncoder) producer.send(topic=topicName, partition=0, value=jstr.encode('utf-8')) # 提交一下
producer.flush() print('恭喜,完成!')

 

 

KafkaConsumer.py

from kafka import KafkaConsumer import time def log(str): t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime()) print("[%s]%s" % (t, str)) log('start consumer') # 消費192.168.120.11:9092上的world 這個Topic,指定consumer group是consumer-20171017 consumer = KafkaConsumer('foobar', bootstrap_servers=['localhost:9092']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) log(recv)

 若是是想讀取kafka記得的全部消費記錄:

from kafka import KafkaConsumer import time # kafka的服務器位置
kafka_servers = '10.10.6.194:9092'
# 統一的topic名稱
topicName = 'test'

def log(str): t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime()) print("[%s]%s" % (t, str)) log('啓動消費者...') # auto_offset_reset='earliest' 這個參數很重要,若是加上了,就是kafka記錄的最後一條位置,若是不加,就是之後要插入的數據了。 #consumer = KafkaConsumer(topicName, auto_offset_reset='earliest', bootstrap_servers=kafka_servers)
consumer = KafkaConsumer(topicName, bootstrap_servers=kafka_servers) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) log(recv)
相關文章
相關標籤/搜索