替代Flume——Kafka Connect簡介

file 咱們知道過去對於Kafka的定義是分佈式,分區化的,帶備份機制的日誌提交服務。也就是一個分佈式的消息隊列,這也是他最多見的用法。可是Kafka不止於此,打開最新的官網。正則表達式

file

咱們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform數據庫

分佈式流處理平臺。apache

file

這裏也清晰的描述了Kafka的特色:Kafka用於構建實時數據管道和流式應用程序。它具備水平可擴展性、容錯性、速度極快,並在數千家公司投入生產。json

因此如今的Kafka已經不只是一個分佈式的消息隊列,更是一個流處理平臺。這源於它於0.9.0.0和0.10.0.0引入的兩個全新的組件Kafka Connect與Kafka Streaming。bootstrap

Kafka Connect簡介

咱們知道消息隊列必須存在上下游的系統,對消息進行搬入搬出。好比經典的日誌分析系統,經過flume讀取日誌寫入kafka,下游由storm進行實時的數據處理。服務器

file

Kafka Connect的做用就是替代Flume,讓數據傳輸這部分工做能夠由Kafka Connect來完成。Kafka Connect是一個用於在Apache Kafka和其餘系統之間可靠且可靠地傳輸數據的工具。它能夠快速地將大量數據集合移入和移出Kafka。架構

Kafka Connect的導入做業能夠將數據庫或從應用程序服務器收集的數據傳入到Kafka,導出做業能夠將Kafka中的數據傳遞到查詢系統,也能夠傳輸到批處理系統以進行離線分析。框架

Kafka Connect功能包括:分佈式

  • 一個通用的Kafka鏈接的框架 - Kafka Connect規範化了其餘數據系統與Kafka的集成,簡化了鏈接器開發,部署和管理
  • 分佈式和獨立模式 - 支持大型分佈式的管理服務,也支持小型生產環境的部署
  • REST界面 - 經過易用的REST API提交和管理Kafka Connect
  • 自動偏移管理 - 只需從鏈接器獲取一些信息,Kafka Connect就能夠自動管理偏移量提交過程,所以鏈接器開發人員無需擔憂鏈接器開發中偏移量提交這部分的開發
  • 默認狀況下是分佈式和可擴展的 - Kafka Connect構建在現有的組管理協議之上。能夠添加擴展集羣
  • 流媒體/批處理集成 - 利用Kafka現有的功能,Kafka Connect是橋接流媒體和批處理數據系統的理想解決方案

file

運行Kafka Connect

Kafka Connect目前支持兩種運行模式:獨立和集羣。ide

獨立模式

在獨立模式下,只有一個進程,這種更容易設置和使用。可是沒有容錯功能。

啓動:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
獨立模式配置

第一個參數config/connect-standalone.properties是一些基本的配置:

這幾個在獨立和集羣模式下都須要設置:

#bootstrap.servers   kafka集羣列表
bootstrap.servers=localhost:9092
#key.converter       key的序列化轉換器  好比json的  key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter     value的序列化轉換器
value.converter=org.apache.kafka.connect.json.JsonConverter

#獨立模式特有的配置:
#offset.storage.file.filename       用於存儲偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
獨立模式鏈接器配置(配置文件)

後面的參數connector1.properties [connector2.properties ...] 能夠多個,是鏈接器配置內容

這裏咱們配置一個從文件讀取數據並存入kafka的配置:

connect-file-sink.properties

  • name - 鏈接器的惟一名稱。嘗試再次使用相同名稱註冊將失敗。

  • connector.class - 鏈接器的Java類 此鏈接器的類的全名或別名。這裏咱們選擇FileStreamSink

  • tasks.max - 應爲此鏈接器建立的最大任務數。若是鏈接器沒法達到此級別的並行性,則可能會建立更少的任務。

  • key.converter - (可選)覆蓋worker設置的默認密鑰轉換器。

  • value.converter - (可選)覆蓋worker設置的默認值轉換器。

    下面兩個必須設置一個:

    • topics - 以逗號分隔的主題列表,用做此鏈接器的輸入
    • topics.regex - 用做此鏈接器輸入的主題的Java正則表達式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

能夠在鏈接器中配置轉換器

須要指定參數:

  • transforms - 轉換的別名列表,指定將應用轉換的順序。
  • transforms.$alias.type - 轉換的徹底限定類名。
  • transforms.$alias.$transformationSpecificConfig 轉換的配置屬性

例如,咱們把剛纔的文件轉換器的內容添加字段

首先設置connect-standalone.properties

key.converter.schemas.enable = false
value.converter.schemas.enable = false

設置connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

沒有轉換前的結果:

"foo"
"bar"
"hello world"

轉換後:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

經常使用轉換類型:

  • InsertField - 使用靜態數據或記錄元數據添加字段
  • ReplaceField - 過濾或重命名字段
  • MaskField - 用類型的有效空值替換字段(0,空字符串等)
  • ValueToKey Value轉換爲Key
  • HoistField - 將整個事件做爲單個字段包裝在Struct或Map中
  • ExtractField - 從Struct和Map中提取特定字段,並在結果中僅包含此字段
  • SetSchemaMetadata - 修改架構名稱或版本
  • TimestampRouter - 根據原始主題和時間戳修改記錄主題
  • RegexRouter - 根據原始主題,替換字符串和正則表達式修改記錄主題

