Kafka學習筆記

Kafka 學習筆記

Kafka使用一個叫Franz Kafka的文學家的名字用來命名的。java

Kafka是一款開源的消息引擎系統。也是一個分佈式流處理平臺。node

Kafka同時支持點對點模型以及發佈/訂閱模型。算法

爲何要使用Kakfa?四個字:削峯填谷!apache

clipboard.png

Kafka 術語

  • Record:消息,指Kafka處理對象
  • Topic:主題,用來承載消息的容器
  • Partition:分區,一個有序不變的消息隊列,一個主題下能夠有多個分區
  • Offset:消息位移,表示分區中每條信息的位置,是一個單調遞增不變的值
  • Replica,副本,數據冗餘。bootstrap

    • 領導者副本:對外提供服務,與客戶端進行交互
    • 追隨者副本:不能與外界進行交互,只是被動地追隨領導者副本
  • Producer:生產者,向主題發佈新消息的應用程序
  • Consumer:消費者,向主題訂閱新消息的應用程序
  • Consumer Offset:消費者位移,表示消費者消費進度
  • Consumer Group:消費者組,多個消費者實例共同組成的一個組,同時消費多個分區來實現高吞吐。
  • Rebalance:重平衡,消費者組內某個消費者實例掛掉後,其餘消費者實例自動從新分配訂閱主題分區的過程。它是Kafka消費者端實現高可用的重要手段。

clipboard.png

Kafka 種類

  • Apache Kafka: 也稱社區版Kafka,迭代速度快,社區響應度高,使用它可讓你有更高的把控度;缺陷在於僅僅提供基礎核心組件,缺失一些高級特性
  • Confluent Kafka: 優點在於集成了不少高級特性且由Kafka原班人馬打造,質量保證;缺陷在於國內相關資料不全,普及率較低,沒有太多可參考的範例。
  • CDH/HDP Kafka: 優點在於操做簡單,節省運維成本;缺陷在於把控度低,演進速度慢

Kafka 版本號

一個題外話

Kafka新版本客戶端代碼開始徹底由java語言編寫,因而有些人開始「JAVA VS SCALA」的大討論。並從語言特性上分析爲何社區擯棄Scala轉而投向Java的懷抱。api

其實事情沒有那麼複雜,僅僅是由於社區來了一批Java程序猿,而之前老的scala程序猿隱退了罷了。緩存

版本演進

Kafka總共演進了7個大版本安全

  • 0.7版本: 上古版本,一旦有人向你推薦這個版本,懟他。
  • 0.8版本: 開始引入副本機制,另外老版本須要制定zookeeper地址而不是Broker地址。在0.8.2.0版本社區引入了新版本Producer API,即指定Broker地址的Producer。
  • 0.9版本: 重量級的大版本更迭。增長了基礎的安全認證/權限功能,引入了Kafka Connect,新版本Producer API穩定。
  • 0.10.0.0: 里程碑的大版本。該版本又有兩個小版本,0.10.1和0.10.2。引入Kafka streams,正式升級爲分佈式流處理平臺。0.10.2.2 新Consumer API穩定。
  • 0.11.0.0: 目前最主流的版本之一。引入兩個重量級功能變動:一個是提供冪等性Producer API以及事務 API, 另外一個是對Kafka消息格式作了重構。
  • 1.0和2.0: 若是你是Kafka Stream用戶,至少選擇2.0.0版本吧。

最後還有個建議,不論你使用的是哪一個版本,都請儘可能保持服務端版本和客戶端版本一致,不然你將損失不少Kafka爲你提供的性能優化收益。性能優化

江湖經驗:不要輕易成爲新版本的小白鼠。服務器

集羣部署

clipboard.png

磁盤容量舉例:

假設公司有個業務須要天天向Kafka集羣發送 1 億條信息。每條消息保存兩份來防止數據丟失。消息默認保存兩週時間。並假設消息的平均大小是1KB。問你的Kafka集羣須要爲這個業務預留多少磁盤空間?

總大小:1億 1KB 2備份 * 14 ~= 2800G
加上Kafka的一些索引數據,爲它預留10%,那麼總大小變爲 2800 * (1 + 10%) ~= 3TB

Kafka支持數據壓縮,壓縮比0.75的話,那麼應該預留的存儲空間爲2.25TB左右。

帶寬舉例

與其說是帶寬資源的規劃,其實真正要規劃的是Kafka服務器的數量。

假設公司機房環境1Gbps,現有個業務,須要在1小時內處理1TB的業務數據。

