NATS--NATS Streaming持久化

前言

最近項目中須要使用到一個消息隊列,主要用來將原來一些操做異步化。根據本身的使用場景和熟悉程度,選擇了NATS Streaming。之因此,選擇NATS Streaming。一,由於我選型一些中間件,我會優先選取一些本身熟悉的語言編寫的,這樣方便排查問題和進一步的深究。二,由於本身一直作k8s等雲原生這塊,偏向於cncf基金會管理的項目,畢竟這些項目從一開始就考慮瞭如何部署在k8s當中。三,是評估項目在不斷髮展過程當中,引入的組件是否可以依舊知足需求。html

消息隊列的使用場景

若是問爲何這麼作,須要說一下消息隊列的使用場景。以前看知乎的時候,看到一些回答比較認同,暫時拿過來,更能形象表達。感謝ScienJus同窗的精彩解答。node

消息隊列的主要特色是異步處理,主要目的是減小請求響應時間和解耦。因此主要的使用場景就是將比較耗時並且不須要即時(同步)返回結果的操做做爲消息放入消息隊列。同時因爲使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方並不須要彼此聯繫,也不須要受對方的影響,即解耦和。mysql

使用場景的話,舉個例子:git

假設用戶在你的軟件中註冊,服務端收到用戶的註冊請求後,它會作這些操做:github

  • 校驗用戶名等信息,若是沒問題會在數據庫中添加一個用戶記錄
  • 若是是用郵箱註冊會給你發送一封註冊成功的郵件,手機註冊則會發送一條短信
  • 分析用戶的我的信息,以便未來向他推薦一些志同道合的人,或向那些人推薦他
  • 發送給用戶一個包含操做指南的系統通知等等……

可是對於用戶來講,註冊功能實際只須要第一步,只要服務端將他的帳戶信息存到數據庫中他即可以登陸上去作他想作的事情了。至於其餘的事情,非要在這一次請求中所有完成麼?值得用戶浪費時間等你處理這些對他來講可有可無的事情麼?因此實際當第一步作完後,服務端就能夠把其餘的操做放入對應的消息隊列中而後立刻返回用戶結果,由消息隊列異步的進行這些操做。算法

或者還有一種狀況,同時有大量用戶註冊你的軟件,再高併發狀況下注冊請求開始出現一些問題,例如郵件接口承受不住,或是分析信息時的大量計算使cpu滿載,這將會出現雖然用戶數據記錄很快的添加到數據庫中了,可是卻卡在發郵件或分析信息時的狀況,致使請求的響應時間大幅增加,甚至出現超時,這就有點不划算了。面對這種狀況通常也是將這些操做放入消息隊列(生產者消費者模型),消息隊列慢慢的進行處理,同時能夠很快的完成註冊請求,不會影響用戶使用其餘功能。sql

因此在軟件的正常功能開發中,並不須要去刻意的尋找消息隊列的使用場景,而是當出現性能瓶頸時,去查看業務邏輯是否存在能夠異步處理的耗時操做,若是存在的話即可以引入消息隊列來解決。不然盲目的使用消息隊列可能會增長維護和開發的成本卻沒法獲得可觀的性能提高,那就得不償失了。docker

其實,總結一下消息隊列的做用數據庫

  • 削峯,形象點的話,能夠比喻爲蓄水池。好比elk日誌收集系統中的kafka,主要在日誌高峯期的時候,在犧牲實時性的同時,保證了整個系統的安全。
  • 同步系統異構化。原先一個同步操做裏的諸多步驟,能夠考慮將一些不影響主線發展的步驟,經過消息隊列異步處理。好比,電商行業,一個訂單完成以後,通常除了直接返回給客戶購買成功的消息,還要通知帳戶組進行扣費,通知處理庫存變化,通知物流進行派送等,通知一些用戶組作一些增長會員積分等操做等。

NATS Streaming 簡介

NATS Streaming是一個由NATS驅動的數據流系統,用Go編程語言編寫。 NATS Streaming服務器的可執行文件名是nats-streaming-server。 NATS Streaming與核心NATS平臺無縫嵌入,擴展和互操做。 NATS Streaming服務器做爲Apache-2.0許可下的開源軟件提供。 Synadia積極維護和支持NATS Streaming服務器。編程

圖片描述

特色

