Pulsar IO 中 Schema 的調用流程

背景

Pulsar Connector 系列的前幾篇文章已介紹了 Source、Sink 與 Producer、Consumer 的關係。內建的 Source 封裝了 Consumer、內建的 Sink 封裝了 Producer,所以,Source 與 Sink 是對 pub/sub 模式的一個深層次的應用。
Pulsar 在與其餘系統集成時,Schema 幫助 Pulsar 保留了數據在其餘系統中原有的含義。例如,數據庫中表的各個字段和信息均可以經過 Schema 表達。數據庫

Schema 的內容比較多,本文簡單介紹 Source、 Sink 與 Schema 的關係;其餘部分,例如, Schema 的版本、註冊中心和兼容性等會在以後的文章中介紹。ui

Schema 定義

Schema 是一種描述數據的數據。例如,數據庫中表的信息和字段類型等都是 Schema。Pulsar 對 Schema 也有比較好的支持。this

Schema 簡單應用

在使用 pub/sub 生產和消費消息時,能夠經過如下代碼使用 Schema:code

public class SensorReading {
    public float temperature;

    public SensorReading(float temperature) {
        this.temperature = temperature;
    }

    // A no-arg constructor is required
    public SensorReading() {
    }

    public float getTemperature() {
        return temperature;
    }

    public void setTemperature(float temperature) {
        this.temperature = temperature;
    }
}
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
    .topic("my-topic")
    .create();
Consumer consumer = client.newConsumer(JSONSchema.of(SensorReading.class))
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscribe();

經過以上操做,生產者和消費者能夠識別出關於 SensorReading 這個類的含義。這是 Schema 在客戶端的應用,也是比較廣泛的使用方法。ip

前文已經提到,Source 和 Sink 是對 pub/sub 的封裝,所以,Schema 的應用也是基於以上原理。如下爲詳細說明。get

Source 中的 Schema

在內建的 Sink 中,實現了一個 Consumer,用於接收從 Pulsar 發來的數據。input

if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
    schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);
} else {
    schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);
}

getSerdeClassName 會獲取用戶指定的用於序列化與反序列化的類,經過指定 --custom-serde-inputs 參數,從而構建真正的 Schema。it

case NONE:
  return (Schema<T>) Schema.BYTES;

case AUTO_CONSUME:
case AUTO:
  return (Schema<T>) Schema.AUTO_CONSUME();

case STRING:
  return (Schema<T>) Schema.STRING;

case AVRO:
  return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());

case JSON:
  return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());

case KEY_VALUE:
  return (Schema<T>)Schema.KV_BYTES();

case PROTOBUF:
  return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());
}

目前,在 Source 中支持的 Schema 包括以上示例中的 JSONSchema。該 Schema 生成以後,會被 Consumer 進行初始化,用於解析 Producer 中發出的數據。io

Sink 中的 Schema

在 Sink 中,Schema 的使用流程與原理同上,在此就再也不贅述。class

總結

Schema 幫助保留了數據的元信息,Source 和 Sink 做爲 與外部系統集成的組件,對 Schema 的良好的支持具備重要的意義。

相關文章
相關標籤/搜索