通常單臺服務器 規劃使用70%的帶寬資源的1/3 ~= 240Mbps。

1TB須要1小時處理,則每秒差很少須要處理2336Mbps的數據,除 240Mbps,則差很少須要10臺機器。若是消息還須要額外複製的話,那麼還要對應乘上備份數。

集羣配置參數

配置名稱 示例 建議值
log.dirs /home/kafka1,/home/kafka2 kafka寫日誌多路徑,不只能提高寫性能,在1.1版本中還能支持故障轉移功能。
zookeeper.connect zk1:2181,zk2:2181,zk3:2181/kafka1
listens listeners=PLAINTEXT://dn1.ambari:6667
auto.create.topics.enable true false,不建議能夠自動建立主題
unclean.leader.election.enable false false,若是設置爲true有丟數據風險
auto.leader.rebalance.enable false false,不按期進行leader副本的選舉
log.retention.hours 168 默認保持7天數據
log.retention.bytes -1 保存多少數據均可以
message.max.bytes 1000000 默認值建議調大。該值表明Broker能處理的最大消息大小

生產者分區策略

輪詢策略

clipboard.png

隨機策略

clipboard.png

按消息保存鍵策略

clipboard.png

自定義策略

生產者壓縮

壓縮配置

compression.type

壓縮算法

clipboard.png

總結一下壓縮和解壓縮,Producer端壓縮,Broker端保持,Consumer端解壓縮。

無消息丟失最佳實踐

  1. 不要使用producer.send(msg),而要使用producer.send(msg,callback)
  2. 設置acks=all,代表全部副本Broker都要接受消息,該消息纔算是「已提交」
  3. 設置retries>0,代表Producer自動重試,當網絡順斷時,防止消息丟失。
  4. 設置unclean.leader.election.enable=false
  5. 設置replication.factor >=3,增長副本數,保證數據冗餘
  6. 設置min.insync.replicas > 1,控制的是消息至少要被寫入多少個副本纔算是 已提交。
  7. 確保replication.factor > min.insync.replicas。若是二者相等,那麼只要有一個副本掛機,整個分區就沒法正常工做了。推薦設置replication.factor = min.insync.replicas + 1
  8. 確保消息消費完再提交。設置enable.aoto.commit=false

Kafka 攔截器

分爲生產者攔截器和消費者攔截器。

典型的應用場景能夠應用於客戶端監控、端到端系統性能測試、消息審計等多種功能在內的場景。

Kafka是如何管理TCP鏈接的

java生產者是如何管理TCP鏈接的

  1. KafkaProducer實例建立時啓動Sender線程,從而建立與bootstrap.servers中全部的Broker的TCP鏈接。
  2. KafkaProducer實例首次更新元數據信息以後,還會再次建立與集羣中全部Broker的TCP鏈接
  3. 若是Producer端發送信息到某臺Broker時,發現沒有與該Broker的TCP鏈接,那麼也會建立鏈接
  4. 若是設置connections.max.idle.ms > 0,則步驟一中的TCP鏈接會被自動關閉;若是設置該參數-1,那麼步驟一中建立的鏈接沒法被關閉,會成爲殭屍進程。

Java消費者是如何管理TCP鏈接的

建立的3個時機

  1. 發起FindCoordinator請求時
  2. 鏈接協調者時
  3. 消費數據時

消費者程序會建立3類TCP鏈接

  1. 肯定協調者和獲取集羣元數據
  2. 鏈接協調者,令其執行組成員管理操做
  3. 執行實際的消息獲取

冪等生產者和事務生產者

消息交付可靠性保障,常見的承諾有如下三種

  1. 最多一次:消息可能會丟失,但毫不會重複發送
  2. 至少一次:消息不會丟失,但有可能被重複發送
  3. 精確一次:消息不會丟失,也不會被重複發送

Kafka默認是最少一次

要保證精確一次,就須要冪等和事務。不過性能會想對較差。

冪等生產者

冪等性有不少好處。其最大的優點在於咱們能夠安全地重試任何冪等性操做,反正它們不會破壞咱們的系統狀態。

在0.11.0.0版本引入了冪等生產者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)

使用冪等生產者要注意

  1. 它只能保證單分區的冪等,多分區沒法實現
  2. 只能實現單會話上的冪等,重啓以後冪等消失

事務生產者

設置事務型Producer

  1. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
  2. 設置producer端參數transctional.id。最好爲其設置一個有意義的名字

