咱們知道過去對於Kafka的定義是分佈式,分區化的,帶備份機制的日誌提交服務。也就是一個分佈式的消息隊列,這也是他最多見的用法。可是Kafka不止於此,打開最新的官網。正則表達式
咱們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform數據庫
分佈式流處理平臺。apache
這裏也清晰的描述了Kafka的特色:Kafka用於構建實時數據管道和流式應用程序。它具備水平可擴展性、容錯性、速度極快,並在數千家公司投入生產。json
因此如今的Kafka已經不只是一個分佈式的消息隊列,更是一個流處理平臺。這源於它於0.9.0.0和0.10.0.0引入的兩個全新的組件Kafka Connect與Kafka Streaming。bootstrap
咱們知道消息隊列必須存在上下游的系統,對消息進行搬入搬出。好比經典的日誌分析系統,經過flume讀取日誌寫入kafka,下游由storm進行實時的數據處理。服務器
Kafka Connect的做用就是替代Flume,讓數據傳輸這部分工做能夠由Kafka Connect來完成。Kafka Connect是一個用於在Apache Kafka和其餘系統之間可靠且可靠地傳輸數據的工具。它能夠快速地將大量數據集合移入和移出Kafka。架構
Kafka Connect的導入做業能夠將數據庫或從應用程序服務器收集的數據傳入到Kafka,導出做業能夠將Kafka中的數據傳遞到查詢系統,也能夠傳輸到批處理系統以進行離線分析。框架
Kafka Connect功能包括:分佈式
Kafka Connect目前支持兩種運行模式:獨立和集羣。ide
在獨立模式下,只有一個進程,這種更容易設置和使用。可是沒有容錯功能。
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一個參數config/connect-standalone.properties是一些基本的配置:
這幾個在獨立和集羣模式下都須要設置:
#bootstrap.servers kafka集羣列表 bootstrap.servers=localhost:9092 #key.converter key的序列化轉換器 好比json的 key.converter=org.apache.kafka.connect.json.JsonConverter #value.converter value的序列化轉換器 value.converter=org.apache.kafka.connect.json.JsonConverter #獨立模式特有的配置: #offset.storage.file.filename 用於存儲偏移量的文件 offset.storage.file.filename =/home/kafka/connect.offsets
後面的參數connector1.properties [connector2.properties ...] 能夠多個,是鏈接器配置內容
這裏咱們配置一個從文件讀取數據並存入kafka的配置:
connect-file-sink.properties
name
- 鏈接器的惟一名稱。嘗試再次使用相同名稱註冊將失敗。
connector.class
- 鏈接器的Java類 此鏈接器的類的全名或別名。這裏咱們選擇FileStreamSink
tasks.max
- 應爲此鏈接器建立的最大任務數。若是鏈接器沒法達到此級別的並行性,則可能會建立更少的任務。
key.converter
- (可選)覆蓋worker設置的默認密鑰轉換器。
value.converter
- (可選)覆蓋worker設置的默認值轉換器。
下面兩個必須設置一個:
topics
- 以逗號分隔的主題列表,用做此鏈接器的輸入topics.regex
- 用做此鏈接器輸入的主題的Java正則表達式name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
能夠在鏈接器中配置轉換器
須要指定參數:
transforms
- 轉換的別名列表,指定將應用轉換的順序。transforms.$alias.type
- 轉換的徹底限定類名。transforms.$alias.$transformationSpecificConfig
轉換的配置屬性例如,咱們把剛纔的文件轉換器的內容添加字段
首先設置connect-standalone.properties
key.converter.schemas.enable = false value.converter.schemas.enable = false
設置connect-file-source.properties
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test transforms=MakeMap, InsertSource transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.MakeMap.field=line transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.InsertSource.static.field=data_source transforms.InsertSource.static.value=test-file-source
沒有轉換前的結果:
"foo" "bar" "hello world"
轉換後:
{"line":"foo","data_source":"test-file-source"} {"line":"bar","data_source":"test-file-source"} {"line":"hello world","data_source":"test-file-source"}
經常使用轉換類型:
集羣模式下,能夠擴展,容錯。
> bin/connect-distributed.sh config/connect-distributed.properties
在集羣模式下,Kafka Connect在Kafka主題中存儲偏移量,配置和任務狀態。
connect-distributed.properties
#也須要基本的配置 bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter #還有一些配置要注意 #group.id(默認connect-cluster) - Connect的組id 請注意,這不得與使用者的組id 衝突 group.id=connect-cluster #用於存儲偏移的主題; 此主題應具備許多分區 offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #用於存儲鏈接器和任務配置的主題 只能一個分區 config.storage.topic=connect-configs config.storage.replication.factor=1 #用於存儲狀態的主題; 此主題能夠有多個分區 status.storage.topic=connect-status status.storage.replication.factor=1
在集羣模式下,配置並不會在命令行傳進去,而是須要REST API來建立,修改和銷燬鏈接器。
能夠配置REST API服務器,支持http與https
listeners=http://localhost:8080,https://localhost:8443
默認狀況下,若是未listeners
指定,則REST服務器使用HTTP協議在端口8083上運行。
如下是當前支持的REST API:
GET /connectors
- 返回活動鏈接器列表POST /connectors
- 建立一個新的鏈接器; 請求主體應該是包含字符串name
字段的JSON對象和包含config
鏈接器配置參數的對象字段GET /connectors/{name}
- 獲取有關特定鏈接器的信息GET /connectors/{name}/config
- 獲取特定鏈接器的配置參數PUT /connectors/{name}/config
- 更新特定鏈接器的配置參數GET /connectors/{name}/status
- 獲取鏈接器的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪一個工做人員,錯誤信息(若是失敗)以及全部任務的狀態GET /connectors/{name}/tasks
- 獲取當前爲鏈接器運行的任務列表GET /connectors/{name}/tasks/{taskid}/status
- 獲取任務的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪一個工做人員,以及錯誤信息是否失敗PUT /connectors/{name}/pause
- 暫停鏈接器及其任務,這將中止消息處理,直到恢復鏈接器PUT /connectors/{name}/resume
- 恢復暫停的鏈接器(若是鏈接器未暫停,則不執行任何操做)POST /connectors/{name}/restart
- 從新啓動鏈接器(一般是由於它已經失敗)POST /connectors/{name}/tasks/{taskId}/restart
- 重啓個別任務(一般由於失敗)DELETE /connectors/{name}
- 刪除鏈接器,暫停全部任務並刪除其配置kakfa容許開發人員本身去開發一個鏈接器。
要在Kafka和其餘系統之間複製數據,用戶須要建立一個Connector
Connector有兩種形式:
SourceConnectors
從另外一個系統導入數據,例如,JDBCSourceConnector
將關係數據庫導入Kafka
SinkConnectors
導出數據,例如,HDFSSinkConnector
將Kafka主題的內容導出到HDFS文件
和對應的Task:
SourceTask
和SinkTask
Task造成輸入輸出流,開發Task要注意偏移量的問題。
每一個流應該是一系列鍵值記錄。還須要按期提交已處理的數據的偏移量,以便在發生故障時,處理能夠從上次提交的偏移量恢復。Connector還須要是動態的,實現還負責監視外部系統是否存在任何更改。
開發鏈接器只須要實現兩個接口,即Connector
和Task
。
這裏咱們簡單開發一個FileStreamConnector。
此鏈接器是爲在獨立模式下使用,SourceConnector/
SourceTask讀取文件的每一行,
SinkConnector/
SinkTask每一個記錄寫入一個文件。
繼承SourceConnector,添加字段(要讀取的文件名和要將數據發送到的主題)
public class FileStreamSourceConnector extends SourceConnector { private String filename; private String topic;
定義實際讀取數據的類
@Override public Class<? extends Task> taskClass() { return FileStreamSourceTask.class; }
在FileStreamSourceTask
下面定義該類。接下來,咱們添加一些標準的生命週期方法,start()
和stop()
@Override public void start(Map<String, String> props) { // The complete version includes error handling as well. filename = props.get(FILE_CONFIG); topic = props.get(TOPIC_CONFIG); } @Override public void stop() { // Nothing to do since no background monitoring is required. }
最後,實施的真正核心在於taskConfigs()
@Override public List<Map<String, String>> taskConfigs(int maxTasks) { ArrayList<Map<String, String>> configs = new ArrayList<>(); // Only one input stream makes sense. Map<String, String> config = new HashMap<>(); if (filename != null) config.put(FILE_CONFIG, filename); config.put(TOPIC_CONFIG, topic); configs.add(config); return configs; }
實現SourceTask
建立FileStreamSourceTask繼承SourceTask
public class FileStreamSourceTask extends SourceTask { String filename; InputStream stream; String topic; @Override public void start(Map<String, String> props) { filename = props.get(FileStreamSourceConnector.FILE_CONFIG); stream = openOrThrowError(filename); topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); } @Override public synchronized void stop() { stream.close(); }
接下來,咱們實現任務的主要功能,即poll()
從輸入系統獲取事件並返回如下內容的方法List
:
@Override public List<SourceRecord> poll() throws InterruptedException { try { ArrayList<SourceRecord> records = new ArrayList<>(); while (streamValid(stream) && records.isEmpty()) { LineAndOffset line = readToNextLine(stream); if (line != null) { Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename); Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset); records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line)); } else { Thread.sleep(1); } } return records; } catch (IOException e) { // Underlying stream was killed, probably as a result of calling stop. Allow to return // null, and driving thread will handle any shutdown if necessary. } return null; }
不像SourceConnector
和SinkConnector
,SourceTask
並SinkTask
有很是不一樣的接口,由於SourceTask
採用的是拉接口,並SinkTask
使用推接口。二者共享公共生命週期方法,但SinkTask
徹底不一樣:
public abstract class SinkTask implements Task { public void initialize(SinkTaskContext context) { this.context = context; } public abstract void put(Collection<SinkRecord> records); public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { }
這是一個簡單的例子,它們有簡單的結構化數據 - 每一行只是一個字符串。幾乎全部實用的鏈接器都須要具備更復雜數據格式的模式。要建立更復雜的數據,您須要使用Kafka Connect data
API。
Schema schema = SchemaBuilder.struct().name(NAME) .field("name", Schema.STRING_SCHEMA) .field("age", Schema.INT_SCHEMA) .field("admin", new SchemaBuilder.boolean().defaultValue(false).build()) .build(); Struct struct = new Struct(schema) .put("name", "Barbara Liskov") .put("age", 75);
更多Kafka相關技術文章:
什麼是Kafka? Kafka監控工具彙總 Kafka快速入門 Kafka核心之Consumer Kafka核心之Producer
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算