ELASTIC的備份與恢復

前言

elasticsearch官方並無提供合適的備份工具,然而生產場景中備份倒是的確須要的。php

本文介紹了使用本身寫的php腳本以及第三方工具來進行索引的備份,恢復以及刪除等操做。html

全量備份

 

elasticdump --input  http://127.0.0.1:9200/logstash-postback --output  logstash-postback_2017.01.17.json --limit 1000
 
elasticdump安裝方法在下面

恢復

命令例子
elasticdump --input logstash-postback_2017.01.14.json --output http://127.0.0.1:9200/logstash-postback --limit 1000
詳解
elasticdump命令須要單獨安裝。
--input 是指定輸入,可指定文件,或須要備份的es集羣。
--output是指定輸出,可指定文件,或須要備份的es集羣。
logstash-postback_2017.01.14.json  是指定的已備份的文件。
 http://127.0.0.1:9200/logstash-postback  是指定的要恢復數據的elastictic集羣,IP加端口加索引名。注意,索引名是不帶時間的。
--limit 1000 一次導入一千條數據,加快進度。
 
elasticdump命令安裝
yum install npm
npm install elasticdump -g
命令安裝完畢,能夠測試。
可能會報出nodejs的版本之類的錯誤,你須要升級一下版本。
npm install -g n
n stable
至此能夠使用。
 

刪除索引

 php delete.php --index http://127.0.0.1:9200/<index> --start 2017-01-01 --end 2017-01-02  
--start 選擇刪除這個索引中指定日期的內容
--end 不傳則默認刪除一天,也就是start的那天
<index> 要刪除的索引名
 
cat delete.php
 
<?php
date_default_timezone_set('Asia/Shanghai');

$longopts  = array(
    'index:',
    'query:',
    'start:',
    'end:',
);
$options = getopt('a', $longopts);

if(!isset($options['index'])) {
        fwrite(STDERR, "index必須設置索引地址\n");
        exit(1);
}

$components = parse_url($options['index']);
if(!isset($components['path'])) {
        fwrite(STDERR, 'index不可爲空');
        exit(1);
}

$host  = "{$components['scheme']}://{$components['host']}:{$components['port']}";
$index = basename($components['path']);

$query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}';
if(isset($options['start'])) {
        $start_time = strtotime($options['start']);
        $start = date('Y-m-d', $start_time).'T00:00:00+0800';
        if(isset($options['end'])) {
                $end_time = strtotime($options['end']);
                $end = date('Y-m-d', $end_time).'T00:00:00+0800';
        } else {
                $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800';
        }

        $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date';
        $query = '{"size":1000,"_source":false,"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}';
}

$scroll_id = null;
$retry     = 0;
$num       = 0;
while(true) {
        if(is_null($scroll_id)) {
                $result = post("{$host}/{$index}/_search?scroll=2m", $query);
        } else {
                $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
        }

        $json = json_decode($result, true);
        if(!isset($json['_scroll_id'])) {
                fwrite(STDERR, "查詢失敗:索引-{$index} 起始-{$start}  截止-{$end}\n");
                sleep(5);
                $retry++;
                if($retry>10) {
                        exit(4);
                }
        }

        $scroll_id = $json['_scroll_id'];
        $bulk = [];
        foreach($json['hits']['hits'] as $row) {
                unset($row['_score']);
                $bulk[] = json_encode(['delete'=>$row]);
                $num++;
        }

        if(count($json['hits']['hits'])==0)
        {
                break;
        }

        $check = post("{$host}/_bulk", implode("\n", $bulk)."\n");
        fwrite(STDOUT, "{$num}\n");
        usleep(100000);
}

echo "deleted:{$num}\n";

function get($url)
{
        $handle = curl_init();
        //curl_setopt($handle, CURLOPT_POST, 1);
        curl_setopt($handle, CURLOPT_HEADER, 0);
        curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handle, CURLOPT_URL, $url);
        //curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
        return curl_exec($handle);
}

function post($url, $data)
{
        $handle = curl_init();
        curl_setopt($handle, CURLOPT_POST, 1);
        curl_setopt($handle, CURLOPT_HEADER, 0);
        curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handle, CURLOPT_URL, $url);
        curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
        return curl_exec($handle);
}

 

 
 

索引增量備份

elasticdump增量備份有BUG,無規律的丟數據。
因而咱們本身寫了腳原本進行增量備份。
備份腳本分爲兩個部分,PHP腳本溝通elasticsearch來進行數據讀取與備份。SHELL腳原本寫做傳參與PHP腳本
 
SHELL腳本以下
 
#!/bin/bash

#若是有參數,則第一個參數爲備份日期,不然默認備份昨天數據
if [ -z "$1" ];then
        start=$(date +%Y-%m-%d --date '-1 day 00:00:00')
        end=$(date +%Y-%m-%d --date 'today 00:00:00')
else
        start=$(date +%Y-%m-%d --date $1)
        end=$(date +%Y-%m-%d --date "$1 +1 day 00:00:00")
fi

