C++中簡單使用HP-Socket

語言: CN / TW / HK

目錄

簡介

HP-Socket 是一套通用的高效能 TCP/UDP /HTTP 通訊 框架 ,包含服務端元件、客戶端元件和 Agent 元件,廣泛適用於各種不同應用場景的 TCP/UDP /HTTP 通訊系統,提供 C/C++ 、 C# 、 Delphi 、 E (易語言)、 Java 、 Python 等程式語言介面。

HP-Socket是一套國產的開源通訊庫,使用C++語言實現,提供多種程式語言的介面,支援 Windows 和 Linux 平臺:

HP-Socket包含30多個元件 ,可根據通訊角色Client/Server)、通訊協議TCP/UDP/HTTP)和接收模型PUSH/PULL/PACK)進行歸類,這裡只簡單介紹一下:

  • Server元件 :基於IOCP/EPOLL通訊模型 ,並結合快取池 、私有堆等技術實現高效記憶體管理,支援超大規模、高併發通訊場景。
  • Agent元件 :實質上是 Multi-Client 元件,與Server元件採用相同的技術架構,可同時建立和高效處理大規模Socket連線 。
  • Client元件 :基於Event Select/POLL通訊模型,每個元件物件建立一個通訊執行緒並管理一個Socket連線, 適用於小規模客戶端場景。
  • Thread Pool元件 :HP-Socket實現的高效易用的執行緒池元件,當成普通的第三方執行緒池庫使用即可。

HP-Socket的TCP元件支援PUSH、PULL和PACK三種接收模型:

  • PUSH模型 :元件接收到資料時會觸發監聽器物件的 OnReceive(pSender,dwConnID,pData,iLength) 事件,把資料“推”給應用程式,這種模型使用起來是最自由的。
  • PULL模型 :元件接收到資料時會觸發監聽器物件的 OnReceive(pSender,dwConnID,iTotalLength) 事件 ,告訴應用程式當前已經接收到多少資料,應用程式檢查資料的長度,如果滿足需要則呼叫元件的**Fetch(dwConnID,pData,iDataLength)方法把需
    要的資料“拉”出來。
  • PACK模型 :PACK模型系列元件是PUSH和PULL模型的結合體,應用程式不必處理分包與資料抓取,元件保證每個OnReceive事件都向應用程式提供一個完整資料包。

注: PACK模型元件會對應用程式傳送的每個資料包自動加上 4 位元組(32位的包頭) ,前10位為用於資料包校驗的包頭標識位,後22位為記錄包體長度的長度位。

使用方式

HP-Socket支援 MBCSUnicode 字符集,支援 32 位和 64 位應用程式。可以通過原始碼、 DLL或LIB方式使用HP-Socket。 HP-Socket發行包中已經提供了HPSocket DLL和HPSocket4C DLL。

HP-Socket提供了各種情況下的dll檔案,不需要我們重新編譯,dll檔案按程式設計介面分為兩大類:

  • HPSocket DLL :匯出C++程式設計介面 ,C++程式的首選方式,使用時需要把 SocketInterface.h(及其依賴檔案HPTypeDef.h)HPSocket.h 以及 DLL 對應的 *.lib 檔案加入到工程專案,用到SSL元件還需要 HPSocket-SSL.h 檔案。
  • HPSocket4C DLL :匯出C程式設計介面,提供給C語言或其它程式語言使用,使用時需要把 HPSocket4C.h 以及 DLL 對應的 *.lib 檔案加入到工程專案,用到SSL元件還需要 HPSocket4C-SSL.h 檔案。

實現簡單執行緒池

使用HP-Socket的執行緒池元件可以在程式中實現一個簡單的、公用的執行緒池,TCP通訊的斷線重連、傳送心跳都會用到執行緒池。執行緒池元件的主要函式如下:

  • Start :啟動執行緒池,具體的使用可以參考原始碼的註釋。
  • Submit :提交任務,主要使用 BOOL Submit(fnTaskProc,pvArg,dwMaxWait=INFINITE) ,另一個函式過載是使用一個特殊的資料型別(把Socket任務引數和任務函式封裝成一個數據結構)作為引數。
  • Stop :關閉執行緒池,引數dwMaxWait代表最大等待時間(毫秒,預設: INFINITE ,一直等待)。