除了核心NATS平臺的功能外,NATS Streaming還提供如下功能:

  • 加強消息協議

NATS Streaming使用谷歌協議緩衝區實現本身的加強型消息格式。這些消息經過二進制數據流在NATS核心平臺進行傳播,所以不須要改變NATS的基本協議。NATS Streaming信息包含如下字段:

  - 序列 - 一個全局順序序列號爲主題的通道
  - 主題 - 是NATS Streaming 交付對象
  - 答覆內容 - 對應"reply-to"對應的對象內容
  - 數據 - 真是數據內容
  - 時間戳 - 接收的時間戳,單位是納秒
  - 重複發送 - 標誌這條數據是否須要服務再次發送
  - CRC32 - 一個循環冗餘數據校驗選項,在數據存儲和數據通信領域裏,爲了保證數據的正確性所採用的檢錯手段,這裏使用的是 IEEE CRC32 算法

 - 消息/事件的持久性
  NATS Streaming提供了可配置的消息持久化,持久目的地能夠爲內存或者文件。另外,對應的存儲子系統使用了一個公共接口容許咱們開發本身自定義實現來持久化對應的消息

 - 至少一次的發送
  NATS Streaming提供了發佈者和服務器之間的消息確認(發佈操做) 和訂閱者和服務器之間的消息確認(確認消息發送)。其中消息被保存在服務器端內存或者輔助存儲(或其餘外部存儲器)用來爲須要從新接受消息的訂閱者進行重發消息。

 - 發佈者發送速率限定
  NATS Streaming提供了一個鏈接選項叫 MaxPubAcksInFlight,它能有效的限制一個發佈者可能隨意的在任什麼時候候發送的未被確認的消息。當達到這個配置的最大數量時,異步發送調用接口將會被阻塞,直到未確認消息降到指定數量之下。

- 每一個訂閱者的速率匹配/限制
  NATS Streaming運行指定的訂閱中設置一個參數爲 MaxInFlight,它用來指定已確認但未消費的最大數據量,當達到這個限制時,NATS Streaming 將暫停發送消息給訂閱者,直到未確認的數據量小於設定的量爲止

  • 以主題重發的歷史數據

  新訂閱的能夠在已經存儲起來的訂閱的主題頻道指定起始位置消息流。經過使用這個選項,消息就能夠開始發送傳遞了:

  1. 訂閱的主題存儲的最先的信息
  2. 與當前訂閱主題以前的最近存儲的數據,這一般被認爲是 "最後的值" 或 "初值" 對應的緩存
  3. 一個以納秒爲基準的 日期/時間
  4. 一個歷史的起始位置相對當前服務的 日期/時間,例如:最後30秒
  5. 一個特定的消息序列號
  • 持久訂閱

  訂閱也能夠指定一個「持久化的名稱」能夠在客戶端重啓時不受影響。持久訂閱會使得對應服務跟蹤客戶端最後確認消息的序列號和持久名稱。當這個客戶端重啓或者從新訂閱的時候,使用相同的客戶端ID 和 持久化的名稱,對應的服務將會從最先的未被確認的消息處恢復。

docker 運行NATS Streaming

在運行以前,前面已經講過NATS Streaming 相比nats,多了持久化的一個future。因此咱們在接下來的demo演示中,會重點說這點。

運行基於memory的持久化示例:

docker run -ti -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0

你將會看到以下的輸出:

