存儲引擎天然是隻支持select insert 沒有索引的archive
。若是確實有update需求,也能夠採用myISAM。主鍵採用bigint,自增便可
。以寫爲主,統計採用離線計算,字段均不要出現索引
,由於一方面可能會影響插入數據效率,另外讀時候會形成死鎖,影響寫數據。/** * 使用隊列生成reids測試數據 * 成功:執行 RPUSH操做後,返回列表的長度:8 */ public function createRedisList($listKey = 'message01') { $redis = RedisInstance::MasterInstance(); $redis->select(1); $message = [ 'type' => 'say', 'userId' => $redis->incr('user_id'), 'userName' => 'Tinywan' . mt_rand(100, 9999), //是否正在錄像 'userImage' => '/res/pub/user-default-w.png', //是否正在錄像 'openId' => 'openId' . mt_rand(100000, 9999999999999999), 'roomId' => 'openId' . mt_rand(30, 50), 'createTime' => date('Y-m-d H:i:s', time()), 'content' => $redis->incr('content') //當前是否正在打流狀態 ]; $rPushResul = $redis->rPush($listKey, json_encode($message)); //執行成功後返回當前列表的長度 9 return $rPushResul; }
第一種思路:php
/** * 消息Redis方法保存到Mysql數據庫 * @param string $liveKey */ public function RedisSaveToMysql($listKey = 'message01') { if (empty($listKey)) { $result = ["errcode" => 500, "errmsg" => "this parameter is empty!"]; exit(json_encode($result)); } $redis = RedisInstance::MasterInstance(); $redis->select(1); $redisInfo = $redis->lRange($listKey, 0, 5); $dataLength = $redis->lLen($listKey); $model = M("User"); while ($dataLength > 65970) { try { $model->startTrans(); $redis->watch($listKey); $arrList = []; foreach ($redisInfo as $key => $val) { $arrList[] = array( 'username' => json_decode($val, true)['userName'], 'logintime' => json_decode($val, true)['createTime'], 'description' => json_decode($val, true)['content'], 'pido' => json_decode($val, true)['content'] ); } $insertResult = $model->addAll($arrList); if (!$insertResult) { $model->rollback(); $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!", 'data' => 'dataLength:' . $dataLength); exit(json_encode($result)); } $model->commit(); $redis->lTrim($listKey, 6, -1); $redisInfo = $redis->lRange($listKey, 0, 5); $dataLength = $redis->lLen($listKey); } catch (Exception $e) { $model->rollback(); $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!"); exit(json_encode($result)); } } $result = array("errcode" => 200, "errmsg" => "Data Insert into Success!", 'data' => 'dataLength:' . $dataLength . 'liveKey:' . $listKey); exit(json_encode($result)); }
第二種思路(供參考,非框架) mysql
<?php $redis_xx = new Redis(); $redis_xx->connect('ip', port); $redis_xx->auth("password"); // 獲取現有消息隊列的長度 $count = 0; $max = $redis_xx->lLen("call_log"); // 獲取消息隊列的內容,拼接sql $insert_sql = "insert into fb_call_log (`interface_name`, `createtime`) values "; // 回滾數組 $roll_back_arr = array(); while ($count < $max) { $log_info = $redis_cq01->lPop("call_log"); $roll_back_arr = $log_info; if ($log_info == 'nil' || !isset($log_info)) { $insert_sql .= ";"; break; } // 切割出時間和info $log_info_arr = explode("%", $log_info); $insert_sql .= " ('" . $log_info_arr[0] . "','" . $log_info_arr[1] . "'),"; $count++; } // 斷定存在數據,批量入庫 if ($count != 0) { $link_2004 = mysql_connect('ip:port', 'user', 'password'); if (!$link_2004) { die("Could not connect:" . mysql_error()); } $crowd_db = mysql_select_db('fb_log', $link_2004); $insert_sql = rtrim($insert_sql, ",") . ";"; $res = mysql_query($insert_sql); // 輸出入庫log和入庫結果; echo date("Y-m-d H:i:s") . "insert " . $count . " log info result:"; echo json_encode($res); echo "</br>\n"; // 數據庫插入失敗回滾 if (!$res) { foreach ($roll_back_arr as $k) { $redis_xx->rPush("call_log", $k); } } // 釋放鏈接 mysql_free_result($res); mysql_close($link_2004); } $redis_cq01->close(); ?>
/** * [0]檢查當前Redis是否鏈接成功 * [1]獲取數據,首先從Redis中去獲取,沒有的話再從數據庫中去獲取 */ public function findDataRedisOrMysql($listKey = 'message01') { //Check the current connection status 查看服務是否運行 if (RedisInstance::MasterInstance() != false) { $redis = RedisInstance::MasterInstance(); $redis->select(2); /** * 首先從Redis中去獲取數據 * lRange 獲取爲空的話,則表示沒有數據,不然返回一個非空數組 */ $redisData = $redis->lRange($listKey, 0, 9); $resultData = []; if (!empty($redisData)) { $resultData['status_code'] = 200; $resultData['msg'] = 'Data Source from Redis Cache'; foreach ($redisData as $key => $val) { $resultData['listData'][] = json_decode($val, true); } } else { $resultData['redis_msg'] = 'Redis is Expire'; $conditions = array('status' => ':status'); $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select(); if ($mysqlData) { $resultData['status_code'] = 200; $resultData['mysql_msg'] = 'Data Source from Mysql is Success'; $redis->select(2); foreach ($mysqlData as $key => $val) { $resultData['listData'][] = $val; //寫入Redis做爲緩存 $redis->rPush($listKey, json_encode($val)); } //同時設置一個過時時間 $redis->expire($listKey,30); } else { $resultData['status_code'] = 500; $resultData['mysql_msg'] = 'Data Source from Mysql is Fail'; } } } else { $resultData['redis_msg'] = 'Redis server went away'; $resultData['mysql_msg'] = 'Mysql Data2'; $conditions = array('status' => ':status'); $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select(); foreach ($mysqlData as $key => $val) { $resultData['listData'][] = $val; } } homePrint($resultData); }
<?php /** * static log :天天離線統計代碼日誌和刪除五天前的日誌 * */ // 離線統計 $link_2004 = mysql_connect('ip:port', 'user', 'pwd'); if (!$link_2004) { die("Could not connect:" . mysql_error()); } $crowd_db = mysql_select_db('fb_log', $link_2004); // 統計昨天的數據 $day_time = date("Y-m-d", time() - 60 * 60 * 24 * 1); $static_sql = "get sql"; $res = mysql_query($static_sql, $link_2004); // 獲取結果入庫略 // 清理15天以前的數據 $before_15_day = date("Y-m-d", time() - 60 * 60 * 24 * 15); $delete_sql = "delete from xxx where createtime < '" . $before_15_day . "'"; try { $res = mysql_query($delete_sql); }catch(Exception $e){ echo json_encode($e)."\n"; echo "delete result:".json_encode($res)."\n"; } mysql_close($link_2004); ?>
主要是部署,批量入庫腳本的調用和天級統計腳本,crontab例行運行。git
# 批量入庫腳本 */2 * * * * /home/cuihuan/xxx/lamp/php5/bin/php /home/cuihuan/xxx/batchLog.php >>/home/cuihuan/xxx/batchlog.log # 天級統計腳本 0 5 * * * /home/cuihuan/xxx/php5/bin/php /home/cuihuan/xxx/staticLog.php >>/home/cuihuan/xxx/staticLog.log
總結:相對於其餘複雜的方式處理高併發,這個解決方案簡單有效:經過redis緩存抗壓,mysql批量入庫解決數據庫瓶頸,離線計算解決統計數據,經過按期清理保證庫的大小。github