參考:html
筆記java
原理:node
https://www.cnblogs.com/yinzhengjie/p/9780976.htmlexpress
部署:apache
www.cnblogs.com/yinzhengjie/p/9209319.htmlvim
官網;網絡
kafka的詳細參數看官網,找到相應版本.app
kafka.apache.org 點documentationless
查kafka版本:
/data1/kafka/libssocket
#注意,生產環境中要把kafka的文件放到空間大的目錄下,如/data
wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.11-2.3.1.tgz
tar -zxf kafka_2.11-2.3.1.tgz -C /hongfeng/software/
vim /etc/profile
KAFKA_HOME=/hongfeng/software/kafka_2.11-2.3.1
PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
vim /hongfeng/software/kafka_2.11-2.3.1/bin/kafka-server-start.sh
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
#默認的KAFKA的HEAP內存爲1G,在實際生產環境中顯然是不夠的,在《kafka權威指南》書中說是配置5G,在《Apache Kafka實戰》書中說配置6G,其實差距並非很大,咱們這裏暫且配置6G吧,當時書中的知識是死的,若是Kafka配置了6G的Heap內存嚴重發現Full GC的話,到時候咱們應該學會變通,將其在擴大,但在實際生產環境中,我就是這樣配置的。注意,這樣配置若是你的虛擬機可用內存若是不足6G可能會直接拋出OOM異常喲~
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
export JMX_PORT="9999"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
##從這行命令不難看出,該腳本會調用kafka-run-class.sh,若是咱們在該配置文件中配置HEAP內存,就不要在Kafka-run-class.sh腳本里再去配置了喲,不然當前腳本配置的HEAP將無效!
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
分發/etc/profile和kafka應用程序到其它節點
scp /etc/profile 10.10.77.8:/etc/profile
scp /etc/profile 10.10.24.86:/etc/profile
scp -r kafka_2.11-2.3.1 10.10.77.8:/hongfeng/software/
scp -r kafka_2.11-2.3.1 10.10.24.86:/hongfeng/software/
修改kafka的配置文件(server.properties)
#各個節點都要改,不一樣的地方
vim /hongfeng/software/kafka_2.11-2.3.1/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=101 #各個節點id要不同
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.10.24.86:9092 #改爲本節點的ip
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://10.10.24.86:9092 #改爲本節點的ip
# Hostname and port the broker will advertise to producers and consumers. If not set,
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=30 #處理網絡請求的最大線程數
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=30 #處理磁盤I/O的線程數
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
num.partitions=20 #每一個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# The interval at which log segments are checked to see if they can be deleted according
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs #可本身定義log的目錄,生產中須要改
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 # 日誌保存時間 (hours|minutes),默認爲7天(168小時)。超過這個時間會根據policy處理數據。bytes和minutes不管哪一個先達到都會觸發。
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# The interval at which log segments are checked to see if they can be deleted according
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.10.37.27:2181,10.10.77.8:2181,10.10.24.86:2181
#生產中若是有多個kafka,須要在ZK中爲每一個kafka建一個目錄,採起下面的方式
#zookeeper.connect=10.52.110.48:2182,10.52.48.92:2182,10.52.60.235:2182/kafka01
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
啓動kafka:
每一個節點上執行:
kafka-server-start.sh /hongfeng/software/kafka_2.11-2.3.1/config/server.properties >> /dev/null &
檢查:
用ansible每一個節點執行jps, 看有沒有kafka的進程
新加節點:
新加節點後數據沒有. 一個是kafka配置文件設置了均衡,過一段時間會知道均衡
二是手動用調副本的命令調leader.
新節點一臺臺加,再舊節點一臺臺下.退役: 一臺一臺的下kafka-server-stop.sh