一種點(diǎn)對(duì)點(diǎn)文件斷點(diǎn)續(xù)傳的多線程實(shí)現(xiàn)方法

  1 概述

  文件斷點(diǎn)續(xù)傳就是在主機(jī)與主機(jī)間傳輸文件時(shí),可以將文件分多次傳輸,與非斷點(diǎn)續(xù)傳不同的是,每次傳輸不必傳輸整個(gè)文件。斷點(diǎn)續(xù)傳是大型文件數(shù)據(jù)傳輸?shù)暮诵。本文將以多線程技術(shù)和Socket技術(shù)為依托,介紹大型文件斷點(diǎn)續(xù)傳的實(shí)現(xiàn)方法。

  2 基本實(shí)現(xiàn)思想

  多線程斷點(diǎn)續(xù)傳實(shí)現(xiàn)的基本思想就是在發(fā)送端(也稱客戶端)將要傳輸?shù)奈募指顬榇笮∠喈?dāng)?shù)亩鄩K,用多個(gè)線程,將這些塊同時(shí)向目標(biāo)服務(wù)器端發(fā)送;在服務(wù)器端的服務(wù)程序監(jiān)聽數(shù)據(jù)傳輸請(qǐng)求,每當(dāng)接到新的請(qǐng)求,則創(chuàng)建一個(gè)新的線程,與客戶端的發(fā)送線程對(duì)應(yīng),接收數(shù)據(jù),記錄數(shù)據(jù)傳輸進(jìn)程。

  圖1是點(diǎn)對(duì)點(diǎn)文件斷點(diǎn)續(xù)傳第N塊傳輸過程示意圖。在傳輸發(fā)起端(客戶端),將大型文件事先分割為大小相當(dāng)?shù)腘塊,同時(shí)創(chuàng)建N個(gè)傳輸線程,連接目標(biāo)端服務(wù)器。當(dāng)服務(wù)器端接收到每一個(gè)連接請(qǐng)求后,告知客戶端可以傳輸文件。當(dāng)客戶端接收到可以傳輸文件的消息時(shí),首先向服務(wù)器發(fā)送數(shù)據(jù)傳輸信息塊(包括第幾塊、在塊中的起始位置)請(qǐng)求,當(dāng)服務(wù)器端接收到該請(qǐng)求后,向客戶端發(fā)送數(shù)據(jù)傳輸信息,客戶端然后傳輸數(shù)據(jù)傳輸信息塊指定的數(shù)據(jù)給服務(wù)器端,服務(wù)器端更新數(shù)據(jù)傳輸信息塊。

  圖1 點(diǎn)對(duì)點(diǎn)文件斷點(diǎn)續(xù)傳過程示意圖

  3 具體實(shí)現(xiàn)

  在實(shí)現(xiàn)過程中我使用了MFC的多線程和Windows的Socket,分客戶端和服務(wù)器端實(shí)現(xiàn)。因?yàn)閿?shù)據(jù)傳輸往往是對(duì)等的,所以需要將客戶端和服務(wù)器端集成在一起,在客戶端和服務(wù)器端分別實(shí)現(xiàn)后,這是件非常簡(jiǎn)單的工作,只需將它們集成在一起即可。下面將分別介紹客戶端和服務(wù)器端的實(shí)現(xiàn)。

  3.1 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)

  文件信息數(shù)據(jù)結(jié)構(gòu) 用于在服務(wù)器端和客戶端之間傳遞文件第N塊的屬性信息,詳細(xì)定義如下:

  structfileinfo

  {

  int fileno; //文件號(hào)

  int type; //消息類別

  long len; //文件(塊)長(zhǎng)度,在客戶端向服務(wù)器端發(fā)送數(shù)據(jù)時(shí),是文件長(zhǎng)度;

  //在服務(wù)器端向客戶端發(fā)送已傳部分的信息時(shí),是應(yīng)該續(xù)傳部分的長(zhǎng)度;

  long seek; //開始位置,標(biāo)識(shí)要傳輸數(shù)據(jù)在原文件中的起始位置

  char name[MAX_PATH_LEN];//文件名

  };

  發(fā)送進(jìn)度記錄結(jié)構(gòu) 用戶記錄文件傳輸進(jìn)程,詳細(xì)定義如下:

  structSentInfo

  {

  long totle; //已經(jīng)成功發(fā)送數(shù)據(jù)的長(zhǎng)度;

  int block; //塊標(biāo)識(shí);

  long filelen; //文件總長(zhǎng)度;

  int threadno; //負(fù)責(zé)傳輸?shù)贜塊數(shù)據(jù)的線程標(biāo)識(shí);

  CString name; //文件名稱

  };

  客戶端類 客戶端文件發(fā)送實(shí)例封裝,用戶記錄客戶端文件發(fā)送過程中的屬性信息、已發(fā)送大小、發(fā)送線程句柄、發(fā)送線程狀態(tài)、發(fā)送統(tǒng)計(jì)信息等,具體定義是:

  classCClient:publicCObject

  {

  protected:

  

  //Attributes

  public:

  CClient(CString ip);

  ~CClient();

  SentInfo doinfo;

  long m_index; //塊索引

  BOOL sendOk[BLOCK]; //塊發(fā)送結(jié)束狀態(tài)

  CString SendFileName;

  CString DestIp; //目標(biāo)IP地址

  THREADSTATUS SendStatus;

  int GetBlockIndex(); //獲取當(dāng)前要傳輸文件塊的序號(hào),例如0,1,2…

  CCriticalSection m_gCS;

  

  //Sending File Block Thread Handles

  HANDLE m_thread[BLOCK]; //Block Sending Thread Handles Array

  HANDLE m_ExitThread; //WaitToExitThread Handle

  HANDLE m_ProcessTrackThread;

  HANDLE m_hEventKill; //User canceled Event

  long m_IdxInListCtrl; //Index in ListView

  long m_lBeginTimeTick;

  long m_lBytesSent;

  // Operations

  public:

  };

  3.2 客戶端

  發(fā)送線程 用于發(fā)送由lpparam參數(shù)指定的Client對(duì)象標(biāo)識(shí)的文件塊數(shù)據(jù)。具體實(shí)現(xiàn)是:

  //發(fā)送線程主函數(shù)

  //并行進(jìn)行的線程,針對(duì)每個(gè)要發(fā)送的文件有BLOCK個(gè)該線程

  DWORDWINAPISendThread(LPVOIDlpparam)

  {

  CClient* pClient=(CClient*)lpparam;

  int block_index = pClient->GetBlockIndex();

  //發(fā)送文件的第m_index塊

  //發(fā)送線程

  sockaddr_in local;

  SOCKET m_socket;

  int rc=0;

  

  local.sin_family=AF_INET;

  local.sin_port=htons(pMainFrm->m_FilePort);

  local.sin_addr.S_un.S_addr=inet_addr(pClient->DestIp);

  m_socket=socket(AF_INET,SOCK_STREAM,0);

  int ret;

  char m_buf[MAX_BUFSIZE];

  fileinfo file_info;

  //Connect to Server

  ret = connect(m_socket,(LPSOCKADDR)&local,sizeof(local));

  while((ret == SOCKET_ERROR)&&(WaitForSingleObject(pClient->m_hEventKill, 0)

  ==WAIT_TIMEOUT))

  {

  closesocket (m_socket);

  }

  file_info.len = pClient->doinfo.filelen ;

  if (file_info.len doinfo.name);

  //發(fā)送第index塊已發(fā)送大小請(qǐng)求

  sendn(m_socket,(char*)&file_info,PROTOCOL_HEADSIZE);

  long nRead = readn(m_socket,m_buf,PROTOCOL_HEADSIZE);

  if(nRead len == 0)

  {

  closesocket (m_socket);

  //block_index塊是續(xù)傳,且發(fā)送完畢

  pClient->good[block_index] = TRUE;

  return 1;

  }

  CFile SourceFile;

  SourceFile.Open(pClient->SendFileName,

  CFile::modeRead|CFile::typeBinary| CFile::shareDenyNone);

  //Seek right position of sending file

  long apos=SourceFile.Seek(pfile_info->seek,CFile::begin);

  int len1;

  len1=pfile_info->len;

  //取得已經(jīng)發(fā)送的長(zhǎng)度

  if (block_index doinfo.totle += pClient->doinfo.filelen / BLOCK - pfile_info->len;

  else

  pClient->doinfo.totle += pClient->doinfo.filelen –

  pClient->doinfo.filelen/BLOCK* (BLOCK - 1) - pfile_info->len;

  int nBegin = GetTickCount();

  int SendTotal = 0;

  while((len1 > 0) && (WaitForSingleObject(pClient->m_hEventKill, 0) == WAIT_TIMEOUT))

  {

  nRead = SourceFile.Read(m_buf, (len1 m_CurrentBlockSize) ?

  len1:pMainFrm->m_CurrentBlockSize);

  int send_count=sendn(m_socket,m_buf,nRead);

  float nLimit;

  if(send_countdoinfo.totle += send_count;

  SendTotal += send_count;

  }

  SourceFile.Close();

  shutdown(m_socket,2);

  closesocket (m_socket);

  if(len1 good[block_index] = TRUE;

  return 1;

  }

  創(chuàng)建發(fā)送線程 對(duì)應(yīng)要發(fā)送的文件,創(chuàng)建BLOCK個(gè)線程,負(fù)責(zé)發(fā)送各自的數(shù)據(jù)塊。

  for(inti=0;i

  {

  //創(chuàng)建線程,狀態(tài)為停止?fàn)顟B(tài)

  pClient->m_thread[i] = ::CreateThread(NULL,0,SendThread,(LPVOID)pClient,

  CREATE_SUSPENDED,&dwthread);

  

  ::SetThreadPriority(pClient->m_thread[i],THREAD_PRIORITY_HIGHEST);

  }

  3.3 服務(wù)器端

  監(jiān)聽函數(shù) 負(fù)責(zé)監(jiān)聽文件傳輸請(qǐng)求,在接受請(qǐng)求后,創(chuàng)建數(shù)據(jù)塊接收線程,具體實(shí)現(xiàn)如下:

  //監(jiān)聽函數(shù)

  DWORDWINAPIlistenthread(LPVOIDlpparam)

  {

  SOCKET pSocket=(SOCKET)lpparam;

  

  //最多監(jiān)聽MAX_LISTENCOUNT個(gè)客戶端連接

  int rc=listen(pSocket,MAX_LISTENCOUNT);

  if (rcm_DestPath + "\\IP" + PeerName + "-" + DestFileName,

  CFile::modeWrite|CFile::typeBinary| CFile::shareDenyNone))

  {

  DWORD error = GetLastError();

  return;

  };

  

  long apos = DestFile.Seek(fileseek,CFile::begin);

  char m_buf[MAX_BUFSIZE];

  CString m_temp;

  m_temp.Format("%s\\Temp\\IP%s-%s.%d",pWnd->m_appPath,PeerName,

  DestFileName,block_index);

  CFile BlockFile;

  BlockFile.Open(m_temp, CFile::modeWrite | CFile::typeBinary);

  while((filelen>0) && (WaitForSingleObject(pWnd->m_hEventKill, 0) == WAIT_TIMEOUT))

  {

  int nRead = readn(so, m_buf, (filelen > MAX_BUFSIZE) ? MAX_BUFSIZE : filelen);

  

  //Exited

  if (nRead == 0)

  {

  CString msg;

  msg.Format("[SOCKET %d] Receiving file %s break(Recv 0).",so,DestFileName);

  pWnd->SendMessage(WM_NEWMSG,(LPARAM)msg.GetBuffer(0),3);

  msg.ReleaseBuffer();

  break;

  }

  if(nReadSendMessage(WM_NEWMSG,(LPARAM)msg.GetBuffer(0),3);

  msg.ReleaseBuffer();

  break;

  }

  DestFile.Write(m_buf,nRead);

  filelen -= nRead;

  fileseek += nRead;

  BlockFile.Seek(0,CFile::begin);

  BlockFile.Write(&fileseek,sizeof(long));

  BlockFile.Write(&filelen,sizeof(long));

  pfileinfo->m_seek = fileseek;

  pfileinfo->m_len = filelen;

  }

  closesocket (so);

  //Current Block Received OK

  BlockFile.Close();

  DestFile.Close();

  if (filelen canDel = true;

  }

  初始化文件數(shù)據(jù)塊信息 負(fù)責(zé)初始化文件傳輸時(shí)數(shù)據(jù)塊的大小、開始位置等信息。具體實(shí)現(xiàn)是:

  //初始化文件數(shù)據(jù)塊信息

  CFileInfoRecving*GetRecvedFileBlock(CStringPeerName,CStringFileName,long& filelen,

  long&fileseek,intblock_index)

  {

  CString m_temp;

  CMainFrame* pWnd = (CMainFrame*) AfxGetMainWnd();

  m_temp.Format("%s\\temp\\IP%s-%s.%d",pWnd->m_appPath,PeerName,FileName,block_index);

  FILE* fp=NULL;

  CFile Blockfile;

  CFile RecvFile;

  long seek = fileseek;

  long flen = filelen;

  CString DestFile = pWnd->m_DestPath + "\\IP" + PeerName + "-" + FileName;

  

  bool DestFileNoExist = false;

  if((fp=fopen(DestFile,"r"))==NULL)

  {

  RecvFile.Open(DestFile,

  CFile::modeCreate|CFile::modeWrite|CFile::typeBinary| CFile::shareDenyNone);

  RecvFile.Close();

  DestFileNoExist = true;

  }

  if (fp) fclose(fp);

  CFileInfoRecving* pfileinfo = NULL;

  //如果目標(biāo)文件不存在或者長(zhǎng)度記錄文件不存在,重新傳

  if( ((fp=fopen(m_temp,"r"))==NULL) || (DestFileNoExist))

  {

  //第一次接收

  fileseek = block_index*(flen/BLOCK);

  filelen = flen/BLOCK;

  if (block_index == (BLOCK - 1))

  filelen = flen - fileseek;

  

  //Add To Recving File List

  pfileinfo = new CFileInfoRecving();

  pfileinfo->m_sFileName = FileName;

  pfileinfo->m_sPeerName = PeerName;

  pfileinfo->m_nBlockIndex = block_index;

  

  pfileinfo->m_sFileSize.Format("%d",flen);

  

  seek = fileseek;

  flen = filelen;

  pfileinfo->m_seek = seek;

  pfileinfo->m_len = flen;

  pWnd->m_RecvingFileList.AddTail(pfileinfo);

  Blockfile.Open(m_temp,

  CFile::modeCreate|CFile::modeWrite| CFile::typeBinary | CFile::shareDenyNone);

  Blockfile.Write(&seek,sizeof(long));

  Blockfile.Write(&flen,sizeof(long));

  Blockfile.Close();

  }

  else

  {

  POSITION pos = pWnd->m_RecvingFileList.GetHeadPosition();

  while (pos)

  {

  pfileinfo = pWnd->m_RecvingFileList.GetAt(pos);

  if (pfileinfo)

  {

  if ((pfileinfo->m_sFileName == FileName) &&

  (pfileinfo->m_sPeerName == PeerName) &&

  (pfileinfo->m_nBlockIndex == block_index))

  break;

  }

  pWnd->m_RecvingFileList.GetNext(pos);

  }

  

  Blockfile.Open(m_temp,CFile::modeRead | CFile::typeBinary | CFile::shareDenyNone);

  Blockfile.Read(&fileseek,sizeof(long));

  Blockfile.Read(&filelen,sizeof(long));

  if ((pfileinfo == NULL) || (!pos))

  {

  pfileinfo = new CFileInfoRecving();

  pfileinfo->m_sFileName = FileName;

  pfileinfo->m_sPeerName = PeerName;

  pfileinfo->m_nBlockIndex = block_index;

  pfileinfo->m_sFileSize.Format("%d",flen);

  //Add To Recving File List

  //pWnd->m_RecvingFileListCS.Lock();

  pWnd->m_RecvingFileList.AddTail(pfileinfo);

  //pWnd->m_RecvingFileListCS.Unlock();

  }

  Blockfile.Close();

  pfileinfo->m_seek = fileseek;

  pfileinfo->m_len = filelen;

  }

  if (fp) fclose(fp);

  return pfileinfo;

  }

  數(shù)據(jù)接收線程 負(fù)責(zé)從客戶端接收數(shù)據(jù)信息:

  //數(shù)據(jù)接收線程

  DWORDWINAPIServerthread(LPVOIDlpparam)

  {

  fileinfo* pFileInfo;

  

  //

  char* m_buf = new char[PROTOCOL_HEADSIZE];

  SOCKET pthis=(SOCKET)lpparam;

  int nRead = readn(pthis,m_buf,PROTOCOL_HEADSIZE);

  if( nRead type)

  {

  case 1:

  //獲取文件塊的已接收大小

  pfileinfo =

  GetRecvedFileBlock(FileID,pFileInfo->name,pFileInfo->len,pFileInfo->seek,pFileInfo->fileno);

  

  nRead = sendn(pthis,(char*)pFileInfo,PROTOCOL_HEADSIZE);

  if(nRead name,pFileInfo->seek,

  pFileInfo->len,pFileInfo->fileno,pfileinfo);

  closesocket(pthis);

  break;

  default:

  break;

  }

  delete [] m_buf;

  return 0;

  }

  4 小結(jié)

  本文簡(jiǎn)要介紹了點(diǎn)對(duì)點(diǎn)文件斷點(diǎn)續(xù)傳的多線程實(shí)現(xiàn)方法,有一定的使用性。限于水平,文中不當(dāng)之處,敬請(qǐng)批評(píng)指正。

  作者簡(jiǎn)介:馮常青(1974-),男,北京,碩士,工程師,主要研究方向:計(jì)算機(jī)網(wǎng)絡(luò)

       師明珠(1976-),男,北京,碩士,工程師,主要研究方向:計(jì)算機(jī)應(yīng)用技術(shù);

----《通信世界》


微信掃描分享本文到朋友圈
掃碼關(guān)注5G通信官方公眾號(hào),免費(fèi)領(lǐng)取以下5G精品資料
  • 1、回復(fù)“YD5GAI”免費(fèi)領(lǐng)取《中國(guó)移動(dòng):5G網(wǎng)絡(luò)AI應(yīng)用典型場(chǎng)景技術(shù)解決方案白皮書
  • 2、回復(fù)“5G6G”免費(fèi)領(lǐng)取《5G_6G毫米波測(cè)試技術(shù)白皮書-2022_03-21
  • 3、回復(fù)“YD6G”免費(fèi)領(lǐng)取《中國(guó)移動(dòng):6G至簡(jiǎn)無線接入網(wǎng)白皮書
  • 4、回復(fù)“LTBPS”免費(fèi)領(lǐng)取《《中國(guó)聯(lián)通5G終端白皮書》
  • 5、回復(fù)“ZGDX”免費(fèi)領(lǐng)取《中國(guó)電信5GNTN技術(shù)白皮書
  • 6、回復(fù)“TXSB”免費(fèi)領(lǐng)取《通信設(shè)備安裝工程施工工藝圖解
  • 7、回復(fù)“YDSL”免費(fèi)領(lǐng)取《中國(guó)移動(dòng)算力并網(wǎng)白皮書
  • 8、回復(fù)“5GX3”免費(fèi)領(lǐng)取《R1623501-g605G的系統(tǒng)架構(gòu)1
  • 本周熱點(diǎn)本月熱點(diǎn)

     

      最熱通信招聘

    業(yè)界最新資訊


      最新招聘信息