python MySQL 插入Elasticsearch

1、需求分析

注意: 本環境使用 elasticsearch 7.0版本開發,切勿低於此版本html

mysql 表結構

有一張表,記錄的數據特別的多,須要將7天前的記錄,插入到Elasticsearch中,並刪除原有表7天前的記錄。python

表結構以下:mysql

CREATE TABLE `historic_records` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(50) NOT NULL DEFAULT '' COMMENT '用戶id',
  `time` bigint(20) NOT NULL DEFAULT '0' COMMENT '上線/下線時間',
  `create_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '建立時間',
  `update_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '更新時間',
  `online_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '在線狀態 默認1 0 離線 1 在線',
  `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '軟刪除標誌:0-已刪除;1-正常',
  PRIMARY KEY (`id`),
  KEY `user_id` (`user_id`),
  KEY `order_index` (`time`,`create_time`,`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='歷史記錄表';
View Code

 

查詢sql:linux

select * from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000

 

刪除sql:sql

delete from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000

 

ES中的一些概念

index(索引)

至關於mysql中的數據庫docker

type(類型)

至關於mysql中的一張表shell

document(文檔)

至關於mysql中的一行(一條記錄)數據庫

field(域)

至關於mysql中的一列(一個字段)ubuntu

節點

一個服務器,由一個名字來標識bash

集羣

一個或多個節點組織在一塊兒

分片

將一份數據劃分爲多小份的能力,容許水平分割和擴展容量。多個分片能夠響應請求,提升性能和吞吐量。

副本

複製數據,一個節點出問題時,其他節點能夠頂上。

倒排索引

可參考 https://www.elastic.co/guide/cn/elasticsearch/guide/current/inverted-index.html

 

es數據結構

設定映射,規定好各個字段及其數據類型,便於es更好地進行管理。根據mysql表結構,映射以下:

# 建立映射
_index_mappings = {
    "settings": {
        "index": {
            "number_of_shards": 3,
            "number_of_replicas": 1
        }
    },
    "mappings": {
        # self.index_type : {
            "properties": {
                "id": {"type": "long"},
                "loid": {"type": "keyword"},
                "mac": {"type": "keyword"},
                "time": {
                    "type": "date",
                    "format": "epoch_millis"
                },
                "create_time": {
                    "type": "date",
                    "format": "epoch_millis"
                },
                "update_time": {
                    "type": "date",
                    "format": "epoch_millis"
                },
                "online_status": {"type": "short"},
                "status": {"type": "short"}
            }
        # }
    }
}
View Code

 

解釋:

索引設置,都在 settings{...} 中

number_of_shards
每一個索引的主分片數,默認值是 5 。這個配置在索引建立後不能修改。


number_of_replicas
每一個主分片的副本數,默認值是 1 。對於活動的索引庫,這個配置能夠隨時修改。

 

映射配置,都在mappings{...} 中

屬性設置,都在 properties{...} 中

 

Elasticsearch 支持 以下簡單域類型:

  • 字符串: string
  • 整數 : byteshortintegerlong
  • 浮點數: floatdouble
  • 布爾型: boolean
  • 日期: date

 

仔細看上面的mysql 表結構

因爲 id 的類型是 bigint(20),那麼在es中就是 long,表示長整形。

user_id 的類型是 varchar(50) ,在es中,有2中,分別是 text和 keyword。

這2種,是有區別的。text 會建立全文索引,支持模糊搜索。而keyword則不會,必須精確搜索才行。

因爲 user_id不須要模糊搜索,所以 設置 keyword纔是合理的。

 

create_time 雖然類型是 bigint(20),可是它存儲在mysql裏面,表示時間戳。

所以es中就是data,時間格式爲:epoch_millis,表示微秒時間戳。

 

online_status 的類型是tinyint(1),在es中是 short,表示短的數字

 

3、elasticsearch和kibana搭建

elasticsearch

新建目錄elasticsearch

mkdir /opt/elasticsearch-7.1.1

目錄結構以下:

./
├── dockerfile
├── elasticsearch-7.1.1-amd64.deb
├── run.sh
└── sources.list

 

dockerfile

FROM ubuntu:16.04
# 修改更新源爲阿里雲
ADD sources.list /etc/apt/sources.list
ADD elasticsearch-7.1.1-amd64.deb ./
# 安裝jdk和elasticsearch
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && dpkg -i elasticsearch-7.1.1-amd64.deb && rm -rf elasticsearch-7.1.1-amd64.deb
EXPOSE 9200
# 添加啓動腳本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]
View Code

 

run.sh

#!/bin/bash
set -e

# 添加時區
TZ=Asia/Shanghai
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

# 覆蓋配置文件
cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
echo "transport.host: localhost
transport.tcp.port: 9300
http.port: 9200
network.host: 0.0.0.0" >> /etc/elasticsearch/elasticsearch.yml

# 修改啓動文件,去掉-d參數,避免後臺運行
sed -i 72's@-d -p $PID_FILE@-p $PID_FILE@g' /etc/init.d/elasticsearch

# 啓動elasticsearch,要hold住,不然容器啓動就退出了!
/etc/init.d/elasticsearch start
View Code

 

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main restricted
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main restricted
deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb http://mirrors.aliyun.com/ubuntu/ xenial multiverse
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates multiverse
deb http://mirrors.aliyun.com/ubuntu/ xenial-backports main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu xenial-security main restricted
deb http://mirrors.aliyun.com/ubuntu xenial-security universe
deb http://mirrors.aliyun.com/ubuntu xenial-security multiverse
View Code

 

生成鏡像

docker build -t elasticsearch-7.1.1 .

 

啓動容器

docker run -d -it --restart=always -p 9200:9200 elasticsearch-7.1.1

 

訪問頁面

 

 

kibana

新建目錄kibana

mkdir /opt/kibana-7.1.1

目錄結構以下:

./
├── dockerfile
├── kibana-7.1.1-amd64.deb
└── run.sh

 

dockerfile

FROM ubuntu:16.04
ADD kibana-7.1.1-amd64.deb ./
# 安裝jdk和elasticsearch
RUN dpkg -i kibana-7.1.1-amd64.deb && rm -rf kibana-7.1.1-amd64.deb
EXPOSE 5601
# 添加啓動腳本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]
View Code

 

run.sh

#!/bin/bash

# 添加時區
TZ=Asia/Shanghai
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

#elasticsearch="192.168.91.128"
if [ -z $elasticsearch ];then
    echo "elasticsearch參數爲空!好比: 192.168.91.128"
    exit
fi

# 修改配置文件
# 修改監聽地址
sed -i '7s@#server.host: "localhost"@server.host: "0.0.0.0"@g' /etc/kibana/kibana.yml
# 刪除行,並添加一行內容
sed -i '28d' /etc/kibana/kibana.yml
sed -i "N;28 i elasticsearch.hosts: ["http://$elasticsearch:9200"]" /etc/kibana/kibana.yml

# 啓動
/usr/share/kibana/bin/kibana "-c /etc/kibana/kibana.yml"
View Code

 

生成鏡像

docker build -t kibana-7.1.1 .

 

啓動鏡像

docker run -d -it --restart=always -p 5601:5601 -e elasticsearch=192.168.10.104 kibana-7.1.1

 

訪問頁面

 

2、查詢mysql數據

爲了方便操做 mysql,封裝了一個mysql工具類,用來查詢和更新數據。

mysql.py

#!/usr/bin/env python3
# coding: utf-8

import pymysql

class Mysql(object):
    # mysql 端口號,注意:必須是int類型
    def __init__(self,host,user,passwd,db_name,port=3306):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.db_name = db_name
        self.port = port

    def select(self,sql):
        """
        執行sql命令
        :param sql: 命令
        :return: 元祖
        """
        try:
            conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.passwd,
                port=self.port,
                database=self.db_name,
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor
            )
            cur = conn.cursor()  # 建立遊標
            cur.execute(sql)  # 執行sql命令
            res = cur.fetchall()  # 獲取執行的返回結果
            cur.close()
            conn.close()  # 關閉mysql 鏈接
            return res
        except Exception as e:
            print(e)
            return False

    def update(self,sql):
        """
        更新操做,好比insert, delete,update
        :param sql: sql命令
        :return: bool
        """
        try:
            conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.passwd,
                port=self.port,
                database=self.db_name,
            )
            cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 建立遊標
            # conn.cursor()
            # print("ip: {} insert 執行命令: {}".format(self.host,sql))
            sta = cur.execute(sql)  # 執行sql命令,返回影響的行數
            # print("sta",sta,type(sta))
            #res = cur.fetchall()  # 獲取執行的返回結果
            if isinstance(sta,int):  # 判斷返回結果, 是數字就是正常的
                #print('插入記錄 Done')
                pass
                # write_log('正常,遠程執行sql: %s 成功'%sql, "green")
            else:
                write_log('錯誤,遠程執行sql: %s 失敗'%sql, "red")
                return False

            conn.commit()  # 主動提交,不然執行sql不生效
            cur.close()
            conn.close()  # 關閉mysql 鏈接
            return sta
        except Exception as e:
            print(e)
            # write_log('錯誤,遠程mysql執行命令: {} 異常'.format(sql), "red")
            return False
View Code

 

使用時,就簡單了。導入這個類,調用相關方法。

mysql_test.py

from mysql import Mysql


host = "192.168.0.179"
user = "sdn_db"
passwd = "Sdn@ujmyhn"
db_name = "terminalservice"
port = 3306

sql = "select * from terminals_record_0 where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000"
res = Mysql(host,user,passwd,db_name,port).select(sql)
print(res)
View Code

 

 

3、完整代碼

因爲時間關係,代碼不一一解釋了。附上完整代碼:

./
├── conf.py
├── es_bulk.py
├── README.md
├── requirements.txt
└── utils
    ├── common.py
    └── mysql.py

 

conf.py

#!/usr/bin/env python3
# coding: utf-8
"""
配置文件,用於mysql和elasticsearch
"""
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # 項目根目錄

# mysql
HOST = "192.168.0.136"
USER = "root"
PASSWD = "123456"
DB_NAME = "terminal"
PORT = 3306

# elasticsearch
INDEX_NAME = "historic_records"
INDEX_TYPE = "_doc"
ES_IP = "192.169.3.133"

MAXIMUM = 100  # 一次性插入多少條
View Code

 

es_bulk.py

#!/usr/bin/env python3
# coding: utf-8

import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers

import conf
from utils.mysql import Mysql
from utils.common import write_log,valid_ip,check_tcp


class ElasticObj:
    def __init__(self,timeout=3600):
        '''
        :param timeout: 超時時間
        '''
        self.index_name = conf.INDEX_NAME  # 索引名稱
        self.index_type = conf.INDEX_TYPE  # 索引類型
        self.es_ip = conf.ES_IP  # es ip

        # 無用戶名密碼狀態
        self.es = Elasticsearch([self.es_ip], port=9200, timeout=timeout)
        # 用戶名密碼狀態
        # self.es = Elasticsearch([self.es_ip], http_auth=('esadm', 'mdase123'), port=9200, timeout=timeout)

    def create_index(self):
        '''
        建立索引
        :return: bool
        '''
        # 建立映射
        _index_mappings = {
            # 索引配置
            "settings": {
                "index": {
                    "number_of_shards": 3,  # 分片數
                    "number_of_replicas": 1  # 副本數
                }
            },
            # 設置字段
            "mappings": {
                "properties": {
                    "id": {"type": "long"},
                    "loid": {"type": "keyword"},
                    "mac": {"type": "keyword"},
                    "time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "create_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "update_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "online_status": {"type": "short"},
                    "status": {"type": "short"}
                }
            }
        }
        # 判斷索引不存在時
        if self.es.indices.exists(index=self.index_name) is not True:
            # 建立索引
            res = self.es.indices.create(index=self.index_name, body=_index_mappings)
            # print(res)
            if not res:
                write_log("錯誤,建立索引{}失敗".format(self.index_name),"red")
                return False

            write_log("正常,建立索引{}成功".format(self.index_name), "green")
            return True
        else:
            write_log("正常,索引{}已存在".format(self.index_name), "green")
            return True

    def bulk_insert(self,table,data_list):
        """
        批量寫入數據
        :param table: 表名
        :param data_list: 數據列表
        [
            {
                'online_status': 1,
                'update_time': 1556073035327,
                'create_time': 1556073035327,
                'id': 1, 'status': 1,
                'time': 1556073035327,
                'loid': '100010000123',
                'mac': '60:45:cb:87:c9:93'
            },
            ...
        ]
        :return: bool
        """
        # 批量插入
        start_time = time.time()  # 開始時間
        actions = []  # 臨時數據列表
        i = 0  # 計數值

        try:
            # 循環數據列表
            for data in data_list:
                action = {
                    "_index": self.index_name,
                    "_type": self.index_type,
                    #"_id": i,  #_id 也能夠默認生成,不賦值
                    "_source": {
                        'id': data['id'],
                        'user_id': data['user_id'],
                        'time': data['time'],
                        'create_time': data['create_time'],
                        'online_status': data['online_status'],
                        'status': data['status'],
                    }
                }
                i += 1
                actions.append(action)  # 添加到列表
                if len(action) == conf.MAXIMUM:  # 列表數量達到100時
                    helpers.bulk(self.es, actions)  # 批量插入數據
                    del actions[0:len(action)]  # 刪除列表元素

            if i > 0:  # 不足100時,插入剩餘數據
                helpers.bulk(self.es, actions)

            end_time = time.time()  # 結束時間
            t = round((end_time - start_time),2)  # 計算耗時
            # print('本次共寫入{}條數據,用時{}s'.format(i, t))
            write_log("正常,{} 表寫入ES {}條數據,用時{}s".format(table,i, t), "green")
            return True
        except Exception as e:
            print(e)
            return False

    def has_table(self,db_name,target_table):
        """
        遠程表是否存在
        :return: bool
        """
        mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
        sql = "select count(1) from {}.{}".format(db_name, target_table)
        res = mysql_obj.select(sql)
        # print("表是否存在",res,type(res))

        if res is False:
            write_log("錯誤,遠程表 {}.{} 不存在".format(db_name,target_table),"red")
            return False
        else:
            return True

    def has_conf(self):
        """
        判斷配置文件中的mysql和es 端口是否正常
        :return:
        """
        if not valid_ip(conf.HOST):
            write_log("錯誤,MySQL IP配置不正確","red")
            return False

        if not valid_ip(conf.ES_IP):
            write_log("錯誤,ES IP配置不正確","red")
            return False

        if not check_tcp(conf.HOST,conf.PORT):
            write_log("錯誤,MySQL {} 端口不可達".format(conf.PORT),"red")
            return False

        if not check_tcp(conf.ES_IP,9200):
            write_log("錯誤,ES 9200 端口不可達","red")
            return False

        return True

    def read_mysql_es(self):
        """
        讀取7天的記錄,並寫入es
        :return: bool
        """
        # 判斷配置文件中的mysql和es 端口是否正常
        if not self.has_conf():
            # print(1)
            return False

        # 建立索引
        if self.create_index() is False:
            # print(2)
            return False

        max = conf.MAXIMUM  # 一次性查詢多少條
        
        flag_list = []  # 標誌位列表
        mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
        for i in range(64):  # 寫入64張表
            # 判斷表是否存在
            res = self.has_table(conf.DB_NAME,'historic_record_%s'%i)
            if not res:
                flag_list.append(False)
                return False

            id = 0  # 每一次查詢後的最大id
            while True:
                # 查詢數據
                sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                i, id, max)
                # print(sql)
                data_list = mysql_obj.select(sql)
                # print(data_list)
                if not data_list:  # 當結果爲空時,結束循環
                    write_log("警告,執行sql: %s 記錄爲空,無需寫入es" %(sql), "yellow")
                    break  # 跳出循環

                last_row = data_list[-1]  # 最後一行記錄
                # print(last_row)
                id = last_row['id']  # 修改最大id

                res = self.bulk_insert('historic_record_%s' % i, data_list)
                if not res:
                    write_log("錯誤,historic_record_%s 寫入ES 失敗"%i,"red")
                    flag_list.append(False)
                    return False

        if False in flag_list:
            write_log("錯誤,historic_record 部分表寫入ES錯誤,請查看上文","red")
            return False

        write_log("正常,historic_record 64張表所有寫入ES成功", "green")
        return True

    def delete_record(self):
        """
        刪除7天的表數據
        :return: bool
        """
        max = conf.MAXIMUM  # 一次性查詢多少條
        flag_list = []
        mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
        for i in range(64):  # 64張表
            # 判斷表是否存在
            res = self.has_table(conf.DB_NAME, 'historic_record_%s' % i)
            if not res:
                flag_list.append(False)
                return False

            ### 先查詢數據
            id = 0  # 每一次查詢後的最大id
            while True:
                # 查詢數據
                sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                    i, id, max)
                # print(sql)
                data_list = mysql_obj.select(sql)
                # print(data_list)
                if not data_list:  # 當結果爲空時,結束循環
                    write_log("警告,執行sql: %s 記錄爲空,無需刪除" % sql, "yellow")
                    break  # 跳出循環

                ### 再刪除數據
                sql = "delete from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                    i, id, max)
                # print(sql)
                res = mysql_obj.update(sql)
                if res is False:
                    write_log("錯誤,刪除 historic_record_%s 記錄失敗" % i, "red")
                    flag_list.append(False)
                    break
                else:
                    write_log("正常,刪除 historic_record_%s 記錄成功" % i, "green")
                    
                last_row = data_list[-1]  # 最後一行記錄
                # print(last_row)
                id = last_row['id']  # 修改最大id


        if False in flag_list:
            write_log("錯誤,刪除 historic_record 部分表失敗,請查看上文", "red")
            return False

        write_log("正常,刪除 historic_record 64張表記錄所有成功", "green")

    def main(self):
        self.read_mysql_es()
        self.delete_record()

ElasticObj().main()  # 執行主程序
View Code

 

common.py

#!/usr/bin/env python3
# coding: utf-8
"""
共有的方法
"""

import sys
import io

def setup_io():  # 設置默認屏幕輸出爲utf-8編碼
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()


import os
import time
import conf
import socket
import subprocess
import ipaddress
from multiprocessing import cpu_count

def write_log(content,colour='white',skip=False):
    """
    寫入日誌文件
    :param content: 寫入內容
    :param colour: 顏色
    :param skip: 是否跳過打印時間
    :return:
    """
    # 顏色代碼
    colour_dict = {
        'red': 31,  # 紅色
        'green': 32,  # 綠色
        'yellow': 33,  # 黃色
        'blue': 34,  # 藍色
        'purple_red': 35,  # 紫紅色
        'bluish_blue': 36, # 淺藍色
        'white': 37,  # 白色
    }
    choice = colour_dict.get(colour)  # 選擇顏色


    path = os.path.join(conf.BASE_DIR,"output.log") # 日誌文件
    with open(path, mode='a+', encoding='utf-8') as f:
        if skip is False:  # 不跳過打印時間時
            content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content

        info = "\033[1;{};1m{}\033[0m".format(choice, content)
        print(info)
        f.write(content+"\n")
        
def execute_linux2(cmd, timeout=10, skip=False):
    """
    執行linux命令,返回list
    :param cmd: linux命令
    :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒
    :param skip: 是否跳過超時
    :return: list
    """
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
    # print(p)
    # timeout = 1  # 超時時間
    t_beginning = time.time()  # 開始時間
    # seconds_passed = 0  # 執行時間
    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if timeout and seconds_passed > timeout:
            p.terminate()
            # raise TimeoutError(cmd, timeout)
            if not skip:
                # self.res.code = 500
                # print('命令: {},執行超時!'.format(cmd))
                write_log('錯誤, 命令: {},本地執行超時!'.format(cmd),"red")
                # return self.res.__dict__
                return False
                # return '命令: {},執行超時!'.format(cmd)

    # result = p.stdout.read().decode('utf-8').strip()  # 命令運行結果
    # print("result",result)
    # self.res.data = result
    # return self.res.__dict__
    result = p.stdout.readlines()
    return result

def valid_ip(ip):
    """
    驗證ip是否有效,好比192.168.1.256是一個不存在的ip
    :return: bool
    """
    try:
        # 判斷 python 版本
        if sys.version_info[0] == 2:
            ipaddress.ip_address(ip.strip().decode("utf-8"))
        elif sys.version_info[0] == 3:
            # ipaddress.ip_address(bytes(ip.strip().encode("utf-8")))
            ipaddress.ip_address(ip)

        return True
    except Exception as e:
        print(e)
        return False

def check_tcp(ip, port, timeout=1):
    """
    檢測tcp端口
    :param ip: ip地址
    :param port: 端口號
    :param timeout: 超時時間
    :return: bool
    """
    flag = False
    try:
        socket.setdefaulttimeout(timeout)  # 整個socket層設置超時時間
        cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = (str(ip), int(port))
        status = cs.connect_ex((address))  # 開始鏈接
        cs.settimeout(timeout)

        if not status:
            flag = True

        return flag
    except Exception as e:
        print(e)
        return flag

COROUTINE_NUMBER = cpu_count()  # 協程池數量,根據cpu核心數來開,避免cpu飆高
View Code

 

mysql.py

#!/usr/bin/env python3
# coding: utf-8

import pymysql
from utils.common import write_log

class Mysql(object):
    # mysql 端口號,注意:必須是int類型
    def __init__(self,host,user,passwd,db_name,port=3306):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.db_name = db_name
        self.port = port

    def select(self,sql):
        """
        執行sql命令
        :param sql: 命令
        :return: 元祖
        """
        try:
            # print(host,self.user,self.passwd,self.port,self.db_name)
            conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.passwd,
                port=self.port,
                database=self.db_name,
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor
            )
            cur = conn.cursor()  # 建立遊標
            # conn.cursor()
            cur.execute(sql)  # 執行sql命令
            res = cur.fetchall()  # 獲取執行的返回結果
            cur.close()
            conn.close()  # 關閉mysql 鏈接
            return res
        except Exception as e:
            print(e)
            return False

    def update(self,sql):
        """
        更新操做,好比insert, delete,update
        :param sql: sql命令
        :return: bool
        """
        try:
            conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.passwd,
                port=self.port,
                database=self.db_name,
            )
            cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 建立遊標
            # conn.cursor()
            # print("ip: {} insert 執行命令: {}".format(self.host,sql))
            sta = cur.execute(sql)  # 執行sql命令,返回影響的行數
            # print("sta",sta,type(sta))
            #res = cur.fetchall()  # 獲取執行的返回結果
            if isinstance(sta,int):  # 判斷返回結果, 是數字就是正常的
                #print('插入記錄 Done')
                pass
                # write_log('正常,遠程執行sql: %s 成功'%sql, "green")
            else:
                write_log('錯誤,遠程執行sql: %s 失敗'%sql, "red")
                return False

            conn.commit()  # 主動提交,不然執行sql不生效
            cur.close()
            conn.close()  # 關閉mysql 鏈接
            #Migration.flag_list.append(True)
            return sta
        except Exception as e:
            print(e)
            # write_log('錯誤,遠程mysql執行命令: {} 異常'.format(sql), "red")
            # Migration.flag_list.append(False)
            return False
View Code

 

requirements.txt

PyMySQL==0.9.2
elasticsearch==6.3.1

 

README.md

## 說明
終端歷史記錄表,寫入到elasticsearch中。

主要將(terminal.historic_record_0~63) 這64張表的7天前數據寫入到elasticsearch中

並刪除 64張表的7天前記錄

`注意: 本環境使用 elasticsearch 7.0版本開發,切勿低於此版本`


## 配置說明
`conf.py` 是環境配置

主要修改 如下信息
```python
# mysql
HOST = "192.168.0.136"
USER = "root"
PASSWD = "123456"
DB_NAME = "terminal"
PORT = 3306

# elasticsearch
INDEX_NAME = "historic_record"
INDEX_TYPE = "_doc"
ES_IP = "192.169.3.133"
```

請根據實際狀況修改以上變量

## 運行說明
## 一鍵執行,遷移相關全部表
`python es_bulk.py`

## 查看結果
結果會輸出到`output.log`文件,直接查看便可!

登陸到`kibana`,查看數據是否存在

<br/>
<br/>
Copyright (c) 2019-present, xiao You
View Code

 

本文參考連接:

http://www.javashuo.com/article/p-fhilhfqj-cn.html

https://blog.csdn.net/m0_37673307/article/details/81153700

相關文章
相關標籤/搜索