[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3
[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set]
[4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:13:01.770581 [INF] Server is ready
[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state
[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY
[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:13:02.052601 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:13:02.052613 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:13:02.052624 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:13:02.052635 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:13:02.052649 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------

能夠看出默認的是基於內存的持久化。

運行基於file的持久化示例:

docker run -ti -v /Users/gao/test/mq:/datastore  -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0  -store file --dir /datastore -m 8222

你將會看到以下的輸出:

[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM
[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set]
[5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:16:07.643932 [INF] Server is ready
[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state
[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE
[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore
[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:16:07.933711 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:16:07.933749 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:16:07.933793 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:16:07.933837 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:16:07.933857 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------

PS

  • 若是部署在k8s當中,那麼就能夠採起基於file的持久化,經過掛載一個塊存儲來保證,數據可靠。好比,aws的ebs或是ceph的rbd。
  • 4222爲客戶端鏈接的端口。8222爲監控端口。

啓動之後訪問:localhost:8222,能夠看到以下的網頁:

圖片描述

啓動參數解析

Streaming Server Options:
    -cid, --cluster_id  <string>         Cluster ID (default: test-cluster)
    -st,  --store <string>               Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir <string>                 For FILE store type, this is the root directory
    -mc,  --max_channels <int>           Max number of channels (0 for unlimited)
    -msu, --max_subs <int>               Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs <int>               Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes <size>             Max messages total size per channel (0 for unlimited)
    -ma,  --max_age <duration>           Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity <duration>    Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server <string>         Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config <string>         Streaming server configuration file
    -hbi, --hb_interval <duration>       Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout <duration>        How long server waits for a heartbeat response
    -hbf, --hb_fail_count <int>          Number of failed heartbeats before server closes the client connection
          --ft_group <string>            Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal <signal>[=<pid>]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt <bool>               Specify if server should use encryption at rest
          --encryption_cipher <string>   Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key <sting>       Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered <bool>                   Run the server in a clustered configuration (default: false)
    --cluster_node_id <string>           ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap <bool>           Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers <string>             List of cluster peer node IDs to bootstrap cluster state.
    --cluster_log_path <string>          Directory to store log replication data
    --cluster_log_cache_size <int>       Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots <int>        Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs <int>        Number of log entries to leave after a snapshot and compaction
    --cluster_sync <bool>                Do a file sync after every write to the replication log and message store
    --cluster_raft_logging <bool>        Enable logging from the Raft library (disabled by default)

Streaming Server File Store Options:
    --file_compact_enabled <bool>        Enable file compaction
    --file_compact_frag <int>            File fragmentation threshold for compaction
    --file_compact_interval <int>        Minimum interval (in seconds) between file compactions
    --file_compact_min_size <size>       Minimum file size for compaction
    --file_buffer_size <size>            File buffer size (in bytes)
    --file_crc <bool>                    Enable file CRC-32 checksum
    --file_crc_poly <int>                Polynomial used to make the table used for CRC-32 checksum
    --file_sync <bool>                   Enable File.Sync on Flush
    --file_slice_max_msgs <int>          Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes <size>        Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age <duration>      Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed
    --file_fds_limit <int>               Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery <int>       On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof <bool>       Truncate files for which there is an unexpected EOF on recovery, dataloss may occur

Streaming Server SQL Store Options:
    --sql_driver <string>            Name of the SQL Driver ("mysql" or "postgres")
    --sql_source <string>            Datasource used when opening an SQL connection to the database
    --sql_no_caching <bool>          Enable/Disable caching for improved performance
    --sql_max_open_conns <int>       Maximum number of opened connections to the database

Streaming Server TLS Options:
    -secure <bool>                   Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key <string>         Client key for the streaming server
    -tls_client_cert <string>        Client certificate for the streaming server
    -tls_client_cacert <string>      Client certificate CA for the streaming server

Streaming Server Logging Options:
    -SD, --stan_debug=<bool>         Enable STAN debugging output
    -SV, --stan_trace=<bool>         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)

Embedded NATS Server Options:
    -a, --addr <string>              Bind to host address (default: 0.0.0.0)
    -p, --port <int>                 Use port for clients (default: 4222)
    -P, --pid <string>               File to store PID
    -m, --http_port <int>            Use port for http monitoring
    -ms,--https_port <int>           Use port for https monitoring
    -c, --config <string>            Configuration file

Logging Options:
    -l, --log <string>               File to redirect log output
    -T, --logtime=<bool>             Timestamp log entries (default: true)
    -s, --syslog <string>            Enable syslog as log method
    -r, --remote_syslog <string>     Syslog server addr (udp://localhost:514)
    -D, --debug=<bool>               Enable debugging output
    -V, --trace=<bool>               Trace the raw protocol
    -DV                              Debug and trace

Authorization Options:
        --user <string>              User required for connections
        --pass <string>              Password required for connections
        --auth <string>              Authorization token required for connections

TLS Options:
        --tls=<bool>                 Enable TLS, do not verify clients (default: false)
        --tlscert <string>           Server certificate file
        --tlskey <string>            Private key for server certificate
        --tlsverify=<bool>           Enable TLS, verify client certificates
        --tlscacert <string>         Client certificate CA for verification

NATS Clustering Options:
        --routes <string, ...>       Routes to solicit and connect
        --cluster <string>           Cluster URL for solicited routes

Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.

源碼簡單分析NATS Streaming 持久化

目前NATS Streaming支持如下4種持久化方式:

  • MEMORY
  • FILE
  • SQL
  • RAFT

其實看源碼能夠知道:NATS Streaming的store基於接口實現,很容易擴展到更多的持久化方式。具體的接口以下:

// Store is the storage interface for NATS Streaming servers.
//
// If an implementation has a Store constructor with StoreLimits, it should be
// noted that the limits don't apply to any state being recovered, for Store
// implementations supporting recovery.
//
type Store interface {
    // GetExclusiveLock is an advisory lock to prevent concurrent
    // access to the store from multiple instances.
    // This is not to protect individual API calls, instead, it
    // is meant to protect the store for the entire duration the
    // store is being used. This is why there is no `Unlock` API.
    // The lock should be released when the store is closed.
    //
    // If an exclusive lock can be immediately acquired (that is,
    // it should not block waiting for the lock to be acquired),
    // this call will return `true` with no error. Once a store
    // instance has acquired an exclusive lock, calling this
    // function has no effect and `true` with no error will again
    // be returned.
    //
    // If the lock cannot be acquired, this call will return
    // `false` with no error: the caller can try again later.
    //
    // If, however, the lock cannot be acquired due to a fatal
    // error, this call should return `false` and the error.
    //
    // It is important to note that the implementation should
    // make an effort to distinguish error conditions deemed
    // fatal (and therefore trying again would invariably result
    // in the same error) and those deemed transient, in which
    // case no error should be returned to indicate that the
    // caller could try later.
    //
    // Implementations that do not support exclusive locks should
    // return `false` and `ErrNotSupported`.
    GetExclusiveLock() (bool, error)

    // Init can be used to initialize the store with server's information.
    Init(info *spb.ServerInfo) error

    // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
    Name() string

    // Recover returns the recovered state.
    // Implementations that do not persist state and therefore cannot
    // recover from a previous run MUST return nil, not an error.
    // However, an error must be returned for implementations that are
    // attempting to recover the state but fail to do so.
    Recover() (*RecoveredState, error)

    // SetLimits sets limits for this store. The action is not expected
    // to be retroactive.
    // The store implementation should make a deep copy as to not change
    // the content of the structure passed by the caller.
    // This call may return an error due to limits validation errors.
    SetLimits(limits *StoreLimits) error

    // GetChannelLimits returns the limit for this channel. If the channel
    // does not exist, returns nil.
    GetChannelLimits(name string) *ChannelLimits

    // CreateChannel creates a Channel.
    // Implementations should return ErrAlreadyExists if the channel was
    // already created.
    // Limits defined for this channel in StoreLimits.PeChannel map, if present,
    // will apply. Otherwise, the global limits in StoreLimits will apply.
    CreateChannel(channel string) (*Channel, error)

    // DeleteChannel deletes a Channel.
    // Implementations should make sure that if no error is returned, the
    // channel would not be recovered after a restart, unless CreateChannel()
    // with the same channel is invoked.
    // If processing is expecting to be time consuming, work should be done
    // in the background as long as the above condition is guaranteed.
    // It is also acceptable for an implementation to have CreateChannel()
    // return an error if background deletion is still happening for a
    // channel of the same name.
    DeleteChannel(channel string) error

    // AddClient stores information about the client identified by `clientID`.
    AddClient(info *spb.ClientInfo) (*Client, error)

    // DeleteClient removes the client identified by `clientID` from the store.
    DeleteClient(clientID string) error

    // Close closes this store (including all MsgStore and SubStore).
    // If an exclusive lock was acquired, the lock shall be released.
    Close() error
}

官方也提供了mysql和pgsql兩種數據的支持:

postgres.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));
CREATE INDEX Idx_ChannelsName ON Channels (name(256));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));
CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));
CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);

-- Updates for 0.10.0
ALTER TABLE Clients ADD proto BYTEA;

mysql.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);

# Updates for 0.10.0
ALTER TABLE Clients ADD proto BLOB;

總結

後續會詳細解讀一下代碼實現和一些集羣部署。固然確定少不了如何部署高可用的集羣在k8s當中。

參閱文章:

NATS Streaming詳解

相關文章
相關標籤/搜索