kafka模擬生產-消費者以及自定義分區

基本概念

kafka中的重要角色
  broker:一臺kafka服務器就是一個broker,一個集羣可有多個broker,一個broker能夠容納多個topic
  topic:能夠理解爲一個消息隊列的名字
  partition:分區,爲了實現擴展性,一個topic能夠分佈到多個broker上,一個topic能夠被分紅多個partition,partition中的每條消息 都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體的順序。也就是說,一個topic在集羣中能夠有多個partition 。kafka有Key Hash算法和Round Robin算法兩種分區策略。
  producer:消息的生產者,是向kafka發消息的客戶端
  consumer:消息消費者,向broker取消息的客戶端
  offset:偏移量,用來記錄consumer消費消息的位置
  Consumer Group:消費組,消息系統有兩類,一是廣播,二是訂閱發佈。java

編碼實現

  建立一個生產者算法

package sancen.kafka

import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

/**
  * 類名  ProducerDemo
  * 做者   彭三青
  * 建立時間  2018-11-26 9:49
  * 版本  1.0
  * 描述: $ 實現一個生產者,把模擬數據發送到kafka集羣
  */

object ProducerDemo {
  def main(args: Array[String]): Unit = {
    // 定義一個接收數據的topic
    val topic = "test"
    // 建立配置信息
    val props = new Properties()
    // 指定序列化類
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    // 指定kafka列表
    props.put("metadata.broker.list", "SC01:9092, SC01:9092, SC03:9092")
    // 設置發送數據後的響應方式
    props.put("request.required.acks", "0")
    // 指定分區器
    // props.put("partitioner.class", "kafka.producer.DefaultPartitioner
    // 自定義分區器
    props.put("partitioner.class", "day01.kafka.CustomPartitioner")
    // 建立producer對象
    val config: ProducerConfig = new ProducerConfig(props)
    // 建立生產者對象
    val producer: Producer[String, String] = new Producer(config)

    // 模擬數據
    for(i <- 1 to 10000){
      val msg = s"$i : producer send data"
      producer.send(new KeyedMessage[String, String](topic, msg)) //key偏移量,也能夠給空 v實際的數據
      Thread.sleep(500)
    }
  }
}

  建立消費者apache

package sancen.kafka

import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}

import kafka.consumer._

import scala.collection.mutable

/**
  * 類名  ConsumerDemo
  * 做者   彭三青
  * 建立時間  2018-11-26 10:08
  * 版本  1.0
  * 描述: $ 建立一個Consumer消費kafka的數據
  */

class ConsumerDemo(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{
  override def run(): Unit = {
    val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
    while (it.hasNext()){
      val data = it.next()
      val topic = data.topic
      val partition = data.partition
      val offset = data.offset
      val msg: String = new String(data.message())
      println(s"Consumer:$consumer, topic:$topic, partiton:$partition, offset:$offset, msg:$msg")
    }
  }
}

object ConsumerDemo {
  def main(args: Array[String]): Unit = {
    // 定義獲取的topic
    val topic = "test"
    // 定義一個map,用來存儲多個topic key:topic名稱,value:指定線程數用來獲取topic的數據
    val topics = new mutable.HashMap[String, Int]() // 要求就要傳一個map,能夠放一個或者多個topic
    topics.put(topic, 2)
    // 配置信息
    val props = new Properties()
    // 指定consumer組名
    props.put("group.id", "group02")
    // 指定zk列表
    props.put("zookeeper.connect", "SC01:2181,SC02:2181,SC03:2181")
    // 指定offset異常時須要獲取的offset值
    props.put("auto.offset.reset", "smallest")
    // 建立配置信息
    val config = new ConsumerConfig(props)
    // 建立consumer對象
    val consumer: ConsumerConnector = Consumer.create(config)
    // 獲取數據,返回的map類型中key:topic名稱,value:topic對應的數據
    val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumer.createMessageStreams(topics)
    // 獲取指定topic的數據
    val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
    // 建立固定大小的線程池
    val pool: ExecutorService = Executors.newFixedThreadPool(3)
     for(i <- 0 until stream.size){
       pool.execute(new ConsumerDemo(s"Consumer:$i", stream.get(i)))
     }
  }
}

  建立自定義分區類服務器

package sancen.kafka

import kafka.producer.Partitioner
import kafka.utils.VerifiableProperties
import org.apache.kafka.common.utils.Utils

/**
  * 類名  CustomPartitioner
  * 做者   彭三青
  * 建立時間  2018-11-26 20:29
  * 版本  1.0
  * 描述: $
  */

// 要實現自定義分區器必需要繼承Partitioner
class CustomPartitioner(props: VerifiableProperties) extends Partitioner{
  override def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

程序測試

  後臺啓動kafka集羣ide

kafka-server-start.sh kafka_2.11-0.9.0.1/config/server.properties &

  在kafka集羣上建立一個名爲test的topic,指定分區爲2,通常一個topic對應一個分區測試

kafka-topics.sh --create --zookeeper SC01:2181 --replication-factor 2 --partitions 2 --topic test

  分別運行ProducerDemo和ConsumerDemo則能夠在ConsumerDemo端窗口打印出信息
在這裏插入圖片描述ui

相關文章
相關標籤/搜索