/*
HTTP Simple Queue Service - httpsqs v1.7
Author: Zhang Yan (http://blog.s135.com), E-mail: net@s135.com
This is free software, and you are welcome to modify and redistribute it under the New BSD License
*/
#include <sys/types.h>
#include <sys/time.h>
#include <sys/queue.h>
#include <sys/types.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <net/if.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <time.h>
#include <sys/ioctl.h>
#include <errno.h>
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <pthread.h>
#include <err.h>
#include <event.h>
#include <evhttp.h>
#include <tcbdb.h>
#include "prename.h"
#define VERSION "1.7"
/* 每一個隊列的默認最大長度爲100萬條 */
#define HTTPSQS_DEFAULT_MAXQUEUE 1000000
/* 全局設置 */
TCBDB *httpsqs_db_tcbdb; /* 數據表 */
int httpsqs_settings_syncinterval; /* 同步更新內容到磁盤的間隔時間 */
char *httpsqs_settings_pidfile; /* PID文件 */
char *httpsqs_settings_auth; /* 驗證密碼 */
/* 建立多層目錄的函數 */
void create_multilayer_dir( char *muldir )
{
int i,len;
char str[512];
strncpy( str, muldir, 512 );
len=strlen(str);
for( i=0; i<len; i++ )
{
if( str[i]=='/' )
{
str[i] = '\0';
//判斷此目錄是否存在,不存在則建立
if( access(str, F_OK)!=0 )
{
mkdir( str, 0777 );
}
str[i]='/';
}
}
if( len>0 && access(str, F_OK)!=0 )
{
mkdir( str, 0777 );
}
return;
}
char *urldecode(char *input_str)
{
int len = strlen(input_str);
char *str = strdup(input_str);
char *dest = str;
char *data = str;
int value;
int c;
while (len--) {
//if (*data == '+') {
// *dest = ' ';
//}
//else if (*data == '%' && len >= 2 && isxdigit((int) *(data + 1))
if (*data == '%' && len >= 2 && isxdigit((int) *(data + 1)) && isxdigit((int) *(data + 2)))
{
c = ((unsigned char *)(data+1))[0];
if (isupper(c))
c = tolower(c);
value = (c >= '0' && c <= '9' ? c - '0' : c - 'a' + 10) * 16;
c = ((unsigned char *)(data+1))[1];
if (isupper(c))
c = tolower(c);
value += c >= '0' && c <= '9' ? c - '0' : c - 'a' + 10;
*dest = (char)value ;
data += 2;
len -= 2;
} else {
*dest = *data;
}
data++;
dest++;
}
*dest = '\0';
return str;
}
/* 讀取隊列寫入點的值 */
static int httpsqs_read_putpos(const char* httpsqs_input_name)
{
int queue_value = 0;
char *queue_value_tmp;
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%s", httpsqs_input_name, "putpos");
queue_value_tmp = tcbdbget2(httpsqs_db_tcbdb, queue_name);
if(queue_value_tmp){
queue_value = atoi(queue_value_tmp);
free(queue_value_tmp);
}
return queue_value;
}
/* 讀取隊列讀取點的值 */
static int httpsqs_read_getpos(const char* httpsqs_input_name)
{
int queue_value = 0;
char *queue_value_tmp;
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%s", httpsqs_input_name, "getpos");
queue_value_tmp = tcbdbget2(httpsqs_db_tcbdb, queue_name);
if(queue_value_tmp){
queue_value = atoi(queue_value_tmp);
free(queue_value_tmp);
}
return queue_value;
}
/* 讀取用於設置的最大隊列數 */
static int httpsqs_read_maxqueue(const char* httpsqs_input_name)
{
int queue_value = 0;
char *queue_value_tmp;
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%s", httpsqs_input_name, "maxqueue");
queue_value_tmp = tcbdbget2(httpsqs_db_tcbdb, queue_name);
if(queue_value_tmp){
queue_value = atoi(queue_value_tmp);
free(queue_value_tmp);
} else {
queue_value = HTTPSQS_DEFAULT_MAXQUEUE; /* 默認隊列長度 */
}
return queue_value;
}
/* 設置最大的隊列數量,返回值爲設置的隊列數量。若是返回值爲0,則表示設置取消(取消緣由爲:設置的最大的隊列數量小於」當前隊列寫入位置點「和」當前隊列讀取位置點「,或者」當前隊列寫入位置點「小於」當前隊列的讀取位置點) */
static int httpsqs_maxqueue(const char* httpsqs_input_name, int httpsqs_input_num)
{
int queue_put_value = 0;
int queue_get_value = 0;
int queue_maxnum_int = 0;
/* 讀取當前隊列寫入位置點 */
queue_put_value = httpsqs_read_putpos(httpsqs_input_name);
/* 讀取當前隊列讀取位置點 */
queue_get_value = httpsqs_read_getpos(httpsqs_input_name);
/* 設置最大的隊列數量,最小值爲10條,最大值爲10億條 */
queue_maxnum_int = httpsqs_input_num;
/* 設置的最大的隊列數量必須大於等於」當前隊列寫入位置點「和」當前隊列讀取位置點「,而且」當前隊列寫入位置點「必須大於等於」當前隊列讀取位置點「 */
if (queue_maxnum_int >= queue_put_value && queue_maxnum_int >= queue_get_value && queue_put_value >= queue_get_value) {
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
char queue_maxnum[16] = {0};
sprintf(queue_name, "%s:%s", httpsqs_input_name, "maxqueue");
sprintf(queue_maxnum, "%d", queue_maxnum_int);
tcbdbput2(httpsqs_db_tcbdb, queue_name, queue_maxnum);
tcbdbsync(httpsqs_db_tcbdb); /* 實時刷新到磁盤 */
return queue_maxnum_int;
}
return 0;
}
/* 重置隊列,0表示重置成功 */
static int httpsqs_reset(const char* httpsqs_input_name)
{
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%s", httpsqs_input_name, "putpos");
tcbdbout2(httpsqs_db_tcbdb, queue_name);
memset(queue_name, '\0', 300);
sprintf(queue_name, "%s:%s", httpsqs_input_name, "getpos");
tcbdbout2(httpsqs_db_tcbdb, queue_name);
memset(queue_name, '\0', 300);
sprintf(queue_name, "%s:%s", httpsqs_input_name, "maxqueue");
tcbdbout2(httpsqs_db_tcbdb, queue_name);
tcbdbsync(httpsqs_db_tcbdb); /* 實時刷新到磁盤 */
return 0;
}
/* 查看單條隊列內容 */
char *httpsqs_view(const char* httpsqs_input_name, int pos)
{
char *queue_value;
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%d", httpsqs_input_name, pos);
queue_value = tcbdbget2(httpsqs_db_tcbdb, queue_name);
return queue_value;
}
/* 修改定時更新內存內容到磁盤的間隔時間,返回間隔時間(秒) */
static int httpsqs_synctime(int httpsqs_input_num)
{
if (httpsqs_input_num >= 1) {
httpsqs_settings_syncinterval = httpsqs_input_num;
}
return httpsqs_settings_syncinterval;
}
/* 獲取本次「入隊列」操做的隊列寫入點 */
static int httpsqs_now_putpos(const char* httpsqs_input_name)
{
int maxqueue_num = 0;
int queue_put_value = 0;
int queue_get_value = 0;
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
char queue_input[32] = {0};
/* 獲取最大隊列數量 */
maxqueue_num = httpsqs_read_maxqueue(httpsqs_input_name);
/* 讀取當前隊列寫入位置點 */
queue_put_value = httpsqs_read_putpos(httpsqs_input_name);
/* 讀取當前隊列讀取位置點 */
queue_get_value = httpsqs_read_getpos(httpsqs_input_name);
sprintf(queue_name, "%s:%s", httpsqs_input_name, "putpos");
/* 隊列寫入位置點加1 */
queue_put_value = queue_put_value + 1;
if (queue_put_value == queue_get_value) { /* 若是隊列寫入ID+1以後追上隊列讀取ID,則說明隊列已滿,返回0,拒絕繼續寫入 */
queue_put_value = 0;
}
else if (queue_get_value <= 1 && queue_put_value > maxqueue_num) { /* 若是隊列寫入ID大於最大隊列數量,而且從未進行過出隊列操做(=0)或進行過1次出隊列操做(=1),返回0,拒絕繼續寫入 */
queue_put_value = 0;
}
else if (queue_put_value > maxqueue_num) { /* 若是隊列寫入ID大於最大隊列數量,則重置隊列寫入位置點的值爲1 */
if(tcbdbput2(httpsqs_db_tcbdb, queue_name, "1")) {
queue_put_value = 1;
}
} else { /* 隊列寫入位置點加1後的值,回寫入數據庫 */
sprintf(queue_input, "%d", queue_put_value);
tcbdbput2(httpsqs_db_tcbdb, queue_name, (char *)queue_input);
}
return queue_put_value;
}
/* 獲取本次「出隊列」操做的隊列讀取點,返回值爲0時隊列所有讀取完成 */
static int httpsqs_now_getpos(const char* httpsqs_input_name)
{
int maxqueue_num = 0;
int queue_put_value = 0;
int queue_get_value = 0;
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
/* 獲取最大隊列數量 */
maxqueue_num = httpsqs_read_maxqueue(httpsqs_input_name);
/* 讀取當前隊列寫入位置點 */
queue_put_value = httpsqs_read_putpos(httpsqs_input_name);
/* 讀取當前隊列讀取位置點 */
queue_get_value = httpsqs_read_getpos(httpsqs_input_name);
/* 若是queue_get_value的值不存在,重置隊列讀取位置點爲1 */
sprintf(queue_name, "%s:%s", httpsqs_input_name, "getpos");
/* 若是queue_get_value的值不存在,重置爲1 */
if (queue_get_value == 0 && queue_put_value > 0) {
queue_get_value = 1;
tcbdbput2(httpsqs_db_tcbdb, queue_name, "1");
/* 若是隊列的讀取值(出隊列)小於隊列的寫入值(入隊列) */
} else if (queue_get_value < queue_put_value) {
queue_get_value = queue_get_value + 1;
char queue_input[32] = {0};
sprintf(queue_input, "%d", queue_get_value);
tcbdbput2(httpsqs_db_tcbdb, queue_name, queue_input);
/* 若是隊列的讀取值(出隊列)大於隊列的寫入值(入隊列),而且隊列的讀取值(出隊列)小於最大隊列數量 */
} else if (queue_get_value > queue_put_value && queue_get_value < maxqueue_num) {
queue_get_value = queue_get_value + 1;
char queue_input[32] = {0};
sprintf(queue_input, "%d", queue_get_value);
tcbdbput2(httpsqs_db_tcbdb, queue_name, queue_input);
/* 若是隊列的讀取值(出隊列)大於隊列的寫入值(入隊列),而且隊列的讀取值(出隊列)等於最大隊列數量 */
} else if (queue_get_value > queue_put_value && queue_get_value == maxqueue_num) {
queue_get_value = 1;
tcbdbput2(httpsqs_db_tcbdb, queue_name, "1");
/* 隊列的讀取值(出隊列)等於隊列的寫入值(入隊列),即隊列中的數據已所有讀出 */
} else {
queue_get_value = 0;
}
return queue_get_value;
}
/* 處理模塊 */
void httpsqs_handler(struct evhttp_request *req, void *arg)
{
struct evbuffer *buf;
buf = evbuffer_new();
/* 分析URL參數 */
const char *httpsqs_query_part;
struct evkeyvalq httpsqs_http_query;
httpsqs_query_part = evhttp_uri_get_query(req->uri_elems);
evhttp_parse_query_str(httpsqs_query_part, &httpsqs_http_query);
/* 接收GET表單參數 */
const char *httpsqs_input_auth = evhttp_find_header (&httpsqs_http_query, "auth"); /* 隊列名稱 */
const char *httpsqs_input_name = evhttp_find_header (&httpsqs_http_query, "name"); /* 隊列名稱 */
const char *httpsqs_input_charset = evhttp_find_header (&httpsqs_http_query, "charset"); /* 操做類別 */
const char *httpsqs_input_opt = evhttp_find_header (&httpsqs_http_query, "opt"); /* 操做類別 */
const char *httpsqs_input_data = evhttp_find_header (&httpsqs_http_query, "data"); /* 操做類別 */
const char *httpsqs_input_pos_tmp = evhttp_find_header (&httpsqs_http_query, "pos"); /* 隊列位置點 字符型 */
const char *httpsqs_input_num_tmp = evhttp_find_header (&httpsqs_http_query, "num"); /* 隊列總長度 字符型 */
int httpsqs_input_pos = 0;
int httpsqs_input_num = 0;
/* 返回給用戶的Header頭信息 */
if (httpsqs_input_charset != NULL && strlen(httpsqs_input_charset) <= 40) {
char content_type[64] = {0};
sprintf(content_type, "text/plain; charset=%s", httpsqs_input_charset);
evhttp_add_header(req->output_headers, "Content-Type", content_type);
} else {
evhttp_add_header(req->output_headers, "Content-Type", "text/plain");
}
evhttp_add_header(req->output_headers, "Connection", "keep-alive");
evhttp_add_header(req->output_headers, "Cache-Control", "no-cache");
//evhttp_add_header(req->output_headers, "Connection", "close");
/* 權限校驗 */
bool is_auth_pass = false; /* 是否驗證經過 */
if (httpsqs_settings_auth != NULL) {
/* 若是命令行啓動參數設置了驗證密碼 */
if (httpsqs_input_auth != NULL && strcmp(httpsqs_settings_auth, httpsqs_input_auth) == 0) {
is_auth_pass = true;
} else {
is_auth_pass = false;
}
} else {
/* 若是命令行啓動參數沒有設置驗證密碼 */
is_auth_pass = true;
}
if (is_auth_pass == false) {
/* 校驗失敗 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_AUTH_FAILED");
}
else
{
/* 校驗成功,或者命令行啓動參數沒有設置校驗密碼 */
if (httpsqs_input_pos_tmp != NULL) {
httpsqs_input_pos = atoi(httpsqs_input_pos_tmp); /* 隊列位置點 數值型 */
}
if (httpsqs_input_num_tmp != NULL) {
httpsqs_input_num = atoi(httpsqs_input_num_tmp); /* 隊列總長度 數值型 */
}
/*參數是否存在判斷 */
if (httpsqs_input_name != NULL && httpsqs_input_opt != NULL && strlen(httpsqs_input_name) <= 256) {
/* 入隊列 */
if (strcmp(httpsqs_input_opt, "put") == 0) {
/* 優先接收POST正文信息 */
/*Mod by Yang for 遠鑑 2017-12-7*/
/*修改說明:1) 匹配入隊列時多隊列內容相同時,一次性分配到 以分隔符"_"爲限定的多個httpsqs_input_name中。*/
int n=0;
char *p[20];
char *buf_temp = httpsqs_input_name;
char *ptr = NULL;
while ((p[n] = strtok_r(buf_temp,"_",&ptr)) != NULL)
{
n++;
buf_temp = NULL;
}
buf_temp = NULL;
int buffer_data_len;
buffer_data_len = EVBUFFER_LENGTH(req->input_buffer);
if (buffer_data_len > 0) {
int i,queue_put_value;
for ( i=0;i<n;i++)
{
httpsqs_input_name = p[i];
queue_put_value = httpsqs_now_putpos((char *)httpsqs_input_name);
if (queue_put_value > 0) {
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%d", httpsqs_input_name, queue_put_value);
char *httpsqs_input_postbuffer;
char *buffer_data = (char *)tccalloc(1, buffer_data_len + 1);
memcpy (buffer_data, EVBUFFER_DATA(req->input_buffer), buffer_data_len);
httpsqs_input_postbuffer = urldecode(buffer_data);
tcbdbput2(httpsqs_db_tcbdb, queue_name, httpsqs_input_postbuffer);
memset(queue_name, '\0', 300);
sprintf(queue_name, "%d", queue_put_value);
evhttp_add_header(req->output_headers, "Pos", queue_name);
//evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_OK");
free(httpsqs_input_postbuffer);
free(buffer_data);
} else {
//evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_END");
}
}
if (queue_put_value > 0)
{
evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_OK");
}
else
{
evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_END");
}
/* 若是POST正文無內容,則取URL中data參數的值 */
} else if (httpsqs_input_data != NULL) {
int k,queue_put_value;
for (k=0;k<n;k++)
{
httpsqs_input_name = p[k];
queue_put_value = httpsqs_now_putpos((char *)httpsqs_input_name);
if (queue_put_value > 0) {
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%d", httpsqs_input_name, queue_put_value);
buffer_data_len = strlen(httpsqs_input_data);
char *httpsqs_input_postbuffer;
char *buffer_data = (char *)tccalloc(1, buffer_data_len + 1);
memcpy (buffer_data, httpsqs_input_data, buffer_data_len);
httpsqs_input_postbuffer = urldecode(buffer_data);
tcbdbput2(httpsqs_db_tcbdb, queue_name, httpsqs_input_postbuffer);
memset(queue_name, '\0', 300);
sprintf(queue_name, "%d", queue_put_value);
evhttp_add_header(req->output_headers, "Pos", queue_name);
//evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_OK");
free(httpsqs_input_postbuffer);
free(buffer_data);
} else {
//evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_END");
}
}
if (queue_put_value > 0)
{
evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_OK");
}
else
{
evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_END");
}
} else {
evbuffer_add_printf(buf, "%s", "HTTPSQS_PUT_ERROR");
}
}
/* 出隊列 */
else if (strcmp(httpsqs_input_opt, "get") == 0) {
int queue_get_value = 0;
queue_get_value = httpsqs_now_getpos((char *)httpsqs_input_name);
if (queue_get_value == 0) {
evbuffer_add_printf(buf, "%s", "HTTPSQS_GET_END");
} else {
char queue_name[300] = {0}; /* 隊列名稱的總長度,用戶輸入的隊列長度少於256字節 */
sprintf(queue_name, "%s:%d", httpsqs_input_name, queue_get_value);
char *httpsqs_output_value;
httpsqs_output_value = tcbdbget2(httpsqs_db_tcbdb, queue_name);
if (httpsqs_output_value) {
memset(queue_name, '\0', 300);
sprintf(queue_name, "%d", queue_get_value);
evhttp_add_header(req->output_headers, "Pos", queue_name);
evbuffer_add_printf(buf, "%s", httpsqs_output_value);
free(httpsqs_output_value);
} else {
evbuffer_add_printf(buf, "%s", "HTTPSQS_GET_END");
}
}
}
/* 查看隊列狀態(普通瀏覽方式) */
else if (strcmp(httpsqs_input_opt, "status") == 0) {
int maxqueue = httpsqs_read_maxqueue((char *)httpsqs_input_name); /* 最大隊列數量 */
int putpos = httpsqs_read_putpos((char *)httpsqs_input_name); /* 入隊列寫入位置 */
int getpos = httpsqs_read_getpos((char *)httpsqs_input_name); /* 出隊列讀取位置 */
int ungetnum;
const char *put_times;
const char *get_times;
if (putpos >= getpos) {
ungetnum = abs(putpos - getpos); /* 還沒有出隊列條數 */
put_times = "1st lap";
get_times = "1st lap";
} else if (putpos < getpos) {
ungetnum = abs(maxqueue - getpos + putpos); /* 還沒有出隊列條數 */
put_times = "2nd lap";
get_times = "1st lap";
}
evbuffer_add_printf(buf, "HTTP Simple Queue Service v%s\n", VERSION);
evbuffer_add_printf(buf, "------------------------------\n");
evbuffer_add_printf(buf, "Queue Name: %s\n", httpsqs_input_name);
evbuffer_add_printf(buf, "Maximum number of queues: %d\n", maxqueue);
evbuffer_add_printf(buf, "Put position of queue (%s): %d\n", put_times, putpos);
evbuffer_add_printf(buf, "Get position of queue (%s): %d\n", get_times, getpos);
evbuffer_add_printf(buf, "Number of unread queue: %d\n", ungetnum);
}
/* 查看隊列狀態(JSON方式,方便客服端程序處理) */
else if (strcmp(httpsqs_input_opt, "status_json") == 0) {
int maxqueue = httpsqs_read_maxqueue((char *)httpsqs_input_name); /* 最大隊列數量 */
int putpos = httpsqs_read_putpos((char *)httpsqs_input_name); /* 入隊列寫入位置 */
int getpos = httpsqs_read_getpos((char *)httpsqs_input_name); /* 出隊列讀取位置 */
int ungetnum;
const char *put_times;
const char *get_times;
if (putpos >= getpos) {
ungetnum = abs(putpos - getpos); /* 還沒有出隊列條數 */
put_times = "1";
get_times = "1";
} else if (putpos < getpos) {
ungetnum = abs(maxqueue - getpos + putpos); /* 還沒有出隊列條數 */
put_times = "2";
get_times = "1";
}
evbuffer_add_printf(buf, "{\"name\":\"%s\",\"maxqueue\":%d,\"putpos\":%d,\"putlap\":%s,\"getpos\":%d,\"getlap\":%s,\"unread\":%d}\n", httpsqs_input_name, maxqueue, putpos, put_times, getpos, get_times, ungetnum);
}
/* 查看單條隊列內容 */
else if (strcmp(httpsqs_input_opt, "view") == 0 && httpsqs_input_pos >= 1 && httpsqs_input_pos <= 1000000000) {
char *httpsqs_output_value;
httpsqs_output_value = httpsqs_view ((char *)httpsqs_input_name, httpsqs_input_pos);
if (httpsqs_output_value) {
evbuffer_add_printf(buf, "%s", httpsqs_output_value);
free(httpsqs_output_value);
}
}
/* 重置隊列 */
else if (strcmp(httpsqs_input_opt, "reset") == 0) {
int reset = httpsqs_reset((char *)httpsqs_input_name);
if (reset == 0) {
evbuffer_add_printf(buf, "%s", "HTTPSQS_RESET_OK");
} else {
evbuffer_add_printf(buf, "%s", "HTTPSQS_RESET_ERROR");
}
}
/* 設置最大的隊列數量,最小值爲10條,最大值爲10億條 */
else if (strcmp(httpsqs_input_opt, "maxqueue") == 0 && httpsqs_input_num >= 10 && httpsqs_input_num <= 1000000000) {
if (httpsqs_maxqueue((char *)httpsqs_input_name, httpsqs_input_num) != 0) {
/* 設置成功 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_MAXQUEUE_OK");
} else {
/* 設置取消 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_MAXQUEUE_CANCEL");
}
}
/* 設置定時更新內存內容到磁盤的間隔時間,最小值爲1秒,最大值爲10億秒 */
else if (strcmp(httpsqs_input_opt, "synctime") == 0 && httpsqs_input_num >= 1 && httpsqs_input_num <= 1000000000) {
if (httpsqs_synctime(httpsqs_input_num) >= 1) {
/* 設置成功 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_SYNCTIME_OK");
} else {
/* 設置取消 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_SYNCTIME_CANCEL");
}
} else {
/* 命令錯誤 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_ERROR");
}
} else {
/* 命令錯誤 */
evbuffer_add_printf(buf, "%s", "HTTPSQS_ERROR");
}
}
/* 輸出內容給客戶端 */
evhttp_send_reply(req, HTTP_OK, "OK", buf);
/* 內存釋放 */
evhttp_clear_headers(&httpsqs_http_query);
evbuffer_free(buf);
}
/* 子進程信號處理 */
static void kill_signal_worker(const int sig) {
/* 同步內存數據到磁盤,並關閉數據庫 */
tcbdbsync(httpsqs_db_tcbdb);
tcbdbclose(httpsqs_db_tcbdb);
tcbdbdel(httpsqs_db_tcbdb);
exit(0);
}
/* 父進程信號處理 */
static void kill_signal_master(const int sig) {
/* 刪除PID文件 */
remove(httpsqs_settings_pidfile);
/* 給進程組發送SIGTERM信號,結束子進程 */
kill(0, SIGTERM);
exit(0);
}
/* 定時同步線程,定時將內存中的內容寫入磁盤 */
static void sync_worker(const int sig) {
pthread_detach(pthread_self());
while(1)
{
/* 間隔httpsqs_settings_syncinterval秒同步一次數據到磁盤 */
sleep(httpsqs_settings_syncinterval);
/* 同步內存數據到磁盤 */
tcbdbsync(httpsqs_db_tcbdb);
}
}
static void show_help(void)
{
char *b = "--------------------------------------------------------------------------------------------------\n"
"HTTP Simple Queue Service - httpsqs v" VERSION " (April 14, 2011)\n\n"
"Author: Zhang Yan (http://blog.s135.com), E-mail: net@s135.com\n"
"This is free software, and you are welcome to modify and redistribute it under the New BSD License\n"
"\n"
"-l <ip_addr> interface to listen on, default is 0.0.0.0\n"
"-p <num> TCP port number to listen on (default: 1218)\n"
"-x <path> database directory (example: /opt/httpsqs/data)\n"
"-t <second> keep-alive timeout for an http request (default: 60)\n"
"-s <second> the interval to sync updated contents to the disk (default: 5)\n"
"-c <num> the maximum number of non-leaf nodes to be cached (default: 1024)\n"
"-m <size> database memory cache size in MB (default: 100)\n"
"-i <file> save PID in <file> (default: /tmp/httpsqs.pid)\n"
"-a <auth> the auth password to access httpsqs (example: mypass123)\n"
"-d run as a daemon\n"
"-h print this help and exit\n\n"
"Use command \"killall httpsqs\", \"pkill httpsqs\" and \"kill `cat /tmp/httpsqs.pid`\" to stop httpsqs.\n"
"Please note that don't use the command \"pkill -9 httpsqs\" and \"kill -9 PID of httpsqs\"!\n"
"\n"
"Please visit \"http://code.google.com/p/httpsqs\" for more help information.\n\n"
"--------------------------------------------------------------------------------------------------\n"
"\n";
fprintf(stderr, b, strlen(b));
}
int main(int argc, char *argv[], char *envp[])
{
int c;
/* 默認參數設置 */
char *httpsqs_settings_listen = "0.0.0.0";
int httpsqs_settings_port = 1218;
char *httpsqs_settings_datapath = NULL;
bool httpsqs_settings_daemon = false;
int httpsqs_settings_timeout = 60; /* 單位:秒 */
httpsqs_settings_syncinterval = 5; /* 單位:秒 */
int httpsqs_settings_cachenonleaf = 1024; /* 緩存非葉子節點數。單位:條 */
int httpsqs_settings_cacheleaf = 2048; /* 緩存葉子節點數。葉子節點緩存數爲非葉子節點數的兩倍。單位:條 */
int httpsqs_settings_mappedmemory = 104857600; /* 單位:字節 */
httpsqs_settings_pidfile = "/tmp/httpsqs.pid";
httpsqs_settings_auth = NULL; /* 驗證密碼 */
/* 命令行參數,暫時存儲下面,便於進程重命名 */
int httpsqs_prename_num = 1;
char httpsqs_path_file[1024] = { 0 }; // httpsqs_path_file 爲 httpsqs 程序的絕對路徑
struct evbuffer *httpsqs_prename_buf; /* 原命令行參數 */
httpsqs_prename_buf = evbuffer_new();
readlink("/proc/self/exe", httpsqs_path_file, sizeof(httpsqs_path_file));
evbuffer_add_printf(httpsqs_prename_buf, "%s", httpsqs_path_file);
for (httpsqs_prename_num = 1; httpsqs_prename_num < argc; httpsqs_prename_num++) {
evbuffer_add_printf(httpsqs_prename_buf, " %s", argv[httpsqs_prename_num]);
}
/* process arguments */
while ((c = getopt(argc, argv, "l:p:x:t:s:c:m:i:a:dh")) != -1) {
switch (c) {
case 'l':
httpsqs_settings_listen = strdup(optarg);
break;
case 'p':
httpsqs_settings_port = atoi(optarg);
break;
case 'x':
httpsqs_settings_datapath = strdup(optarg); /* httpsqs數據庫文件存放路徑 */
if (access(httpsqs_settings_datapath, W_OK) != 0) { /* 若是目錄不可寫 */
if (access(httpsqs_settings_datapath, R_OK) == 0) { /* 若是目錄可讀 */
chmod(httpsqs_settings_datapath, S_IWOTH); /* 設置其餘用戶具可寫入權限 */
} else { /* 若是不存在該目錄,則建立 */
create_multilayer_dir(httpsqs_settings_datapath);
}
if (access(httpsqs_settings_datapath, W_OK) != 0) { /* 若是目錄不可寫 */
fprintf(stderr, "httpsqs database directory not writable\n");
}
}
break;
case 't':
httpsqs_settings_timeout = atoi(optarg);
break;
case 's':
httpsqs_settings_syncinterval = atoi(optarg);
break;
case 'c':
httpsqs_settings_cachenonleaf = atoi(optarg);
httpsqs_settings_cacheleaf = httpsqs_settings_cachenonleaf * 2;
break;
case 'm':
httpsqs_settings_mappedmemory = atoi(optarg) * 1024 * 1024; /* 單位:M */
break;
case 'i':
httpsqs_settings_pidfile = strdup(optarg);
break;
case 'a':
httpsqs_settings_auth = strdup(optarg);
break;
case 'd':
httpsqs_settings_daemon = true;
break;
case 'h':
default:
show_help();
return 1;
}
}
/* 判斷是否加了必填參數 -x */
if (httpsqs_settings_datapath == NULL) {
show_help();
fprintf(stderr, "Attention: Please use the indispensable argument: -x <path>\n\n");
exit(1);
}
/* 數據表路徑 */
int httpsqs_settings_dataname_len = 1024;
char *httpsqs_settings_dataname = (char *)tccalloc(1, httpsqs_settings_dataname_len);
sprintf(httpsqs_settings_dataname, "%s/httpsqs.db", httpsqs_settings_datapath);
/* 打開數據表 */
httpsqs_db_tcbdb = tcbdbnew();
tcbdbsetmutex(httpsqs_db_tcbdb); /* 開啓線程互斥鎖 */
tcbdbtune(httpsqs_db_tcbdb, 1024, 2048, 50000000, 8, 10, BDBTLARGE);
tcbdbsetcache(httpsqs_db_tcbdb, httpsqs_settings_cacheleaf, httpsqs_settings_cachenonleaf);
tcbdbsetxmsiz(httpsqs_db_tcbdb, httpsqs_settings_mappedmemory); /* 內存緩存大小 */
/* 判斷表是否能打開 */
if(!tcbdbopen(httpsqs_db_tcbdb, httpsqs_settings_dataname, BDBOWRITER|BDBOCREAT)){
show_help();
fprintf(stderr, "Attention: Unable to open the database.\n\n");
exit(1);
}
/* 釋放變量所佔內存 */
free(httpsqs_settings_dataname);
/* 若是加了-d參數,以守護進程運行 */
if (httpsqs_settings_daemon == true) {
pid_t pid;
/* Fork off the parent process */
pid = fork();
if (pid < 0) {
exit(EXIT_FAILURE);
}
/* If we got a good PID, then
we can exit the parent process. */
if (pid > 0) {
exit(EXIT_SUCCESS);
}
}
/* 將進程號寫入PID文件 */
FILE *fp_pidfile;
fp_pidfile = fopen(httpsqs_settings_pidfile, "w");
fprintf(fp_pidfile, "%d\n", getpid());
fclose(fp_pidfile);
/* 重命名httpsqs主進程,便於ps -ef命令查看 */
prename_setproctitle_init(argc, argv, envp);
prename_setproctitle("[httpsqs: master process] %s", (char *)EVBUFFER_DATA(httpsqs_prename_buf));
/* 派生httpsqs子進程(工做進程) */
pid_t httpsqs_worker_pid_wait;
pid_t httpsqs_worker_pid = fork();
/* 若是派生進程失敗,則退出程序 */
if (httpsqs_worker_pid < 0)
{
fprintf(stderr, "Error: %s:%d\n", __FILE__, __LINE__);
exit(EXIT_FAILURE);
}
/* httpsqs父進程內容 */
if (httpsqs_worker_pid > 0)
{
/* 處理父進程接收到的kill信號 */
/* 忽略Broken Pipe信號 */
signal(SIGPIPE, SIG_IGN);
/* 處理kill信號 */
signal (SIGINT, kill_signal_master);
signal (SIGKILL, kill_signal_master);
signal (SIGQUIT, kill_signal_master);
signal (SIGTERM, kill_signal_master);
signal (SIGHUP, kill_signal_master);
/* 處理段錯誤信號 */
signal(SIGSEGV, kill_signal_master);
/* 若是子進程終止,則從新派生新的子進程 */
while (1)
{
httpsqs_worker_pid_wait = wait(NULL);
if (httpsqs_worker_pid_wait < 0)
{
continue;
}
usleep(100000);
httpsqs_worker_pid = fork();
if (httpsqs_worker_pid == 0)
{
break;
}
}
}
/* ---------------如下爲httpsqs子進程內容------------------- */
/* 忽略Broken Pipe信號 */
signal(SIGPIPE, SIG_IGN);
/* 處理kill信號 */
signal (SIGINT, kill_signal_worker);
signal (SIGKILL, kill_signal_worker);
signal (SIGQUIT, kill_signal_worker);
signal (SIGTERM, kill_signal_worker);
signal (SIGHUP, kill_signal_worker);
/* 處理段錯誤信號 */
signal(SIGSEGV, kill_signal_worker);
/* 建立定時同步線程,定時將內存中的內容寫入磁盤 */
pthread_t sync_worker_tid;
pthread_create(&sync_worker_tid, NULL, (void *) sync_worker, NULL);
/* 重命名httpsqs子進程,便於ps -ef命令查看 */
prename_setproctitle_init(argc, argv, envp);
prename_setproctitle("[httpsqs: worker process] %s", (char *)EVBUFFER_DATA(httpsqs_prename_buf));
evbuffer_free(httpsqs_prename_buf);
/* 請求處理部分 */
struct evhttp *httpd;
event_init();
httpd = evhttp_start(httpsqs_settings_listen, httpsqs_settings_port);
if (httpd == NULL) {
fprintf(stderr, "Error: Unable to listen on %s:%d\n\n", httpsqs_settings_listen, httpsqs_settings_port);
kill(0, SIGTERM);
exit(1);
}
evhttp_set_timeout(httpd, httpsqs_settings_timeout);
/* Set a callback for requests to "/specific". */
/* evhttp_set_cb(httpd, "/select", select_handler, NULL); */
/* Set a callback for all other requests. */
evhttp_set_gencb(httpd, httpsqs_handler, NULL);
event_dispatch();
/* Not reached in this code as it is now. */
evhttp_free(httpd);
return 0;
}