消費阿里雲日誌服務SLS

此文檔只關心消費接入,不關心日誌接入,只關心消費如何接入,可直接跳轉到【sdk消費接入]

SLS簡介

  • 日誌服務:
    • 日誌服務(Log Service,簡稱 LOG)是針對日誌類數據的一站式服務,在阿里巴巴集團經歷大量大數據場景錘鍊而成。您無需開發就能快捷完成日誌數據採集、消費、投遞以及查詢分析等功能,提高運維、運營效率,創建 DT 時代海量日誌處理能力
  • 功能
    • 實時採集與消費(LogHub)
    • 投遞數倉(LogShipper)
    • 查詢與實時分析(Search/Analytics)

接入消費流程

帳號問題
  • 若是消費本身的日誌,直接使用本身阿里雲帳號的key
  • 若是消費他人提供的日誌,須要別人建立的子帳號或者帳號(推薦子帳號,無安全問題)中的key,使用本身帳號沒法鏈接通
接入點EndPoint

消費接入(java)

概念
對象 明細
Log 日誌、日誌組表示等基本概念
Project 項目
Config 配置
LogStore 日誌庫
Index 索引
Shard 分區
ConsumerGroup 消費組
配置

就如同使用 API 和日誌服務服務端交互同樣,使用 SDK 也須要指定一些基本配置。目前,全部語言的 SDK 都定義了一個 Client 類做爲入口類,這些基本配置信息在該入口類的構造時指定。html

具體包括以下幾項:java

  • 服務入口(Endpoint):確認 Client 須要訪問的服務入口
    • 當使用 SDK 時,首先須要明確訪問的日誌服務 Project 所在 Region(如「華東 1 (杭州)」、「華北 1 (青島)」等),而後選擇與其匹配的日誌服務入口初始化 Client。該服務入口與 API 中的 服務入口 定義一致
    • 當選擇 Client 的 Endpoint 時,必需要保證您須要訪問的 Project 的 Region 和 Endpoint 對應的 Region 一致,不然 SDK 將沒法訪問您指定的 Project
    • 因爲 Client 實例只能在構造時指定該服務入口,若是須要訪問不一樣 Region 裏的 Project,則須要用不一樣的 Endpoint 構建不一樣的 Client 實例
    • 目前,全部 API 的服務入口僅支持 HTTP 協議。
    • 若是在阿里雲 ECS 虛擬機內使用 SDK,您還可使用內網 Endpoint 避免公網帶寬開銷,具體請參考 服務入口
  • 阿里雲訪問祕鑰(AccessKeyId/AccessKeySecret):指定 Client 訪問日誌服務時使用的訪問祕鑰
skd消費接入
原始接入
  • 參見參考文檔,要消費日誌服務中的數據,請儘可能不要直接使用SDK的拉數據接口,咱們提供了一個高級消費庫消費組消費,該庫屏蔽了日誌服務的實現細節,而且提供了負載均衡、按序消費等高級功能
消費組接入
  • 同一個消費組下面的消費者名稱必須不一樣,不然相同的消費者會同時消費logstore同份數據,形成數據重複git

  • 協同消費庫(Consumer Library)是對日誌服務中日誌進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日誌服務的實現細節,只須要專一於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心github

  • 消費組(ConsumerGroup)緩存

    • 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重複消費數據
  • 消費組(Consumer)安全

    • 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不一樣
  • shared消費組、消費組關係網絡

    • 一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者
      • 每一個shard只會分配到一個消費者
      • 一個消費者能夠同時擁有多個shard
      • 新的消費者加入一個消費組,這個消費組下面的shard從屬關係會調整,以達到消費負載均衡的目的,可是上面的分配原則不會變,分配過程對用戶透明
  • maven
<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.15</version>
</dependency>

阿里雲client依賴log4j,若是項目中使用的logback,須要增長轉換log4j到logback的轉換負載均衡

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.25</version>
</dependency>
java文件
  • main
public class Main {
  // 日誌服務域名,根據實際狀況填寫
  private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
  // 日誌服務項目名稱,根據實際狀況填寫
  private static String sProject = "ali-cn-hangzhou-sls-admin";
  // 日誌庫名稱,根據實際狀況填寫
  private static String sLogstore = "sls_operation_log";
  // 消費組名稱,根據實際狀況填寫
  private static String sConsumerGroup = "consumerGroupX";
  // 消費數據的ak,根據實際狀況填寫
  private static String sAccessKeyId = "";
  private static String sAccessKey = "";
  public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
  {
       // 第二個參數是消費者名稱,同一個消費組下面的消費者名稱必須不一樣,可使用相同的消費組名稱,不一樣的消費者名稱在多臺機器上啓動多個進程,來均衡消費一個Logstore,這個時候消費者名稱可使用機器ip來區分。第9個參數(maxFetchLogGroupSize)是每次從服務端獲取的LogGroup數目,使用默認值便可,若有調整請注意取值範圍(0,1000]
      LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
      ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread運行以後,Client Worker會自動運行,ClientWorker擴展了Runnable接口。
       thread.start();
       Thread.sleep(60 * 60 * 1000);
       //調用worker的Shutdown函數,退出消費實例,關聯的線程也會自動中止。
       worker.shutdown();
       //ClientWorker運行過程當中會生成多個異步的Task,Shutdown以後最好等待還在執行的Task安全退出,建議sleep 30s。
       Thread.sleep(30 * 1000);
  }
}
  • SampleLogHubProcessor
