Pulsar Connector 系列的前幾篇文章已介紹了 Source、Sink 與 Producer、Consumer 的關係。內建的 Source 封裝了 Consumer、內建的 Sink 封裝了 Producer,所以,Source 與 Sink 是對 pub/sub 模式的一個深層次的應用。
Pulsar 在與其餘系統集成時,Schema 幫助 Pulsar 保留了數據在其餘系統中原有的含義。例如,數據庫中表的各個字段和信息均可以經過 Schema 表達。數據庫
Schema 的內容比較多,本文簡單介紹 Source、 Sink 與 Schema 的關係;其餘部分,例如, Schema 的版本、註冊中心和兼容性等會在以後的文章中介紹。ui
Schema 是一種描述數據的數據。例如,數據庫中表的信息和字段類型等都是 Schema。Pulsar 對 Schema 也有比較好的支持。this
在使用 pub/sub 生產和消費消息時,能夠經過如下代碼使用 Schema:code
public class SensorReading { public float temperature; public SensorReading(float temperature) { this.temperature = temperature; } // A no-arg constructor is required public SensorReading() { } public float getTemperature() { return temperature; } public void setTemperature(float temperature) { this.temperature = temperature; } } Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class)) .topic("my-topic") .create(); Consumer consumer = client.newConsumer(JSONSchema.of(SensorReading.class)) .topic("my-topic") .subscriptionName("my-subscription") .subscribe();
經過以上操做,生產者和消費者能夠識別出關於 SensorReading 這個類的含義。這是 Schema 在客戶端的應用,也是比較廣泛的使用方法。ip
前文已經提到,Source 和 Sink 是對 pub/sub 的封裝,所以,Schema 的應用也是基於以上原理。如下爲詳細說明。get
在內建的 Sink 中,實現了一個 Consumer,用於接收從 Pulsar 發來的數據。input
if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true); } else { schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true); }
getSerdeClassName 會獲取用戶指定的用於序列化與反序列化的類,經過指定 --custom-serde-inputs 參數,從而構建真正的 Schema。it
case NONE: return (Schema<T>) Schema.BYTES; case AUTO_CONSUME: case AUTO: return (Schema<T>) Schema.AUTO_CONSUME(); case STRING: return (Schema<T>) Schema.STRING; case AVRO: return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build()); case JSON: return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build()); case KEY_VALUE: return (Schema<T>)Schema.KV_BYTES(); case PROTOBUF: return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap()); }
目前,在 Source 中支持的 Schema 包括以上示例中的 JSONSchema。該 Schema 生成以後,會被 Consumer 進行初始化,用於解析 Producer 中發出的數據。io
在 Sink 中,Schema 的使用流程與原理同上,在此就再也不贅述。class
Schema 幫助保留了數據的元信息,Source 和 Sink 做爲 與外部系統集成的組件,對 Schema 的良好的支持具備重要的意義。