FastDFS源代碼分析之tracker協議分析

本篇博客主要解說fastdfs中tracker協議的解說。服務器

fastdfs主要是存儲文件。直接把整個文件存儲到磁盤上,因此。簡單直接。但是也有很是大的侷限性。app

所以,fastdfs對文件的文件夾設置和存儲是最爲核心的。dom


爲何這麼忽然的解說這些。因爲我已經看了一段時間的fastdfs,主要結構都已經搞的比較清晰了。所以,這篇文章。我就主要一tracker這一部分的協議來分析。函數


其它詳細介紹tracker的請百度。我就不介紹了,我就直接從fetch

int tracker_deal_task(struct fast_task_info *pTask)

這種方法開始對每個case分析。


一、storage心跳協議spa

case TRACKER_PROTO_CMD_STORAGE_BEAT:
			TRACKER_CHECK_LOGINED(pTask)
			result = tracker_deal_storage_beat(pTask);
			break;

天然。該協議是從storage層發送給tracker層的數據包,

#define TRACKER_PROTO_CMD_STORAGE_BEAT              83  //storage heart beat

那麼,storage主要是作了什麼:

storage在啓動的時候,會開啓一個線程,該線程爲線程

static void *tracker_report_thread_entrance(void *arg)

該函數主要是作了依據配置鏈接對應的它的組的tacker。作一些事情,這裏有個while循環,代碼例如如下

current_time = g_current_time;
			if (current_time - last_beat_time >= \
					g_heart_beat_interval)
			{
				if (tracker_heart_beat(pTrackerServer, \
					&stat_chg_sync_count, \
					&bServerPortChanged) != 0)
				{
					break;
				}


也就是至少30秒鐘來一次心跳。心跳包的主要數據是包頭和當前storage的狀態信息,指針

char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];

/* struct for network transfering */
typedef struct
{
	char sz_total_upload_count[8];
	char sz_success_upload_count[8];
	char sz_total_append_count[8];
	char sz_success_append_count[8];
	char sz_total_modify_count[8];
	char sz_success_modify_count[8];
	char sz_total_truncate_count[8];
	char sz_success_truncate_count[8];
	char sz_total_set_meta_count[8];
	char sz_success_set_meta_count[8];
	char sz_total_delete_count[8];
	char sz_success_delete_count[8];
	char sz_total_download_count[8];
	char sz_success_download_count[8];
	char sz_total_get_meta_count[8];
	char sz_success_get_meta_count[8];
	char sz_total_create_link_count[8];
	char sz_success_create_link_count[8];
	char sz_total_delete_link_count[8];
	char sz_success_delete_link_count[8];
	char sz_total_upload_bytes[8];
	char sz_success_upload_bytes[8];
	char sz_total_append_bytes[8];
	char sz_success_append_bytes[8];
	char sz_total_modify_bytes[8];
	char sz_success_modify_bytes[8];
	char sz_total_download_bytes[8];
	char sz_success_download_bytes[8];
	char sz_total_sync_in_bytes[8];
	char sz_success_sync_in_bytes[8];
	char sz_total_sync_out_bytes[8];
	char sz_success_sync_out_bytes[8];
	char sz_total_file_open_count[8];
	char sz_success_file_open_count[8];
	char sz_total_file_read_count[8];
	char sz_success_file_read_count[8];
	char sz_total_file_write_count[8];
	char sz_success_file_write_count[8];
	char sz_last_source_update[8];
	char sz_last_sync_update[8];
	char sz_last_synced_timestamp[8];
	char sz_last_heart_beat_time[8];
} FDFSStorageStatBuff;


tracker主要是作了什麼呢?code

對其進行解包,而後對這個保存在本地的storage的信息進行保存到文件裏,調用server

	status = tracker_save_storages();

調用

	tracker_mem_active_store_server(pClientInfo->pGroup, \
				pClientInfo->pStorage);
將這個存儲服務器假設沒有,就插入到group中。




最後調用

static int tracker_check_and_sync(struct fast_task_info *pTask, \
			const int status)

檢查對應的改變狀態。並將其同步等。

(需要再具體看看)


二、報告對應同步時間

#define TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT	    89  //report src last synced time as dest server


相同在storage的report線程運行

if (sync_time_chg_count != g_sync_change_count && \
				current_time - last_sync_report_time >= \
					g_heart_beat_interval)
			{
				if (tracker_report_sync_timestamp( \
					pTrackerServer, &bServerPortChanged)!=0)
				{
					break;
				}

				sync_time_chg_count = g_sync_change_count;
				last_sync_report_time = current_time;
			}