先實現執行緒池的CHPThreadPoolListener介面,然後構造IHPThreadPool智慧指標,後面執行緒池的操作都通過智慧指標操作,程式碼如下:

class CHPThreadPoolListenerImpl : public CHPThreadPoolListener
{
private:
	void LogInfo(string logStr)
	{
		cout <<"ThreadPool " <<logStr << endl;
	}
public:
	virtual void OnStartup(IHPThreadPool* pThreadPool) 
	{
		LogInfo("執行緒池啟動");
	}
	virtual void OnShutdown(IHPThreadPool* pThreadPool) 
	{
		LogInfo("執行緒池啟動關閉");
	}
	virtual void OnWorkerThreadStart(IHPThreadPool* pThreadPool, THR_ID dwThreadID) 
	{				
		LogInfo("[" + to_string(dwThreadID) + "] " + "工作執行緒啟動");
	}
	virtual void OnWorkerThreadEnd(IHPThreadPool* pThreadPool, THR_ID dwThreadID) 
	{
		LogInfo("[" + to_string(dwThreadID) + "] " + "工作執行緒退出");
	}
};

CHPThreadPoolListenerImpl ThreadPoolListener;
//全域性共享變數使用extern關鍵字修飾
extern CHPThreadPoolPtr ThreadPool(&ThreadPoolListener);

實現TCP客戶端

先實現一個列印函式,顯示客戶端相關的資訊,程式碼如下:

void PrintInfo(ITcpClient* pSender, CONNID dwConnID)
{
	char buffer[20];	
	TCHAR* ipAddr = buffer;
	int ipLen;
	USHORT port;

	pSender->GetLocalAddress(ipAddr, ipLen, port);	
	cout << string(ipAddr,0,ipLen) << ":" << port << " " << " [" << dwConnID << "] -> ";

	pSender->GetRemoteHost(ipAddr, ipLen, port);	
	cout << string(ipAddr, 0, ipLen) << ":" << port << " ";
}

實現CTcpClientListener監聽介面,客戶端斷線後自動重連,以換行符分割接收到的字串,程式碼如下:

bool SysExit = false;
void ReConnect(ITcpClient* pSender)
{
	while (pSender->GetState() != SS_STOPPED)
	{
		Sleep(10);
	}
	pSender->Start("127.0.0.1", 60000);
}

class CClientListenerImpl : public CTcpClientListener
{

public:	
	virtual EnHandleResult OnConnect(ITcpClient* pSender, CONNID dwConnID) 
	{ 
		PrintInfo(pSender, dwConnID);
		cout << "連線成功" << endl;
		return HR_OK;		
	}

	string resStr = "";
	string commStr="";
	virtual EnHandleResult OnReceive(ITcpClient* pSender, CONNID dwConnID, const BYTE* pData, int iLength) 
	{
		
		string str((char*)pData,0, iLength);
		resStr.append(str);
		int index;
		while (true)
		{
			index = resStr.find("\r\n");
			if (index == -1)break;

			commStr = resStr.substr(0, index);
			resStr = resStr.substr(index +2, resStr.length() - (index +2));
			if (commStr!="")
			{
				PrintInfo(pSender, dwConnID);
				cout << "收到分割字串 " << commStr << endl;
			}
		}	

		PrintInfo(pSender, dwConnID);
		cout << "資料接受 " << str << endl;

		return HR_OK;
	}

	virtual EnHandleResult OnClose(ITcpClient* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode)
	{
		resStr = "";

		PrintInfo(pSender, dwConnID);
		cout << "連線斷開,"<< enOperation <<"操作導致錯誤,錯誤碼 " << iErrorCode<< endl;
		if (!SysExit) 
		{
			ThreadPool->Submit((Fn_TaskProc)(&ReConnect), (PVOID)pSender);
		}		
		return HR_OK;
	}	
};

迴圈輸入字串傳送服務端,程式碼如下:

