本篇博客主要解說fastdfs中tracker協議的解說。服務器
fastdfs主要是存儲文件。直接把整個文件存儲到磁盤上,因此。簡單直接。但是也有很是大的侷限性。app
所以,fastdfs對文件的文件夾設置和存儲是最爲核心的。dom
爲何這麼忽然的解說這些。因爲我已經看了一段時間的fastdfs,主要結構都已經搞的比較清晰了。所以,這篇文章。我就主要一tracker這一部分的協議來分析。函數
其它詳細介紹tracker的請百度。我就不介紹了,我就直接從fetch
int tracker_deal_task(struct fast_task_info *pTask)
一、storage心跳協議spa
case TRACKER_PROTO_CMD_STORAGE_BEAT: TRACKER_CHECK_LOGINED(pTask) result = tracker_deal_storage_beat(pTask); break;
#define TRACKER_PROTO_CMD_STORAGE_BEAT 83 //storage heart beat
storage在啓動的時候,會開啓一個線程,該線程爲線程
static void *tracker_report_thread_entrance(void *arg)
current_time = g_current_time; if (current_time - last_beat_time >= \ g_heart_beat_interval) { if (tracker_heart_beat(pTrackerServer, \ &stat_chg_sync_count, \ &bServerPortChanged) != 0) { break; }
也就是至少30秒鐘來一次心跳。心跳包的主要數據是包頭和當前storage的狀態信息,指針
char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];
/* struct for network transfering */ typedef struct { char sz_total_upload_count[8]; char sz_success_upload_count[8]; char sz_total_append_count[8]; char sz_success_append_count[8]; char sz_total_modify_count[8]; char sz_success_modify_count[8]; char sz_total_truncate_count[8]; char sz_success_truncate_count[8]; char sz_total_set_meta_count[8]; char sz_success_set_meta_count[8]; char sz_total_delete_count[8]; char sz_success_delete_count[8]; char sz_total_download_count[8]; char sz_success_download_count[8]; char sz_total_get_meta_count[8]; char sz_success_get_meta_count[8]; char sz_total_create_link_count[8]; char sz_success_create_link_count[8]; char sz_total_delete_link_count[8]; char sz_success_delete_link_count[8]; char sz_total_upload_bytes[8]; char sz_success_upload_bytes[8]; char sz_total_append_bytes[8]; char sz_success_append_bytes[8]; char sz_total_modify_bytes[8]; char sz_success_modify_bytes[8]; char sz_total_download_bytes[8]; char sz_success_download_bytes[8]; char sz_total_sync_in_bytes[8]; char sz_success_sync_in_bytes[8]; char sz_total_sync_out_bytes[8]; char sz_success_sync_out_bytes[8]; char sz_total_file_open_count[8]; char sz_success_file_open_count[8]; char sz_total_file_read_count[8]; char sz_success_file_read_count[8]; char sz_total_file_write_count[8]; char sz_success_file_write_count[8]; char sz_last_source_update[8]; char sz_last_sync_update[8]; char sz_last_synced_timestamp[8]; char sz_last_heart_beat_time[8]; } FDFSStorageStatBuff;
tracker主要是作了什麼呢?code
對其進行解包,而後對這個保存在本地的storage的信息進行保存到文件裏,調用server
status = tracker_save_storages();
調用
tracker_mem_active_store_server(pClientInfo->pGroup, \ pClientInfo->pStorage);將這個存儲服務器假設沒有,就插入到group中。
最後調用
static int tracker_check_and_sync(struct fast_task_info *pTask, \ const int status)
(需要再具體看看)
二、報告對應同步時間
#define TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT 89 //report src last synced time as dest server
if (sync_time_chg_count != g_sync_change_count && \ current_time - last_sync_report_time >= \ g_heart_beat_interval) { if (tracker_report_sync_timestamp( \ pTrackerServer, &bServerPortChanged)!=0) { break; } sync_time_chg_count = g_sync_change_count; last_sync_report_time = current_time; }
pEnd = g_storage_servers + g_storage_count; for (pServer=g_storage_servers; pServer<pEnd; pServer++) { memcpy(p, pServer->server.id, FDFS_STORAGE_ID_MAX_SIZE); p += FDFS_STORAGE_ID_MAX_SIZE; int2buff(pServer->last_sync_src_timestamp, p); p += 4; }
而後tracker的server存儲結構爲
pClientInfo->pGroup->last_sync_timestamps \ [src_index][dest_index] = sync_timestamp;
dest_index 值爲當前鏈接所在組的索引值
dest_index = tracker_mem_get_storage_index(pClientInfo->pGroup, pClientInfo->pStorage); if (dest_index < 0 || dest_index >= pClientInfo->pGroup->count) { status = 0; break; }
首相經過id。(ip地址)找到詳細的storage。而後在經過指針找到索引位置,
最後。調用
if (++g_storage_sync_time_chg_count % \ TRACKER_SYNC_TO_FILE_FREQ == 0) { status = tracker_save_sync_timestamps(); } else { status = 0; } } while (0); return tracker_check_and_sync(pTask, status);
三、上報磁盤狀況
#define TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE 84 //report disk usage
相同線程定時調用。
if (current_time - last_df_report_time >= \ g_stat_report_interval) { if (tracker_report_df_stat(pTrackerServer, \ &bServerPortChanged) != 0) { break; } last_df_report_time = current_time; }
for (i=0; i<g_fdfs_store_paths.count; i++) { if (statvfs(g_fdfs_store_paths.paths[i], &sbuf) != 0) { logError("file: "__FILE__", line: %d, " \ "call statfs fail, errno: %d, error info: %s.",\ __LINE__, errno, STRERROR(errno)); if (pBuff != out_buff) { free(pBuff); } return errno != 0 ? errno : EACCES; } g_path_space_list[i].total_mb = ((int64_t)(sbuf.f_blocks) * \ sbuf.f_frsize) / FDFS_ONE_MB; g_path_space_list[i].free_mb = ((int64_t)(sbuf.f_bavail) * \ sbuf.f_frsize) / FDFS_ONE_MB; long2buff(g_path_space_list[i].total_mb, pStatBuff->sz_total_mb); long2buff(g_path_space_list[i].free_mb, pStatBuff->sz_free_mb); pStatBuff++; }
int64_t *path_total_mbs; //total disk storage in MB
int64_t *path_free_mbs; //free disk storage in MB
這裏
path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb); path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb); pClientInfo->pStorage->total_mb += path_total_mbs[i]; pClientInfo->pStorage->free_mb += path_free_mbs[i];
四、storage服增長到tracker
#define TRACKER_PROTO_CMD_STORAGE_JOIN 81
if (tracker_report_join(pTrackerServer, tracker_index, \ sync_old_done) != 0) { sleep(g_heart_beat_interval); continue; }
typedef struct { char group_name[FDFS_GROUP_NAME_MAX_LEN+1]; char storage_port[FDFS_PROTO_PKG_LEN_SIZE]; char storage_http_port[FDFS_PROTO_PKG_LEN_SIZE]; char store_path_count[FDFS_PROTO_PKG_LEN_SIZE]; char subdir_count_per_path[FDFS_PROTO_PKG_LEN_SIZE]; char upload_priority[FDFS_PROTO_PKG_LEN_SIZE]; char join_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage join timestamp char up_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage service started timestamp char version[FDFS_VERSION_SIZE]; //storage version char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE]; char init_flag; signed char status; char tracker_count[FDFS_PROTO_PKG_LEN_SIZE]; //all tracker server count } TrackerStorageJoinBody;
p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody); pServerEnd = g_tracker_group.servers + g_tracker_group.server_count; for (pServer=g_tracker_group.servers; pServer<pServerEnd; pServer++) { /* if (strcmp(pServer->ip_addr, pTrackerServer->ip_addr) == 0 && \ pServer->port == pTrackerServer->port) { continue; } tracker_count++; */ sprintf(p, "%s:%d", pServer->ip_addr, pServer->port); p += FDFS_PROTO_IP_PORT_SIZE; }
tracker server接收
case TRACKER_PROTO_CMD_STORAGE_JOIN: result = tracker_deal_storage_join(pTask); break;
typedef struct { int storage_port; int storage_http_port; int store_path_count; int subdir_count_per_path; int upload_priority; int join_time; //storage join timestamp (create timestamp) int up_time; //storage service started timestamp char version[FDFS_VERSION_SIZE]; //storage version char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE]; char init_flag; signed char status; int tracker_count; ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS]; } FDFSStorageJoinBody;
同一時候插入本地內存
result = tracker_mem_add_group_and_storage(pClientInfo, \
pTask->client_ip, &joinBody, true);
同一時候把發消息報的id傳過來
pJoinBodyResp = (TrackerStorageJoinBodyResp *)(pTask->data + \ sizeof(TrackerHeader)); memset(pJoinBodyResp, 0, sizeof(TrackerStorageJoinBodyResp)); if (pClientInfo->pStorage->psync_src_server != NULL) { strcpy(pJoinBodyResp->src_id, \ pClientInfo->pStorage->psync_src_server->id); }
五、報告存儲狀態
#define TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS 76 //report specified storage server status
int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \ FDFSStorageBrief *briefServer)
strcpy(out_buff + sizeof(TrackerHeader), g_group_name);
memcpy(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN, \ briefServer, sizeof(FDFSStorageBrief));
typedef struct { char status; char port[4]; char id[FDFS_STORAGE_ID_MAX_SIZE]; char ip_addr[IP_ADDRESS_SIZE]; } FDFSStorageBrief;
六、從tracker獲取storage狀態。
#define TRACKER_PROTO_CMD_STORAGE_GET_STATUS 71 //get storage status from tracker
調用流程例如如下:
int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \ const char *group_name, const char *ip_addr, \ FDFSStorageBrief *pDestBuff) int tracker_get_storage_max_status(TrackerServerGroup *pTrackerGroup, \ const char *group_name, const char *ip_addr, \ char *storage_id, int *status) int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \ const char *group_name, const char *ip_addr, \ FDFSStorageBrief *pDestBuff)
獲取本身的狀態,
包體格式 組名 ip的字符串
tracker經過獲取了對應的數據。查找到storage的信息
結構體爲:
typedef struct { char status; char port[4]; char id[FDFS_STORAGE_ID_MAX_SIZE]; char ip_addr[IP_ADDRESS_SIZE]; } FDFSStorageBrief;
賦值後。返回
七、經過tracker獲取storageid
#define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID 70 //get storage server id from tracker
和上以協議請求同樣 groupname+ip 組成。
tracker處理方法
static int tracker_deal_get_storage_id(struct fast_task_info *pTask)
tracker最後經過
FDFSStorageIdInfo *fdfs_get_storage_id_by_ip(const char *group_name, \ const char *pIpAddr) { FDFSStorageIdInfo target; memset(&target, 0, sizeof(FDFSStorageIdInfo)); snprintf(target.group_name, sizeof(target.group_name), "%s", group_name); snprintf(target.ip_addr, sizeof(target.ip_addr), "%s", pIpAddr); return (FDFSStorageIdInfo *)bsearch(&target, g_storage_ids_by_ip, \ g_storage_id_count, sizeof(FDFSStorageIdInfo), \ fdfs_cmp_group_name_and_ip); }
FDFSStorageIdInfo
信息。而後賦值,返回。
八、經過tracker獲取所有storageserver
#define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69 //get all storage ids from tracker
for (i=0; i<5; i++) { for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++) { memcpy(pTServer, pGServer, sizeof(ConnectionInfo)); pTServer->sock = -1; result = fdfs_get_storage_ids_from_tracker_server(pTServer); if (result == 0) { return result; } } if (pServerStart != pTrackerGroup->servers) { pServerStart = pTrackerGroup->servers; } sleep(1); }
調用順序
int storage_func_init(const char *filename, char *bind_addr, const int addr_size) int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup) int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)
tracker函數。每秒鐘中調用。遍歷所有的trackersserver
trackerserver獲取
case TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS: result = tracker_deal_fetch_storage_ids(pTask); break;
</pre></p><p>返回的數據</p><p></p><pre name="code" class="cpp">pIdsStart = g_storage_ids_by_ip + start_index; pIdsEnd = g_storage_ids_by_ip + g_storage_id_count; for (pIdInfo = pIdsStart; pIdInfo < pIdsEnd; pIdInfo++) { if ((int)(p - pTask->data) > pTask->size - 64) { break; } p += sprintf(p, "%s %s %s\n", pIdInfo->id, \ pIdInfo->group_name, pIdInfo->ip_addr); }
九、回覆給新的storage
#define TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG 85 //repl new storage servers
storageserver調用流程:
剩下的協議
static int tracker_merge_servers(ConnectionInfo *pTrackerServer, \ FDFSStorageBrief *briefServers, const int server_count)
case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE: result = tracker_deal_service_query_fetch_update( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE: result = tracker_deal_service_query_fetch_update( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL: result = tracker_deal_service_query_fetch_update( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE: result = tracker_deal_service_query_storage( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE: result = tracker_deal_service_query_storage( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL: result = tracker_deal_service_query_storage( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL: result = tracker_deal_service_query_storage( \ pTask, pHeader->cmd); break; case TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP: result = tracker_deal_server_list_one_group(pTask); break; case TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS: result = tracker_deal_server_list_all_groups(pTask); break; case TRACKER_PROTO_CMD_SERVER_LIST_STORAGE: result = tracker_deal_server_list_group_storages(pTask); break; case TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ: result = tracker_deal_storage_sync_src_req(pTask); break; case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ: TRACKER_CHECK_LOGINED(pTask) result = tracker_deal_storage_sync_dest_req(pTask); break; case TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY: result = tracker_deal_storage_sync_notify(pTask); break; case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY: result = tracker_deal_storage_sync_dest_query(pTask); break; case TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE: result = tracker_deal_server_delete_storage(pTask); break; case TRACKER_PROTO_CMD_SERVER_SET_TRUNK_SERVER: result = tracker_deal_server_set_trunk_server(pTask); break; case TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED: result = tracker_deal_storage_report_ip_changed(pTask); break; case TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ: result = tracker_deal_changelog_req(pTask); break; case TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ: result = tracker_deal_parameter_req(pTask); break; case FDFS_PROTO_CMD_QUIT: close(pTask->ev_read.ev_fd); task_finish_clean_up(pTask); return 0; case FDFS_PROTO_CMD_ACTIVE_TEST: result = tracker_deal_active_test(pTask); break; case TRACKER_PROTO_CMD_TRACKER_GET_STATUS: result = tracker_deal_get_tracker_status(pTask); break; case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START: result = tracker_deal_get_sys_files_start(pTask); break; case TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE: result = tracker_deal_get_one_sys_file(pTask); break; case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END: result = tracker_deal_get_sys_files_end(pTask); break; case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID: TRACKER_CHECK_LOGINED(pTask) result = tracker_deal_report_trunk_fid(pTask); break; case TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID: TRACKER_CHECK_LOGINED(pTask) result = tracker_deal_get_trunk_fid(pTask); break; case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE: TRACKER_CHECK_LOGINED(pTask) result = tracker_deal_report_trunk_free_space(pTask); break; case TRACKER_PROTO_CMD_TRACKER_PING_LEADER: result = tracker_deal_ping_leader(pTask); break; case TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER: result = tracker_deal_notify_next_leader(pTask); break; case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER: result = tracker_deal_commit_next_leader(pTask); break;