此外代碼也要作一些調整變化。

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

重平衡

怎麼避免Rebalance

Rebalance發生的時機有三個

  1. 組成員數據量發生變化
  2. 訂閱主題數量發生變化
  3. 訂閱主題的分區數發生變化

後面兩個一般是運維的主動操做,沒法避免。主要仍是針對組成員數量減小的狀況。增長通常也是人爲主動的。

那麼避免由於參數或邏輯不合理而致使的成員退出,與之相關的主要參數

  1. session.timeout.ms,推薦設置6s
  2. heartbeat.interval.ms,推薦設置2s
  3. max.poll.interval.ms,推薦設置比你的業務邏輯處理要長
  4. GC參數,避免頻繁的FULL GC

重平衡通知

重平衡過程是經過 消費者端的心跳線程來通知到其餘消費者實例的。

0.10.1.0版本以前,發送心跳請求是在消費者主線程完成的,也就是kafkaConsumer.poll方法的那個線程。這樣作有諸多弊端,由於消息處理也是在這個線程中完成的。所以當業務邏輯處理消耗了較長時間,心跳請求就沒法及時發送到協調者那邊了。致使協調者 錯誤地認爲該消費者已經死了。

0.10.1.0版本開始,社區引入了一個單獨的線程來專門執行心跳發送。

消費者組狀態機

定義了5種狀態

clipboard.png

各個狀態的流轉

clipboard.png

一個消費者組最開始是Empty狀態,當重平衡過程開啓後,它會被置爲PreparingRebalance狀態等待成員加入,以後變動到CompletingRebalance狀態等待分配方案,最後流轉到Stable狀態完成重平衡。

當有新成員或已有成員退出時,消費者組的狀態從Stable直接跳到PreparingRebalance狀態,此時,全部現存成員就必須從新申請加入組。

當全部成員都退出組後,消費者組狀態變動爲Empty。

Kafka自動按期刪除過時位移的條件就是,組要處於Empty狀態。

重平衡流程

消費者端重平衡流程

JoinGroup請求

clipboard.png

SyncGroup請求

clipboard.png

Broker端重平衡場景分析

  • 新成員入組

clipboard.png

  • 組成員主動離組

clipboard.png

  • 組成員崩潰離組

clipboard.png

  • 重平衡時協調者對組內成員提交位移的處理

clipboard.png

位移提交

clipboard.png

CommitFailedException怎麼處理?

  1. 縮短消息處理的時間,該方法優先處理
  2. 增長Consumer端容許下游系統消費一批數據的最大時長。設置參數max.poll.interval.ms,新版本默認是5分鐘。
  3. 減小下游系統一次性消費的消息總數。max.poll.records
  4. 下游系統使用多線程來加速消費

多消費者實例

鑑於KafkaConsumer不是線程安全的事實,制定兩套多線程方案。

  1. 每一個線程維護專屬的KafkaConsumer實例,負責完整的消息獲取、消息處理流程

    clipboard.png

核心代碼

```
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
            ConsumerRecords records = 
                consumer.poll(Duration.ofMillis(10000));
                 //  執行消息處理邏輯
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
```
  1. 消費者程序使用單或多線程獲取消息,建立多個消費者線程執行消息處理邏輯

    clipboard.png

核心代碼

```
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
    workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000), 
    new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofSeconds(1));
    for (final ConsumerRecord record : records) {
        executors.submit(new Worker(record));
    }
}

```

兩種方案各有特色。

clipboard.png

監控消費進度的3種方法

  1. 使用Kafka自帶命令行工具kafka-consumer-groups腳本
  2. 使用Kafka Consumer API
  3. 使用Kafka自帶的JMX監控指標

Kafka副本詳解

副本機制的好處:

  1. 提供數據冗餘
  2. 提供高伸縮性
  3. 改善數據局部性

但Kafka只有第一種好處,緣由是這樣的設計,Kafka有兩點好處

  1. 方便實現 Read-your-writes

    指當你用生產者API向Kafka成功寫入消息後,立刻使用消費者API去讀取剛纔生產的消息

  2. 方便實現單調讀(Monotonic Reads)

    在屢次消費信息時,不會看到該消息一會存在一會不存在的狀況。

判斷Follower副本與Leader副本是否同步的標準,Broker參數replia.lag.time.max.ms的參數值。Kafka有一個in-sync Replicas(ISR)集合的概念。

Kafka控制器

