1、功能描述html
對log進行數據篩取,用判斷和調用函數的方式,獲取record_time、function_name、Error_state、client_ip、tv_id、description等數據並插入到本地數據庫中error_log表中mysql
2、log模型git
[2016-03-09 16:54:05:726] HttpSendLiveStream ERROR!!github
[473 58.254.216.34:18297 TV4016]:: caught up by live stream! curr:17775[19886->19885]sql
[2016-03-09 16:54:07:219] LiveStateCheck:: TV6047 not received pkt for 15 seconds!!數據庫
[2016-03-09 16:54:07:951] LiveStateCheck:: TV6046 not received pkt for 15 seconds!!多線程
[2016-03-09 16:54:08:105] HlsCheckStealingLink:: HLS Relay :client 209.126.64.42 is stealing!函數
修改後的log模型ui
[2016-03-10 14:52:32:187] SystemInit INFO1!!
ReadConfig ........url
[2016-03-10 14:52:32:188] ReadConfig ERROR!!
uknown element pName AdvertisementChannel, content TV999999999
[2016-03-10 14:52:32:800] OpenBWFileList ERROR!!
open bwlist file[../../tools/BWList/BlackList/BlackList-0.ini] failed.
最終效果;
3、錯誤和修改
1.發送和接收MSG時,發現消息不全,或空 兩個線程send、rcv MSG時,都要本身定義MSG結構體變量和分配內存
2.發送、接收少許消息後,程序掛掉
多線程下對消息隊列進行操做時,要lock互斥鎖,且必須的語句才鎖上,減小互斥時間
msgsnd、msgrcv函數要指定IPC_NOWAIT函數控制位,表示不忽略0,操做失敗時不等待而致使消息隊列阻塞
3.msgid無效,提示消息隊列不存在
msgget去獲取msgid時,指定的key若是是用自定義的get_key去獲取已經存在的消息隊列,這個key若是不是指定的消息隊列若是已經被銷燬,則會出錯;指定key爲0,老是建立新的消息隊列
把msgid當第四個參數經過pthread_create傳入線程函數時,可能建立線程函數已經結束,而線程因爲系統時間片分配問題未啓動,msgid就null傳入線程函數;提高msgid爲全局變量
4.其餘
建立線程後,應本身設置爲死循環存在,來不斷的處理數據
編譯語句:gcc log_c_multithread.c -o work -I /usr/local/mysql/include -L /usr/local/mysql/lib/ -lmysqlclient
4、完整代碼
//Scriptname:log_analyse //Author:charlotte //date:2016/03/17 //Purpose: sleve log and connect save on database
/*defina head file*/ #include "stdio.h" #include "stdlib.h" #include "string.h" /*use function about string*/ #include "mysql.h" /*use mysql DB*/ #include "pthread.h" /*use multi threads*/ #include "sys/ipc.h" /*call message queue*/ #include "sys/time.h" /*call sleep*/
#define MAX_LINE 1024 /*define variable*/ int sign=0,sign_exit=1; /*sign to mark no.2error,sign_exit to mark thread1 exit*/ char log_name[MAX_LINE]="record_model.log"; /*no use*/ char log_path[MAX_LINE]="record_model.log"; int record_time = 0; /*store YMD*/ int record_time_day = 0; /*store hms*/ char record_day_time[MAX_LINE]; char function_name[MAX_LINE]; int sid; char IP[MAX_LINE]; char tv_id[MAX_LINE]; char description[MAX_LINE]; const char* error_status="ERROR"; const char mysqlServer[20] = "localhost"; int lines=0; /*order to check source maybe occur error*/ int msgid; /*set global variable to avoid lose*/ pthread_t thread[2]; /*define thread variable,two threads*/ pthread_mutex_t mut; /*define mutex locks*/ typedef struct mymsg /*define msg struct */ { /*long ltype;*/ char record_day_time[MAX_LINE]; char function_name[MAX_LINE]; char description[MAX_LINE]; }MSG;
/*performance function*/
int check_error( char buf[]) { int i,j; for ( i=0; buf[i] != '\0'; i++) { if ( buf[i] != error_status[0]) continue; j = 0; while ( error_status[j] != '\0' && buf[i+j] != '\0') { j++; if ( error_status[j] != buf[i+j]) break; } if ( error_status[j] == '\0') return 1; } return 0; } void save_error(char buf[]) { int i,j,colon_count; colon_count = 0; for ( i=1; buf[i] != '\0'; i++) /* skip no.1 char [*/ { if ( buf[i-1] == '[') /* save date time,begin*/ { j = 0; while ( buf[i+j] != ' ') /*separate by blank*/ { if ( buf[ i+j] != '-') /*gain date*/ record_time = record_time * 10 + buf[i+j]-48; /*ASCII conversion*/ j++; } j++; while ( buf[i+j] != ']') /* gain time*/ { if ( buf[ i+j] != ':' && colon_count<3) record_time_day = record_time_day * 10 + buf[i+j]-48; else colon_count++; j++; } } i=i+j+2; /*move to right step*/ j=0; while ( buf[ i+j] != ' ') /* gain function name*/ { function_name[j] = buf[i+j]; j++; } break; } } void save_error2(char buf[]) { int i,j; for ( i=1; buf[i] != '\0'; i++) /* skip no.1 char [*/ { if ( buf[i-1] == '[') /* save ip*/ { j = 0; while ( buf[i+j] != ' ') /* skip number*/ { j++; } i=i+j+1; j=0; while ( buf[i+j] != ' ') /* save ip*/ { IP[j] = buf[i+j]; j++; } i=i+j+1; j=0; while ( buf[i+j] != ']') /* save tv id */ { tv_id[j] = buf[i+j]; j++; } } j++; for( ; buf[ i+j] != ' '; j++) /* move right step*/ continue; i=i+j+1; j=0; for ( ; buf[i] != '\0'; i++) {description[j] = buf[i]; /* save description*/ j++;} } } void analyse_log(char buf[]) { int length,sign_send=1; MSG *new_msg; new_msg=(MSG *)malloc(sizeof(MSG)); if ( sign == 0) /*judge sign,half deal or full deal*/ { if ( check_error(buf) == 0) /* search error ,not return*/ ; else /* process error line*/ { save_error(buf); sign=1; } } else if ( sign == 1) /* half deal,process ip line*/ { save_error2(buf); /* get complete data*/ sign=0; sprintf(record_day_time,"%d%d",record_time,record_time_day); /* send data by message*/ strcpy(new_msg->record_day_time,record_day_time); strcpy(new_msg->function_name,function_name); strcpy(new_msg->description,description); /*new_msg->ltype='L';*/ length = sizeof(MSG); while(sign_send) /*set sign,until send ok*/ { pthread_mutex_lock(&mut); /* mutex cpu locks*/ if(msgsnd(msgid,new_msg,length,IPC_NOWAIT)==0) {sign_send=0;pthread_mutex_unlock(&mut);} pthread_mutex_unlock(&mut); /* unlock cpu */ } printf("send ok.\n"); /*initialize data before send ok*/ record_time=0; record_time_day=0; memset(function_name,'\0',sizeof(function_name)); memset(IP,'\0',sizeof(IP)); memset(tv_id,'\0',sizeof(tv_id)); memset(description,'\0',sizeof(description)); }
} void read_log() /*read log and store the required data*/ { char buf[MAX_LINE]; FILE *fp; int len,i; if ((fp = fopen(log_path,"r")) == NULL) /* continue before valid log*/ { perror("fail to read"); exit(1); } i=1; while(fgets(buf,MAX_LINE,fp) != NULL ) /* process log line by line*/ { lines++; len = strlen(buf); buf[len-1] = '\0'; i++; analyse_log(buf); /* call function to do second step :analyse*/ } pthread_mutex_lock(&mut); /* mutex cpu locks*/ sign_exit=0; pthread_mutex_unlock(&mut); printf("read_thread exit,%d\n",sign_exit); }
void *thread1() /*define threads function1 to read log and get data*/ { read_log(); } void insert_into_DB() { int length; /*use to count msg size*/ MSG *new_msg; /*use to save received msg*/ new_msg=(MSG *)malloc(sizeof(MSG)); length = sizeof(MSG); while(1) /*dead circulation to receive msg and insert into DB*/ { pthread_mutex_lock(&mut); /* mutex cpu locks*/ if(msgrcv(msgid,new_msg,length,0,IPC_NOWAIT)==-1) /*MQid,msg vessel,msg type(no require),set to avoid MQ block;success return no negative num*/ { pthread_mutex_unlock(&mut); /*unlock MQ*/ if(sign_exit==0)break; /*if no message and the sign show no continue send msg,exit*/ printf("no message,wait a monment.\n"); /*if no message,continue,wait*/ } else { pthread_mutex_unlock(&mut); /*unlock MQ*/ printf("rcv msg successful.\n"); char ibuf[1024]; MYSQL mysql,*sock; mysql_init(&mysql); /*initialize mysql handle*/ if ( !(sock=mysql_real_connect( &mysql,"127.0.0.1","root","123456","operation_monitor",3306,NULL,0))) /*return sock*/ {/*connect DB*/ fprintf(stderr,"Couldn't connect to engine!\n%s\n\n",mysql_error(&mysql)); perror(" connect mysql error!\n"); exit(1); }/*save inserts statement into ibuf*/ sprintf( ibuf,"insert into error_log(id,function_name,description,record_time) values('','%s','%s','%s')",new_msg->function_name,new_msg->description,new_msg->record_day_time); if ( mysql_query(sock,ibuf)) /*operate executive*/ { printf("insert data error!i No.%d\n",lines); } else { printf("insert ok.\n"); } mysql_close(&mysql); mysql_close(sock); } } printf("insert_thread exit.\n"); } void *thread2() /*define threads function2 to insert data into DB*/ { insert_into_DB(); }
void thread_create() { pthread_create(&thread[0],NULL,thread1,NULL); /*create thread,but maybe threads no start,attention incomming local parameters lose*/ pthread_create(&thread[1],NULL,thread2,NULL); }
void thread_wait() { pthread_join(thread[0],NULL); /*wait thread finish*/ pthread_join(thread[1],NULL); }
int get_key() { int key; key=ftok(".",'s'); return key; }
/*function main*/ int main() { /*MSG *msg; /define about msg no use/ key_t key; /define key to get existed message queue/ key=get_key(); if(key<0) { perror("get key error."); exit(1); }*/ msgid=msgget(0,IPC_CREAT|0644); /*get a new MQ*/ if(msgid<0) { perror("msgget error."); exit(1); } printf("get msg success\n"); /*msg=(MSG*)malloc(sizeof(MSG)); if(msg==NULL) { perror("malloc error."); exit(1); } */ pthread_mutex_init(&mut,NULL); /*initialize mutex locks variable mut*/ thread_create(); /*call function to create thread*/ thread_wait(); /*wait thread run end*/ return 0; }
修改的,修改了名稱,匹配的log模型,按條,正則匹配
//Scriptname:log_analyse //Author:charlotte //date:2016/03/17 //Purpose: sleve log and connect save on database
/*defina head file*/ #include "stdio.h" #include "stdlib.h" #include "string.h" /*use function about string*/ #include "mysql.h" /*use mysql DB*/ #include "pthread.h" /*use multi threads*/ #include "sys/ipc.h" /*call message queue*/ #include "sys/time.h" /*call sleep*/ #include "regex.h" #include "memory.h"
#define MAX_LINE 1024 /*define variable*/ int sign=0,sign_exit=1; /*sign to mark no.2error,sign_exit to mark thread1 exit*/ char log_path[MAX_LINE]="record.log"; char record_day_time[MAX_LINE]; char function_name[MAX_LINE]; int sid; char description[MAX_LINE]; const char mysqlServer[20] = "127.0.0.1"; const char mysqlUser[20] = "root"; const char mysqlPasswd[20] = "123456"; const char mysqlDBName[20] = "operation_monitor"; int lines=0; /*order to check source maybe occur error*/ int msgid; /*set global variable to avoid lose*/ pthread_t thread[2]; /*define thread variable,two threads*/ pthread_mutex_t mut; /*define mutex locks*/ typedef struct mymsg /*define msg struct */ { char record_day_time[MAX_LINE]; char function_name[MAX_LINE]; char description[MAX_LINE]; }MSG;
/*performance function*/
int check_error( char buf[]) { char errbuf[1024]; regex_t reg; int err,pmatch_size=10; regmatch_t pmatch[pmatch_size]; //Regular Expression ERROR if ( regcomp( ®, "(.*)ERROR(.*)",REG_EXTENDED|REG_ICASE) < 0) //compile RE { regerror( err,®, errbuf, sizeof( errbuf)); printf ( "err:%s\n", errbuf); } err = regexec( ®, buf, pmatch_size, pmatch, 0); // RE object string if ( err == REG_NOMATCH) { return 0; } else if ( err) { regerror( err, ®, errbuf, sizeof(errbuf)); printf ( "err:%s\n",errbuf); } else { printf ("match ok.\n"); return 1; } } int save_error(char buf[]) { char record_day_time_mid[1024]; char errbuf[1024]; regex_t reg; int err,pmatch_size=10,i,mid_size; regmatch_t pmatch[pmatch_size]; // RE ERROR and save data that need if ( regcomp( ®, "\\[(.*)-(.*)-(.*)[ ](.*):(.*):(.*):(.*)\\](.*) ERROR(.*)",REG_EXTENDED|REG_ICASE) < 0) { regerror( err,®, errbuf, sizeof( errbuf)); printf ( "err:%s\n", errbuf); } err = regexec( ®, buf, pmatch_size, pmatch, 0); if ( err == REG_NOMATCH) { return 0; } else if ( err) { regerror( err, ®, errbuf, sizeof(errbuf)); printf ( "err:%s\n",errbuf); } for ( i=0; i<pmatch_size && pmatch[i].rm_so != -1; i++) // read buf by point_match { int se_len = pmatch[i].rm_eo - pmatch[i].rm_so; if( se_len && i >=1 && i <=6) { memset( record_day_time_mid,'\0',sizeof(record_day_time_mid)); memcpy( record_day_time_mid, buf+pmatch[i].rm_so, se_len); mid_size = sizeof(record_day_time_mid); strncat( record_day_time, record_day_time_mid, mid_size); } else if( se_len && i == 8) { memset( function_name, '\0', sizeof( function_name)); memcpy( function_name, buf+pmatch[i].rm_so, se_len); } } } void save_error2(char buf[]) { char errbuf[1024]; regex_t reg; int err,pmatch_size=10,i,buf_size; regmatch_t pmatch[pmatch_size]; // RE ERROR next part if ( regcomp( ®, "^\\[(.*)\\](.*)",REG_EXTENDED|REG_ICASE) < 0) // end by next record { regerror( err,®, errbuf, sizeof( errbuf)); printf ( "err:%s\n", errbuf); } err = regexec( ®, buf, pmatch_size, pmatch, 0); if ( err == REG_NOMATCH) { buf_size = strlen( buf); strncat( description, buf, buf_size); } else if ( err) { regerror( err, ®, errbuf, sizeof(errbuf)); printf ( "err:%s\n",errbuf); } else { sign = 0; // set sign ,a full record } } int analyse_log(char buf[]) { int length,sign_send=1; MSG *new_msg; new_msg=(MSG *)malloc(sizeof(MSG)); if ( sign == 0) /*judge sign,half deal or full deal*/ { if ( check_error(buf) == 0) /* search error ,not return*/ ; else /* process error line*/ { save_error(buf); sign=1; } } else if ( sign == 1) /* half deal,process ip line*/ { save_error2(buf); /* get complete data*/ if ( sign == 1) return 0; strcpy(new_msg->record_day_time,record_day_time); strcpy(new_msg->function_name,function_name); strcpy(new_msg->description,description); length = sizeof(MSG); while(sign_send) /*set sign,until send ok*/ { pthread_mutex_lock(&mut); /* mutex cpu locks*/ if(msgsnd(msgid,new_msg,length,IPC_NOWAIT)==0) {sign_send=0;pthread_mutex_unlock(&mut);} else pthread_mutex_unlock(&mut); /* unlock cpu */ } printf("send ok.\n"); /*initialize data before send ok*/ memset(function_name,'\0',sizeof(function_name)); memset(record_day_time,'\0',sizeof(record_day_time)); memset(description,'\0',sizeof(description)); analyse_log(buf); }
} void read_log() /*read log and store the required data*/ { char buf[MAX_LINE]; FILE *fp; int len,i; if ((fp = fopen(log_path,"r")) == NULL) /* continue before valid log*/ { perror("fail to read"); exit(1); } i=1; while(fgets(buf,MAX_LINE,fp) != NULL ) /* process log line by line*/ { lines++; len = strlen(buf); buf[len-1] = '\0'; i++; analyse_log(buf); /* call function to do second step :analyse*/ } pthread_mutex_lock(&mut); /* mutex cpu locks*/ sign_exit=0; pthread_mutex_unlock(&mut); printf("read_thread exit,%d\n",sign_exit); }
void *read_log_analyse() /*define threads function1 to read log and get data*/ { read_log(); } void insert_into_DB() { int length; /*use to count msg size*/ MSG *new_msg; /*use to save received msg*/ new_msg=(MSG *)malloc(sizeof(MSG)); length = sizeof(MSG); while(1) /*dead circulation to receive msg and insert into DB*/ { pthread_mutex_lock(&mut); /* mutex cpu locks*/ if(msgrcv(msgid,new_msg,length,0,IPC_NOWAIT)==-1) /*MQid,msg vessel,msg type(no require),set to avoid MQ block;success return no negative num*/ { pthread_mutex_unlock(&mut); /*unlock MQ*/ if(sign_exit==0)break; /*if no message and the sign show no continue send msg,exit*/ printf("no message,wait a monment.\n"); /*if no message,continue,wait*/ sleep(1); } else { pthread_mutex_unlock(&mut); /*unlock MQ*/ printf("rcv msg successful.\n"); char ibuf[1024]; MYSQL mysql,*sock; mysql_init(&mysql); /*initialize mysql handle*/ if ( !(sock=mysql_real_connect( &mysql,"127.0.0.1","root","123456","operation_monitor",3306,NULL,0))) /*return sock*/ {/*connect DB*/ fprintf(stderr,"Couldn't connect to engine!\n%s\n\n",mysql_error(&mysql)); perror(" connect mysql error!\n"); exit(1); }/*save inserts statement into ibuf*/ sprintf( ibuf,"insert into error_log(id,function_name,description,record_time) values('','%s','%s','%s')",new_msg->function_name,new_msg->description,new_msg->record_day_time); if ( mysql_query(sock,ibuf)) /*operate executive*/ { printf("insert data error!i No.%d\n",lines); } else { printf("insert ok.\n"); } mysql_close(&mysql); mysql_close(sock); } } printf("insert_thread exit.\n"); } void *insert_data_DB() /*define threads function2 to insert data into DB*/ { insert_into_DB(); }
void thread_create() { pthread_create(&thread[0],NULL,read_log_analyse,NULL); /*create thread,but maybe threads no start,attention incomming local parameters lose*/ pthread_create(&thread[1],NULL,insert_data_DB,NULL); }
void thread_wait() { pthread_join(thread[0],NULL); /*wait thread finish*/ pthread_join(thread[1],NULL); }
int get_key() { int key; key=ftok(".",'s'); return key; }
/*function main*/ int main() { msgid=msgget(0,IPC_CREAT|0644); /*get a new MQ*/ if(msgid<0) { perror("msgget error."); exit(1); } printf("get msg success\n"); pthread_mutex_init(&mut,NULL); /*initialize mutex locks variable mut*/ thread_create(); /*call function to create thread*/ thread_wait(); /*wait thread run end*/ return 0; }