文件實時對比,將數據組裝入庫(SQLITE)

這個是本身以前實現的一個功能,主要給本身用。由於這部分邏輯,要改了。可是以爲挺惋惜的。先在播客中留下,後續在整理。(框架:php laravel)php

<?php
namespace App\Services;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;

class LogService {
private $log_path = '';

private $last_read_path = '';

public function __construct() {
$this->log_path = base_path(config('log_record_path'));
$this->last_read_path = base_path(config('log_last_record_path'));
}

/*
定義單獨調用的方法
用於每次PHP層寫入LOG文件,以及後臺寫入時實時推動入庫
*/
public function pushDatabase() {
//獲取出當前日誌更新的行數
$lines = $this->getLastLine();
//取出最新更新的日誌文件並整理好關係。存入數據庫
$this->readFileByLine($this->log_path,$lines);
}

public function readFileByLine($filename,$lines) {
$offset = 1;
$line_stream = array();
$cur_last_str = '';
if(file_exists($filename)) {
$fp = fopen($filename, "r");
while(!feof($fp)){
$stream = fgets($fp); //可能須要指定字節數,目前少不須要。默認是1024
if($lines >= $offset) {
$offset++;
continue;
}
//$line_stream .= rtrim($stream,"\r\n") . ' ';
//剔除換行符
$cur_stream = rtrim($stream);
if(!empty($cur_stream)) {
$line_stream[] = json_decode($cur_stream,true);
}
}
fclose($fp);
if(!empty($line_stream)) {
// 記錄最後一條
$cur_last_str = json_encode($line_stream[count($line_stream) -1]);
//整合
$relate_array = array();
foreach($line_stream as $key => &$val) {
$mark = count($val) > 6 ? 'web_log' : 'response';
$relate_array[$val[0]][$mark] = $val;
}
//入庫準備
$save_rel_array = array();
$save_rel_multiple = array();
foreach($relate_array as $key => $val) {
//有返回的日誌直接整合入庫
if(count($relate_array[$key]) == 2 || (count($relate_array[$key]) == 1 && isset($val['web_log']))) {
$save_rel_array['uuid'] = $key;
$save_rel_array['log_desc'] = $val['web_log'][0];
$save_rel_array['log_dec_code'] = $val['web_log'][1];
$save_rel_array['host_name'] = $val['web_log'][2];
$save_rel_array['host_uid'] = $val['web_log'][3];
$save_rel_array['operation_type'] = $val['web_log'][4];
$save_rel_array['operator_ip'] = $val['web_log'][5];
$save_rel_array['operator_name'] = $val['web_log'][6];
$save_rel_array['operator_uuid'] = $val['web_log'][7];
$save_rel_array['progress'] = $val['web_log'][8];
$save_rel_array['start_datetime'] = $val['web_log'][9];
//todo 目前只知道更新狀態
if(count($val) == 2) {
$save_rel_array['status'] = isset($val['response'][5]) ? (int)$val['response'][5] : (int)$val['web_log'][10];
}
$save_rel_array['target_type'] = $val['web_log'][11];
$save_rel_array['target_name'] = $val['web_log'][12];
$save_rel_array['target_uid'] = $val['web_log'][13];
array_push($save_rel_multiple,$save_rel_array);
} else {
//執行更新操做
DB::table('log')->where('uuid', $key)->update(['status' => $val['response'][5]]);
}
}
//批量插入(須要作分批,目前本地測試是50條+就炸了,因此分批30)
$part_array = array_chunk($save_rel_multiple,30);
DB::beginTransaction();
try {
foreach($part_array as $val) {
$result = DB::table('log')->insert($val);
}
DB::commit();
//沒有入庫成功不記錄
$this->recordLastLine($cur_last_str);
} catch (\Illuminate\Database\QueryException $e){
DB::rollback();
throw $e;
Log::info('push database error: '. dd($val));
}
}
}
}

public function recordLastLine($cur_last_str = '') {
//記錄最後讀取的文件
$fp = fopen($this->last_read_path,'w');
if($fp !== false) {
$fw = fwrite($fp, $cur_last_str);
if($fw === false) {
//寫入失敗,能夠記錄失敗
Log::info('record last line failed: '.$cur_last_str);
}
}
fclose($fp);
}
 
public function getLastLine() {
$last_line = 0;
$last_record = '';
$log_array = array();
if(file_exists($this->log_path)) {
$log_array = file($this->log_path);
}
if(is_array($log_array) && !empty($log_array)) {
if(file_exists($this->last_read_path)) {
$last_record = file_get_contents($this->last_read_path);
}
foreach($log_array as $key => $val) {
if(rtrim($val) == rtrim($last_record)) {
$last_line = (int)($key + 1); //array 從0開始
}
}
// $last_line = !array_search($last_record,$log_array) ? 0 : array_search($last_record,$log_array) + 1; //array 從0開始
}
return $last_line;
}

public function writeLog($logContent) {
if (lock_file($this->log_path)) {
file_put_contents($this->log_path, $logContent . PHP_EOL, FILE_APPEND);
unlock_file($this->log_path);
return true;
} else {
return false;
}
}
}
相關文章
相關標籤/搜索