int main()
{
	//啟動執行緒池
	ThreadPool->Start();

	CClientListenerImpl listener;
	CTcpClientPtr client(&listener);

	if (!client->Start("127.0.0.1", 60000))
	{
		cout << "連線錯誤:" << client->GetLastError() << "-" << client->GetLastErrorDesc();
	}
	
	string sendMsg;
	while (!SysExit)
	{		
		cin >> sendMsg;
		if (sendMsg == "esc") 
		{
			SysExit = true;
			break;
		}

		if (client->GetState() == SS_STARTED) 
		{
			const BYTE* data = (BYTE*)(sendMsg.c_str());
			if (client->Send(data, sizeof(data)))
			{
				PrintInfo(client, client->GetConnectionID());
				cout << "傳送成功 "<<sendMsg<<endl;
			}
			else
			{
				PrintInfo(client, client->GetConnectionID());
				cout << "傳送失敗,錯誤描述 " << client->GetLastError() << "-" << client->GetLastErrorDesc() << endl;
			}
		}
		else 
		{
			PrintInfo(client, client->GetConnectionID());			
			cout << "無法傳送,當前狀態 " <<client->GetState()<< endl;
		}
	}	
	client->Stop();
	//關閉執行緒池
	ThreadPool->Stop();

	return 0;	
}

實現TCP服務端

先實現一個列印函式,基本上和客戶端的相同,只有獲取本地IP的地方不同,程式碼如下:

void PrintInfo(ITcpServer* pSender, CONNID dwConnID)
{
	char buffer[20];
	TCHAR* ipAddr = buffer;
	int ipLen;
	USHORT port;

	pSender->GetListenAddress(ipAddr, ipLen, port);
	cout << string(ipAddr, 0, ipLen) << ":" << port << " " << "<-  [" << dwConnID << "] ";

	pSender->GetRemoteAddress(dwConnID, ipAddr, ipLen, port);
	cout << string(ipAddr, 0, ipLen) << ":" << port << " ";
}

為了演示客戶端和應用資料的繫結,定義一個使用者資料型別並建立一個佇列,程式碼如下:

class UserData 
{
public:
	UserData(string name="") 
	{
		Name = name;
	}
	string Name;
};
queue<UserData*> qName;  //建立佇列物件

實現CTcpServerListener監聽介面,收到字串後加上使用者名稱再發送回去,程式碼如下:

class CTcpServerListenerImpl : public CTcpServerListener
{
public:	
	virtual EnHandleResult OnAccept(ITcpServer* pSender, CONNID dwConnID, UINT_PTR soClient)
	{ 

		pSender->SetConnectionExtra(dwConnID,qName.front());
		qName.pop();
		PrintInfo(pSender, dwConnID);
		cout << "連線成功" << endl;
		return HR_OK;
	}
	virtual EnHandleResult OnReceive(ITcpServer* pSender, CONNID dwConnID, const BYTE* pData, int iLength)
	{ 
		string str((char*)pData, 0, iLength);
		PrintInfo(pSender, dwConnID);
		cout << "資料接受 " << str<<endl;

		PVOID pInfo = nullptr;		
		pSender->GetConnectionExtra(dwConnID, &pInfo);
		str = "reply-" + ((UserData*)pInfo)->Name + str;

		const BYTE* data = (BYTE*)(str.c_str());		
		pSender->Send(dwConnID, data,str.size());
		return HR_OK;
	}	
	virtual EnHandleResult OnClose(ITcpServer* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode)
	{
		PVOID pInfo = nullptr;
		pSender->GetConnectionExtra(dwConnID, &pInfo);
		qName.push((UserData*)pInfo);
		PrintInfo(pSender, dwConnID);
		cout << "斷開連線"<< endl;

		pSender->SetConnectionExtra(dwConnID, NULL);		
		return HR_OK;
	}
};

迴圈輸入字串傳送到客戶端,自動回覆客戶端傳送的訊息,程式碼如下:

