互聯網快速發展的今天,社交類應用、消息類功能大行其道,佔據了大量網絡流量。大至釘釘、微信、微博、知乎,小至各種App的推送通知,消息類功能幾乎成爲全部應用的標配。根據場景特色,咱們能夠將消息類場景概括成三大類:IM(釘釘、微信)、Feed流(微博、知乎)以及常規消息隊列。所以,如何開發一個簡便而又高效IM或Feed流功能,成爲了不少架構師、開發人員不得不面對的問題。git
針對消息類場景,表格存儲團隊針對JAVA語言打造了一個TableStore-Timeline 1.0版數據模型模型(簡稱Timeline模型)。基於場景經驗與理解,將消息場景封裝成一個數據模型,提供了表結構設計,讀寫方式等解決方案供需求者使用。用戶只需依託模型API,直接忽略Timeline到底層存儲系統之間的架構方案,直接基於接口實現業務邏輯。它能知足消息數據場景對消息保序、海量消息存儲、實時同步等特殊需求。Timeline 1.0是定義在表格存儲之上抽象出來的數據模型,具體內容參見《TableStore Timeline:輕鬆構建千萬級IM和Feed流系統》。github
在表格存儲的Timeline模型受到普遍使用的過程當中,咱們也逐漸發現消息類數據的全文檢索、模糊查詢這一很強需求。而原有模型的在線查詢能力存在必定短板。隨着表格存儲支持了SearchIndex能力,使得Timeline模型支持在線全文檢索、模糊查詢成爲了可能。因此咱們基於原有的架構設計,從新打造了Timeline 2.0模型,引入了強大的查詢能力與數據管理新方案。數據庫
項目代碼目前已經開源在了GitHub上:Timeline@GitHub。微信
這次推出的Timeline模型2.0版,沒有直接基於1.X版本直接改造。而是在兼容原有模型架構之上,定義、封裝了新的使用接口。從新打造升級新的模型,增長了以下功能:網絡
Timeline作爲表格存儲直接支持的一種數據模型,以『簡單』爲設計目標,其存核心模塊構成比較清晰明瞭。Timeline儘可能提高用戶的使用自由度,讓用戶可以根據自身場景需求選擇更爲合適的實現方案。模型的模塊架構如上圖,主要包括以下重要部分:架構
Timeline 模型是基於Tablestore抽象、封裝出的一類場景數據模型,於是具備Tablestore自身的全部優勢。同時結合場景設計的接口,讓用戶更直觀、清晰的實現業務邏輯,總結以下:併發
<dependency> <groupId>com.aliyun.openservices.tablestore</groupId> <artifactId>Timeline</artifactId> <version>2.0.0</version> </dependency>
TableStore Java SDK異步
Timeline模型在TableStore Java SDK >= 4.12.1做爲基本數據模型直接提供,表格存儲老用戶可升級SDK直接使用分佈式
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore</artifactId> <version>4.12.1</version> </dependency>
入手指南ide
用戶將SyncClient做爲參數,初始化StoreFactory,經過工廠建立Meta數據、Timeline數據的管理Store。錯誤重試的實現依賴SyncClient的重試策略,用戶經過設置SyncClient實現重試。若有特殊需求,可自定義策略(只需實現RetryStrategy接口)。
/** * 重試策略設置 * Code: configuration.setRetryStrategy(new DefaultRetryStrategy()); * */ ClientConfiguration configuration = new ClientConfiguration(); SyncClient client = new SyncClient( "http://instanceName.cn-shanghai.ots.aliyuncs.com", "accessKeyId", "accessKeySecret", "instanceName", configuration); TimelineStoreFactory factory = new TimelineStoreFactoryImpl(client);
初始化MetaStore
構建meta表的Schema(包含Identifier、MetaIndex等參數),經過Store工廠建立並獲取Meta的管理Store;配置參數包含:Meta表名、索引,表名、主鍵字段、索引名、索引類型等參數。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder() .addStringField("timeline_id").build(); IndexSchema metaIndex = new IndexSchema(); metaIndex.addFieldSchema( //配置索引字段、類型 new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord) new FieldSchema("create_time", FieldType.Long).setIndex(true) ); TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema) .withIndex("metaIndex", metaIndex); //設置索引 TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);
初始化TimelineStore
構建timeline表的Schema配置,包含Identifier、TimelineIndex等參數,經過Store工廠建立並獲取Timeline的管理Store;配置參數包含:Timeline表名、索引,表名、主鍵字段、索引名、索引類型等參數。
消息的批量寫入,基於Tablestore的DefaultTableStoreWriter提高併發,用戶能夠根據本身需求設置線程池數目。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder() .addStringField("timeline_id").build(); IndexSchema timelineIndex = new IndexSchema(); timelineIndex.setFieldSchemas(Arrays.asList(//配置索引的字段、類型 new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord), new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true) )); TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema) .autoGenerateSeqId() //SequenceId 設置爲自增列方式 .setCallbackExecuteThreads(5) //設置Writer初始線程數爲5 .withIndex("metaIndex", timelineIndex); //設置索引 TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);
Meta管理
Meta管理提供了增、刪、改、單行讀、多條件組合查詢等接口。其中多條件組合查詢功能基於多元索引,只有設置了IndexSchema的MetaStore才支持組合查詢功能。索引類型支持LONG、DOUBLE、BOOLEAN、KEYWORD、GEO_POINT等類型,屬性包含Index、Store和Array,其含義與多元索引相同。
TimelineIdentifer是區分Timeline的惟一標識,重複的Identifier會被覆蓋。
/** * 接口使用參數 * */ TimelineIdentifier identifier = new TimelineIdentifier.Builder() .addField("timeline_id", "group") .build(); TimelineMeta meta = new TimelineMeta(identifier) .setField("filedName", "fieldValue"); /** * 建立Meta表(若是設置索引則會建立索引) * */ timelineMetaStore.prepareTables(); /** * 插入Meta數據 * */ timelineMetaStore.insert(meta); /** * 根據id讀取單行Meta數據 * */ timelineMetaStore.read(identifier); /** * 更新Meta數據 * */ meta.setField("fieldName", "newValue"); timelineMetaStore.update(meta); /** * 根據id刪除單行Meta數據 * */ timelineMetaStore.delete(identifier); /** * 經過SearchParameter參數檢索 * */ SearchParameter parameter = new SearchParameter( field("fieldName").equals("fieldValue") ); timelineMetaStore.search(parameter); /** * 經過SearchQuery參數檢索(SearchQuery是SDK原生類型,支持全部多元索引檢索條件) * */ TermQuery query = new TermQuery(); query.setFieldName("fieldName"); query.setTerm(ColumnValue.fromString("fieldValue")); SearchQuery searchQuery = new SearchQuery().setQuery(query); timelineMetaStore.search(searchQuery); /** * 刪除Meta表(若是存在索引,同時刪除索引) * */ timelineMetaStore.dropAllTables();
Timeline管理
Timeline管理提供了消息模糊查詢、多條件組合查詢接口。消息的全文檢索依託多元索引,用戶只需將相應字段索引類型設置爲TEXT,便可經過Search接口實現消息的全文檢索。Timeline管理包含消息表的建立、檢索、刪除等。
/** * 接口使用參數 * */ SearchParameter searchParameter = new SearchParameter( field("text").equals("fieldValue") ); TermQuery query = new TermQuery(); query.setFieldName("text"); query.setTerm(ColumnValue.fromString("fieldValue")); SearchQuery searchQuery = new SearchQuery().setQuery(query).setLimit(10); /** * 建立Meta表(若是設置索引則會建立索引) * */ timelineStore.prepareTables(); /** * 經過SearchParameter參數檢索 * */ timelineStore.search(searchParameter); /** * 經過SearchQuery參數檢索(SearchQuery是SDK原生類型,支持全部多元索引檢索條件) * */ timelineStore.search(searchQuery); /** * 將Writer隊列中未發的請求主動觸發,同步等待直到全部消息完成存儲 * */ timelineStore.flush(); /** * 關閉Writer與Writer中的線程池 * */ timelineStore.close(); /** * 刪除Timeline表(若是存在索引,同時刪除索引) * */ timelineStore.dropAllTables();
Queue管理
Queue是單個消息隊列的抽象概念,對應一個Store下單個Identifier的全部消息。經過Queue實例管理相應Identifer的消息隊列,支持基本的增、刪、改、單行查、範圍查等接口。
/** * 接口使用參數 * */ TimelineIdentifier identifier = new TimelineIdentifier.Builder() .addField("timeline_id", "group") .build(); long sequenceId = 1557133858994L; TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine."); ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0); TimelineCallback callback = new TimelineCallback() { @Override public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) { // do something when succeed. } @Override public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) { // do something when failed. } }; /** * 單個Identifier對應的消息隊列 * */ timelineQueue = timelineStore.createTimelineQueue(identifier); /** * 存儲消息 * */ //同步 timelineQueue.store(message); timelineQueue.store(sequenceId, message); //異步,支持callback timelineQueue.storeAsync(message, callback); timelineQueue.storeAsync(sequenceId, message, callback); //異步批量 timelineQueue.batchStore(message); timelineQueue.batchStore(sequenceId, message); //異步批量,支持callback timelineQueue.batchStore(message, callback); timelineQueue.batchStore(sequenceId, message, callback); /** * 單行讀取、獲取最新一行、獲取最新SequenceId * */ timelineQueue.get(sequenceId); timelineQueue.getLatestTimelineEntry(); timelineQueue.getLatestSequenceId(); /** * 根據SequenceId更新消息 * */ message.setField("text", "newValue"); timelineQueue.update(sequenceId, message); timelineQueue.updateAsync(sequenceId, message, callback); /** * 根據SequenceId刪除消息 * */ timelineQueue.delete(sequenceId); /** * 根據範圍參數、Filter獲取批量消息 * */ timelineQueue.scan(scanParameter);
本文爲雲棲社區原創內容,未經容許不得轉載。