控制器組件(Controller),是Kafka的核心組件,它的主要做用是在Apache Zookeeper的幫助下管理和協調整個Kafka集羣。

控制器是怎麼被選出來的

每臺Broker都能充當控制器,在Broker啓動時,會嘗試去Zookeeper中建立/controller節點。Kafka當前選舉規則,第一個成功建立/controller節點的Broker會被指定爲控制器。

控制器能作什麼?

  1. 主題管理
  2. 分區重分配
  3. Prefered領導者選舉
  4. 集羣成員管理
  5. 數據服務,控制器上保存最全的集羣元數據信息

控制器保存了什麼數據?

clipboard.png

這些數據其實也在Zookeeper中存儲了一份。

控制器的故障轉移

clipboard.png

總結

小竅門分享:當你以爲控制器出現問題時,好比主題沒法刪除了,重分區hang住了,你能夠不用重啓broker或者控制器,快速簡便的方法,直接去Zookeeper手動刪除/controller節點。

這樣作的好處是,既能夠引起控制器的重選舉,又能夠避免重啓Broker致使的消息中斷。

Kafka請求處理

請求方案

Kafka方案相似於Reactor模式

clipboard.png

那麼Kafka相似的方案是這樣的。網絡線程池默認參數num.network.threads=3

clipboard.png

好了,客戶端發來的請求會被Aceptor線程分發到任意一個網絡線程中,由他們進行處理。你可能會認爲,網絡線程池是順序處理不就行了?實際上,Kafka在這個環節上又作了一層異步線程池的處理。

clipboard.png

IO線程池執行真正的處理。若是是PRODUCER生產請求,則將消息寫入到底層的磁盤日誌中;若是是FETCH請求,則從磁盤或頁緩存中讀取消息。當IO請求處理完請求後,會將生成的響應放入網絡線程池的響應隊列中,並由對應的網絡線程負責將Response反還給客戶端。

請求隊列是全部網絡線程共享的,而響應隊列則是每一個網絡線程專屬的。

IO線程池默認參數num.io.threads=8

圖中還有一個Purgatory的組件,這是Kafka中著名的「煉獄」組件。

它是用來緩存延時請求的,所謂延時請求,就是那些一時未知足條件的不可馬上處理的請求。

Kafka高水位和Leader Epoch

高水位

高水位做用:

  • 定義消息可見性,即用來標識分區下哪些消息是能夠被消費者消費的
  • 幫助kafka完成副本同步

clipboard.png

高水位更新機制

clipboard.png

clipboard.png

副本更新機制

clipboard.png

Leader Epoch

這塊感受有點晦澀難懂!須要有時間再研究!

該機制是用來防止數據丟失和不一致的問題。由於僅僅依靠高水位更新,是會有時間錯配的問題的。

沒有使用Leader Epoch前,丟失數據的一個場景。

clipboard.png

假設生產者成功向Kafka發送了兩條信息,Leader和副本B都寫成功了。

Leader的高水位更新完成,但副本B高水位還未更新。

副本B所在的Broker宕機了,當它重啓後,此時LEO值會變動爲高水位值1。也就是說位移值爲1的那條信息被副本B從磁盤中刪除了。此時副本B的底層磁盤文件中只保存有一條信息,即位移值爲0的那條消息。

執行完日誌截斷操做以後,副本B開始向Leader拉取消息,執行正常的消息同步。那麼就在這個節骨眼,Leader又掛掉了。那麼Kafka就別無選擇,讓副本B成爲了Leader。那麼當副本A回來以後,就會將高水位調整爲與B相同的值,也是1。

這樣操做以後,位移值爲1的那條消息就完全從這兩個副本中被永遠地抹去了。

clipboard.png

場景和以前相似,可是引用了Leader Epoch機制後,副本B重啓回來後,會向A發送一個特殊的請求去獲取Leader的LEO值。

發現Leader LEO的值不比它本身的小,且緩存中也沒有起始位移>2的Epoch條目,所以B不須要執行任何的日誌截斷操做。這就是對高水位機制的一個明顯改進,副本是否執行日誌截斷再也不依賴高水位進行判斷。

A宕機了,B成爲了Leader,一樣地發送一個特殊的請求,發現也不須要執行日誌截斷操做。當後面生產者往B寫入消息後,B生成了新的Epoch條目。

主題管理

kafka 2.2版本示例

建立主題

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1

查詢主題

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

查詢某主題的詳細信息

bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

修改主題分區

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分區數 >

這裏要注意的是,分區數必須大於原有分區數,不然會報錯InvalidPartitionsException.