集羣模式

集羣模式下,能夠擴展,容錯。

啓動:
> bin/connect-distributed.sh config/connect-distributed.properties

在集羣模式下,Kafka Connect在Kafka主題中存儲偏移量,配置和任務狀態。

集羣模式配置

connect-distributed.properties

#也須要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#還有一些配置要注意
#group.id(默認connect-cluster) - Connect的組id 請注意,這不得與使用者的組id 衝突
group.id=connect-cluster

#用於存儲偏移的主題; 此主題應具備許多分區
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

#用於存儲鏈接器和任務配置的主題  只能一個分區
config.storage.topic=connect-configs
config.storage.replication.factor=1

#用於存儲狀態的主題; 此主題能夠有多個分區
status.storage.topic=connect-status
status.storage.replication.factor=1

在集羣模式下,配置並不會在命令行傳進去,而是須要REST API來建立,修改和銷燬鏈接器。

集羣模式鏈接器配置(REST API)

能夠配置REST API服務器,支持http與https

listeners=http://localhost:8080,https://localhost:8443

默認狀況下,若是未listeners指定,則REST服務器使用HTTP協議在端口8083上運行。

如下是當前支持的REST API:

  • GET /connectors - 返回活動鏈接器列表
  • POST /connectors - 建立一個新的鏈接器; 請求主體應該是包含字符串name字段的JSON對象和包含config鏈接器配置參數的對象字段
  • GET /connectors/{name} - 獲取有關特定鏈接器的信息
  • GET /connectors/{name}/config - 獲取特定鏈接器的配置參數
  • PUT /connectors/{name}/config - 更新特定鏈接器的配置參數
  • GET /connectors/{name}/status - 獲取鏈接器的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪一個工做人員,錯誤信息(若是失敗)以及全部任務的狀態
  • GET /connectors/{name}/tasks - 獲取當前爲鏈接器運行的任務列表
  • GET /connectors/{name}/tasks/{taskid}/status - 獲取任務的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪一個工做人員,以及錯誤信息是否失敗
  • PUT /connectors/{name}/pause - 暫停鏈接器及其任務,這將中止消息處理,直到恢復鏈接器
  • PUT /connectors/{name}/resume - 恢復暫停的鏈接器(若是鏈接器未暫停,則不執行任何操做)
  • POST /connectors/{name}/restart - 從新啓動鏈接器(一般是由於它已經失敗)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重啓個別任務(一般由於失敗)
  • DELETE /connectors/{name} - 刪除鏈接器,暫停全部任務並刪除其配置

鏈接器開發指南

kakfa容許開發人員本身去開發一個鏈接器。

核心概念

要在Kafka和其餘系統之間複製數據,用戶須要建立一個Connector

Connector有兩種形式:

SourceConnectors從另外一個系統導入數據,例如,JDBCSourceConnector將關係數據庫導入Kafka

SinkConnectors導出數據,例如,HDFSSinkConnector將Kafka主題的內容導出到HDFS文件

和對應的Task:

SourceTaskSinkTask

Task造成輸入輸出流,開發Task要注意偏移量的問題。

每一個流應該是一系列鍵值記錄。還須要按期提交已處理的數據的偏移量,以便在發生故障時,處理能夠從上次提交的偏移量恢復。Connector還須要是動態的,實現還負責監視外部系統是否存在任何更改。

開發一個簡單的鏈接器

開發鏈接器只須要實現兩個接口,即ConnectorTask

這裏咱們簡單開發一個FileStreamConnector。

此鏈接器是爲在獨立模式下使用,SourceConnectorSourceTask讀取文件的每一行,SinkConnectorSinkTask每一個記錄寫入一個文件。

鏈接器示例:

繼承SourceConnector,添加字段(要讀取的文件名和要將數據發送到的主題)

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

定義實際讀取數據的類

@Override
public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;
}

FileStreamSourceTask下面定義該類。接下來,咱們添加一些標準的生命週期方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
}
 
@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最後,實施的真正核心在於taskConfigs()

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new HashMap<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    configs.add(config);
    return configs;
}

任務示例:

源任務

實現SourceTask 建立FileStreamSourceTask繼承SourceTask

public class FileStreamSourceTask extends SourceTask {
    String filename;
    InputStream stream;
    String topic;
 
    @Override
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    }
 
    @Override
    public synchronized void stop() {
        stream.close();
    }

接下來,咱們實現任務的主要功能,即poll()從輸入系統獲取事件並返回如下內容的方法List

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}
接收任務

不像SourceConnectorSinkConnectorSourceTaskSinkTask有很是不一樣的接口,由於SourceTask採用的是拉接口,並SinkTask使用推接口。二者共享公共生命週期方法,但SinkTask徹底不一樣:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }
 
    public abstract void put(Collection<SinkRecord> records);
 
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

這是一個簡單的例子,它們有簡單的結構化數據 - 每一行只是一個字符串。幾乎全部實用的鏈接器都須要具備更復雜數據格式的模式。要建立更復雜的數據,您須要使用Kafka Connect dataAPI。

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();
 
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

更多Kafka相關技術文章:

什麼是Kafka? Kafka監控工具彙總 Kafka快速入門 Kafka核心之Consumer Kafka核心之Producer

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算

file

相關文章
相關標籤/搜索