Elasticsearch(GEO)數據寫入和空間檢索

Elasticsearch簡介

什麼是 Elasticsearch?
Elasticsearch 是一個開源的分佈式 RESTful搜索和分析引擎,可以解決愈來愈多不一樣的應用場景。html

本文內容

本文主要是介紹了ES GEO數據寫入和空間檢索,ES版本爲7.3.1json

數據準備

Qgis使用漁網工具,對範圍進行切割,獲得網格的Geojson多線程

新建索引設置映射

def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}):
    # ignore 404 and 400
    es.indices.delete(index=index_name, ignore=[400, 404])
    print("delete_index")
    # ignore 400 cause by IndexAlreadyExistsException when creating an index
    my_mapping = {
        "properties": {
            "location": {"type": "geo_shape"},
            "id": {"type": "long"}
        }
    }
    create_index = es.indices.create(index=index_name)
    mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping,                          include_type_name=True)
    print("create_index")
    if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
        print("Index creation failed...")

數據插入

使用multiprocessing和elasticsearch.helpers.bulk進行數據寫入,每一萬條爲一組寫入,剩下的爲一組,而後多線程寫入。分別寫入4731254條點和麪數據。寫入時候使用多核,ssd,合適的批量數據能夠有效加快寫入速度,經過這些手段能夠在三分鐘左右寫入四百多萬的點或者面數據。app

def mp_worker(features):
    count = 0
    es = Elasticsearch(hosts=[ip], timeout=5000)
    success, _ = bulk(es,features, index=index_name, raise_on_error=True)
    count += success
    return count
def mp_handler(input_file, index_name, doc_type_name="en"):
    with open(input_file, 'rb') as f:
        data = json.load(f)
    features = data["features"]
    del data
    act=[]
    i=0
    count=0
    actions = []
    for feature in features:
        action = {
                "_index": index_name,
                "_type": doc_type_name,
                "_source": {
                    "id": feature["properties"]["id"],
                    "location": {
                        "type": "polygon",
                        "coordinates": feature["geometry"]["coordinates"]
                    }
                }
            }
        i=i+1
        actions.append(action)
        if (i == 9500):
            act.append(actions)
            count=count+i
            i = 0
            actions = []
    if i!=0:
        act.append(actions)
        count = count + i
    del features
    print('read all %s data ' % count)
    p = multiprocessing.Pool(4)
    i=0
    for result in p.imap(mp_worker, act):
        i=i+result
    print('write all %s data ' % i)

GEO(point)查詢距離nkm附近的點和範圍選擇

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time
starttime = time.time()
_index = "gis_point"
_doc_type = "20190824"
ip = "127.0.0.1:9200"
# 附近nkm 選擇
_body = {
    "query": {
        "bool": {
            "must": {
                "match_all": {}
            },
            "filter": {
                "geo_distance": {
                    "distance": "9km",
                    "location": {
                        "lat": 18.1098857850465471,
                        "lon": 109.1271036098896730
                    }
                }
            }
        }
    }
}
# 範圍選擇
# _body={
#   "query": {
#     "geo_bounding_box": {
#       "location": {
#         "top_left": {
#           "lat": 18.4748659238899933,
#           "lon": 109.0007435371629470
#         },
#         "bottom_right": {
#           "lat": 18.1098857850465471,
#           "lon": 105.1271036098896730
#         }
#       }
#     }
#   }
# }
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m")
for resp in scanResp:
    print(resp)
endtime = time.time()
print(endtime - starttime)

GEO(shape)範圍選擇

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time
starttime = time.time()
_index = "gis"
_doc_type = "20190823"
ip = "127.0.0.1:9200"
# envelope format, [[minlon,maxlat],[maxlon,minlat]]
_body = {
    "query": {
        "bool": {
            "must": {
                "match_all": {}
            },
            "filter": {
                "geo_shape": {
                    "location": {
                        "shape": {
                            "type": "envelope",
                            "coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]]
                        },
                        "relation": "within"
                    }
                }
            }
        }
    }
}

es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")
for resp in scanResp:
    print(resp)
endtime = time.time()
print(endtime - starttime)

GEO(point)距離聚合

from elasticsearch import Elasticsearch
import time
starttime = time.time()
_index = "gis_point"
_doc_type = "20190824"
ip = "127.0.0.1:9200"
# 距離聚合
_body = {
    "aggs" : {
        "rings_around_amsterdam" : {
            "geo_distance" : {
                "field" : "location",
                "origin" : "18.1098857850465471,109.1271036098896730",
                "ranges" : [
                    { "to" : 100000 },
                    { "from" : 100000, "to" : 300000 },
                    { "from" : 300000 }
                ]
            }
        }
    }
}

es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search( body=_body, index=_index)
for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']:
    print(i)
endtime = time.time()
print(endtime - starttime)

中心點聚合

_body ={
     "aggs" : {
        "centroid" : {
            "geo_centroid" : {
                "field" : "location"
            }
        }
    }
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search( body=_body, index=_index)
print(scanResp['aggregations'])

範圍聚合

_body = {
    "aggs": {
        "viewport": {
            "geo_bounds": {
                "field": "location"

            }
        }
    }
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
print(scanResp['aggregations']['viewport'])

geohash聚合

##低精度聚合,precision表明geohash長度
_body = {
    "aggregations": {
        "large-grid": {
            "geohash_grid": {
                "field": "location",
                "precision": 3
            }
        }
    }
}
# 高精度聚合,範圍聚合以及geohash聚合
# _body = {
#     "aggregations": {
#         "zoomed-in": {
#             "filter": {
#                 "geo_bounding_box": {
#                     "location": {
#                         "top_left": "18.4748659238899933,109.0007435371629470",
#                         "bottom_right": "18.4698857850465471,108.9971036098896730"
#                     }
#                 }
#             },
#             "aggregations": {
#                 "zoom1": {
#                     "geohash_grid": {
#                         "field": "location",
#                         "precision": 7
#                     }
#                 }
#             }
#         }
#     }
# }
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
for i in scanResp['aggregations']['large-grid']['buckets']:
    print(i)
#for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:
#    print(i)


切片聚合

# 低精度切片聚合,precision表明級別
_body = {
    "aggregations": {
        "large-grid": {
            "geotile_grid": {
                "field": "location",
                "precision": 8
            }
        }
    }
}
# 高精度切片聚合,範圍聚合以切片聚合
# _body={
#     "aggregations" : {
#         "zoomed-in" : {
#             "filter" : {
#                 "geo_bounding_box" : {
#                     "location" : {
#                         "top_left": "18.4748659238899933,109.0007435371629470",
#                          "bottom_right": "18.4698857850465471,108.9991036098896730"
#                     }
#                 }
#             },
#             "aggregations":{
#                 "zoom1":{
#                     "geotile_grid" : {
#                         "field": "location",
#                         "precision": 18
#                     }
#                 }
#             }
#         }
#     }
# }
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
for i in scanResp['aggregations']['large-grid']['buckets']:
    print(i)
# for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:
#      print(i)



參考資料:

Elasticsearch(GEO)空間檢索查詢

Elasticsearch官網
elasticsearch

相關文章
相關標籤/搜索