#若是是每個月1號則加一天,解決時區致使的數據跨索引問題
if [ $(date +%d --date $end) -eq "01" ]
then
        end=$(date +%Y-%m-%d --date "$end +1 day 00:00:00")
fi

php esdump.php --input=http://127.0.0.1:9200/logstash-event-$(date +%Y.%m --date $start) --start $start --end $end  2>> backup_error_$start.log | gzip > event/logstash-event-$(date +%Y.%m_%d --date $start).json.gz  2>> backup_error_$start.log &

 

 
注:代碼第18行的logstash-event替換爲你要備份的索引名
 
PHP腳本以下(無需修改)
<?php
date_default_timezone_set('Asia/Shanghai');

$longopts  = array(
    'input:',
    'output:',
    'query:',
    'start:',
    'end:',
);
$options = getopt('a', $longopts);

if(!isset($options['input'])) {
    fwrite(STDERR, "input必須設置索引地址\n");
    exit(1);
}

$check = get($options['input'].'/_count');
if($check===false) {
    fwrite(STDERR, "input索引地址無效:{$options['input']}\n");
    exit(2);
}

$check = json_decode($check, true);
if(!isset($check['count'])) {
    fwrite(STDERR, "input索引地址無效:{$options['input']}\n");
    exit(3);
}

$components = parse_url($options['input']);
$host  = "{$components['scheme']}://{$components['host']}:{$components['port']}";
$index = basename($components['path']);

$query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}';
if(isset($options['start'])) {
    $start_time = strtotime($options['start']);
    $start = date('Y-m-d', $start_time).'T00:00:00+0800';
    if(isset($options['end'])) {
        $end_time = strtotime($options['end']);
        $end = date('Y-m-d', $end_time).'T00:00:00+0800';
    } else {
        $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800';
    }

    $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date';
    $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}';

    if(strpos($index, 'eventlogs')!==false) {
        $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"bool":{'.
            '"must":[{"range":{"date":{"gte":"'.$start.'","lte":"'.$end.'"}}}],'.
                        '"must_not":[{"exists": {"field":"nsp3hq"}},{"exists": {"field":"q0i8u1"}},{"exists": {"field":"eyn916"}},{"exists": {"field":"20mqd8"}},'.
                        '{"exists": {"field":"wwbkux"}},{"exists": {"field":"r5ua96"}},{"exists": {"field":"easiz"}},{"exists": {"field":"dexusu"}},{"exists": {"field":"earts"}},'.
                        '{"exists": {"field":"ealu"}},{"exists": {"field":"ealf"}},{"exists": {"field":"eal"}},{"exists": {"field":"ears"}},{"exists": {"field":"ealuf"}},'.
                        '{"exists": {"field":"ealus"}},{"exists": {"field":"eaatf"}},{"exists": {"field":"enail"}},{"exists": {"field":"enuail"}},{"exists": {"field":"test"}}]'.
                        '}}}}}';
         }
}

$scroll_id = null;
$retry     = 0;
$num       = 0;
while(true) {
    if(is_null($scroll_id)) {
        $result = post("{$host}/{$index}/_search?scroll=2m", $query);
    } else {
        $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}");
    }

    $json = json_decode($result, true);
    if(!isset($json['_scroll_id'])) {
        fwrite(STDERR, "查詢失敗:索引-{$index} 起始-{$start}  截止-{$end}\n");
        sleep(5);
        $retry++;
        if($retry>10) {
            exit(4);
        }
    }

    $scroll_id = $json['_scroll_id'];

    foreach($json['hits']['hits'] as $row) {
        fwrite(STDOUT, json_encode($row)."\n");
        $num++;
    }

    if(count($json['hits']['hits'])==0)
    {
        break;
    }

    usleep(100000);
}

//校驗條數是否一致
$query = json_decode($query, true);
unset($query['size'], $query['sort']);
$result = post("{$host}/{$index}/_count", json_encode($query));
$json = json_decode($result, true);
if(!isset($json['count']) or intval($json['count'])!==$num) {
    fwrite(STDERR, "校驗失敗:索引-{$index} 起始-{$start} 截止-{$end} 記錄條數-{$json['count']} 導出條數-{$num}\n");
}

function get($url)
{
    $handle = curl_init();
    //curl_setopt($handle, CURLOPT_POST, 1);
    curl_setopt($handle, CURLOPT_HEADER, 0);
    curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($handle, CURLOPT_URL, $url);
    //curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
    return curl_exec($handle);
}

function post($url, $data)
{
    $handle = curl_init();
    curl_setopt($handle, CURLOPT_POST, 1);
    curl_setopt($handle, CURLOPT_HEADER, 0);
    curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($handle, CURLOPT_URL, $url);
    curl_setopt($handle, CURLOPT_POSTFIELDS, $data);
    return curl_exec($handle);
}

備份時執行shell腳本便可備份昨天的增量數據node

謝土豪

若是有幫到你的話,請讚揚我吧!git

本文爲kerwin原創,轉載請註明出處。github

 http://www.cnblogs.com/kerwinC/p/6296675.htmlshell

相關文章
相關標籤/搜索