bool SysExit = false;
int main()
{	
	UserData user1("NO1-User");
	UserData user2("NO2-User");
	UserData user3("NO3-User");
	UserData user4("NO4-User");

	qName.push(&user1);
	qName.push(&user2);
	qName.push(&user3);
	qName.push(&user4);

	CTcpServerListenerImpl listener;
	CTcpServerPtr server(&listener);
		
	if (!server->Start("127.0.0.1", 60000))
	{
		cout << "啟動錯誤:" << server->GetLastError() << "-" << server->GetLastErrorDesc();
	}
	
	string sendMsg;
	while (!SysExit)
	{
		cin >> sendMsg;
		if (sendMsg == "esc")
		{
			SysExit = true;
			break;
		}

		//如果陣列長度小於當前連線數量,則獲取失敗
		DWORD count= 1000;			
		CONNID pIDs[1000]; 
		ZeroMemory(pIDs, 1000);;

		if (server->GetAllConnectionIDs(pIDs, count)&& count >0)
		{
			for (size_t i = 0; i < count; i++)
			{
				const BYTE* data = (BYTE*)(sendMsg.c_str());
				if (server->Send(*(pIDs+i),data, sendMsg.size()))
				{
					PrintInfo(server, pIDs[i]);
					cout << "傳送成功 " << sendMsg << endl;
				}
				else
				{
					PrintInfo(server, pIDs[i]);
					cout << "傳送失敗,錯誤描述 " << server->GetLastError() << "-" << server->GetLastErrorDesc() << endl;
				}
			}		
		}
		else
		{			
			cout << "無法傳送,當前連線數 " << count << endl;
		}
	}
	server->Stop();
}

注: 獲取連線時指標陣列的長度一定要大於當前連線數量 ,否則會失敗。

實現Http客戶端

HP-Socket的Http客戶端有同步、非同步兩種,同步客戶端不需要繫結監聽器,這裡使用同步客戶端演示。

Sync Client: 同步HTTP客戶端元件(CHttpSyncClient和CHttpsSyncClient) 內部會處理所有事件,因此,它們不需要繫結監聽器(構造方法的監聽器引數傳入null); 如果綁定了監聽器則可以跟蹤元件的通訊過程。

測試客戶端可以使用 實時天氣介面 上面的測試示例,當前的測試示例為:

http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json

直接開始測試,程式碼如下:

int main()
{
    CHttpSyncClientPtr SyncClient;
    THeader type;
    type.name = "Content-Type";
    type.value = "text/html;charset=UTF-8";
    
    if (SyncClient->OpenUrl("GET", "http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json",&type))
    {
        LPCBYTE pData = nullptr;
        int iLength = 0;
        SyncClient->GetResponseBody(&pData, &iLength);        
        string body((char*)pData, iLength);
        //返回的有中文,需要轉化編碼格式
        cout << body << endl;        
        cout << endl;
        cout << StringToUtf(body) << endl;
        cout << endl;
        cout << UtfToString(StringToUtf(body)) << endl;
    }
    else
    {
        cout << "開啟失敗:"<<SyncClient->GetLastError()<<"-"<< SyncClient->GetLastErrorDesc()<<endl;
    }   
}

上面的 StringToUtfUtfToString 函式是轉載至C++ 中文亂碼的問題,該函式實現UTF-8和ANSI編碼格式的轉化,程式碼如下:

string UtfToString(string strValue)
{
    int nwLen = ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), -1, NULL, 0);
    wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0'
    ZeroMemory(pwBuf, nwLen * 2 + 2);
    ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen);
    int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL);
    char* pBuf = new char[nLen + 1];
    ZeroMemory(pBuf, nLen + 1);
    ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);
    std::string retStr(pBuf);
    delete[]pwBuf;
    delete[]pBuf;
    pwBuf = NULL;
    pBuf = NULL;
    return retStr;
}

string StringToUtf(string strValue)
{
    int nwLen = MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), -1, NULL, 0);
    wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0'
    memset(pwBuf, 0, nwLen * 2 + 2);
    MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen);
    int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL);
    char* pBuf = new char[nLen + 1];
    memset(pBuf, 0, nLen + 1);
    WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);
    std::string retStr = pBuf;
    delete[]pBuf;
    delete[]pwBuf;
    return retStr;
}

注:函式實現需放在main函式之前。

附件