詳細的數據包爲

pEnd = g_storage_servers + g_storage_count;
	for (pServer=g_storage_servers; pServer<pEnd; pServer++)
	{
		memcpy(p, pServer->server.id, FDFS_STORAGE_ID_MAX_SIZE);
		p += FDFS_STORAGE_ID_MAX_SIZE;
		int2buff(pServer->last_sync_src_timestamp, p);
		p += 4;
	}

也就是遍歷當前進程的本組所有storageserver,和上次同步的時間戳。給trackerserver。

而後tracker的server存儲結構爲

pClientInfo->pGroup->last_sync_timestamps \
				[src_index][dest_index] = sync_timestamp;


dest_index 值爲當前鏈接所在組的索引值

dest_index = tracker_mem_get_storage_index(pClientInfo->pGroup,
			pClientInfo->pStorage);
	if (dest_index < 0 || dest_index >= pClientInfo->pGroup->count)
	{
		status = 0;
		break;
	}

因爲 本連接的storage是固定不變的,而src_index就是爲本組的其它storage的id索引,

首相經過id。(ip地址)找到詳細的storage。而後在經過指針找到索引位置,


最後。調用

	if (++g_storage_sync_time_chg_count % \
			TRACKER_SYNC_TO_FILE_FREQ == 0)
	{
		status = tracker_save_sync_timestamps();
	}
	else
	{
		status = 0;
	}
	} while (0);

	return tracker_check_and_sync(pTask, status);

定時保存文件和檢查等

三、上報磁盤狀況

#define TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE 84  //report disk usage

相同線程定時調用。

	if (current_time - last_df_report_time >= \
					g_stat_report_interval)
			{
				if (tracker_report_df_stat(pTrackerServer, \
						&bServerPortChanged) != 0)
				{
					break;
				}

				last_df_report_time = current_time;
			}


相同上報這些數據

for (i=0; i<g_fdfs_store_paths.count; i++)
	{
		if (statvfs(g_fdfs_store_paths.paths[i], &sbuf) != 0)
		{
			logError("file: "__FILE__", line: %d, " \
				"call statfs fail, errno: %d, error info: %s.",\
				__LINE__, errno, STRERROR(errno));

			if (pBuff != out_buff)
			{
				free(pBuff);
			}
			return errno != 0 ? errno : EACCES;
		}

		g_path_space_list[i].total_mb = ((int64_t)(sbuf.f_blocks) * \
					sbuf.f_frsize) / FDFS_ONE_MB;
		g_path_space_list[i].free_mb = ((int64_t)(sbuf.f_bavail) * \
					sbuf.f_frsize) / FDFS_ONE_MB;
		long2buff(g_path_space_list[i].total_mb, pStatBuff->sz_total_mb);
		long2buff(g_path_space_list[i].free_mb, pStatBuff->sz_free_mb);

		pStatBuff++;
	}


tracker這邊存儲在

int64_t *path_total_mbs; //total disk storage in MB
int64_t *path_free_mbs;  //free disk storage in MB

這裏

path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb);
		path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb);

		pClientInfo->pStorage->total_mb += path_total_mbs[i];
		pClientInfo->pStorage->free_mb += path_free_mbs[i];



四、storage服增長到tracker

#define TRACKER_PROTO_CMD_STORAGE_JOIN              81


storage線程相同在該處調用

if (tracker_report_join(pTrackerServer, tracker_index, \
					sync_old_done) != 0)
		{
			sleep(g_heart_beat_interval);
			continue;
		}


發送的包體數據包爲:

typedef struct
{
	char group_name[FDFS_GROUP_NAME_MAX_LEN+1];
	char storage_port[FDFS_PROTO_PKG_LEN_SIZE];
	char storage_http_port[FDFS_PROTO_PKG_LEN_SIZE];
	char store_path_count[FDFS_PROTO_PKG_LEN_SIZE];
	char subdir_count_per_path[FDFS_PROTO_PKG_LEN_SIZE];
	char upload_priority[FDFS_PROTO_PKG_LEN_SIZE];
	char join_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage join timestamp
	char up_time[FDFS_PROTO_PKG_LEN_SIZE];   //storage service started timestamp
	char version[FDFS_VERSION_SIZE];   //storage version
	char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
	char init_flag;
	signed char status;
	char tracker_count[FDFS_PROTO_PKG_LEN_SIZE];  //all tracker server count
} TrackerStorageJoinBody;

當賦值完畢後。在氣候變增長

p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody);
	pServerEnd = g_tracker_group.servers + g_tracker_group.server_count;
	for (pServer=g_tracker_group.servers; pServer<pServerEnd; pServer++)
	{
		/*
		if (strcmp(pServer->ip_addr, pTrackerServer->ip_addr) == 0 && \
			pServer->port == pTrackerServer->port)
		{
			continue;
		}
		tracker_count++;
		*/

		sprintf(p, "%s:%d", pServer->ip_addr, pServer->port);
		p += FDFS_PROTO_IP_PORT_SIZE;
	}

增長所有tracker的server信息格式爲ip:port


tracker server接收

	case TRACKER_PROTO_CMD_STORAGE_JOIN:
			result = tracker_deal_storage_join(pTask);
			break;


獲取到的相關信息存儲到

typedef struct
{
	int storage_port;
	int storage_http_port;
	int store_path_count;
	int subdir_count_per_path;
	int upload_priority;
	int join_time; //storage join timestamp (create timestamp)
	int up_time;   //storage service started timestamp
        char version[FDFS_VERSION_SIZE];   //storage version
	char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
        char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
        char init_flag;
	signed char status;
	int tracker_count;
	ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS];
} FDFSStorageJoinBody;

這些結構體內

同一時候插入本地內存

result = tracker_mem_add_group_and_storage(pClientInfo, \
pTask->client_ip, &joinBody, true);


同一時候把發消息報的id傳過來

	pJoinBodyResp = (TrackerStorageJoinBodyResp *)(pTask->data + \
				sizeof(TrackerHeader));
	memset(pJoinBodyResp, 0, sizeof(TrackerStorageJoinBodyResp));

	if (pClientInfo->pStorage->psync_src_server != NULL)
	{
		strcpy(pJoinBodyResp->src_id, \
			pClientInfo->pStorage->psync_src_server->id);
	}


五、報告存儲狀態


#define TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS     76  //report specified storage server status

storageserver調用

int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \
		FDFSStorageBrief *briefServer)

內容主要是組名字

strcpy(out_buff + sizeof(TrackerHeader), g_group_name);

和簡要信息

	memcpy(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN, \
			briefServer, sizeof(FDFSStorageBrief));

其結構體例如如下

typedef struct
{
	char status;
	char port[4];
	char id[FDFS_STORAGE_ID_MAX_SIZE];
	char ip_addr[IP_ADDRESS_SIZE];
} FDFSStorageBrief;


六、從tracker獲取storage狀態。

#define TRACKER_PROTO_CMD_STORAGE_GET_STATUS	    71  //get storage status from tracker

該協議是由client發起

調用流程例如如下:


int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \
		const char *group_name, const char *ip_addr, \
		FDFSStorageBrief *pDestBuff)
int tracker_get_storage_max_status(TrackerServerGroup *pTrackerGroup, \
		const char *group_name, const char *ip_addr, \
		char *storage_id, int *status)	
int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \
		const char *group_name, const char *ip_addr, \
		FDFSStorageBrief *pDestBuff)

獲取本身的狀態,

包體格式   組名 ip的字符串


tracker經過獲取了對應的數據。查找到storage的信息

結構體爲:

typedef struct
{
	char status;
	char port[4];
	char id[FDFS_STORAGE_ID_MAX_SIZE];
	char ip_addr[IP_ADDRESS_SIZE];
} FDFSStorageBrief;


賦值後。返回


七、經過tracker獲取storageid

#define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID    70  //get storage server id from tracker

和上以協議請求同樣 groupname+ip 組成。


tracker處理方法

static int tracker_deal_get_storage_id(struct fast_task_info *pTask)

tracker最後經過

FDFSStorageIdInfo *fdfs_get_storage_id_by_ip(const char *group_name, \
		const char *pIpAddr)
{
	FDFSStorageIdInfo target;
	memset(&target, 0, sizeof(FDFSStorageIdInfo));
	snprintf(target.group_name, sizeof(target.group_name), "%s", group_name);
	snprintf(target.ip_addr, sizeof(target.ip_addr), "%s", pIpAddr);
	return (FDFSStorageIdInfo *)bsearch(&target, g_storage_ids_by_ip, \
		g_storage_id_count, sizeof(FDFSStorageIdInfo), \
		fdfs_cmp_group_name_and_ip);
}

該方法獲取了了
FDFSStorageIdInfo

信息。而後賦值,返回。


八、經過tracker獲取所有storageserver

#define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69  //get all storage ids from tracker

	for (i=0; i<5; i++)
	{
		for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++)
		{
			memcpy(pTServer, pGServer, sizeof(ConnectionInfo));
			pTServer->sock = -1;
			result = fdfs_get_storage_ids_from_tracker_server(pTServer);
			if (result == 0)
			{
				return result;
			}
		}

		if (pServerStart != pTrackerGroup->servers)
		{
			pServerStart = pTrackerGroup->servers;
		}
		sleep(1);
	}

調用順序

int storage_func_init(const char *filename, char *bind_addr, const int addr_size)
int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)
int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)


tracker函數。每秒鐘中調用。遍歷所有的trackersserver


trackerserver獲取

case TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS:
			result = tracker_deal_fetch_storage_ids(pTask);
			break;

而後經過這樣的協議格式

</pre></p><p>返回的數據</p><p></p><pre name="code" class="cpp">pIdsStart = g_storage_ids_by_ip + start_index;
	pIdsEnd = g_storage_ids_by_ip + g_storage_id_count;
	for (pIdInfo = pIdsStart; pIdInfo < pIdsEnd; pIdInfo++)
	{
		if ((int)(p - pTask->data) > pTask->size - 64)
		{
			break;
		}

		p += sprintf(p, "%s %s %s\n", pIdInfo->id, \
			pIdInfo->group_name, pIdInfo->ip_addr);
	}



 返回給請求者。


九、回覆給新的storage

#define TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG       85  //repl new storage servers


storageserver調用流程:



剩下的協議


static int tracker_merge_servers(ConnectionInfo *pTrackerServer, \
		FDFSStorageBrief *briefServers, const int server_count)

case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE:
			result = tracker_deal_service_query_fetch_update( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE:
			result = tracker_deal_service_query_fetch_update( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL:
			result = tracker_deal_service_query_fetch_update( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE:
			result = tracker_deal_service_query_storage( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE:
			result = tracker_deal_service_query_storage( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL:
			result = tracker_deal_service_query_storage( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL:
			result = tracker_deal_service_query_storage( \
					pTask, pHeader->cmd);
			break;
		case TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP:
			result = tracker_deal_server_list_one_group(pTask);
			break;
		case TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS:
			result = tracker_deal_server_list_all_groups(pTask);
			break;
		case TRACKER_PROTO_CMD_SERVER_LIST_STORAGE:
			result = tracker_deal_server_list_group_storages(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ:
			result = tracker_deal_storage_sync_src_req(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ:
			TRACKER_CHECK_LOGINED(pTask)
			result = tracker_deal_storage_sync_dest_req(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY:
			result = tracker_deal_storage_sync_notify(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY:
			result = tracker_deal_storage_sync_dest_query(pTask);
			break;
		case TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE:
			result = tracker_deal_server_delete_storage(pTask);
			break;
		case TRACKER_PROTO_CMD_SERVER_SET_TRUNK_SERVER:
			result = tracker_deal_server_set_trunk_server(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED:
			result = tracker_deal_storage_report_ip_changed(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ:
			result = tracker_deal_changelog_req(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ:
			result = tracker_deal_parameter_req(pTask);
			break;
		case FDFS_PROTO_CMD_QUIT:
			close(pTask->ev_read.ev_fd);
			task_finish_clean_up(pTask);
			return 0;
		case FDFS_PROTO_CMD_ACTIVE_TEST:
			result = tracker_deal_active_test(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_GET_STATUS:
			result = tracker_deal_get_tracker_status(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START:
			result = tracker_deal_get_sys_files_start(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE:
			result = tracker_deal_get_one_sys_file(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END:
			result = tracker_deal_get_sys_files_end(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID:
			TRACKER_CHECK_LOGINED(pTask)
			result = tracker_deal_report_trunk_fid(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID:
			TRACKER_CHECK_LOGINED(pTask)
			result = tracker_deal_get_trunk_fid(pTask);
			break;
		case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE:
			TRACKER_CHECK_LOGINED(pTask)
			result = tracker_deal_report_trunk_free_space(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_PING_LEADER:
			result = tracker_deal_ping_leader(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER:
			result = tracker_deal_notify_next_leader(pTask);
			break;
		case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER:
			result = tracker_deal_commit_next_leader(pTask);
			break;
相關文章
相關標籤/搜索