#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include "fifo.h" struct fifo_t { uint8_t *buf; uint32_t size; uint32_t in; uint32_t out; }; struct fifo_t * fifo_create(uint32_t size) { struct fifo_t * fifo = malloc(sizeof(struct fifo_t)); if(fifo == NULL) { fprintf(stderr, "[%s][%d]malloc fifo failed:%s", __FUNCTION__, __LINE__, strerror(errno)); return NULL; } memset(fifo, 0, sizeof(struct fifo_t)); fifo->buf = malloc(size + 1); if(fifo->buf == NULL) { fprintf(stderr, "[%s][%d]malloc fifo->buf fialed:%s",__FUNCTION__, __LINE__, strerror(errno)); free(fifo); return NULL; } memset(fifo->buf, 0, size); fifo->size = size; fifo->in = 0; fifo->out = 0; return fifo; } /* init/empty @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@- * in * out * * in==out @@@@@@@-@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ * empty in * out * * in > out @@@@@@@-#################@@@@@@@@@@@@@@ * in * out * * in < out ######@@@@@@@@@@@@@@@@@@@@@@@-######### * in * out * * in+1=out #######-############################### * full in * out * * in=size-1 ######################################- * full in * out */ void * fifo_write_prepare(struct fifo_t *fifo, uint32_t *size) { if(fifo == NULL) { fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__); return NULL; } uint8_t *addr = fifo->buf + fifo->in; if(fifo->in >= fifo->out) { if(fifo->out == 0) *size = fifo->size - fifo->in - 1; else *size = fifo->size - fifo->in; } else *size = fifo->out - fifo->in - 1; return addr; } uint32_t fifo_write_done(struct fifo_t *fifo, uint32_t size) { if(fifo == NULL) { fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__); return 0; } if(fifo->in >= fifo->out) { if(fifo->out == 0) { if(fifo->in + size > fifo->size - 1) { fifo->in = fifo->size - 1; fprintf(stderr, "[%s][%d]overwrited!!! in=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->in,fifo->size); return fifo->size - 1 - fifo->in; } } else { if(fifo->in + size > fifo->size) { fifo->in = 0; fprintf(stderr, "[%s][%d]overwrited!!! in=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->in,fifo->size); return fifo->size - fifo->in; } } } else { if(fifo->in + size >= fifo->out) { fifo->in = fifo->out - 1; fprintf(stderr, "[%s][%d]overwrited!!! in=%u,size=%u,fifo->out=%u\n",__FUNCTION__,__LINE__,fifo->in,size,fifo->out); return fifo->out - fifo->in - 1; } } fifo->in += size; return size; } void * fifo_read_prepare(struct fifo_t *fifo, uint32_t *size) { if(fifo == NULL) { fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__); return 0; } uint8_t *addr = fifo->buf + fifo->out; if(fifo->in >= fifo->out) *size = fifo->in - fifo->out; else *size = fifo->size - fifo->out; return addr; } uint32_t fifo_read_done(struct fifo_t *fifo, uint32_t size) { if(fifo == NULL) { fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__); return 0; } if(fifo->in >= fifo->out) { if(fifo->out + size > fifo->in) { fifo->out = fifo->in; fprintf(stderr, "[%s][%d]overreaded!!! out=%u,fifo->in=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->out,fifo->in,size); return fifo->in - fifo->out; } } else { if(fifo->out + size >= fifo->size) { fifo->out = 0; if(fifo->out + size > fifo->size) fprintf(stderr, "[%s][%d]overreaded!!! out=%u,fifo->size=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->out,fifo->size,size); return fifo->size - fifo->out; } } fifo->out += size; if(fifo->out == fifo->size) fifo->out = 0; return size; } uint32_t fifo_read_data(struct fifo_t *fifo, uint32_t offset, uint32_t len, void * buf) { if(fifo == NULL) { fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__); return 0; } if(buf == NULL) { fprintf(stderr, "[%s][%d]buf == NULL\n",__FUNCTION__,__LINE__); return 0; } if(fifo->in >= fifo->out) { if(fifo->out + offset + len > fifo->in || fifo->out + offset > fifo->in) { // fprintf(stderr, "[%s][%d]overreaded!!! fifo->out=%u,fifo->in=%u,offset=%u,size=%u\n", __FUNCTION__, __LINE__, fifo->out, fifo->in, offset, size); return 0; } memcpy(buf, &fifo->buf[fifo->out + offset], len); } else { if(offset + len > fifo->size - fifo->out + fifo->in || offset > fifo->size - fifo->out + fifo->in) { // fprintf(stderr, "[%s][%d]overreaded!!! fifo->out=%u,fifo->in=%u,offset=%u,size=%u\n", __FUNCTION__, __LINE__, fifo->out, fifo->in, offset, size); return 0; } if(offset > fifo->size - fifo->out) { memcpy( buf, &fifo->buf[fifo->out + offset - fifo->size], len); } else { if(offset + len > fifo->size - fifo->out) { memcpy( buf, &fifo->buf[fifo->out + offset], fifo->size - fifo->out - offset); uint8_t *buf_l = buf; memcpy( &buf_l[fifo->size - fifo->out], fifo->buf, fifo->out + offset + len - fifo->size); } else { memcpy( buf, &fifo->buf[fifo->out + offset, len); } } } return len; } void fifo_destroy(struct fifo_t *fifo) { if(fifo == NULL) { fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__); return; } free(fifo->buf); free(fifo); } void fifo_reset(struct fifo_t *fifo) { fifo->in = 0; fifo->out = 0; }
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <poll.h> #include <signal.h> #include <sys/types.h> #include <sys/socket.h> #include <stdbool.h> #include <linux/in.h> #include <endian.h> #include "fifo.h" struct transmit_t { uint32_t server_ip; uint16_t server_port; int fd; pthread_t th_transmit; struct fifo_t *fifo_recv; struct fifo_t *fifo_send; }; static struct transmit_t transmit = { .fd = -1 }; int transmit_init() { transmit.fd = -1; transmit.fifo_recv = fifo_create(512*1024); transmit.fifo_send = fifo_create(512*1024); return pthread_create(transmit.th_transmit, NULL, thread_transmit, NULL); } static bool __do_connect() { transmit.fd = socket(AF_INET, SOCK_STREAM, SOCK_NONBLOCK); struct sockaddr_in server_addr = { .sin_family = AF_INET, .sin_port = htobe16(transmit.server_port), .sin_addr.s_addr = inet_addr("127.0.0.1");//htobe32(transmit.server_ip) }; socklen_t len = (socklen_t)sizeof(server_addr); int ret = connect(transmit.fd, &server_addr, len); if(ret < 0) { fprintf(stderr,"[%s][%d]connect failed:%s\n",__FUNCTION__,__LINE__, strerror(errno)); close(transmit.fd); transmit.fd = -1; return false; } return true; } static bool __do_transmit() { struct pollfd pollfd_recv = { .fd = transmit.fd, .events = POLL_IN }; // 檢查是否有發送的數據 uint32_t to_send_size = 0; fifo_read_prepare(transmit.fifo_send, &to_send_size); if(to_send_size > 0) pollfd_recv.events |= POLL_OUT; // 等待socket就緒,poll/ppoll是線程取消點 //sigset_t sigset = SIGPIPE | SIGINT | SIGALRM; //int num = ppoll(&pollfd_recv, 1, NULL, &sigset); int num = poll(&pollfd_recv, 1, -1); if(num < 0) { if(errno == EINTR) return; } if(num > 0) { // 接收 if(pollfd_recv.revents & POLL_IN != 0) { void * buf = fifo_write_prepare(transmit.fifo_recv, &size); int size = recv(transmit.fd, buf, size, 0); if(size < 0) //error { fprintf(sdterr, "[%s][%d]recv failed:%s\n", __FUNCTION__,__LINE__,strerror(errno)); return false; } else if(size == 0) // connection lost { fprintf(sdterr, "[%s][%d]connection lost\n", __FUNCTION__,__LINE__); return false; } fifo_write_done(transmit.fifo_recv, size); while(decode_do() == true) ; } // 發送 if(pollfd_recv.revents & POLL_OUT != 0) { void * buf = fifo_read_prepare(transmit.fifo_recv, &size); int size = send(transmit.fd, buf, size); if(size < 0) { fprintf(sdterr, "[%s][%d]recv failed:%s\n", __FUNCTION__,__LINE__,strerror(errno)); return false; } fifo_read_done(transmit.fifo_send, size); } } return true; } void * thread_transmit() { while(1) { bool ret = __do_connect(); if(ret == false) { sleep(3); continue; } while(1) { ret = __do_transmit(); if(ret == false) break; } close(transmit.fd); } return NULL; } void transmit_send_ready() { pthread_kill(&transmit.th_transmit, SIGALRM); } void transmit_destroy() { } struct fifo_t *transmit_get_recv_fifo() { return transmit.fifo_recv; } struct fifo_t *transmit_get_send_fifo() { return transmit.fifo_send; }
#include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include "depackage.h" #include "protocal.h" enum DECODE_E { DECODE__SYNC_HEAD, DECODE__LEN, DECODE__SYNC_END, DECODE__CHECKSUM, DECODE__EXCUTE, DECODE__COUNT }; struct decode_t { enum DECODE_E state; uint16_t cmd; uint32_t msgid_in; uint32_t msgid_response; uint32_t data_len; pthread_t th_decode; }; static struct decode_t decode = { .state = DECODE__SYNC_HEAD, }; void decode_reset() { decode.state = DECODE__SYNC_HEAD; decode.cmd = 0; decode.msgid_in = 0; decode.msgid_response = 0; decode.data_len = 0; } /* * return: true: data decoded and executed,re-call again * false: not enough data to decode * */ bool decode_do() { struct fifo_t *fifo_recv = transmit_get_recv_fifo(); static uint8_t data[256*1024] = {0}; switch(decode.state) { case DECODE__SYNC_HEAD: { uint32_t sync_head = 0; uint32_t len = fifo_read_data(fifo_recv, 0, SYNC_HEAD__LEN, &sync_end); if(len < sizeof(sync_head)) return false; if(sync_head != SYNC_HEAD) fifo_read_done(fifo_recv, 1); else decode.state = DECODE__LEN; break; } case DECODE__LEN: { uint32_t len = fifo_read_data(fifo_recv, DATA_LEN__OFFSET, DATA_LEN__LEN, &decode.data_len); if(len < DATA_LEN__LEN) return false; fifo_read_data(fifo_recv, CMD__OFFSET, CMD__LEN, &decode.cmd); fifo_read_data(fifo_recv, MSGID_SEND__OFFSET, MSGID_SEND__LEN, &decode.msgid_in); fifo_read_data(fifo_recv, MSGID_RESPONSE__OFFSET, MSGID_RESPONSE__LEN, &decode.msgid_response); if(decode.data_len > DATA_LEN_MAX) { fprintf(sdterr, "[%s][%d]data_len too big:%u,max=%u,msgid_send=%u\n", __FUNCTION__, __LINE__, decode.data_len, DATA_LEN_MAX, decode.msgid_in); fifo_read_done(fifo_recv, 1); decode.state = DECODE__SYNC_HEAD; return true; } decode.state = DECODE__SYNC_END; break; } case DECODE__SYNC_END: { uint32_t sync_end = 0; uint32_t len = fifo_read_data( fifo_recv, DATA__OFFSET + decode.data_len + CHECKSUM__LEN, SYNC_END__LEN, &sync_end); if(len < SYNC_END__LEN) return false; if(sync_end != SYNC_END) { fprintf(sdterr, "[%s][%d]SYNC_END error:%x\n", __FUNCTION__, __LINE__, sync_end); fifo_read_done(fifo_recv, 1); decode.state = DECODE__SYNC_HEAD; return true; } decode.state = DECODE__CHECKSUM; break; } case DECODE__CHECKSUM: fifo_read_data( fifo_recv, DATA__OFFSET, decode.data_len, data); // decode.state = DECODE__EXCUTE; break; case DECODE__EXCUTE: protocal_excute(decode.cmd, decode.msgid_in, data, decode.data_len); fifo_read_done(fifo_recv, DATA__OFFSET + decode.data_len + CHECKSUM__LEN + SYNC_END__LEN); break; default: decode.state = DECODE__SYNC_HEAD; return true; } return true; } void *thread_decode() { while(1) { pthread_mutex_lock(&decode.mutex); while(decode_do() == false) pthread_cond_wait(&decode.cond, &decode.mutex); pthread_mutex_unlock(&decode.mutex); usleep(1); } } int decode_init() { memset(&decode, 0, sizeof(decode)); decode.state = DECODE__SYNC_HEAD; return pthread_create(&decode.th_decode, NULL, thread_decode, NULL); } void decode_destroy() { pthread_cancel(decode.th_decode); pthread_join(decode.th_decode, NULL); }