public class SampleLogHubProcessor implements ILogHubProcessor 
{
  private int mShardId;
  // 記錄上次持久化 check point 的時間
  private long mLastCheckTime = 0; 
  public void initialize(int shardId) 
  {
      mShardId = shardId;
  }
  // 消費數據的主邏輯,這裏面的全部異常都須要捕獲,不能拋出去。
  public String process(List<LogGroupData> logGroups,
          ILogHubCheckPointTracker checkPointTracker) 
  {
      // 這裏簡單的將獲取到的數據打印出來
      for(LogGroupData logGroup: logGroups){
          FastLogGroup flg = logGroup.GetFastLogGroup();
          System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                  flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
          System.out.println("Tags");
          for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
              FastLogTag logtag = flg.getLogTags(tagIdx);
              System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
          }
          for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
              FastLog log = flg.getLogs(lIdx);
              System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
              for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                  FastLogContent content = log.getContents(cIdx);
                  System.out.println(content.getKey() + "\t:\t" + content.getValue());
              }
          }
      }
      long curTime = System.currentTimeMillis();
      // 每隔 30 秒,寫一次 check point 到服務端,若是 30 秒內,worker crash,
      // 新啓動的 worker 會從上一個 checkpoint 其消費數據,有可能有少許的重複數據
      if (curTime - mLastCheckTime >  30 * 1000) 
      {
          try 
          {
              //參數true表示當即將checkpoint更新到服務端,爲false會將checkpoint緩存在本地,後臺默認隔60s會將checkpoint刷新到服務端。
              checkPointTracker.saveCheckPoint(true);
          } 
          catch (LogHubCheckPointException e) 
          {
              e.printStackTrace();
          }
          mLastCheckTime = curTime;
      } 
      return null;  
  }
  // 當 worker 退出的時候,會調用該函數,用戶能夠在此處作些清理工做。
  public void shutdown(ILogHubCheckPointTracker checkPointTracker) 
  {
      //將消費斷點保存到服務端。
      try {
          checkPointTracker.saveCheckPoint(true);
      } catch (LogHubCheckPointException e) {
          e.printStackTrace();
      }
  }
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory 
{
  public ILogHubProcessor generatorProcessor()
  {   
      // 生成一個消費實例
      return new SampleLogHubProcessor();
  }
}
  • 上述代碼,工廠類能夠用lambda替換
  • client繼承Runnable,必須以thread方式啓動
  • client原理:是啓動線程,底層定時發送心跳給服務端,拿到要消費的必要信息,異步提交http請求任務(線程池),請求處理數據。因此調用client.shutdown,方法並不能立馬把全部任務關閉,最好有個時間差,同時client中運行線程標記是否關閉的變量不是線程安全的,因此關閉的時候,依然有可能提交請求任務處理
錯誤處理
  • SDK 可能出現的異常錯誤能夠分紅以下幾類:運維

    • 由日誌服務端返回的錯誤。這類錯誤由日誌服務端返回並由 SDK 處理。關於這類錯誤的詳細細節能夠參考日誌服務 API 的通用錯誤碼和各個 API 接口的具體說明。
    • 由 SDK 在向服務端發出請求時出現的網絡錯誤。這類錯誤包括網絡鏈接不通,服務端返回超時等。日誌服務內部並未對此作任何重試邏輯,因此,您在使用 SDK 時須要本身定義相應的處理邏輯(重試請求或者直接報錯等)
    • 由 SDK 自身產生的、與平臺及語言相關的錯誤,如內存溢出等。
  • 目前,各個語言 SDK 的實現都採起拋出異常的方式處理錯誤。具體原則以下:異步

    • 由如上第一或者第二類錯誤將會被 SDK 處理幷包裝在統一的 LogException 類拋出給用戶處理
    • 由如上第三類錯誤不會被 SDK 處理,而是直接拋出平臺及語言的 Native Exception 類給用戶處理
  • API錯誤重試

    • 在ILogHubProcessor的process方法中,方法返回空表示正常處理數據, 若是須要回滾到上個check point的點進行重試的話,能夠return checkPointTracker.getCheckpoint(),可是這裏有可能會形成重複消費
    • 本身增長重試策略,避免重複消費
參考文檔

原文出處:https://www.cnblogs.com/guozp/p/10327607.html

相關文章
相關標籤/搜索