注意: 本環境使用 elasticsearch 7.0版本開發,切勿低於此版本html
有一張表,記錄的數據特別的多,須要將7天前的記錄,插入到Elasticsearch中,並刪除原有表7天前的記錄。python
表結構以下:mysql
CREATE TABLE `historic_records` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `user_id` varchar(50) NOT NULL DEFAULT '' COMMENT '用戶id', `time` bigint(20) NOT NULL DEFAULT '0' COMMENT '上線/下線時間', `create_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '建立時間', `update_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '更新時間', `online_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '在線狀態 默認1 0 離線 1 在線', `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '軟刪除標誌:0-已刪除;1-正常', PRIMARY KEY (`id`), KEY `user_id` (`user_id`), KEY `order_index` (`time`,`create_time`,`update_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='歷史記錄表';
查詢sql:linux
select * from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000
刪除sql:sql
delete from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000
至關於mysql中的數據庫docker
至關於mysql中的一張表shell
至關於mysql中的一行(一條記錄)數據庫
至關於mysql中的一列(一個字段)ubuntu
一個服務器,由一個名字來標識bash
一個或多個節點組織在一塊兒
將一份數據劃分爲多小份的能力,容許水平分割和擴展容量。多個分片能夠響應請求,提升性能和吞吐量。
複製數據,一個節點出問題時,其他節點能夠頂上。
可參考 https://www.elastic.co/guide/cn/elasticsearch/guide/current/inverted-index.html
設定映射,規定好各個字段及其數據類型,便於es更好地進行管理。根據mysql表結構,映射以下:
# 建立映射 _index_mappings = { "settings": { "index": { "number_of_shards": 3, "number_of_replicas": 1 } }, "mappings": { # self.index_type : { "properties": { "id": {"type": "long"}, "loid": {"type": "keyword"}, "mac": {"type": "keyword"}, "time": { "type": "date", "format": "epoch_millis" }, "create_time": { "type": "date", "format": "epoch_millis" }, "update_time": { "type": "date", "format": "epoch_millis" }, "online_status": {"type": "short"}, "status": {"type": "short"} } # } } }
解釋:
索引設置,都在 settings{...} 中
number_of_shards
每一個索引的主分片數,默認值是 5 。這個配置在索引建立後不能修改。
number_of_replicas
每一個主分片的副本數,默認值是 1 。對於活動的索引庫,這個配置能夠隨時修改。
映射配置,都在mappings{...} 中
屬性設置,都在 properties{...} 中
string
byte
, short
, integer
, long
float
, double
boolean
date
仔細看上面的mysql 表結構
因爲 id 的類型是 bigint(20),那麼在es中就是 long,表示長整形。
user_id 的類型是 varchar(50) ,在es中,有2中,分別是 text和 keyword。
這2種,是有區別的。text 會建立全文索引,支持模糊搜索。而keyword則不會,必須精確搜索才行。
因爲 user_id不須要模糊搜索,所以 設置 keyword纔是合理的。
create_time 雖然類型是 bigint(20),可是它存儲在mysql裏面,表示時間戳。
所以es中就是data,時間格式爲:epoch_millis,表示微秒時間戳。
online_status 的類型是tinyint(1),在es中是 short,表示短的數字
新建目錄elasticsearch
mkdir /opt/elasticsearch-7.1.1
目錄結構以下:
./ ├── dockerfile ├── elasticsearch-7.1.1-amd64.deb ├── run.sh └── sources.list
dockerfile
FROM ubuntu:16.04 # 修改更新源爲阿里雲 ADD sources.list /etc/apt/sources.list ADD elasticsearch-7.1.1-amd64.deb ./ # 安裝jdk和elasticsearch RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && dpkg -i elasticsearch-7.1.1-amd64.deb && rm -rf elasticsearch-7.1.1-amd64.deb EXPOSE 9200 # 添加啓動腳本 ADD run.sh . RUN chmod 755 run.sh ENTRYPOINT [ "/run.sh"]
run.sh
#!/bin/bash set -e # 添加時區 TZ=Asia/Shanghai ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone # 覆蓋配置文件 cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak echo "transport.host: localhost transport.tcp.port: 9300 http.port: 9200 network.host: 0.0.0.0" >> /etc/elasticsearch/elasticsearch.yml # 修改啓動文件,去掉-d參數,避免後臺運行 sed -i 72's@-d -p $PID_FILE@-p $PID_FILE@g' /etc/init.d/elasticsearch # 啓動elasticsearch,要hold住,不然容器啓動就退出了! /etc/init.d/elasticsearch start
sources.list
deb http://mirrors.aliyun.com/ubuntu/ xenial main restricted deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main restricted deb http://mirrors.aliyun.com/ubuntu/ xenial universe deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb http://mirrors.aliyun.com/ubuntu/ xenial multiverse deb http://mirrors.aliyun.com/ubuntu/ xenial-updates multiverse deb http://mirrors.aliyun.com/ubuntu/ xenial-backports main restricted universe multiverse deb http://mirrors.aliyun.com/ubuntu xenial-security main restricted deb http://mirrors.aliyun.com/ubuntu xenial-security universe deb http://mirrors.aliyun.com/ubuntu xenial-security multiverse
生成鏡像
docker build -t elasticsearch-7.1.1 .
啓動容器
docker run -d -it --restart=always -p 9200:9200 elasticsearch-7.1.1
訪問頁面
新建目錄kibana
mkdir /opt/kibana-7.1.1
目錄結構以下:
./ ├── dockerfile ├── kibana-7.1.1-amd64.deb └── run.sh
dockerfile
FROM ubuntu:16.04 ADD kibana-7.1.1-amd64.deb ./ # 安裝jdk和elasticsearch RUN dpkg -i kibana-7.1.1-amd64.deb && rm -rf kibana-7.1.1-amd64.deb EXPOSE 5601 # 添加啓動腳本 ADD run.sh . RUN chmod 755 run.sh ENTRYPOINT [ "/run.sh"]
run.sh
#!/bin/bash # 添加時區 TZ=Asia/Shanghai ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone #elasticsearch="192.168.91.128" if [ -z $elasticsearch ];then echo "elasticsearch參數爲空!好比: 192.168.91.128" exit fi # 修改配置文件 # 修改監聽地址 sed -i '7s@#server.host: "localhost"@server.host: "0.0.0.0"@g' /etc/kibana/kibana.yml # 刪除行,並添加一行內容 sed -i '28d' /etc/kibana/kibana.yml sed -i "N;28 i elasticsearch.hosts: ["http://$elasticsearch:9200"]" /etc/kibana/kibana.yml # 啓動 /usr/share/kibana/bin/kibana "-c /etc/kibana/kibana.yml"
生成鏡像
docker build -t kibana-7.1.1 .
啓動鏡像
docker run -d -it --restart=always -p 5601:5601 -e elasticsearch=192.168.10.104 kibana-7.1.1
訪問頁面
爲了方便操做 mysql,封裝了一個mysql工具類,用來查詢和更新數據。
mysql.py
#!/usr/bin/env python3 # coding: utf-8 import pymysql class Mysql(object): # mysql 端口號,注意:必須是int類型 def __init__(self,host,user,passwd,db_name,port=3306): self.host = host self.user = user self.passwd = passwd self.db_name = db_name self.port = port def select(self,sql): """ 執行sql命令 :param sql: 命令 :return: 元祖 """ try: conn = pymysql.connect( host=self.host, user=self.user, passwd=self.passwd, port=self.port, database=self.db_name, charset='utf8', cursorclass=pymysql.cursors.DictCursor ) cur = conn.cursor() # 建立遊標 cur.execute(sql) # 執行sql命令 res = cur.fetchall() # 獲取執行的返回結果 cur.close() conn.close() # 關閉mysql 鏈接 return res except Exception as e: print(e) return False def update(self,sql): """ 更新操做,好比insert, delete,update :param sql: sql命令 :return: bool """ try: conn = pymysql.connect( host=self.host, user=self.user, passwd=self.passwd, port=self.port, database=self.db_name, ) cur = conn.cursor(cursor=pymysql.cursors.DictCursor) # 建立遊標 # conn.cursor() # print("ip: {} insert 執行命令: {}".format(self.host,sql)) sta = cur.execute(sql) # 執行sql命令,返回影響的行數 # print("sta",sta,type(sta)) #res = cur.fetchall() # 獲取執行的返回結果 if isinstance(sta,int): # 判斷返回結果, 是數字就是正常的 #print('插入記錄 Done') pass # write_log('正常,遠程執行sql: %s 成功'%sql, "green") else: write_log('錯誤,遠程執行sql: %s 失敗'%sql, "red") return False conn.commit() # 主動提交,不然執行sql不生效 cur.close() conn.close() # 關閉mysql 鏈接 return sta except Exception as e: print(e) # write_log('錯誤,遠程mysql執行命令: {} 異常'.format(sql), "red") return False
使用時,就簡單了。導入這個類,調用相關方法。
mysql_test.py
from mysql import Mysql host = "192.168.0.179" user = "sdn_db" passwd = "Sdn@ujmyhn" db_name = "terminalservice" port = 3306 sql = "select * from terminals_record_0 where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000" res = Mysql(host,user,passwd,db_name,port).select(sql) print(res)
因爲時間關係,代碼不一一解釋了。附上完整代碼:
./
├── conf.py
├── es_bulk.py
├── README.md
├── requirements.txt
└── utils
├── common.py
└── mysql.py
conf.py
#!/usr/bin/env python3 # coding: utf-8 """ 配置文件,用於mysql和elasticsearch """ import os BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 項目根目錄 # mysql HOST = "192.168.0.136" USER = "root" PASSWD = "123456" DB_NAME = "terminal" PORT = 3306 # elasticsearch INDEX_NAME = "historic_records" INDEX_TYPE = "_doc" ES_IP = "192.169.3.133" MAXIMUM = 100 # 一次性插入多少條
es_bulk.py
#!/usr/bin/env python3 # coding: utf-8 import time from elasticsearch import Elasticsearch from elasticsearch import helpers import conf from utils.mysql import Mysql from utils.common import write_log,valid_ip,check_tcp class ElasticObj: def __init__(self,timeout=3600): ''' :param timeout: 超時時間 ''' self.index_name = conf.INDEX_NAME # 索引名稱 self.index_type = conf.INDEX_TYPE # 索引類型 self.es_ip = conf.ES_IP # es ip # 無用戶名密碼狀態 self.es = Elasticsearch([self.es_ip], port=9200, timeout=timeout) # 用戶名密碼狀態 # self.es = Elasticsearch([self.es_ip], http_auth=('esadm', 'mdase123'), port=9200, timeout=timeout) def create_index(self): ''' 建立索引 :return: bool ''' # 建立映射 _index_mappings = { # 索引配置 "settings": { "index": { "number_of_shards": 3, # 分片數 "number_of_replicas": 1 # 副本數 } }, # 設置字段 "mappings": { "properties": { "id": {"type": "long"}, "loid": {"type": "keyword"}, "mac": {"type": "keyword"}, "time": { "type": "date", "format": "epoch_millis" }, "create_time": { "type": "date", "format": "epoch_millis" }, "update_time": { "type": "date", "format": "epoch_millis" }, "online_status": {"type": "short"}, "status": {"type": "short"} } } } # 判斷索引不存在時 if self.es.indices.exists(index=self.index_name) is not True: # 建立索引 res = self.es.indices.create(index=self.index_name, body=_index_mappings) # print(res) if not res: write_log("錯誤,建立索引{}失敗".format(self.index_name),"red") return False write_log("正常,建立索引{}成功".format(self.index_name), "green") return True else: write_log("正常,索引{}已存在".format(self.index_name), "green") return True def bulk_insert(self,table,data_list): """ 批量寫入數據 :param table: 表名 :param data_list: 數據列表 [ { 'online_status': 1, 'update_time': 1556073035327, 'create_time': 1556073035327, 'id': 1, 'status': 1, 'time': 1556073035327, 'loid': '100010000123', 'mac': '60:45:cb:87:c9:93' }, ... ] :return: bool """ # 批量插入 start_time = time.time() # 開始時間 actions = [] # 臨時數據列表 i = 0 # 計數值 try: # 循環數據列表 for data in data_list: action = { "_index": self.index_name, "_type": self.index_type, #"_id": i, #_id 也能夠默認生成,不賦值 "_source": { 'id': data['id'], 'user_id': data['user_id'], 'time': data['time'], 'create_time': data['create_time'], 'online_status': data['online_status'], 'status': data['status'], } } i += 1 actions.append(action) # 添加到列表 if len(action) == conf.MAXIMUM: # 列表數量達到100時 helpers.bulk(self.es, actions) # 批量插入數據 del actions[0:len(action)] # 刪除列表元素 if i > 0: # 不足100時,插入剩餘數據 helpers.bulk(self.es, actions) end_time = time.time() # 結束時間 t = round((end_time - start_time),2) # 計算耗時 # print('本次共寫入{}條數據,用時{}s'.format(i, t)) write_log("正常,{} 表寫入ES {}條數據,用時{}s".format(table,i, t), "green") return True except Exception as e: print(e) return False def has_table(self,db_name,target_table): """ 遠程表是否存在 :return: bool """ mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT) sql = "select count(1) from {}.{}".format(db_name, target_table) res = mysql_obj.select(sql) # print("表是否存在",res,type(res)) if res is False: write_log("錯誤,遠程表 {}.{} 不存在".format(db_name,target_table),"red") return False else: return True def has_conf(self): """ 判斷配置文件中的mysql和es 端口是否正常 :return: """ if not valid_ip(conf.HOST): write_log("錯誤,MySQL IP配置不正確","red") return False if not valid_ip(conf.ES_IP): write_log("錯誤,ES IP配置不正確","red") return False if not check_tcp(conf.HOST,conf.PORT): write_log("錯誤,MySQL {} 端口不可達".format(conf.PORT),"red") return False if not check_tcp(conf.ES_IP,9200): write_log("錯誤,ES 9200 端口不可達","red") return False return True def read_mysql_es(self): """ 讀取7天的記錄,並寫入es :return: bool """ # 判斷配置文件中的mysql和es 端口是否正常 if not self.has_conf(): # print(1) return False # 建立索引 if self.create_index() is False: # print(2) return False max = conf.MAXIMUM # 一次性查詢多少條 flag_list = [] # 標誌位列表 mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT) for i in range(64): # 寫入64張表 # 判斷表是否存在 res = self.has_table(conf.DB_NAME,'historic_record_%s'%i) if not res: flag_list.append(False) return False id = 0 # 每一次查詢後的最大id while True: # 查詢數據 sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % ( i, id, max) # print(sql) data_list = mysql_obj.select(sql) # print(data_list) if not data_list: # 當結果爲空時,結束循環 write_log("警告,執行sql: %s 記錄爲空,無需寫入es" %(sql), "yellow") break # 跳出循環 last_row = data_list[-1] # 最後一行記錄 # print(last_row) id = last_row['id'] # 修改最大id res = self.bulk_insert('historic_record_%s' % i, data_list) if not res: write_log("錯誤,historic_record_%s 寫入ES 失敗"%i,"red") flag_list.append(False) return False if False in flag_list: write_log("錯誤,historic_record 部分表寫入ES錯誤,請查看上文","red") return False write_log("正常,historic_record 64張表所有寫入ES成功", "green") return True def delete_record(self): """ 刪除7天的表數據 :return: bool """ max = conf.MAXIMUM # 一次性查詢多少條 flag_list = [] mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT) for i in range(64): # 64張表 # 判斷表是否存在 res = self.has_table(conf.DB_NAME, 'historic_record_%s' % i) if not res: flag_list.append(False) return False ### 先查詢數據 id = 0 # 每一次查詢後的最大id while True: # 查詢數據 sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % ( i, id, max) # print(sql) data_list = mysql_obj.select(sql) # print(data_list) if not data_list: # 當結果爲空時,結束循環 write_log("警告,執行sql: %s 記錄爲空,無需刪除" % sql, "yellow") break # 跳出循環 ### 再刪除數據 sql = "delete from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % ( i, id, max) # print(sql) res = mysql_obj.update(sql) if res is False: write_log("錯誤,刪除 historic_record_%s 記錄失敗" % i, "red") flag_list.append(False) break else: write_log("正常,刪除 historic_record_%s 記錄成功" % i, "green") last_row = data_list[-1] # 最後一行記錄 # print(last_row) id = last_row['id'] # 修改最大id if False in flag_list: write_log("錯誤,刪除 historic_record 部分表失敗,請查看上文", "red") return False write_log("正常,刪除 historic_record 64張表記錄所有成功", "green") def main(self): self.read_mysql_es() self.delete_record() ElasticObj().main() # 執行主程序
common.py
#!/usr/bin/env python3 # coding: utf-8 """ 共有的方法 """ import sys import io def setup_io(): # 設置默認屏幕輸出爲utf-8編碼 sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True) sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True) setup_io() import os import time import conf import socket import subprocess import ipaddress from multiprocessing import cpu_count def write_log(content,colour='white',skip=False): """ 寫入日誌文件 :param content: 寫入內容 :param colour: 顏色 :param skip: 是否跳過打印時間 :return: """ # 顏色代碼 colour_dict = { 'red': 31, # 紅色 'green': 32, # 綠色 'yellow': 33, # 黃色 'blue': 34, # 藍色 'purple_red': 35, # 紫紅色 'bluish_blue': 36, # 淺藍色 'white': 37, # 白色 } choice = colour_dict.get(colour) # 選擇顏色 path = os.path.join(conf.BASE_DIR,"output.log") # 日誌文件 with open(path, mode='a+', encoding='utf-8') as f: if skip is False: # 不跳過打印時間時 content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content info = "\033[1;{};1m{}\033[0m".format(choice, content) print(info) f.write(content+"\n") def execute_linux2(cmd, timeout=10, skip=False): """ 執行linux命令,返回list :param cmd: linux命令 :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒 :param skip: 是否跳過超時 :return: list """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) # print(p) # timeout = 1 # 超時時間 t_beginning = time.time() # 開始時間 # seconds_passed = 0 # 執行時間 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if timeout and seconds_passed > timeout: p.terminate() # raise TimeoutError(cmd, timeout) if not skip: # self.res.code = 500 # print('命令: {},執行超時!'.format(cmd)) write_log('錯誤, 命令: {},本地執行超時!'.format(cmd),"red") # return self.res.__dict__ return False # return '命令: {},執行超時!'.format(cmd) # result = p.stdout.read().decode('utf-8').strip() # 命令運行結果 # print("result",result) # self.res.data = result # return self.res.__dict__ result = p.stdout.readlines() return result def valid_ip(ip): """ 驗證ip是否有效,好比192.168.1.256是一個不存在的ip :return: bool """ try: # 判斷 python 版本 if sys.version_info[0] == 2: ipaddress.ip_address(ip.strip().decode("utf-8")) elif sys.version_info[0] == 3: # ipaddress.ip_address(bytes(ip.strip().encode("utf-8"))) ipaddress.ip_address(ip) return True except Exception as e: print(e) return False def check_tcp(ip, port, timeout=1): """ 檢測tcp端口 :param ip: ip地址 :param port: 端口號 :param timeout: 超時時間 :return: bool """ flag = False try: socket.setdefaulttimeout(timeout) # 整個socket層設置超時時間 cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) address = (str(ip), int(port)) status = cs.connect_ex((address)) # 開始鏈接 cs.settimeout(timeout) if not status: flag = True return flag except Exception as e: print(e) return flag COROUTINE_NUMBER = cpu_count() # 協程池數量,根據cpu核心數來開,避免cpu飆高
mysql.py
#!/usr/bin/env python3 # coding: utf-8 import pymysql from utils.common import write_log class Mysql(object): # mysql 端口號,注意:必須是int類型 def __init__(self,host,user,passwd,db_name,port=3306): self.host = host self.user = user self.passwd = passwd self.db_name = db_name self.port = port def select(self,sql): """ 執行sql命令 :param sql: 命令 :return: 元祖 """ try: # print(host,self.user,self.passwd,self.port,self.db_name) conn = pymysql.connect( host=self.host, user=self.user, passwd=self.passwd, port=self.port, database=self.db_name, charset='utf8', cursorclass=pymysql.cursors.DictCursor ) cur = conn.cursor() # 建立遊標 # conn.cursor() cur.execute(sql) # 執行sql命令 res = cur.fetchall() # 獲取執行的返回結果 cur.close() conn.close() # 關閉mysql 鏈接 return res except Exception as e: print(e) return False def update(self,sql): """ 更新操做,好比insert, delete,update :param sql: sql命令 :return: bool """ try: conn = pymysql.connect( host=self.host, user=self.user, passwd=self.passwd, port=self.port, database=self.db_name, ) cur = conn.cursor(cursor=pymysql.cursors.DictCursor) # 建立遊標 # conn.cursor() # print("ip: {} insert 執行命令: {}".format(self.host,sql)) sta = cur.execute(sql) # 執行sql命令,返回影響的行數 # print("sta",sta,type(sta)) #res = cur.fetchall() # 獲取執行的返回結果 if isinstance(sta,int): # 判斷返回結果, 是數字就是正常的 #print('插入記錄 Done') pass # write_log('正常,遠程執行sql: %s 成功'%sql, "green") else: write_log('錯誤,遠程執行sql: %s 失敗'%sql, "red") return False conn.commit() # 主動提交,不然執行sql不生效 cur.close() conn.close() # 關閉mysql 鏈接 #Migration.flag_list.append(True) return sta except Exception as e: print(e) # write_log('錯誤,遠程mysql執行命令: {} 異常'.format(sql), "red") # Migration.flag_list.append(False) return False
requirements.txt
PyMySQL==0.9.2
elasticsearch==6.3.1
README.md
## 說明 終端歷史記錄表,寫入到elasticsearch中。 主要將(terminal.historic_record_0~63) 這64張表的7天前數據寫入到elasticsearch中 並刪除 64張表的7天前記錄 `注意: 本環境使用 elasticsearch 7.0版本開發,切勿低於此版本` ## 配置說明 `conf.py` 是環境配置 主要修改 如下信息 ```python # mysql HOST = "192.168.0.136" USER = "root" PASSWD = "123456" DB_NAME = "terminal" PORT = 3306 # elasticsearch INDEX_NAME = "historic_record" INDEX_TYPE = "_doc" ES_IP = "192.169.3.133" ``` 請根據實際狀況修改以上變量 ## 運行說明 ## 一鍵執行,遷移相關全部表 `python es_bulk.py` ## 查看結果 結果會輸出到`output.log`文件,直接查看便可! 登陸到`kibana`,查看數據是否存在 <br/> <br/> Copyright (c) 2019-present, xiao You
本文參考連接: