TeamTalk 文件服务器(file_server)分析

TeamTalk 文件服务器(file_server)分析

写在前边

​ 过去一年自己学习了很多c++后台开发的相关知识,但回头发现其实自己实际掌握的知识并不多。我觉着可能有下面几个原因吧,第一点:把看技术书籍当做完任务,看书的时候缺乏耐心,看着看着就一目十行,想着尽快把书看完,骗自己说看完其实就掌握了,其实这样书中的细节很多都没有掌握,只是知道了一些概念而已。第二点,很多时候只是看书,看源码,鲜少编码,这样就会导致你看起来你好像什么都懂,让你自己写代码你很难写出来。第三点,缺乏总结,我觉着说一个人最快进步的方式就是犯了错或者做完一件事情之后去总结,改进,而不是什么都不做,这样永远也不会进步,该掌握的知识还是没有掌握,改犯的错还是会犯。

​ 最近这一个月以来都是在看teamtalk 服务器的相关代码,但发现自己看了好几遍也说不出所以然来,也不能完成把看过的东西描述出来,所以在仔细看完file_server之后,想写一篇东西做个总结,因为自己平时很不擅长写东西,做总结,希望这篇东西能对自己这方面能力有所历练。

概述

Teamtalk文件服务器主要提供客户端文件传输的服务,分为在线传输和离线传输两种文件传输形式,本文对于文件传输的主要业务逻辑进行分析,不分析teamtalk的网络传输库。

teamtalk的后台服务整体架构图

​ file_server在文件传输时需要与msg_server 进行消息传递,现先将后台服务整体架构图列出来

img

源码分析与整体流程分析

1.file_server服务连接相关

msg_server 启动的时候的时候会去连接file_server(必须在msg_server之前启动)

void init_file_serv_conn(serv_info_t* server_list, uint32_t server_count)
{
	g_file_server_list = server_list;
	g_file_server_count = server_count;
    
serv_init<CFileServConn>(g_file_server_list, g_file_server_count);

netlib_register_timer(file_server_conn_timer_callback, NULL, 1000);
s_file_handler = CFileHandler::getInstance();
}

连接成功之后,msg_server 会发消息查询file_server 的ip地址(CID:CID_OTHER_FILE_SERVER_IP_REQ,具体消息:IMFileServerIPReq)

void CFileServConn::OnConfirm()
{
	log("connect to file server success ");
	m_bOpen = true;
	m_connect_time = get_tick_count();
	g_file_server_list[m_serv_idx].reconnect_cnt = MIN_RECONNECT_CNT / 2;
    
    //连上file_server以后,给file_server发送获取ip地址的数据包
    IM::Server::IMFileServerIPReq msg;
    CImPdu pdu;
    pdu.SetPBMsg(&msg);
    pdu.SetServiceId(SID_OTHER);
    pdu.SetCommandId(CID_OTHER_FILE_SERVER_IP_REQ);
    SendPdu(&pdu);
}

file_server 收到消息后,会将自己配置文件中的IP地址发给msg_server

void FileMsgServerConn::_HandleGetServerAddressReq(CImPdu* pPdu) {
    IM::Server::IMFileServerIPRsp msg;
    
    const std::list<IM::BaseDefine::IpAddr>& addrs = ConfigUtil::GetInstance()->GetAddressList();
    
    for (std::list<IM::BaseDefine::IpAddr>::const_iterator it=addrs.begin(); it!=addrs.end(); ++it) {
        IM::BaseDefine::IpAddr* addr = msg.add_ip_addr_list();
        *addr = *it;
        log("Upload file_client_conn addr info, ip=%s, port=%d", addr->ip().c_str(), addr->port());
    }
    
    SendMessageLite(this, SID_OTHER, CID_OTHER_FILE_SERVER_IP_RSP, pPdu->GetSeqNum(), &msg);
}

msg_server 收到Ip地址之后会把ip地址存到m_ip_list中

void CFileServConn::_HandleFileServerIPRsp(CImPdu* pPdu)
{
    IM::Server::IMFileServerIPRsp msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
    uint32_t ip_addr_cnt = msg.ip_addr_list_size();
    
    for (uint32_t i = 0; i < ip_addr_cnt ; i++)
    {
        IM::BaseDefine::IpAddr ip_addr = msg.ip_addr_list(i);
        log("_HandleFileServerIPRsp -> %s : %d ", ip_addr.ip().c_str(), ip_addr.port());
        m_ip_list.push_back(ip_addr);
    }
}

2.开始发文件msg_server 连接

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9vc4P5V4-1584002589483)(C:\Users\dykes\AppData\Roaming\Typora\typora-user-images\image-20200307084035864.png)]

pc客户端在聊天对话框点击上图的发送按之后,会弹出一个文件选择对话框,让用户选择要发送的文件,向文件模块发送 (module::getFileTransferModule()->sendFile)发送文件所在的路径,收文件用户Id,收用户如果在线发送在线文件,否则发送离线文件

     else if (msg.pSender == m_pBtnsendfile) //文件传输
        {
            module::UserInfoEntity userInfo;
            if (!module::getUserListModule()->getUserInfoBySId(m_sId, userInfo))
            {
                LOG__(ERR, _T("SendFile can't find the sid"));
                return;
            }

            CFileDialog	fileDlg(TRUE, NULL, NULL, OFN_HIDEREADONLY | OFN_FILEMUSTEXIST
                , _T("文件|*.*||"), AfxGetMainWnd());
            fileDlg.m_ofn.Flags |= OFN_NOCHANGEDIR;
            fileDlg.DoModal();

            CString sPathName;
            POSITION nPos = fileDlg.GetStartPosition();
            if (nPos != NULL)
            {
                sPathName = fileDlg.GetNextPathName(nPos);
            }
            if (sPathName.IsEmpty())
                return;

            module::getFileTransferModule()->sendFile(sPathName, m_sId, userInfo.isOnlne());
        }

然后把文件的大小,发送端用户fromId,接收端用户toId,文件大小,文件全路径,文件状态(在线/离线文件)发送给消息服务器msg_server发消息(CID_FILE_REQUEST)

BOOL FileTransferModule_Impl::sendFile(IN const CString& sFilePath, IN const std::string& sSendToSID,IN BOOL bOnlineMode)
{
	if (TransferFileEntityManager::getInstance()->checkIfIsSending(sFilePath))
	{
		return FALSE;
	}
	TransferFileEntity fileEntity;
	
	//获取文件大小
	struct __stat64 buffer;
	_wstat64(sFilePath, &buffer);
	fileEntity.nFileSize = (UInt32)buffer.st_size;
	if (0 != fileEntity.nFileSize)
	{
		CString strFileName = sFilePath;
		strFileName.Replace(_T("\\"), _T("/"));//mac上对于路径字符“\”需要做特殊处理,windows上则可以识别
		fileEntity.sFileName = util::cStringToString(strFileName);
		fileEntity.sFromID = module::getSysConfigModule()->userID();
		fileEntity.sToID = sSendToSID;
		uint32_t transMode = 0;
		transMode = bOnlineMode ? IM::BaseDefine::TransferFileType::FILE_TYPE_ONLINE : IM::BaseDefine::TransferFileType::FILE_TYPE_OFFLINE;
		
		LOG__(DEBG,_T("FileTransferSevice_Impl::sendFile sTaskID = %s"), util::stringToCString(fileEntity.sTaskID));

		imcore::IMLibCoreStartOperationWithLambda(
			[=]()
		{
			IM::File::IMFileReq imFileReq;
            LOG__(APP, _T("imFileReq,name=%s,size=%d,toId=%s"),util::stringToCString(fileEntity.sFileName)
                ,fileEntity.nFileSize,util::stringToCString(fileEntity.sToID));
			imFileReq.set_from_user_id(util::stringToInt32(fileEntity.sFromID));
			imFileReq.set_to_user_id(util::stringToInt32(fileEntity.sToID));
			imFileReq.set_file_name(fileEntity.sFileName);
			imFileReq.set_file_size(fileEntity.nFileSize);
			imFileReq.set_trans_mode(static_cast<IM::BaseDefine::TransferFileType>(transMode));

			module::getTcpClientModule()->sendPacket(IM::BaseDefine::ServiceID::SID_FILE
				, IM::BaseDefine::FileCmdID::CID_FILE_REQUEST
				, &imFileReq);
		});
		
		return TRUE;
	}
    LOG__(ERR, _T("fileEntity FileSize error,size = %d"), fileEntity.nFileSize);
	return FALSE;
}

msg_server 收到消息后

   case CID_FILE_REQUEST:
            s_file_handler->HandleClientFileRequest(this, pPdu);
            break;

会从msg_server 和file_server 中建立的连接中随机挑选一个连接,发送CID_OTHER_FILE_TRANSFER_REQ如果是离线文件,直接把消息发给发给file_server ,如果是在线文件,需要查询当前msg_server是否是否与接收端pc连接,如果没有连接,则需要和去route_server 查询状态,如果没有file_server 直接回应pc端失败

void CFileHandler::HandleClientFileRequest(CMsgConn* pMsgConn, CImPdu* pPdu)
{
    IM::File::IMFileReq msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
    
    uint32_t from_id = pMsgConn->GetUserId();
    uint32_t to_id = msg.to_user_id();
    string file_name = msg.file_name();
    uint32_t file_size = msg.file_size();
    uint32_t trans_mode = msg.trans_mode();
    log("HandleClientFileRequest, %u->%u, fileName: %s, trans_mode: %u.", from_id, to_id, file_name.c_str(), trans_mode);
    
    CDbAttachData attach(ATTACH_TYPE_HANDLE, pMsgConn->GetHandle());
    CFileServConn* pFileConn = get_random_file_serv_conn();
    if (pFileConn)
    {
        IM::Server::IMFileTransferReq msg2;
        msg2.set_from_user_id(from_id);
        msg2.set_to_user_id(to_id);
        msg2.set_file_name(file_name);
        msg2.set_file_size(file_size);
        msg2.set_trans_mode((IM::BaseDefine::TransferFileType)trans_mode);
        msg2.set_attach_data(attach.GetBuffer(), attach.GetLength());
        CImPdu pdu;
        pdu.SetPBMsg(&msg2);
        pdu.SetServiceId(SID_OTHER);
        pdu.SetCommandId(CID_OTHER_FILE_TRANSFER_REQ);
        pdu.SetSeqNum(pPdu->GetSeqNum());
        
        if (IM::BaseDefine::FILE_TYPE_OFFLINE == trans_mode)
        {
            pFileConn->SendPdu(&pdu);
        }
        else //IM::BaseDefine::FILE_TYPE_ONLINE
        {
            CImUser* pUser = CImUserManager::GetInstance()->GetImUserById(to_id);
            if (pUser && pUser->GetPCLoginStatus())//已有对应的账号pc登录状态
            {
                pFileConn->SendPdu(&pdu);
            }
            else//无对应用户的pc登录状态,向route_server查询状态
            {
                //no pc_client in this msg_server, check it from route_server
                CPduAttachData attach_data(ATTACH_TYPE_HANDLE_AND_PDU_FOR_FILE, pMsgConn->GetHandle(), pdu.GetBodyLength(), pdu.GetBodyData());
                IM::Buddy::IMUsersStatReq msg3;
                msg3.set_user_id(from_id);
                msg3.add_user_id_list(to_id);
                msg3.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());
                CImPdu pdu2;
                pdu2.SetPBMsg(&msg3);
                pdu2.SetServiceId(SID_BUDDY_LIST);
                pdu2.SetCommandId(CID_BUDDY_LIST_USERS_STATUS_REQUEST);
                pdu2.SetSeqNum(pPdu->GetSeqNum());
                CRouteServConn* route_conn = get_route_serv_conn();
                if (route_conn)
                {
                    route_conn->SendPdu(&pdu2);
                }
            }
        }
    }
    else
    {
        log("HandleClientFileRequest, no file server.   ");
        IM::File::IMFileRsp msg2;
        msg2.set_result_code(1);
        msg2.set_from_user_id(from_id);
        msg2.set_to_user_id(to_id);
        msg2.set_file_name(file_name);
        msg2.set_task_id("");
        msg2.set_trans_mode((IM::BaseDefine::TransferFileType)trans_mode);
        CImPdu pdu;
        pdu.SetPBMsg(&msg2);
        pdu.SetServiceId(SID_FILE);
        pdu.SetCommandId(CID_FILE_RESPONSE);
        pdu.SetSeqNum(pPdu->GetSeqNum());
        pMsgConn->SendPdu(&pdu);
    }
}

fil_server收到消息后,会新建一个guid作为一个文件任务标识符,然后新建一个文件任务插入队列,接下来把这个任务状态 加上taskId回给msg_server,

void FileMsgServerConn::_HandleMsgFileTransferReq(CImPdu* pdu) {
    IM::Server::IMFileTransferReq transfer_req;
    CHECK_PB_PARSE_MSG(transfer_req.ParseFromArray(pdu->GetBodyData(), pdu->GetBodyLength()));
    
    
    uint32_t from_id = transfer_req.from_user_id();
    uint32_t to_id = transfer_req.to_user_id();
    
    IM::Server::IMFileTransferRsp transfer_rsp;
    transfer_rsp.set_result_code(1);
    transfer_rsp.set_from_user_id(from_id);
    transfer_rsp.set_to_user_id(to_id);
    transfer_rsp.set_file_name(transfer_req.file_name());
    transfer_rsp.set_file_size(transfer_req.file_size());
    transfer_rsp.set_task_id("");
    transfer_rsp.set_trans_mode(transfer_req.trans_mode());
    transfer_rsp.set_attach_data(transfer_req.attach_data());

    
    bool rv = false;
    do {
        std::string task_id = GenerateUUID();
        if (task_id.empty()) {
            log("Create task id failed");
            break;
        }
        log("trams_mode=%d, task_id=%s, from_id=%d, to_id=%d, file_name=%s, file_size=%d", transfer_req.trans_mode(), task_id.c_str(), from_id, to_id, transfer_req.file_name().c_str(), transfer_req.file_size());
        
        BaseTransferTask* transfer_task = TransferTaskManager::GetInstance()->NewTransferTask(
                                                                                              transfer_req.trans_mode(),
                                                                                              task_id,
                                                                                              from_id,
                                                                                              to_id,
                                                                                              transfer_req.file_name(),
                                                                                              transfer_req.file_size());
        
        if (transfer_task == NULL) {
            // 创建未成功
            // close connection with msg svr
            // need_close = true;
            log("Create task failed");
            break;
        }
        
        transfer_rsp.set_result_code(0);
        transfer_rsp.set_task_id(task_id);
        rv = true;
        // need_seq_no = false;
        
        log("Create task succeed, task id %s, task type %d, from user %d, to user %d", task_id.c_str(), transfer_req.trans_mode(), from_id, to_id);
    } while (0);
    
    ::SendMessageLite(this, SID_OTHER, CID_OTHER_FILE_TRANSFER_RSP, pdu->GetSeqNum(), &transfer_rsp);
    
    if (!rv) {
        // 未创建成功,关闭连接
        Close();
    }
}

msg_server接收到这个回应后,会把上述建立的文件传输任务相关信息还有之前缓存的IP地址包装成IMFileRsp通过cid CID_FILE_RESPONSE发送给客户端发送端,如果前面建立任务成功(result=0)则通知所有接收端的连接(CID_FILE_NOTIFY) 包括任务ID,文件服务器ip地址等,并且发送给route_server ,然后route_server 通知所有的服务端

3,客户端与file_server 相关

void FileTransferModule_Impl::onPacket(imcore::TTPBHeader& header, std::string& pbBody)
{
	switch (header.getCommandId())
	{
	case IM::BaseDefine::FileCmdID::CID_FILE_RESPONSE://发送“文件请求/离线文件”-返回
		_sendfileResponse(pbBody);
		break;
	case IM::BaseDefine::FileCmdID::CID_FILE_NOTIFY://收到“发送文件请求”
		_fileNotify(pbBody);
		break;
	case IM::BaseDefine::FileCmdID::CID_FILE_HAS_OFFLINE_RES:
		_hasOfflineRes(pbBody);
		break;
	default:
		return;
	}

enum ClientFileRole {
CLIENT_REALTIME_SENDER = 1, 在线发送端
CLIENT_REALTIME_RECVER = 2,在线接收端
CLIENT_OFFLINE_UPLOAD = 3,离线发送端
CLIENT_OFFLINE_DOWNLOAD = 4 离线接收端
};

(1)在线文件传输

发送端CID_FILE_RESPONSE:发送端收到CID_FILE_RESPONSE后,会新建一个任务相关信息到任务map,如果是在线文件置clientrole为CLIENT_REALTIME_SENDER ,接下来去根据接收到ip地址来连接文件服务器

void FileTransferModule_Impl::_sendfileResponse(IN std::string& body)
{
	IM::File::IMFileRsp imFileRsp;
	if (!imFileRsp.ParseFromString(body))
	{
		LOG__(ERR, _T("parse failed,body:%s"), util::stringToCString(body));
		return;
	}

	UInt32 nResult = imFileRsp.result_code();
	if (nResult != 0)
	{
		LOG__(ERR, _T("_sendfileResponse result != 0"));
		module::getFileTransferModule()->asynNotifyObserver(module::KEY_FILESEVER_UPLOAD_FAILED);
	}

	TransferFileEntity fileEntity;
	fileEntity.sTaskID = imFileRsp.task_id();
    assert(!fileEntity.sTaskID.empty());
	fileEntity.sFromID = util::uint32ToString(imFileRsp.from_user_id());
	fileEntity.sToID = util::uint32ToString(imFileRsp.to_user_id());
	fileEntity.sFileName = imFileRsp.file_name();
	fileEntity.setSaveFilePath(util::stringToCString(fileEntity.sFileName));//发送方文件地址,就是保存地址
	fileEntity.time = static_cast<UInt32>(time(0));
	uint32_t transMode = imFileRsp.trans_mode();
	if (IM::BaseDefine::TransferFileType::FILE_TYPE_ONLINE == transMode)
	{
		fileEntity.nClientMode = IM::BaseDefine::ClientFileRole::CLIENT_REALTIME_SENDER;
	}
	else if (IM::BaseDefine::TransferFileType::FILE_TYPE_OFFLINE == transMode)
	{
		fileEntity.nClientMode = IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_UPLOAD;
	}
	fileEntity.pFileObject = new TransferFile(util::stringToCString(fileEntity.sFileName),FALSE);
	if (fileEntity.pFileObject)
	{
		fileEntity.nFileSize = fileEntity.pFileObject->length();
	}
	
	UINT32 nIPCount = imFileRsp.ip_addr_list_size();
	if (nIPCount <= 0)
	{
		return;
	}
	const IM::BaseDefine::IpAddr& ipAdd = imFileRsp.ip_addr_list(0);
	fileEntity.sIP = ipAdd.ip();
	fileEntity.nPort = ipAdd.port();

	if (!TransferFileEntityManager::getInstance()->pushTransferFileEntity(fileEntity))
		TransferFileEntityManager::getInstance()->updateFileInfoBysTaskID(fileEntity);

	LOG__(DEBG, _T("FileTransferSevice_Impl::准备连接文件服务器 sTaskId = %s"), util::stringToCString(fileEntity.sTaskID));
	//连接文件服务器
	TransferFileEntityManager::getInstance()->openFileSocketByTaskId(fileEntity.sTaskID);
}

接收端:CID_FILE_NOTIFY:接收端收到消息,同样也会把任务加入任务map,如果状态是在线文件ClientFileRole=CLIENT_REALTIME_RECVER ,离线文件CLIENT_OFFLINE_DOWNLOAD 接下来会连接文件服务器

void FileTransferModule_Impl::_fileNotify(IN std::string& body)
{
	IM::File::IMFileNotify imFileNotify;
	if (!imFileNotify.ParseFromString(body))
	{
		LOG__(ERR, _T("parse failed,body:%s"), util::stringToCString(body));
		return;
	}
	TransferFileEntity file;
	file.sFileName = imFileNotify.file_name();
	file.sFromID = util::uint32ToString(imFileNotify.from_user_id());
	file.sToID = util::uint32ToString(imFileNotify.to_user_id());
	file.sTaskID = imFileNotify.task_id();
	file.nFileSize = imFileNotify.file_size();

	UINT32 nIPCount = imFileNotify.ip_addr_list_size();
	if (nIPCount <= 0)
	{
		return;
	}
	const IM::BaseDefine::IpAddr& ipAdd = imFileNotify.ip_addr_list(0);
	file.sIP = ipAdd.ip();
	file.nPort = ipAdd.port();

	uint32_t transMode = imFileNotify.trans_mode();
	if (IM::BaseDefine::TransferFileType::FILE_TYPE_ONLINE == transMode)
	{
		file.nClientMode = IM::BaseDefine::ClientFileRole::CLIENT_REALTIME_RECVER;
	}
	else if (IM::BaseDefine::TransferFileType::FILE_TYPE_OFFLINE == transMode)
	{
		file.nClientMode = IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD;
	}
	file.time = static_cast<UInt32>(time(0));

	TransferFileEntityManager::getInstance()->pushTransferFileEntity(file);
	LOG__(DEBG, _T("FileTransferSevice_Impl::给你发文件 sFileID = %s"), util::stringToCString(file.sTaskID));

	if (1 == imFileNotify.offline_ready())
	{
		//TODO离线文件传输结束
	}

	//连接服务器
	TransferFileEntityManager::getInstance()->openFileSocketByTaskId(file.sTaskID);
}

接受端与发送端都连接 file_server连接成功后,会发送CID_FILE_LOGIN_REQ给文件服务器会携带当前clientrole,taskid,当前的userid


void FileTransferSocket::onConnectDone()
{
	LOG__(APP, _T("FileTransferSocket::onConnected()"));
	startHeartbeat();

	TransferFileEntity info;
    if (!TransferFileEntityManager::getInstance()->getFileInfoByTaskId(m_sTaskId, info))
    {
        LOG__(APP, _T("Can't get the file info,task id:%s"),util::stringToCString(m_sTaskId));
        return;
    }
		
	//拉模式文件传输,传输taskid、token、client_mode
	IM::File::IMFileLoginReq imFileLoginReq;
	imFileLoginReq.set_user_id(module::getSysConfigModule()->userId());
	imFileLoginReq.set_task_id(info.sTaskID);
	imFileLoginReq.set_file_role(static_cast<IM::BaseDefine::ClientFileRole>(info.nClientMode));

    LOG__(APP, _T("IMFileLoginReq,sTaskID:%s,nClientMode:%d"), util::stringToCString(info.sTaskID), info.nClientMode);
	//send packet
    LOG__(APP, _T("IMFileLoginReq,taskId:%s"), util::stringToCString(info.sTaskID));
    sendPacket(IM::BaseDefine::ServiceID::SID_FILE, IM::BaseDefine::FileCmdID::CID_FILE_LOGIN_REQ, &imFileLoginReq);

	//CImPduClientFileLoginReq pduFileLoginReq(module::getSysConfigModule()->userID().c_str()
	//	, "", info.sTaskID.c_str(), );
	//sendPacket(&pduFileLoginReq);
}

文件服务器收到这个消息后,会查询当前taskid是否在任务map中存在,则会进行状态机转化

// 状态机
enum TransferTaskState {
    kTransferTaskStateInvalid = 0,              // 非法状态
    
    kTransferTaskStateReady = 1,                // 已经准备好了
    
    kTransferTaskStateWaitingSender = 2,        // 等待发起者
    kTransferTaskStateWaitingReceiver = 3,      // 等待接收者
    kTransferTaskStateWaitingTransfer = 4,   // 全部已经准备好了
    kTransferTaskStateTransfering = 5,          // 正在传输中
    kTransferTaskStateTransferDone = 6,          // 正在传输中
    
    kTransferTaskStateWaitingUpload = 7,        // 等待上传
    kTransferTaskStateUploading = 8,            // 正在上传中
    kTransferTaskStateUploadEnd = 9,            // 正在上传中
    
    kTransferTaskStateWaitingDownload = 10,      // 等待下载
    kTransferTaskStateDownloading = 11,          // 正在下载中
    kTransferTaskStateDownloadEnd = 12,          // 正在下载中

    kTransferTaskStateError = 13,               // 传输失败
};
void FileClientConn::_HandleClientFileLoginReq(CImPdu* pdu) {
    IM::File::IMFileLoginReq login_req;
    CHECK_PB_PARSE_MSG(login_req.ParseFromArray(pdu->GetBodyData(), pdu->GetBodyLength()));
    
    uint32_t user_id = login_req.user_id();
    string task_id = login_req.task_id();
    IM::BaseDefine::ClientFileRole mode = login_req.file_role();
    
    log("Client login, user_id=%d, task_id=%s, file_role=%d", user_id, task_id.c_str(), mode);
    
    BaseTransferTask* transfer_task = NULL;
    
    bool rv = false;
    do {
        // 查找任务是否存在
        transfer_task = TransferTaskManager::GetInstance()->FindByTaskID(task_id);
        
        if (transfer_task == NULL) {
            if (mode == CLIENT_OFFLINE_DOWNLOAD) {
                // 文件不存在,检查是否是离线下载,有可能是文件服务器重启
                // 尝试从磁盘加载
                transfer_task = TransferTaskManager::GetInstance()->NewTransferTask(task_id, user_id);
                // 需要再次判断是否加载成功
                if (transfer_task == NULL) {
                    log("Find task id failed, user_id=%u, taks_id=%s, mode=%d", user_id, task_id.c_str(), mode);
                    break;
                }
            } else {
                log("Can't find task_id, user_id=%u, taks_id=%s, mode=%d", user_id, task_id.c_str(), mode);
                break;
            }
        }
        
        // 状态转换
        rv = transfer_task->ChangePullState(user_id, mode);
        ...省略

状态转化:

先检查当前任务的fromid与toID是否正确,(建议任务的初始状态机为kTransferTaskStateReady),如果发送端数据先来,则状态为kTransferTaskStateWaitingReceiver,若接受端先来为kTransferTaskStateWaitingSender

注意:kTransferTaskStateWaitingReceiver和kTransferTaskStateWaitingSender这两个状态只会存在一个,

当第二次发送端或者接收端来的时候,然后状态转为kTransferTaskStateWaitingTransfer

//文件传输的下一个状态
bool OnlineTransferTask::ChangePullState(uint32_t user_id, int file_role) {
    // 在线文件传输,初始状态:kTransferTaskStateReady
    //  状态转换流程 kTransferTaskStateReady
    //        --> kTransferTaskStateWaitingSender或kTransferTaskStateWaitingReceiver
    //        --> kTransferTaskStateWaitingTransfer
    //
    
    bool rv = false;
    
    do {
        rv = CheckByUserIDAndFileRole(user_id, file_role);
        if (!rv) {
            //
            log("Check error! user_id=%d, file_role=%d", user_id, file_role);
            break;
        }
        
        if (state_ != kTransferTaskStateReady && state_ != kTransferTaskStateWaitingSender && state_ != kTransferTaskStateWaitingReceiver) {
            log("Invalid state, valid state is kTransferTaskStateReady or kTransferTaskStateWaitingSender or kTransferTaskStateWaitingReceiver, but state is %d", state_);
            break;
        }
        
        //state 是全局变量,接收端连接经历啊的时候就会转向 kTransferTaskStateWaitingTransfer
        if (state_ == kTransferTaskStateReady) 
        {
            // 第一个用户进来
            // 如果是sender,则-->kTransferTaskStateWaitingReceiver
            // 如果是receiver,则-->kTransferTaskStateWaitingSender
            if (file_role == CLIENT_REALTIME_SENDER)
            {
                state_ = kTransferTaskStateWaitingReceiver;
            } else //为啥recv要等待sender?
            {
                state_ = kTransferTaskStateWaitingSender;
            }
        } else 
        {
            if (state_ == kTransferTaskStateWaitingReceiver) 
            {
                // 此时必须是receiver
                // 需要检查是否是receiver
                if (file_role != CLIENT_REALTIME_RECVER) {
                    log("Invalid user, user_id = %d, but to_user_id_ = %d", user_id, to_user_id_);
                    break;
                }
            } else if (state_ == kTransferTaskStateWaitingSender) {
                // 此时必须是sender
                // 需要检查是否是sender
                if (file_role != CLIENT_REALTIME_SENDER) {
                    log("Invalid user, user_id = %d, but to_user_id_ = %d", user_id, to_user_id_);
                    break;
                }
            }
             //如果 两方都处于等待状态,则下个状态是传输
            state_ = kTransferTaskStateWaitingTransfer;
            
        }
        
        SetLastUpdateTime();
        rv = true;
    } while (0);
    
    return rv;
}

状态转化完成之后,则分别给发送和接受端回复CID_FILE_LOGIN_RES,发送接受这两个消息后,分别会打开自己的文件传输对话框,如果上方的发送端和接受端都进行了上方的状态转化,还要向接受端发送状态准备就绪_StatesNotify(CLIENT_FILE_PEER_READY, task_id, transfer_task_->from_user_id(), conn);

   if (!rv) {
            // log();
            break;
            //
        }
   // Ok
    auth_ = true;
    transfer_task_ = transfer_task;
    user_id_ = user_id;
    // 设置conn
    transfer_task->SetConnByUserID(user_id, this);
    rv = true;
    
} while (0);

IM::File::IMFileLoginRsp login_rsp;
login_rsp.set_result_code(rv?0:1);
login_rsp.set_task_id(task_id);

//让发送端打开发送文件对话框,接收端打开接收文件对话框
::SendMessageLite(this, SID_FILE, CID_FILE_LOGIN_RES, pdu->GetSeqNum(), &login_rsp);

if (rv) {
    if (transfer_task->GetTransMode() == FILE_TYPE_ONLINE) 
    {
        if (transfer_task->state() == kTransferTaskStateWaitingTransfer) 
        {
            CImConn* conn = transfer_task_->GetToConn();
            if (conn) 
            {     //发送当前状态 // 1.准备就绪,取消
                _StatesNotify(CLIENT_FILE_PEER_READY, task_id, transfer_task_->from_user_id(), conn);
            } else 
            {
                log("to_conn is close, close me!!!");
                Close();
            }
            // _StatesNotify(CLIENT_FILE_PEER_READY, task_id, user_id, this);
            // transfer_task->StatesNotify(CLIENT_FILE_PEER_READY, task_id, user_id_);
        }
    } 
    else 
    {
        if (transfer_task->state() == kTransferTaskStateWaitingUpload) {
            
            OfflineTransferTask* offline = reinterpret_cast<OfflineTransferTask*>(transfer_task);
            
            IM::File::IMFilePullDataReq pull_data_req;
            pull_data_req.set_task_id(task_id);
            pull_data_req.set_user_id(user_id);
            pull_data_req.set_trans_mode(FILE_TYPE_OFFLINE);
            pull_data_req.set_offset(0);
            pull_data_req.set_data_size(offline->GetNextSegmentBlockSize());
            
            ::SendMessageLite(this, SID_FILE, CID_FILE_PULL_DATA_REQ, &pull_data_req);

            log("Pull Data Req");
        }
    }
} else {
    Close();
}

这里客户端收到CID_FILE_LOGIN_RES通过这个消息KEY_FILETRANSFER_SENDFILE通知发送端和接受端打开文件传输对话框

void FileTransferSocket::_fileLoginResponse(IN std::string& body)
{
   IM::File::IMFileLoginRsp imFileLoginRsp;
   if (!imFileLoginRsp.ParseFromString(body))
   {
       LOG__(ERR, _T("parse failed,body:%s"), util::stringToCString(body));
       return;
   }
   if (imFileLoginRsp.result_code() != 0)
   {
   	LOG__(ERR, _T("file server login failed! "));
   	return;
   }
   //打开文件
   std::string taskId = imFileLoginRsp.task_id();
   TransferFileEntity fileEntity;
   if (!TransferFileEntityManager::getInstance()->getFileInfoByTaskId(taskId, fileEntity))
   {
   	LOG__(ERR, _T("file server login:can't find the fileInfo "));
   	return;
   }

   LOG__(APP, _T("IMFileLoginRsp, file server login succeed"));
   //提示界面,界面上插入该项
   if (IM::BaseDefine::ClientFileRole::CLIENT_REALTIME_SENDER == fileEntity.nClientMode
   	|| IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_UPLOAD == fileEntity.nClientMode)
   {
   	module::getFileTransferModule()->asynNotifyObserver(module::KEY_FILETRANSFER_SENDFILE, fileEntity.sTaskID);
   }
   else if (IM::BaseDefine::ClientFileRole::CLIENT_REALTIME_RECVER == fileEntity.nClientMode
   	|| IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD == fileEntity.nClientMode)
   {
   	module::getFileTransferModule()->asynNotifyObserver(module::KEY_FILETRANSFER_RE如QUEST, fileEntity.sTaskID);
   }
}

如果点击了接受文件,则接收端会给文件服务器发送CID_FILE_PULL_DATA_REQ消息,设置文件传输单元为

FILE_TRANSFER_BLOCK_SIZE(34*1024)初始文件偏移量为0

BOOL FileTransferUIThread::acceptFileTransfer(const std::string& taskId)
{
	FileTransferSocket* pFileSocket = _findFileSocketByTaskId(taskId);
	if (pFileSocket)
	{
		imcore::IMLibCoreStartOperationWithLambda(
			[=]()
		{
			TransferFileEntity fileEntity;
			if (TransferFileEntityManager::getInstance()->getFileInfoByTaskId(taskId, fileEntity))
			{
				int mode = fileEntity.nClientMode == IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD ? IM::BaseDefine::TransferFileType::FILE_TYPE_OFFLINE : IM::BaseDefine::TransferFileType::FILE_TYPE_ONLINE;
                LOG__(APP, _T("IMFilePullDataReq,taskId:%s"),util::stringToCString(taskId));
                IM::File::IMFilePullDataReq imFilePullDataReq;
				imFilePullDataReq.set_task_id(taskId);
				imFilePullDataReq.set_user_id(util::stringToInt32(fileEntity.sToID));
				imFilePullDataReq.set_trans_mode(static_cast<IM::BaseDefine::TransferFileType>(mode));
				imFilePullDataReq.set_offset(0);
                fileEntity.nFileSize > FILE_TRANSFER_BLOCK_SIZE ? imFilePullDataReq.set_data_size(FILE_TRANSFER_BLOCK_SIZE) : imFilePullDataReq.set_data_size(fileEntity.nFileSize);
				//发包
                pFileSocket->sendPacket(IM::BaseDefine::ServiceID::SID_FILE
                    , IM::BaseDefine::FileCmdID::CID_FILE_PULL_DATA_REQ, &imFilePullDataReq);
				//CImPduClientFilePullDataReq pduPullDataReq(taskId.c_str(), fileEntity.sToID.c_str()
				//	, mode, 0, FILE_TRANSFER_BLOCK_SIZE);
				//pFileSocket->sendPacket(&pduPullDataReq);
			}
		});
	}

	return FALSE;
}

文件服务器收到消息,会把状态机变为从kTransferTaskStateWaitingTransfer改变为kTransferTaskStateTransfering

,如果是在线文件 直接把这个消息转送给发送端

void FileClientConn::_HandleClientFilePullFileReq(CImPdu *pdu) {
    if (!auth_ || !transfer_task_) {
        log("Recv a client_file_state, but auth is false");
        return;
    }
    
    IM::File::IMFilePullDataReq pull_data_req;
    CHECK_PB_PARSE_MSG(pull_data_req.ParseFromArray(pdu->GetBodyData(), pdu->GetBodyLength()));
    
    uint32_t user_id = pull_data_req.user_id();
    string task_id = pull_data_req.task_id();
    uint32_t mode = pull_data_req.trans_mode();
    uint32_t offset = pull_data_req.offset();
    uint32_t datasize = pull_data_req.data_size();

    log("Recv FilePullFileReq, user_id=%d, task_id=%s, file_role=%d, offset=%d, datasize=%d", user_id, task_id.c_str(), mode, offset, datasize);

    // rsp
    IM::File::IMFilePullDataRsp pull_data_rsp;
    pull_data_rsp.set_result_code(1);
    pull_data_rsp.set_task_id(task_id);
    pull_data_rsp.set_user_id(user_id);
    pull_data_rsp.set_offset(offset);
    pull_data_rsp.set_file_data("");

    // BaseTransferTask* transfer_task = NULL;
    int rv = -1;
    
    do {
        // 检查user_id
        if (user_id != user_id_) {
            log("Received user_id valid, recv_user_id = %d, transfer_task.user_id = %d, user_id_ = %d", user_id, transfer_task_->from_user_id(), user_id_);
            break;
        }

        // 检查task_id
        if (transfer_task_->task_id() != task_id) {
            log("Received task_id valid, recv_task_id = %s, this_task_id = %s", task_id.c_str(), transfer_task_->task_id().c_str());
            break;
        }
        
        // 离线传输,需要下载文件
        // 在线传输,从发送者拉数据
        // user_id为transfer_task.to_user_id
        if (!transfer_task_->CheckToUserID(user_id)) {
            log("user_id equal transfer_task.to_user_id, but user_id=%d, transfer_task.to_user_id=%d", user_id, transfer_task_->to_user_id());
            break;
        }
        
        rv =  transfer_task_->DoPullFileRequest(user_id, offset, datasize, pull_data_rsp.mutable_file_data());
        
        if (rv == -1) {
            break;
        }
        
        pull_data_rsp.set_result_code(0);

        if (transfer_task_->GetTransMode() == FILE_TYPE_ONLINE) {
            OnlineTransferTask* online = reinterpret_cast<OnlineTransferTask*>(transfer_task_);
            online->SetSeqNum(pdu->GetSeqNum());
            CImConn* conn = transfer_task_->GetOpponentConn(user_id);
            if (conn) {
                conn->SendPdu(pdu);
                // SendMessageLite(conn, SID_FILE, CID_FILE_PULL_DATA_RSP, pdu->GetSeqNum(), &pull_data_rsp);
            }
            // SendPdu(&pdu);
        } else {
            SendMessageLite(this, SID_FILE, CID_FILE_PULL_DATA_RSP, pdu->GetSeqNum(), &pull_data_rsp);
            if (rv == 1) {
                _StatesNotify(CLIENT_FILE_DONE, task_id, transfer_task_->from_user_id(), this);
            }
        }
        
        // rv = 0;
    } while (0);


    if (rv!=0) {
        Close();
    }

}

//客户端收到消息后,就会开始读取文件,然后把文件信息放到IMFilePullDataRsp中发送给文件服务器,发出的文件会更新进度条

void FileTransferSocket::filePullDataReqResponse(IN std::string& body)//发
{
IM::File::IMFilePullDataReq imFilePullDataReq;
if (!imFilePullDataReq.ParseFromString(body))
{
LOG
_(ERR, _T(“parse failed,body:%s”), util::stringToCString(body));
return;
}
UInt32 fileSize = imFilePullDataReq.data_size();
UInt32 fileOffset = imFilePullDataReq.offset();
std::string taskId = imFilePullDataReq.task_id();

TransferFileEntity fileEntity;
if (!TransferFileEntityManager::getInstance()->getFileInfoByTaskId(taskId, fileEntity))
{
	LOG__(ERR, _T("PullDataReqResponse: can't find the fileInfo"));
	return;
}
LOG__(DEBG, _T("send:taskId=%s,filesize=%d,name=%s,BolckSize=%d")
	,util::stringToCString(fileEntity.sTaskID)
	,fileEntity.nFileSize
	,fileEntity.getRealFileName()
    ,fileSize);
std::string buff;
if (nullptr == fileEntity.pFileObject)
{
	LOG__(ERR, _T("PullDataReqResponse: file boject Destoryed!"));
	return;
}
fileEntity.pFileObject->readBlock(fileOffset, fileSize, buff);//读取本地文件的数据块
IM::File::IMFilePullDataRsp imFilePullDataRsp;//todo check
imFilePullDataRsp.set_result_code(0);
imFilePullDataRsp.set_task_id(taskId);
imFilePullDataRsp.set_user_id(util::stringToInt32(fileEntity.sFromID));
imFilePullDataRsp.set_offset(fileOffset);
imFilePullDataRsp.set_file_data((void*)buff.data(), fileSize);

//send packet
sendPacket(IM::BaseDefine::ServiceID::SID_FILE, IM::BaseDefine::FileCmdID::CID_FILE_PULL_DATA_RSP
    , &imFilePullDataRsp);

fileEntity.nProgress = fileOffset + fileSize;
if (fileEntity.nProgress < fileEntity.nFileSize)
{
	//更新进度条
	TransferFileEntityManager::getInstance()->updateFileInfoBysTaskID(fileEntity);//保存当前进度
	module::getFileTransferModule()->asynNotifyObserver(module::KEY_FILESEVER_UPDATA_PROGRESSBAR
        , fileEntity.sTaskID);
}
else//传输完成
{
	if (fileEntity.pFileObject)
	{
		delete fileEntity.pFileObject;
		fileEntity.pFileObject = nullptr;
	}
	module::getFileTransferModule()->asynNotifyObserver(module::KEY_FILESEVER_PROGRESSBAR_FINISHED
        , fileEntity.sTaskID);
}
TransferFileEntityManager::getInstance()->updateFileInfoBysTaskID(fileEntity);

文件服务器如果收到这个消息,会直接转发给接收端

   if (transfer_task_->GetTransMode() == FILE_TYPE_ONLINE) {
            // 对于在线,直接转发
            OnlineTransferTask* online = reinterpret_cast<OnlineTransferTask*>(transfer_task_);
            pdu->SetSeqNum(online->GetSeqNum());
            // online->SetSeqNum(pdu->GetSeqNum());

            CImConn* conn = transfer_task_->GetToConn();
            if (conn) {
                conn->SendPdu(pdu);
            }

接收端收到这个消息CID_FILE_PULL_DATA_RSP后,会把文件保存到本地,更新文件传输进度,如果文件 没传送完,会把当前文件进度设为偏移量,继续向文件服务器发送CID_FILE_PULL_DATA_REQ ,文件如果传输完成

接受端会发送CID_FILE_STATE状态为IM::BaseDefine::ClientFileState::CLIENT_FILE_DONE,文件服务器会转发这个状态到发送端,文件服务器关闭连接,并且将状态变为kTransferTaskStateInvalid,然后发送端子在心跳中检测是否无效,就会关闭与发送端的连接

void FileClientConn::_HandleClientFileStates(CImPdu* pdu) {
    if (!auth_ || !transfer_task_) {
        log("Recv a client_file_state, but auth is false");
        return;
    }
    
    IM::File::IMFileState file_state;
    CHECK_PB_PARSE_MSG(file_state.ParseFromArray(pdu->GetBodyData(), pdu->GetBodyLength()));
    
    string task_id = file_state.task_id();
    uint32_t user_id = file_state.user_id();
    uint32_t file_stat = file_state.state();
    
    log("Recv FileState, user_id=%d, task_id=%s, file_stat=%d", user_id, task_id.c_str(),file_stat);

    // FilePullFileRsp
    bool rv = false;
    do {
        // 检查user_id
        if (user_id != user_id_) {
            log("Received user_id valid, recv_user_id = %d, transfer_task.user_id = %d, user_id_ = %d", user_id, transfer_task_->from_user_id(), user_id_);
            break;
        }
        
        // 检查task_id
        if (transfer_task_->task_id() != task_id) {
            log("Received task_id valid, recv_task_id = %s, this_task_id = %s", task_id.c_str(), transfer_task_->task_id().c_str());
            break;
        }

        switch (file_stat) {
            case CLIENT_FILE_CANCEL:
            case CLIENT_FILE_DONE:
            case CLIENT_FILE_REFUSE:
            {
                CImConn* im_conn = transfer_task_->GetOpponentConn(user_id);
                if (im_conn) {
                    im_conn->SendPdu(pdu);
                    log("Task %s %d by user_id %d notify %d, erased", task_id.c_str(), file_stat, user_id, transfer_task_->GetOpponent(user_id));
                }
                // notify other client
                // CFileConn* pConn = (CFileConn*)t->GetOpponentConn(user_id);
                // pConn->SendPdu(pPdu);
                
                // TransferTaskManager::GetInstance()->DeleteTransferTask(task_id);
                
                rv = true;
                break;
            }
                
            default:
                log("Recv valid file_stat: file_state = %d, user_id=%d, task_id=%s", file_stat, user_id_, task_id.c_str());
                break;
        }
        
        // rv = true;
    } while (0);
    //断掉连接
    // if (!rv) {
    Close();

发送端 收到CLIENT_FILE_DONE之后会删除文件在内存中,然后把文件信息插入本地数据库,插入已完成文件列表

module::getDatabaseModule()->sqlInsertFileTransferHistory(FileInfo);
		TransferFileEntityManager::getInstance()->kickMapFileItemToVecFile(sFileId);
	}

以上就是在线文件发送的全部流程

(2)离线文件传输

一、发送端

离线文件传输与在线文件传输基本共用一些代码,有一些状态的差别和插入服务器数据库的操作

1,发送文件这里ClientFileRole::CLIENT_OFFLINE_UPLOAD

else if (IM::BaseDefine::TransferFileType::FILE_TYPE_OFFLINE == transMode)
	{
		fileEntity.nClientMode = IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_UPLOAD;
	}

2,发送端连接FIle_server成功后,如果是发送端,状态机会变为kTransferTaskStateWaitingUpload

bool OfflineTransferTask::ChangePullState(uint32_t user_id, int file_role) {
    // 离线文件传输
    // 1. 如果是发送者,状态转换 kTransferTaskStateReady-->kTransferTaskStateWaitingUpload
    // 2. 如果是接收者,状态转换 kTransferTaskStateUploadEnd --> kTransferTaskStateWaitingDownload

//    if (CheckFromUserID(user_id)) {
//        // 如果是发送者
//        // 当前状态必须为kTransferTaskStateReady
//        if () {
//
//        }
//    } else {
//        // 如果是接收者
//    }
    
bool rv = false;

do {
    rv = CheckByUserIDAndFileRole(user_id, file_role);
    if (!rv) {
        //
        log("Check error! user_id=%d, file_role=%d", user_id, file_role);
        break;
    }
    
    if (state_ != kTransferTaskStateReady &&
            state_ != kTransferTaskStateUploadEnd &&
            state_ != kTransferTaskStateWaitingDownload) {
        
        log("Invalid state, valid state is kTransferTaskStateReady or kTransferTaskStateUploadEnd, but state is %d", state_);
        break;
    }
    
    if (state_ == kTransferTaskStateReady) 
    {
        // 第一个用户进来,必须是CLIENT_OFFLINE_UPLOAD
        // 必须是kTransferTaskStateReady,则-->kTransferTaskStateWaitingUpload
        if (CLIENT_OFFLINE_UPLOAD == file_role) 
        {
            state_ = kTransferTaskStateWaitingUpload;
        } else 
        {
            log("Offline upload: file_role is CLIENT_OFFLINE_UPLOAD but file_role = %d", file_role);
            break;
            // state_ = kTransferTaskStateWaitingSender;
        }
    } else 
    {
        if (file_role == CLIENT_OFFLINE_DOWNLOAD) 
        {
            state_ = kTransferTaskStateWaitingDownload;
        } else 
        {
            log("Offline upload: file_role is CLIENT_OFFLINE_DOWNLOAD but file_role = %d", file_role);
            break;
        }
    }
    
    SetLastUpdateTime();
    rv = true;
} while (0);

return rv;

和在线文件类似,也会通知发送端打开文件进度传输

和在线文件区别(在线文件会等待接收端操作)对于离线文件,会直接发送CID_FILE_PULL_DATA_REQ,让发送端发送文件

{
            if (transfer_task->state() == kTransferTaskStateWaitingUpload) {
                
                OfflineTransferTask* offline = reinterpret_cast<OfflineTransferTask*>(transfer_task);
                
                IM::File::IMFilePullDataReq pull_data_req;
                pull_data_req.set_task_id(task_id);
                pull_data_req.set_user_id(user_id);
                pull_data_req.set_trans_mode(FILE_TYPE_OFFLINE);
                pull_data_req.set_offset(0);
                pull_data_req.set_data_size(offline->GetNextSegmentBlockSize());
                
                ::SendMessageLite(this, SID_FILE, CID_FILE_PULL_DATA_REQ, &pull_data_req);

                log("Pull Data Req");
            }
        }

发送端收到文件请求时,会通过CID_FILE_PULL_DATA_RSP发送数据(和在线文件代码相同,就不列出具体代码了)

file_server服务器收到CID_FILE_PULL_DATA_RSP,会开始收取文件

收取文件过程:1.检查用户是否合法,2.判断当前偏移量是否正确3,第一次收到文件数据的时候,会创建一个可以写的文件,会把写文件头,包含创建时间,接收端ID,发送端ID,文件名,文件大小,写入文件,状态机变为kTransferTaskStateUploading

4.把收到的数据写入文件,更新文件偏移量,如果文件写入完毕,会关闭文件句柄,返回值为1

        
        //如果是准备状态,转化准备态到正在传输态
        rv = transfer_task_->DoRecvData(user_id, offset, data, data_size);
        if (rv == -1) {
            break;
        }

int OfflineTransferTask::DoRecvData(uint32_t user_id, uint32_t offset, const char* data, uint32_t data_size) {
    // 离线文件上传
    
    int rv = -1;
    
    do {
        // 检查是否发送者
        if (!CheckFromUserID(user_id)) {
            log("rsp user_id=%d, but sender_id is %d", user_id, from_user_id_);
            break;
        }
        
        // 检查状态
        if (state_ != kTransferTaskStateWaitingUpload && state_ != kTransferTaskStateUploading) {
            log("state=%d error, need kTransferTaskStateWaitingUpload or kTransferTaskStateUploading", state_);
            break;
        }
        
        // 检查offset是否有效
        if (offset != transfered_idx_*SEGMENT_SIZE) {
            break;
        }
        
        //if (data_size != GetNextSegmentBlockSize()) {
        //    break;
        //}
        // todo
        // 检查文件大小
        
        data_size = GetNextSegmentBlockSize();
        log("Ready recv data, offset=%d, data_size=%d, segment_size=%d", offset, data_size, sengment_size_);
        
        if (state_ == kTransferTaskStateWaitingUpload) 
        {
            if (fp_ == NULL) 
            {
                fp_ = OpenByWrite(task_id_, to_user_id_);
                if (fp_ == NULL)
                {
                    break;
                }
            }

            // 写文件头
            OfflineFileHeader file_header;
            memset(&file_header, 0, sizeof(file_header));
            file_header.set_create_time(time(NULL));
            file_header.set_task_id(task_id_);
            file_header.set_from_user_id(from_user_id_);
            file_header.set_to_user_id(to_user_id_);
            file_header.set_file_name("");
            file_header.set_file_size(file_size_);
            fwrite(&file_header, 1, sizeof(file_header), fp_);
            fflush(fp_);

            state_ = kTransferTaskStateUploading;
        }
        
        // 存储
        if (fp_ == NULL) {
            //
            break;
        }
        
        fwrite(data, 1, data_size, fp_);
        fflush(fp_);

        ++transfered_idx_;
        SetLastUpdateTime();

        if (transfered_idx_ == sengment_size_) {
            state_ = kTransferTaskStateUploadEnd;
            fclose(fp_);
            fp_ = NULL;
            rv = 1;
        } else {
            rv = 0;
        }
    } while (0);
    
    return rv;
}

对于离线文件,接着向发送端发送请求发送文件的指令CID_FILE_PULL_DATA_REQ,往次循环,直至传输完成,上面代码会返回rv=1,完成后向发送端通知文件状态为CLIENT_FILE_DONE

    } else {
            // 离线
            // all packages recved
            if (rv == 1) {
                _StatesNotify(CLIENT_FILE_DONE, task_id, user_id, this);
                // Close();
            } else {
                OfflineTransferTask* offline = reinterpret_cast<OfflineTransferTask*>(transfer_task_);
                
                IM::File::IMFilePullDataReq pull_data_req;
                pull_data_req.set_task_id(task_id);
                pull_data_req.set_user_id(user_id);
                pull_data_req.set_trans_mode(static_cast<IM::BaseDefine::TransferFileType>(offline->GetTransMode()));
                pull_data_req.set_offset(offline->GetNextOffset());
                pull_data_req.set_data_size(offline->GetNextSegmentBlockSize());
                
                ::SendMessageLite(this, SID_FILE, CID_FILE_PULL_DATA_REQ, &pull_data_req);
                // log("size not match");
            }
        }

接收端收到文件传输完成的状态后,会关闭文件句柄然后发出通知KEY_FILESEVER_PROGRESSBAR_FINISHED

	case IM::BaseDefine::ClientFileState::CLIENT_FILE_DONE:
		LOG__(APP, _T("fileState--CLIENT_FILE_DONE "));
		if (fileEntity.pFileObject)
		{
			delete fileEntity.pFileObject;
			fileEntity.pFileObject = nullptr;
		}
		TransferFileEntityManager::getInstance()->updateFileInfoBysTaskID(fileEntity);
		module::getFileTransferModule()->asynNotifyObserver(module::KEY_FILESEVER_PROGRESSBAR_FINISHED, fileEntity.sTaskID);
		break;

文件模块收到文件传输完成的指令后,会向msg_server 发送添加文件的CID_FILE_ADD_OFFLINE_REQ

,包含任务ID,发送端,接收端ID,文件名,文件大小

else if (IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_UPLOAD == FileInfo.nClientMode)
		{
			IM::File::IMFileAddOfflineReq imFileAddOfflineReq;
			imFileAddOfflineReq.set_from_user_id(util::stringToInt32(FileInfo.sFromID));
			imFileAddOfflineReq.set_to_user_id(util::stringToInt32(FileInfo.sToID));
			imFileAddOfflineReq.set_task_id(FileInfo.sTaskID);
			imFileAddOfflineReq.set_file_name(util::cStringToString(FileInfo.getRealFileName()));
			imFileAddOfflineReq.set_file_size(FileInfo.nFileSize);
			module::getTcpClientModule()->sendPacket(IM::BaseDefine::ServiceID::SID_FILE
				, IM::BaseDefine::FileCmdID::CID_FILE_ADD_OFFLINE_REQ
				, &imFileAddOfflineReq);
		}

msg_server 收到这个消息后,会把消息转发给DB_server ,将离线文件数据插入数据库,如果接收端现在在线,通知接收端连接文件服务器

void CFileHandler::HandleClientFileAddOfflineReq(CMsgConn* pMsgConn, CImPdu* pPdu)
{
    IM::File::IMFileAddOfflineReq msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
uint32_t from_id = pMsgConn->GetUserId();
uint32_t to_id = msg.to_user_id();
string task_id = msg.task_id();
string file_name = msg.file_name();
uint32_t file_size = msg.file_size();
log("HandleClientFileAddOfflineReq, %u->%u, task_id: %s, file_name: %s, size: %u  ",
    from_id, to_id, task_id.c_str(), file_name.c_str(), file_size);

CDBServConn* pDbConn = get_db_serv_conn();
if (pDbConn) {
    msg.set_from_user_id(from_id);
    pPdu->SetPBMsg(&msg);
    pDbConn->SendPdu(pPdu);
}

CFileServConn* pFileConn = get_random_file_serv_conn();
if (pFileConn)
{
    const list<IM::BaseDefine::IpAddr>* file_addr_list = pFileConn->GetFileServerIPList();
    
    IM::File::IMFileNotify msg2;
    msg2.set_from_user_id(from_id);
    msg2.set_to_user_id(to_id);
    msg2.set_file_name(file_name);
    msg2.set_file_size(file_size);
    msg2.set_task_id(task_id);
    msg2.set_trans_mode(IM::BaseDefine::FILE_TYPE_OFFLINE);
    msg2.set_offline_ready(1);
    for (list<IM::BaseDefine::IpAddr>::const_iterator it = file_addr_list->begin(); it != file_addr_list->end(); it++)
    {
        IM::BaseDefine::IpAddr ip_addr_tmp = *it;
        IM::BaseDefine::IpAddr* ip_addr = msg2.add_ip_addr_list();
        ip_addr->set_ip(ip_addr_tmp.ip());
        ip_addr->set_port(ip_addr_tmp.port());
    }
    CImPdu pdu;
    pdu.SetPBMsg(&msg2);
    pdu.SetServiceId(SID_FILE);
    pdu.SetCommandId(CID_FILE_NOTIFY);
    
    CImUser* pUser = CImUserManager::GetInstance()->GetImUserById(to_id);
    if (pUser)
    {
        //to user is online, notify the offline file has been ready
        pUser->BroadcastPduWithOutMobile(&pdu);
    }
    CRouteServConn* pConn = get_route_serv_conn();
    if (pConn) {
        pConn->SendPdu(&pdu);
    }
}
}

db_server收到数据后,插入数据库

void addOfflineFile(CImPdu* pPdu, uint32_t conn_uuid)
    {
        IM::File::IMFileAddOfflineReq msg;
        if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
        {
            uint32_t nUserId = msg.from_user_id();
            uint32_t nToId = msg.to_user_id();
            string strTaskId = msg.task_id();
            string strFileName = msg.file_name();
            uint32_t nFileSize = msg.file_size();
            CFileModel* pModel = CFileModel::getInstance();
            pModel->addOfflineFile(nUserId, nToId, strTaskId, strFileName, nFileSize);
            log("fromId=%u, toId=%u, taskId=%s, fileName=%s, fileSize=%u", nUserId, nToId, strTaskId.c_str(), strFileName.c_str(), nFileSize);
        }
void CFileModel::addOfflineFile(uint32_t fromId, uint32_t toId, string& taskId, string& fileName, uint32_t fileSize)
{
    CDBManager* pDBManager = CDBManager::getInstance();
    CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");
    if (pDBConn)
    {
        string strSql = "insert into IMTransmitFile (`fromId`,`toId`,`fileName`,`size`,`taskId`,`status`,`created`,`updated`) values(?,?,?,?,?,?,?,?)";
        
        // 必须在释放连接前delete CPrepareStatement对象,否则有可能多个线程操作mysql对象,会crash
        CPrepareStatement* pStmt = new CPrepareStatement();
        if (pStmt->Init(pDBConn->GetMysql(), strSql))
        {
            uint32_t status = 0;
            uint32_t nCreated = (uint32_t)time(NULL);
            
            uint32_t index = 0;
            pStmt->SetParam(index++, fromId);
            pStmt->SetParam(index++, toId);
            pStmt->SetParam(index++, fileName);
            pStmt->SetParam(index++, fileSize);
            pStmt->SetParam(index++, taskId);
            pStmt->SetParam(index++, status);
            pStmt->SetParam(index++, nCreated);
            pStmt->SetParam(index++, nCreated);
            
            bool bRet = pStmt->ExecuteUpdate();
            
            if (!bRet)
            {
                log("insert message failed: %s", strSql.c_str());
            }
        }
        delete pStmt;
        pDBManager->RelDBConn(pDBConn);
    }
    else
    {
        log("no db connection for teamtalk_master");
    }
}
二、接收端

用户登录的时候,获取最近联系人信息的时候,会向msg_server发送CID_FILE_HAS_OFFLINE_REQ查询离线文件相关的信息,msg_server 直接会把信息发给DB_server,从数据库查询相关信息,查询到数据后,会将数据包装,然后发给msg_server

    void hasOfflineFile(CImPdu* pPdu, uint32_t conn_uuid)
    {
        IM::File::IMFileHasOfflineReq msg;
        IM::File::IMFileHasOfflineRsp msgResp;
        if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
        {
            CImPdu* pPduRes = new CImPdu;
            
            uint32_t nUserId = msg.user_id();
            CFileModel* pModel = CFileModel::getInstance();
            list<IM::BaseDefine::OfflineFileInfo> lsOffline;
            pModel->getOfflineFile(nUserId, lsOffline);
            msgResp.set_user_id(nUserId);
            for (list<IM::BaseDefine::OfflineFileInfo>::iterator it=lsOffline.begin();
                 it != lsOffline.end(); ++it) {
                IM::BaseDefine::OfflineFileInfo* pInfo = msgResp.add_offline_file_list();
    //            *pInfo = *it;
                pInfo->set_from_user_id(it->from_user_id());
                pInfo->set_task_id(it->task_id());
                pInfo->set_file_name(it->file_name());
                pInfo->set_file_size(it->file_size());
            }
            
            log("userId=%u, count=%u", nUserId, msgResp.offline_file_list_size());
            
            msgResp.set_attach_data(msg.attach_data());
            pPduRes->SetPBMsg(&msgResp);
            pPduRes->SetSeqNum(pPdu->GetSeqNum());
            pPduRes->SetServiceId(IM::BaseDefine::SID_FILE);
            pPduRes->SetCommandId(IM::BaseDefine::CID_FILE_HAS_OFFLINE_RES);
            CProxyConn::AddResponsePdu(conn_uuid, pPduRes);
        }
        else
        {
            log("parse pb failed");
        }
    }
 void hasOfflineFile(CImPdu* pPdu, uint32_t conn_uuid)
    {
        IM::File::IMFileHasOfflineReq msg;
        IM::File::IMFileHasOfflineRsp msgResp;
        if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
        {
            CImPdu* pPduRes = new CImPdu;
            
            uint32_t nUserId = msg.user_id();
            CFileModel* pModel = CFileModel::getInstance();
            list<IM::BaseDefine::OfflineFileInfo> lsOffline;
            pModel->getOfflineFile(nUserId, lsOffline);
            msgResp.set_user_id(nUserId);
            for (list<IM::BaseDefine::OfflineFileInfo>::iterator it=lsOffline.begin();
                 it != lsOffline.end(); ++it) {
                IM::BaseDefine::OfflineFileInfo* pInfo = msgResp.add_offline_file_list();
    //            *pInfo = *it;
                pInfo->set_from_user_id(it->from_user_id());
                pInfo->set_task_id(it->task_id());
                pInfo->set_file_name(it->file_name());
                pInfo->set_file_size(it->file_size());
            }
            
            log("userId=%u, count=%u", nUserId, msgResp.offline_file_list_size());
            
            msgResp.set_attach_data(msg.attach_data());
            pPduRes->SetPBMsg(&msgResp);
            pPduRes->SetSeqNum(pPdu->GetSeqNum());
            pPduRes->SetServiceId(IM::BaseDefine::SID_FILE);
            pPduRes->SetCommandId(IM::BaseDefine::CID_FILE_HAS_OFFLINE_RES);
            CProxyConn::AddResponsePdu(conn_uuid, pPduRes);
        }
        else
        {
            log("parse pb failed");
        }
    }
void CFileModel::getOfflineFile(uint32_t userId, list<IM::BaseDefine::OfflineFileInfo>& lsOffline)
{
    CDBManager* pDBManager = CDBManager::getInstance();
    CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");
    if (pDBConn)
    {
        string strSql = "select * from IMTransmitFile where toId="+int2string(userId) + " and status=0 order by created";
        CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());
        if(pResultSet)
        {
            while (pResultSet->Next())
            {
                IM::BaseDefine::OfflineFileInfo offlineFile;
                offlineFile.set_from_user_id(pResultSet->GetInt("fromId"));
                offlineFile.set_task_id(pResultSet->GetString("taskId"));
                offlineFile.set_file_name(pResultSet->GetString("fileName"));
                offlineFile.set_file_size(pResultSet->GetInt("size"));
                lsOffline.push_back(offlineFile);
            }
            delete pResultSet;
        }
        else
        {
            log("no result for:%s", strSql.c_str());
        }
        pDBManager->RelDBConn(pDBConn);
    }
    else
    {
        log("no db connection for teamtalk_slave");
    }
}

msg_server收到信息,会随机选择一个file_server ,将ip地址包装到数据报,然后转发给客户端,

void CFileHandler::HandleFileHasOfflineRes(CImPdu* pPdu)
{
    IM::File::IMFileHasOfflineRsp msg;
    CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));

    uint32_t req_user_id = msg.user_id();
    uint32_t file_cnt = msg.offline_file_list_size();
    CDbAttachData attach((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());
    log("HandleFileHasOfflineRes, req_id=%u, file_cnt=%u ", req_user_id, file_cnt);
    
    CMsgConn* pConn = CImUserManager::GetInstance()->GetMsgConnByHandle(req_user_id,
                                                                        attach.GetHandle());
    CFileServConn* pFileConn = get_random_file_serv_conn();
    const list<IM::BaseDefine::IpAddr>* ip_list = NULL;
    if (pFileConn)
    {
        ip_list = pFileConn->GetFileServerIPList();
        for (list<IM::BaseDefine::IpAddr>::const_iterator it = ip_list->begin(); it != ip_list->end(); it++)
        {
            IM::BaseDefine::IpAddr ip_addr_tmp = *it;
            IM::BaseDefine::IpAddr* ip_addr = msg.add_ip_addr_list();
            ip_addr->set_ip(ip_addr_tmp.ip());
            ip_addr->set_port(ip_addr_tmp.port());
        }
    }
    else
    {
        log("HandleFileHasOfflineRes, no file server. ");
    }
    if (pConn) {
        pPdu->SetPBMsg(&msg);
        pConn->SendPdu(pPdu);
    }
}

客户端收到消息后,新建任务,将clientrole变为CLIENT_OFFLINE_DOWNLOAD,然后去连接文件服务器

void FileTransferModule_Impl::_hasOfflineRes(IN std::string& body)
{
	IM::File::IMFileHasOfflineRsp imFileHasOfflineRsp;
	if (!imFileHasOfflineRsp.ParseFromString(body))
	{
		LOG__(ERR, _T("parse failed,body:%s"), util::stringToCString(body));
		return;
	}
	UINT32 nIPCount = imFileHasOfflineRsp.ip_addr_list_size();
	if (nIPCount <= 0)
	{
        LOG__(ERR, _T("imFileHasOfflineRsp,ip_addr_list_size == 0"));
		return;
	}
	UINT32 nFileNum = imFileHasOfflineRsp.offline_file_list_size();
	for (UINT32 i = nFileNum; i > 0; --i)
	{
		const IM::BaseDefine::OfflineFileInfo& offLineInfo = imFileHasOfflineRsp.offline_file_list(i - 1);
		TransferFileEntity fileInfo;
		fileInfo.sFromID = util::uint32ToString(offLineInfo.from_user_id());
		fileInfo.sToID = module::getSysConfigModule()->userID();
		fileInfo.nFileSize = offLineInfo.file_size();
		fileInfo.sTaskID = offLineInfo.task_id();
		fileInfo.sFileName = offLineInfo.file_name();
		fileInfo.nClientMode = IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD;
		const IM::BaseDefine::IpAddr& ipAdd = imFileHasOfflineRsp.ip_addr_list(0);
		fileInfo.sIP = ipAdd.ip();
		fileInfo.nPort = ipAdd.port();
		fileInfo.time = static_cast<UInt32>(time(0));
		if (TransferFileEntityManager::getInstance()->pushTransferFileEntity(fileInfo))
		{
			LOG__(APP, _T("离线文件 sFileID = %s"), util::stringToCString(fileInfo.sTaskID));
			TransferFileEntityManager::getInstance()->openFileSocketByTaskId(fileInfo.sTaskID);
		}
	}
}

和在线文件类似,连接文件服务器成功后,会登陆文件服务器

登录文件服务器成功后,如果是CLIENT_OFFLINE_DOWNLOAD 类型,会新建一个任务,然后从磁盘加载文件数据,然后文件转就绪变为kTransferTaskStateWaitingDownload,然后给接收端发送打开文件弹窗消息

   bool rv = false;
    do {
        // 查找任务是否存在
        transfer_task = TransferTaskManager::GetInstance()->FindByTaskID(task_id);
        
        if (transfer_task == NULL) {
            if (mode == CLIENT_OFFLINE_DOWNLOAD) {
                // 文件不存在,检查是否是离线下载,有可能是文件服务器重启
                // 尝试从磁盘加载
                transfer_task = TransferTaskManager::GetInstance()->NewTransferTask(task_id, user_id);
                // 需要再次判断是否加载成功
                if (transfer_task == NULL) {
                    log("Find task id failed, user_id=%u, taks_id=%s, mode=%d", user_id, task_id.c_str(), mode);
                    break;
                }
            } else {
                log("Can't find task_id, user_id=%u, taks_id=%s, mode=%d", user_id, task_id.c_str(), mode);
                break;
            }
        }
        
        // 状态转换
        rv = transfer_task->ChangePullState(user_id, mode);

客户端点击了接收文件之后,然后向服务器发送接收文件的请求(CID_FILE_PULL_DATA_REQ)

			[=]()
		{
			TransferFileEntity fileEntity;
			if (TransferFileEntityManager::getInstance()->getFileInfoByTaskId(taskId, fileEntity))
			{
				int mode = fileEntity.nClientMode == IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD ? IM::BaseDefine::TransferFileType::FILE_TYPE_OFFLINE : IM::BaseDefine::TransferFileType::FILE_TYPE_ONLINE;
                LOG__(APP, _T("IMFilePullDataReq,taskId:%s"),util::stringToCString(taskId));
                IM::File::IMFilePullDataReq imFilePullDataReq;
				imFilePullDataReq.set_task_id(taskId);
				imFilePullDataReq.set_user_id(util::stringToInt32(fileEntity.sToID));
				imFilePullDataReq.set_trans_mode(static_cast<IM::BaseDefine::TransferFileType>(mode));
				imFilePullDataReq.set_offset(0);
                fileEntity.nFileSize > FILE_TRANSFER_BLOCK_SIZE ? imFilePullDataReq.set_data_size(FILE_TRANSFER_BLOCK_SIZE) : imFilePullDataReq.set_data_size(fileEntity.nFileSize);
				//发包
                pFileSocket->sendPacket(IM::BaseDefine::ServiceID::SID_FILE
                    , IM::BaseDefine::FileCmdID::CID_FILE_PULL_DATA_REQ, &imFilePullDataReq);

服务器收到接收文件的请求,会开始读取文件信息,如果第一次收到,先会读取文件头信息,判断文件参数是否正确,然后接下来开始发送文件,如果发送完成,rv=1;

int OfflineTransferTask::DoPullFileRequest(uint32_t user_id, uint32_t offset, uint32_t data_size, std::string* data) {
    int rv = -1;
    
    log("Recv pull file request: user_id=%d, offset=%d, data_size=%d", user_id, offset, data_size);

    do {
        // 1. 首先检查状态,必须为kTransferTaskStateWaitingDownload或kTransferTaskStateDownloading
        if (state_ != kTransferTaskStateWaitingDownload && state_ != kTransferTaskStateDownloading) {
            log("state=%d error, need kTransferTaskStateWaitingDownload or kTransferTaskStateDownloading", state_);
            break;
        }
        
        // 2. 处理kTransferTaskStateWaitingDownload
        if(state_ == kTransferTaskStateWaitingDownload) {
            if (transfered_idx_ != 0)
                transfered_idx_ = 0;
            
            if (fp_!=NULL) {
                fclose(fp_);
                fp_ = NULL;
            }

            fp_ = OpenByRead(task_id_, user_id);
            if (fp_ == NULL) {
                break;
            }
            
            //if (file_header_ == NULL) {
            //    file_header_ = new FileHeader();
            //}
 
            OfflineFileHeader file_header;
            size_t size = fread(&file_header, 1, sizeof(file_header), fp_); // read header
            if (sizeof(file_header) != size) {
                // close to ensure next time will read again
                log("read file head failed.");
                fclose(fp_); // error to get header
                fp_ = NULL;
                break;
                
            }
 
            state_ = kTransferTaskStateDownloading;
        } else {
            // 检查文件是否打开
            if (fp_ == NULL) {
                // 不可能发生
                break;
            }
        }
        
        // 检查offset是否有效
        if (offset != transfered_idx_*SEGMENT_SIZE) {
            log("Recv offset error, offser=%d, transfered_offset=%d", offset, transfered_idx_*SEGMENT_SIZE);
            break;
        }
        
        data_size = GetNextSegmentBlockSize();
        
        log("Ready send data, offset=%d, data_size=%d", offset, data_size);
        //if (data_size != GetNextSegmentBlockSize()) {
        //    log("Recv data_size error, data_size=%d, transfered_data_size=%d", data_size, GetNextSegmentBlockSize());
        //    break;
        //}

        
        // the header won't be sent to recver, because the msg svr had already notified it.
        // if the recver needs to check it, it could be helpful
        // or sometime later, the recver needs it in some way.
        
            
        // read data and send based on offset and datasize.
        char* tmpbuf = new char[data_size];
        if (NULL == tmpbuf) {
            // alloc mem failed
            log("alloc mem failed.");
            // SendPdu(&pdu);
            //t->unlock(__LINE__);
            // return;
            break;
        }
        memset(tmpbuf, 0, data_size);
        
        size_t size = fread(tmpbuf, 1, data_size, fp_);
        if (size != data_size) {
            log("Read size error, data_size=%d, but read_size=%d", data_size, size);
            delete [] tmpbuf;
            break;
            //
        }
        
        data->append(tmpbuf, data_size);
        delete [] tmpbuf;

        transfered_idx_++;
        
        SetLastUpdateTime();
        if (transfered_idx_ == sengment_size_) {
            log("pull req end.");
            state_ = kTransferTaskStateUploadEnd;
            fclose(fp_);
            fp_ = NULL;
            rv = 1;
        } else {
            rv = 0;
        }

/*      
        msg2.set_file_data(tmpbuf, size);
        msg2.set_result_code(0);
        CImPdu pdu2;
        pdu2.SetPBMsg(&msg2);
        pdu2.SetServiceId(SID_FILE);
        pdu2.SetCommandId(CID_FILE_PULL_DATA_RSP);
        pdu2.SetSeqNum(pPdu->GetSeqNum());
        pdu2.SetSeqNum(pPdu->GetSeqNum());
        SendPdu(&pdu2);
        delete[] tmpbuf;
  */
//        t->transfered_size += size; // record transfered size for next time offset

/*
        // offset file_header_t
        int iret = fseek(fp_, sizeof(FileHeader) + offset, SEEK_SET); // read after file_header_t
        if (0 != iret) {
            log("seek offset failed.");
            // SendPdu(&pdu);
            delete[] tmpbuf;
            
            //t->unlock(__LINE__);
            //return;
            // offset failed
            break;
        }
 */
        
        
/*
            t->transfered_size += size; // record transfered size for next time offset
            if (0 == size) {
                fclose(t->fp);
                t->fp = NULL;
                
                _StatesNotify(CLIENT_FILE_DONE, task_id.c_str(), user_id, this);
                Close();
                
                t->self_destroy = true;
                t->unlock(__LINE__);
                return;
            }

 */
//        rv = 0;
 //       }
        
    } while (0);

    return rv;
}
       } else {
            SendMessageLite(this, SID_FILE, CID_FILE_PULL_DATA_RSP, pdu->GetSeqNum(), &pull_data_rsp);
            if (rv == 1) {
                _StatesNotify(CLIENT_FILE_DONE, task_id, transfer_task_->from_user_id(), this);
            }
        }

上面如果文件发送完成,会给接收端发送CLIENT_FILE_DONE

客户端收到完成信息,会给msg_server发送删除记录的消息CID_FILE_DEL_OFFLINE_REQ

	if (IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD == FileInfo.nClientMode)
		{
            LOG__(APP, _T("IMFileDelOfflineReq,name=%s,taskid=%s")
                ,util::stringToCString(FileInfo.sFileName)
                ,util::stringToCString(FileInfo.sTaskID));
			IM::File::IMFileDelOfflineReq imFileDelOfflineReq;
			imFileDelOfflineReq.set_from_user_id(util::stringToInt32(FileInfo.sFromID));
			imFileDelOfflineReq.set_to_user_id(util::stringToInt32(FileInfo.sToID));
			imFileDelOfflineReq.set_task_id(FileInfo.sTaskID);
			module::getTcpClientModule()->sendPacket(IM::BaseDefine::ServiceID::SID_FILE
				, IM::BaseDefine::FileCmdID::CID_FILE_DEL_OFFLINE_REQ
				, &imFileDelOfflineReq);
		}

服务器收到后,会删除数据库数据

void delOfflineFile(CImPdu* pPdu, uint32_t conn_uuid)
{
    IM::File::IMFileDelOfflineReq msg;
    if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
    {
        uint32_t nUserId = msg.from_user_id();
        uint32_t nToId = msg.to_user_id();
        string strTaskId = msg.task_id();
        CFileModel* pModel = CFileModel::getInstance();
        pModel->delOfflineFile(nUserId, nToId, strTaskId);
        log("fromId=%u, toId=%u, taskId=%s", nUserId, nToId, strTaskId.c_str());
    }
}

oy = true;
t->unlock(LINE);
return;
}

*/
// rv = 0;
// }

} while (0);

return rv;

}




   } else {
        SendMessageLite(this, SID_FILE, CID_FILE_PULL_DATA_RSP, pdu->GetSeqNum(), &pull_data_rsp);
        if (rv == 1) {
            _StatesNotify(CLIENT_FILE_DONE, task_id, transfer_task_->from_user_id(), this);
        }
    }

上面如果文件发送完成,会给接收端发送CLIENT_FILE_DONE



客户端收到完成信息,会给msg_server发送删除记录的消息CID_FILE_DEL_OFFLINE_REQ

if (IM::BaseDefine::ClientFileRole::CLIENT_OFFLINE_DOWNLOAD == FileInfo.nClientMode)
	{
        LOG__(APP, _T("IMFileDelOfflineReq,name=%s,taskid=%s")
            ,util::stringToCString(FileInfo.sFileName)
            ,util::stringToCString(FileInfo.sTaskID));
		IM::File::IMFileDelOfflineReq imFileDelOfflineReq;
		imFileDelOfflineReq.set_from_user_id(util::stringToInt32(FileInfo.sFromID));
		imFileDelOfflineReq.set_to_user_id(util::stringToInt32(FileInfo.sToID));
		imFileDelOfflineReq.set_task_id(FileInfo.sTaskID);
		module::getTcpClientModule()->sendPacket(IM::BaseDefine::ServiceID::SID_FILE
			, IM::BaseDefine::FileCmdID::CID_FILE_DEL_OFFLINE_REQ
			, &imFileDelOfflineReq);
	}

服务器收到后,会删除数据库数据

    void delOfflineFile(CImPdu* pPdu, uint32_t conn_uuid)
    {
        IM::File::IMFileDelOfflineReq msg;
        if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()))
        {
            uint32_t nUserId = msg.from_user_id();
            uint32_t nToId = msg.to_user_id();
            string strTaskId = msg.task_id();
            CFileModel* pModel = CFileModel::getInstance();
            pModel->delOfflineFile(nUserId, nToId, strTaskId);
            log("fromId=%u, toId=%u, taskId=%s", nUserId, nToId, strTaskId.c_str());
        }
    }

文件服务器基本上分析完了,还有取消发送和拒绝接收没有分析,这部分比较简单,就是简单通知服务器状态的改变


版权声明:本文为qq_28539161原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>