對象 | 明細 |
---|---|
Log | 日誌、日誌組表示等基本概念 |
Project | 項目 |
Config | 配置 |
LogStore | 日誌庫 |
Index | 索引 |
Shard | 分區 |
ConsumerGroup | 消費組 |
就如同使用 API 和日誌服務服務端交互同樣,使用 SDK 也須要指定一些基本配置。目前,全部語言的 SDK 都定義了一個 Client 類做爲入口類,這些基本配置信息在該入口類的構造時指定。html
具體包括以下幾項:java
同一個消費組下面的消費者名稱必須不一樣,不然相同的消費者會同時消費logstore同份數據,形成數據重複git
協同消費庫(Consumer Library)是對日誌服務中日誌進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日誌服務的實現細節,只須要專一於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心github
消費組(ConsumerGroup)緩存
消費組(Consumer)安全
shared消費組、消費組關係網絡
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.15</version> </dependency>
阿里雲client依賴log4j,若是項目中使用的logback,須要增長轉換log4j到logback的轉換負載均衡
<dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.25</version> </dependency>
public class Main { // 日誌服務域名,根據實際狀況填寫 private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; // 日誌服務項目名稱,根據實際狀況填寫 private static String sProject = "ali-cn-hangzhou-sls-admin"; // 日誌庫名稱,根據實際狀況填寫 private static String sLogstore = "sls_operation_log"; // 消費組名稱,根據實際狀況填寫 private static String sConsumerGroup = "consumerGroupX"; // 消費數據的ak,根據實際狀況填寫 private static String sAccessKeyId = ""; private static String sAccessKey = ""; public static void main(String []args) throws LogHubClientWorkerException, InterruptedException { // 第二個參數是消費者名稱,同一個消費組下面的消費者名稱必須不一樣,可使用相同的消費組名稱,不一樣的消費者名稱在多臺機器上啓動多個進程,來均衡消費一個Logstore,這個時候消費者名稱可使用機器ip來區分。第9個參數(maxFetchLogGroupSize)是每次從服務端獲取的LogGroup數目,使用默認值便可,若有調整請注意取值範圍(0,1000] LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); //Thread運行以後,Client Worker會自動運行,ClientWorker擴展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); //調用worker的Shutdown函數,退出消費實例,關聯的線程也會自動中止。 worker.shutdown(); //ClientWorker運行過程當中會生成多個異步的Task,Shutdown以後最好等待還在執行的Task安全退出,建議sleep 30s。 Thread.sleep(30 * 1000); } }
public class SampleLogHubProcessor implements ILogHubProcessor { private int mShardId; // 記錄上次持久化 check point 的時間 private long mLastCheckTime = 0; public void initialize(int shardId) { mShardId = shardId; } // 消費數據的主邏輯,這裏面的全部異常都須要捕獲,不能拋出去。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 這裏簡單的將獲取到的數據打印出來 for(LogGroupData logGroup: logGroups){ FastLogGroup flg = logGroup.GetFastLogGroup(); System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s", flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID())); System.out.println("Tags"); for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) { FastLogTag logtag = flg.getLogTags(tagIdx); System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue())); } for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) { FastLog log = flg.getLogs(lIdx); System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) { FastLogContent content = log.getContents(cIdx); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔 30 秒,寫一次 check point 到服務端,若是 30 秒內,worker crash, // 新啓動的 worker 會從上一個 checkpoint 其消費數據,有可能有少許的重複數據 if (curTime - mLastCheckTime > 30 * 1000) { try { //參數true表示當即將checkpoint更新到服務端,爲false會將checkpoint緩存在本地,後臺默認隔60s會將checkpoint刷新到服務端。 checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } mLastCheckTime = curTime; } return null; } // 當 worker 退出的時候,會調用該函數,用戶能夠在此處作些清理工做。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { //將消費斷點保存到服務端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } } class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一個消費實例 return new SampleLogHubProcessor(); } }
SDK 可能出現的異常錯誤能夠分紅以下幾類:運維
目前,各個語言 SDK 的實現都採起拋出異常的方式處理錯誤。具體原則以下:異步
API錯誤重試
原文出處:https://www.cnblogs.com/guozp/p/10327607.html