【13】把 Elasticsearch 當數據庫使:Join

使用 https://github.com/taowen/es-monitor 能夠用 SQL 進行 elasticsearch 的查詢。要真正把Elasticsearch看成數據庫來使,Join是一個繞不過的話題。關於Elasticsearch如何支持join,這個slide總結得很好:http://www.slideshare.net/sirensolutions/searching-relational-data-with-elasticsearch。整體來講有這麼幾種方式:python

  • 徹底不join,把關聯表的字段融合到一張表裏。固然這會形成數據的冗餘git

  • 錄入的時候join:使用 nested documents(nested document和主文檔是同segment存儲的,對於一個symbol,幾千萬個quote這樣的場景就不適合了)github

  • 錄入的時候join:使用 sirensql

  • 查詢時join:使用 parent/child (這個是elasticsearch的特性,要求parent/child同shard存在)數據庫

  • 查詢時join:使用 siren-joins(就是一個在服務端求值的filter,而後把結果發佈給每一個shard去作二次match)服務器

  • 查詢時join:在客戶端拼裝第二個查詢(和siren-joins差很少,可是多了一次客戶端到服務器的來回)elasticsearch

  • 查詢時join:在coordinate節點上作兩個查詢的join合併(https://github.com/NLPchina/elasticsearch-sql分佈式

我我的喜歡的是siren-joins和客戶端拼裝這兩種方案。這兩種方案都是先作了一次查詢,把查詢結果再次分發到每一個分佈式節點上再次去作分佈式的聚合。相比在coordinate節點上去作join合併更scalable。ide

客戶端求值

首先我來看如何在客戶端完成結果集的求值ui

$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
    SAVE RESULT AS finance_symbols;
EOF

這裏引入的 SAVE RESULT AS 就是用於觸發前面的SQL的求值,並把結果集命名爲 finance_symbols。若是由於一些中間結果咱們不須要,咱們也能夠用REMOVE 命令把求值結果刪除

$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
    SAVE RESULT AS finance_symbols;
    REMOVE RESULT finance_symbols;
EOF

甚至咱們可使用任意的python代碼來修改result_map。

$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
    SAVE RESULT AS finance_symbols;
    result_map['finance_symbols'] = result_map['finance_symbols'][1:-1];
EOF

客戶端Join

在客戶端求值的基礎上,咱們能夠利用客戶端保留的結果集來發第二個請求。

cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5;
    SAVE RESULT AS finance_symbols;
    SELECT MAX(adj_close) FROM quote 
        JOIN finance_symbols ON quote.symbol = finance_symbols.symbol;
    REMOVE RESULT finance_symbols;
EOF

這個產生的Elaticsearch請求是這樣的兩條:

{
  "query": {
    "term": {
      "sector": "Finance"
    }
  }, 
  "size": 5
}

而後根據其返回,產生了第二個請求

{
  "query": {
    "bool": {
      "filter": [
        {}, 
        {
          "terms": {
            "symbol": [
              "TFSC", 
              "TFSCR", 
              "TFSCU", 
              "TFSCW", 
              "PIH"
            ]
          }
        }
      ]
    }
  }, 
  "aggs": {
    "MAX(adj_close)": {
      "max": {
        "field": "adj_close"
      }
    }
  }, 
  "size": 0
}

能夠看到,所謂客戶端join,就是用前一次的查詢結果拼出了第二次查詢的條件(terms filter)。

服務端Join

有了 siren-join 插件(https://github.com/sirensolutions/siren-join),咱們能夠在服務端完成一樣的join操做

cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    WITH finance_symbols AS (SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5);
    SELECT MAX(adj_close) FROM quote 
        JOIN finance_symbols ON quote.symbol = finance_symbols.symbol;
EOF

前面第一個查詢是用SAVE RESULT AS求值並命名爲finance_symbols,這裏咱們並無求值而是給其取了一個名字(WITH AS),而後就能夠引用了。

{
  "query": {
    "bool": {
      "filter": [
        {}, 
        {
          "filterjoin": {
            "symbol": {
              "indices": "symbol*", 
              "path": "symbol", 
              "query": {
                "term": {
                  "sector": "Finance"
                }
              }
            }
          }
        }
      ]
    }
  }, 
  "aggs": {
    "MAX(adj_close)": {
      "max": {
        "field": "adj_close"
      }
    }
  }, 
  "size": 0
}

可見產生的filterjoin把兩步合爲一步了。注意對於filterjoin查詢,須要POST _coordinate_search 而不是_search這個URL。
Profile

[
  {
    "query": [
      {
        "query_type": "BoostQuery",
        "lucene": "ConstantScore(BytesFieldDataTermsQuery::[size=8272])^0.0",
        "time": "29.32334300ms",
        "breakdown": {
          "score": 0,
          "create_weight": 360426,
          "next_doc": 137906,
          "match": 0,
          "build_scorer": 15027540,
          "advance": 0
        },
        "children": [
          {
            "query_type": "BytesFieldDataTermsQuery",
            "lucene": "BytesFieldDataTermsQuery::[size=8272]",
            "time": "13.79747100ms",
            "breakdown": {
              "score": 0,
              "create_weight": 14903,
              "next_doc": 168010,
              "match": 0,
              "build_scorer": 13614558,
              "advance": 0
            }
          }
        ]
      }
    ],
    "rewrite_time": 30804,
    "collector": [
      {
        "name": "MultiCollector",
        "reason": "search_multi",
        "time": "1.529236000ms",
        "children": [
          {
            "name": "TotalHitCountCollector",
            "reason": "search_count",
            "time": "0.08967800000ms"
          },
          {
            "name": "MaxAggregator: [MAX(adj_close)]",
            "reason": "aggregation",
            "time": "0.1675550000ms"
          }
        ]
      }
    ]
  }
]

從profile的結果來看,其原理也是 terms filter(BytesFieldDataTermsQuery)。因此這也就決定了這種join只是僞join。真正的join不單單能夠用第一個表去filter第二個表,並且要可以在第二個查詢的計算階段引用第一個階段的結果。這個是僅僅用terms filter沒法完成的。固然全部這些join的努力僅僅是讓數據維護變得更加容易而已,若是咱們真的要求Elasticsearch的join和傳統SQL同樣強大,那麼咱們也沒法期望那麼複雜的join能夠快到哪裏去,也就失去了使用Elasticsearch的意義了。有了上面兩種Join方式,咱們能夠在極度快速和極度靈活之間得到必定的選擇權利。

相關文章
相關標籤/搜索