修改主題級別參數

假設咱們要設置主題的級別參數爲max.message.bytes

bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

修改主題限速

先設置Broker端參數

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

注意--entity-name指的是Broker ID.若是該主題副本分別上0,1,2,3上,那麼你還要爲Broker 1,2,3執行這條命令。

設置好這個參數以後,還須要爲該主題設置要限速的副本。示例中但願對全部的副本進行限速,使用通配符*。

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

刪除主題

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>

一般這個步驟,你須要耐心等待一段時間,Kafka只是標記成已刪除狀態,須要一段時間在後臺默默刪除。

當你碰到主題沒法刪除的狀況,你能夠採用這樣的方法:

  1. 手動刪除Zookeeper節點/admin/delete_topics下待刪除的主題名的znode
  2. 手動刪除該主題的磁盤上的分區目錄
  3. 在Zookeeper中執行rmr /controller,觸發Controller重選舉。刷新Controller緩存。

事實上第三步要當心,其實執行前面兩步就夠了。

查看消費組提交的位移數據

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

查看消費組的狀態信息

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

__consumer_offsets佔用太多磁盤空間

要顯示的使用jstack命令查看下kafka-log-cleaner-thread前綴的線程的狀態。一般狀況下,是因爲該線程掛掉致使的。你只能重啓相應的Broker了。

動態參數

1.1.0版本以前,都是經過配置server.properties。

當一些配置要更改的時候,咱們須要修改這個配置文件,而後重啓Broker,以此來達到修改配置生效。

可是生產環境的機子怎麼能說重啓就重啓呢。針對這個痛點,社區1.1.0版本正式引入了動態參數。

動態參數種類

kafka broker config官網增長了DYNAMIC UPDATE MODE列。而該列有三類值。

  • read-only: 被標記爲read-only的參數跟以前同樣,只有重啓Broker,才能令修改生效。
  • pre-broker: 被標記爲pre-broker的參數屬於動態參數,只會在對應的Broker上生效。
  • cluster-wide: 屬於動態參數,會在整個集羣範圍內生效。固然你也能夠爲具體的Broker修改cluster-wide值。

使用場景

  1. 動態調整Broker端各類線程池大小,實時應對突發量(最爲經常使用)
  2. 動態調整Broker端鏈接信息或安全信息
  3. 動態更新SSL KeyStore有效期
  4. 動態調整Broker端Compact操做性能
  5. 實時變動JMX指標收集器

如何實現動態參數的?

clipboard.png

其中config/brokers是真正保存動態Broker參數的地方。

  • <default> : cluster-wide範圍的動態參數
  • broker.id : pre-broker範圍的動態參數。該類參數子節點可能存在多個

clipboard.png

參數優先級:pre-broker > cluster-wide > read-only > 默認值

如何配置?

設置cluster-wide值

bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true

刪除cluster-wide值

`# 刪除 cluster-wide 範圍參數
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable`

設置pre-broker值

bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false

刪除pre-broker值

`# 刪除 per-broker 範圍參數
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable`

經常使用的動態參數

  • log.retention.ms
  • num.io.threads和num.network.threads
  • SSL相關參數
  • num.relica.fetchers

如何重設消費者位移

重設位移策略

clipboard.png

重設位移方法

  • java consumer api,主要就是seek方法
  • 經過kafka-consumer-groups腳本實現

Kafka運維利器:KafkaAdminClient

引入緣由

  1. 命令腳本很差集成到應用程序中
  2. 不少命令腳本都是直接鏈接zookeeper的,這會繞過kafka的安全設置
  3. 社區但願在client端就能運維管理集羣,而不須要內部的server端代碼

基於這些緣由,社區於0.11版本正式推出了Java客戶端版本的AdminClient。

它的功能

  • 主題管理:包括主題的建立、刪除和查詢
  • 權限管理:包括具體權限的配置與刪除
  • 配置參數管理:包括kafka各類資源的參數設置、詳情查詢。Kafka資源主要有Broker、主題、用戶、Client-id等
  • 副本日誌管理:包括副本底層日誌路徑的變動和詳情查詢
  • 分區管理:即建立額外的主題分區
  • 消息刪除:即刪除指定位移以前的分區信息
  • Delegation Token管理
  • 消費者組管理:包括消費者組的查詢、位移查詢和刪除
  • Prefered領導者選舉:推選指定主題分區的Prefered Broker爲領導者
相關文章
相關標籤/搜索