什麼是 Elasticsearch?
Elasticsearch 是一個開源的分佈式 RESTful搜索和分析引擎,可以解決愈來愈多不一樣的應用場景。html
本文主要是介紹了ES GEO數據寫入和空間檢索,ES版本爲7.3.1json
Qgis使用漁網工具,對範圍進行切割,獲得網格的Geojson多線程
def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}): # ignore 404 and 400 es.indices.delete(index=index_name, ignore=[400, 404]) print("delete_index") # ignore 400 cause by IndexAlreadyExistsException when creating an index my_mapping = { "properties": { "location": {"type": "geo_shape"}, "id": {"type": "long"} } } create_index = es.indices.create(index=index_name) mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping, include_type_name=True) print("create_index") if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True: print("Index creation failed...")
使用multiprocessing和elasticsearch.helpers.bulk進行數據寫入,每一萬條爲一組寫入,剩下的爲一組,而後多線程寫入。分別寫入4731254條點和麪數據。寫入時候使用多核,ssd,合適的批量數據能夠有效加快寫入速度,經過這些手段能夠在三分鐘左右寫入四百多萬的點或者面數據。app
def mp_worker(features): count = 0 es = Elasticsearch(hosts=[ip], timeout=5000) success, _ = bulk(es,features, index=index_name, raise_on_error=True) count += success return count def mp_handler(input_file, index_name, doc_type_name="en"): with open(input_file, 'rb') as f: data = json.load(f) features = data["features"] del data act=[] i=0 count=0 actions = [] for feature in features: action = { "_index": index_name, "_type": doc_type_name, "_source": { "id": feature["properties"]["id"], "location": { "type": "polygon", "coordinates": feature["geometry"]["coordinates"] } } } i=i+1 actions.append(action) if (i == 9500): act.append(actions) count=count+i i = 0 actions = [] if i!=0: act.append(actions) count = count + i del features print('read all %s data ' % count) p = multiprocessing.Pool(4) i=0 for result in p.imap(mp_worker, act): i=i+result print('write all %s data ' % i)
from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import time starttime = time.time() _index = "gis_point" _doc_type = "20190824" ip = "127.0.0.1:9200" # 附近nkm 選擇 _body = { "query": { "bool": { "must": { "match_all": {} }, "filter": { "geo_distance": { "distance": "9km", "location": { "lat": 18.1098857850465471, "lon": 109.1271036098896730 } } } } } } # 範圍選擇 # _body={ # "query": { # "geo_bounding_box": { # "location": { # "top_left": { # "lat": 18.4748659238899933, # "lon": 109.0007435371629470 # }, # "bottom_right": { # "lat": 18.1098857850465471, # "lon": 105.1271036098896730 # } # } # } # } # } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m") for resp in scanResp: print(resp) endtime = time.time() print(endtime - starttime)
from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import time starttime = time.time() _index = "gis" _doc_type = "20190823" ip = "127.0.0.1:9200" # envelope format, [[minlon,maxlat],[maxlon,minlat]] _body = { "query": { "bool": { "must": { "match_all": {} }, "filter": { "geo_shape": { "location": { "shape": { "type": "envelope", "coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]] }, "relation": "within" } } } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m") for resp in scanResp: print(resp) endtime = time.time() print(endtime - starttime)
from elasticsearch import Elasticsearch import time starttime = time.time() _index = "gis_point" _doc_type = "20190824" ip = "127.0.0.1:9200" # 距離聚合 _body = { "aggs" : { "rings_around_amsterdam" : { "geo_distance" : { "field" : "location", "origin" : "18.1098857850465471,109.1271036098896730", "ranges" : [ { "to" : 100000 }, { "from" : 100000, "to" : 300000 }, { "from" : 300000 } ] } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search( body=_body, index=_index) for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']: print(i) endtime = time.time() print(endtime - starttime)
_body ={ "aggs" : { "centroid" : { "geo_centroid" : { "field" : "location" } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search( body=_body, index=_index) print(scanResp['aggregations'])
_body = { "aggs": { "viewport": { "geo_bounds": { "field": "location" } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) print(scanResp['aggregations']['viewport'])
##低精度聚合,precision表明geohash長度 _body = { "aggregations": { "large-grid": { "geohash_grid": { "field": "location", "precision": 3 } } } } # 高精度聚合,範圍聚合以及geohash聚合 # _body = { # "aggregations": { # "zoomed-in": { # "filter": { # "geo_bounding_box": { # "location": { # "top_left": "18.4748659238899933,109.0007435371629470", # "bottom_right": "18.4698857850465471,108.9971036098896730" # } # } # }, # "aggregations": { # "zoom1": { # "geohash_grid": { # "field": "location", # "precision": 7 # } # } # } # } # } # } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) for i in scanResp['aggregations']['large-grid']['buckets']: print(i) #for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']: # print(i)
# 低精度切片聚合,precision表明級別 _body = { "aggregations": { "large-grid": { "geotile_grid": { "field": "location", "precision": 8 } } } } # 高精度切片聚合,範圍聚合以切片聚合 # _body={ # "aggregations" : { # "zoomed-in" : { # "filter" : { # "geo_bounding_box" : { # "location" : { # "top_left": "18.4748659238899933,109.0007435371629470", # "bottom_right": "18.4698857850465471,108.9991036098896730" # } # } # }, # "aggregations":{ # "zoom1":{ # "geotile_grid" : { # "field": "location", # "precision": 18 # } # } # } # } # } # } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) for i in scanResp['aggregations']['large-grid']['buckets']: print(i) # for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']: # print(i)
參考資料:
Elasticsearch(GEO)空間檢索查詢
Elasticsearch官網
elasticsearch