C++中簡單使用HP-Socket
目錄
簡介
HP-Socket 是一套通用的高效能 TCP/UDP /HTTP 通訊 框架 ,包含服務端元件、客戶端元件和 Agent 元件,廣泛適用於各種不同應用場景的 TCP/UDP /HTTP 通訊系統,提供 C/C++ 、 C# 、 Delphi 、 E (易語言)、 Java 、 Python 等程式語言介面。
HP-Socket是一套國產的開源通訊庫,使用C++語言實現,提供多種程式語言的介面,支援 Windows 和 Linux 平臺:
HP-Socket包含30多個元件 ,可根據通訊角色Client/Server)、通訊協議TCP/UDP/HTTP)和接收模型PUSH/PULL/PACK)進行歸類,這裡只簡單介紹一下:
- Server元件 :基於IOCP/EPOLL通訊模型 ,並結合快取池 、私有堆等技術實現高效記憶體管理,支援超大規模、高併發通訊場景。
- Agent元件 :實質上是 Multi-Client 元件,與Server元件採用相同的技術架構,可同時建立和高效處理大規模Socket連線 。
- Client元件 :基於Event Select/POLL通訊模型,每個元件物件建立一個通訊執行緒並管理一個Socket連線, 適用於小規模客戶端場景。
- Thread Pool元件 :HP-Socket實現的高效易用的執行緒池元件,當成普通的第三方執行緒池庫使用即可。
HP-Socket的TCP元件支援PUSH、PULL和PACK三種接收模型:
- PUSH模型 :元件接收到資料時會觸發監聽器物件的 OnReceive(pSender,dwConnID,pData,iLength) 事件,把資料“推”給應用程式,這種模型使用起來是最自由的。
- PULL模型 :元件接收到資料時會觸發監聽器物件的 OnReceive(pSender,dwConnID,iTotalLength) 事件 ,告訴應用程式當前已經接收到多少資料,應用程式檢查資料的長度,如果滿足需要則呼叫元件的**Fetch(dwConnID,pData,iDataLength)方法把需
要的資料“拉”出來。 - PACK模型 :PACK模型系列元件是PUSH和PULL模型的結合體,應用程式不必處理分包與資料抓取,元件保證每個OnReceive事件都向應用程式提供一個完整資料包。
注: PACK模型元件會對應用程式傳送的每個資料包自動加上 4 位元組(32位的包頭) ,前10位為用於資料包校驗的包頭標識位,後22位為記錄包體長度的長度位。
使用方式
HP-Socket支援 MBCS 和 Unicode 字符集,支援 32 位和 64 位應用程式。可以通過原始碼、 DLL或LIB方式使用HP-Socket。 HP-Socket發行包中已經提供了HPSocket DLL和HPSocket4C DLL。
HP-Socket提供了各種情況下的dll檔案,不需要我們重新編譯,dll檔案按程式設計介面分為兩大類:
- HPSocket DLL :匯出C++程式設計介面 ,C++程式的首選方式,使用時需要把 SocketInterface.h(及其依賴檔案HPTypeDef.h) 、 HPSocket.h 以及 DLL 對應的 *.lib 檔案加入到工程專案,用到SSL元件還需要 HPSocket-SSL.h 檔案。
- HPSocket4C DLL :匯出C程式設計介面,提供給C語言或其它程式語言使用,使用時需要把 HPSocket4C.h 以及 DLL 對應的 *.lib 檔案加入到工程專案,用到SSL元件還需要 HPSocket4C-SSL.h 檔案。
實現簡單執行緒池
使用HP-Socket的執行緒池元件可以在程式中實現一個簡單的、公用的執行緒池,TCP通訊的斷線重連、傳送心跳都會用到執行緒池。執行緒池元件的主要函式如下:
- Start :啟動執行緒池,具體的使用可以參考原始碼的註釋。
- Submit :提交任務,主要使用 BOOL Submit(fnTaskProc,pvArg,dwMaxWait=INFINITE) ,另一個函式過載是使用一個特殊的資料型別(把Socket任務引數和任務函式封裝成一個數據結構)作為引數。
- Stop :關閉執行緒池,引數dwMaxWait代表最大等待時間(毫秒,預設: INFINITE ,一直等待)。
先實現執行緒池的CHPThreadPoolListener介面,然後構造IHPThreadPool智慧指標,後面執行緒池的操作都通過智慧指標操作,程式碼如下:
class CHPThreadPoolListenerImpl : public CHPThreadPoolListener { private: void LogInfo(string logStr) { cout <<"ThreadPool " <<logStr << endl; } public: virtual void OnStartup(IHPThreadPool* pThreadPool) { LogInfo("執行緒池啟動"); } virtual void OnShutdown(IHPThreadPool* pThreadPool) { LogInfo("執行緒池啟動關閉"); } virtual void OnWorkerThreadStart(IHPThreadPool* pThreadPool, THR_ID dwThreadID) { LogInfo("[" + to_string(dwThreadID) + "] " + "工作執行緒啟動"); } virtual void OnWorkerThreadEnd(IHPThreadPool* pThreadPool, THR_ID dwThreadID) { LogInfo("[" + to_string(dwThreadID) + "] " + "工作執行緒退出"); } }; CHPThreadPoolListenerImpl ThreadPoolListener; //全域性共享變數使用extern關鍵字修飾 extern CHPThreadPoolPtr ThreadPool(&ThreadPoolListener);
實現TCP客戶端
先實現一個列印函式,顯示客戶端相關的資訊,程式碼如下:
void PrintInfo(ITcpClient* pSender, CONNID dwConnID) { char buffer[20]; TCHAR* ipAddr = buffer; int ipLen; USHORT port; pSender->GetLocalAddress(ipAddr, ipLen, port); cout << string(ipAddr,0,ipLen) << ":" << port << " " << " [" << dwConnID << "] -> "; pSender->GetRemoteHost(ipAddr, ipLen, port); cout << string(ipAddr, 0, ipLen) << ":" << port << " "; }
實現CTcpClientListener監聽介面,客戶端斷線後自動重連,以換行符分割接收到的字串,程式碼如下:
bool SysExit = false; void ReConnect(ITcpClient* pSender) { while (pSender->GetState() != SS_STOPPED) { Sleep(10); } pSender->Start("127.0.0.1", 60000); } class CClientListenerImpl : public CTcpClientListener { public: virtual EnHandleResult OnConnect(ITcpClient* pSender, CONNID dwConnID) { PrintInfo(pSender, dwConnID); cout << "連線成功" << endl; return HR_OK; } string resStr = ""; string commStr=""; virtual EnHandleResult OnReceive(ITcpClient* pSender, CONNID dwConnID, const BYTE* pData, int iLength) { string str((char*)pData,0, iLength); resStr.append(str); int index; while (true) { index = resStr.find("\r\n"); if (index == -1)break; commStr = resStr.substr(0, index); resStr = resStr.substr(index +2, resStr.length() - (index +2)); if (commStr!="") { PrintInfo(pSender, dwConnID); cout << "收到分割字串 " << commStr << endl; } } PrintInfo(pSender, dwConnID); cout << "資料接受 " << str << endl; return HR_OK; } virtual EnHandleResult OnClose(ITcpClient* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode) { resStr = ""; PrintInfo(pSender, dwConnID); cout << "連線斷開,"<< enOperation <<"操作導致錯誤,錯誤碼 " << iErrorCode<< endl; if (!SysExit) { ThreadPool->Submit((Fn_TaskProc)(&ReConnect), (PVOID)pSender); } return HR_OK; } };
迴圈輸入字串傳送服務端,程式碼如下:
int main() { //啟動執行緒池 ThreadPool->Start(); CClientListenerImpl listener; CTcpClientPtr client(&listener); if (!client->Start("127.0.0.1", 60000)) { cout << "連線錯誤:" << client->GetLastError() << "-" << client->GetLastErrorDesc(); } string sendMsg; while (!SysExit) { cin >> sendMsg; if (sendMsg == "esc") { SysExit = true; break; } if (client->GetState() == SS_STARTED) { const BYTE* data = (BYTE*)(sendMsg.c_str()); if (client->Send(data, sizeof(data))) { PrintInfo(client, client->GetConnectionID()); cout << "傳送成功 "<<sendMsg<<endl; } else { PrintInfo(client, client->GetConnectionID()); cout << "傳送失敗,錯誤描述 " << client->GetLastError() << "-" << client->GetLastErrorDesc() << endl; } } else { PrintInfo(client, client->GetConnectionID()); cout << "無法傳送,當前狀態 " <<client->GetState()<< endl; } } client->Stop(); //關閉執行緒池 ThreadPool->Stop(); return 0; }
實現TCP服務端
先實現一個列印函式,基本上和客戶端的相同,只有獲取本地IP的地方不同,程式碼如下:
void PrintInfo(ITcpServer* pSender, CONNID dwConnID) { char buffer[20]; TCHAR* ipAddr = buffer; int ipLen; USHORT port; pSender->GetListenAddress(ipAddr, ipLen, port); cout << string(ipAddr, 0, ipLen) << ":" << port << " " << "<- [" << dwConnID << "] "; pSender->GetRemoteAddress(dwConnID, ipAddr, ipLen, port); cout << string(ipAddr, 0, ipLen) << ":" << port << " "; }
為了演示客戶端和應用資料的繫結,定義一個使用者資料型別並建立一個佇列,程式碼如下:
class UserData { public: UserData(string name="") { Name = name; } string Name; }; queue<UserData*> qName; //建立佇列物件
實現CTcpServerListener監聽介面,收到字串後加上使用者名稱再發送回去,程式碼如下:
class CTcpServerListenerImpl : public CTcpServerListener { public: virtual EnHandleResult OnAccept(ITcpServer* pSender, CONNID dwConnID, UINT_PTR soClient) { pSender->SetConnectionExtra(dwConnID,qName.front()); qName.pop(); PrintInfo(pSender, dwConnID); cout << "連線成功" << endl; return HR_OK; } virtual EnHandleResult OnReceive(ITcpServer* pSender, CONNID dwConnID, const BYTE* pData, int iLength) { string str((char*)pData, 0, iLength); PrintInfo(pSender, dwConnID); cout << "資料接受 " << str<<endl; PVOID pInfo = nullptr; pSender->GetConnectionExtra(dwConnID, &pInfo); str = "reply-" + ((UserData*)pInfo)->Name + str; const BYTE* data = (BYTE*)(str.c_str()); pSender->Send(dwConnID, data,str.size()); return HR_OK; } virtual EnHandleResult OnClose(ITcpServer* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode) { PVOID pInfo = nullptr; pSender->GetConnectionExtra(dwConnID, &pInfo); qName.push((UserData*)pInfo); PrintInfo(pSender, dwConnID); cout << "斷開連線"<< endl; pSender->SetConnectionExtra(dwConnID, NULL); return HR_OK; } };
迴圈輸入字串傳送到客戶端,自動回覆客戶端傳送的訊息,程式碼如下:
bool SysExit = false; int main() { UserData user1("NO1-User"); UserData user2("NO2-User"); UserData user3("NO3-User"); UserData user4("NO4-User"); qName.push(&user1); qName.push(&user2); qName.push(&user3); qName.push(&user4); CTcpServerListenerImpl listener; CTcpServerPtr server(&listener); if (!server->Start("127.0.0.1", 60000)) { cout << "啟動錯誤:" << server->GetLastError() << "-" << server->GetLastErrorDesc(); } string sendMsg; while (!SysExit) { cin >> sendMsg; if (sendMsg == "esc") { SysExit = true; break; } //如果陣列長度小於當前連線數量,則獲取失敗 DWORD count= 1000; CONNID pIDs[1000]; ZeroMemory(pIDs, 1000);; if (server->GetAllConnectionIDs(pIDs, count)&& count >0) { for (size_t i = 0; i < count; i++) { const BYTE* data = (BYTE*)(sendMsg.c_str()); if (server->Send(*(pIDs+i),data, sendMsg.size())) { PrintInfo(server, pIDs[i]); cout << "傳送成功 " << sendMsg << endl; } else { PrintInfo(server, pIDs[i]); cout << "傳送失敗,錯誤描述 " << server->GetLastError() << "-" << server->GetLastErrorDesc() << endl; } } } else { cout << "無法傳送,當前連線數 " << count << endl; } } server->Stop(); }
注: 獲取連線時指標陣列的長度一定要大於當前連線數量 ,否則會失敗。
實現Http客戶端
HP-Socket的Http客戶端有同步、非同步兩種,同步客戶端不需要繫結監聽器,這裡使用同步客戶端演示。
Sync Client: 同步HTTP客戶端元件(CHttpSyncClient和CHttpsSyncClient) 內部會處理所有事件,因此,它們不需要繫結監聽器(構造方法的監聽器引數傳入null); 如果綁定了監聽器則可以跟蹤元件的通訊過程。
測試客戶端可以使用 實時天氣介面 上面的測試示例,當前的測試示例為:
http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json
直接開始測試,程式碼如下:
int main() { CHttpSyncClientPtr SyncClient; THeader type; type.name = "Content-Type"; type.value = "text/html;charset=UTF-8"; if (SyncClient->OpenUrl("GET", "http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json",&type)) { LPCBYTE pData = nullptr; int iLength = 0; SyncClient->GetResponseBody(&pData, &iLength); string body((char*)pData, iLength); //返回的有中文,需要轉化編碼格式 cout << body << endl; cout << endl; cout << StringToUtf(body) << endl; cout << endl; cout << UtfToString(StringToUtf(body)) << endl; } else { cout << "開啟失敗:"<<SyncClient->GetLastError()<<"-"<< SyncClient->GetLastErrorDesc()<<endl; } }
上面的 StringToUtf 和 UtfToString 函式是轉載至C++ 中文亂碼的問題,該函式實現UTF-8和ANSI編碼格式的轉化,程式碼如下:
string UtfToString(string strValue) { int nwLen = ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), -1, NULL, 0); wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0' ZeroMemory(pwBuf, nwLen * 2 + 2); ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen); int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL); char* pBuf = new char[nLen + 1]; ZeroMemory(pBuf, nLen + 1); ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL); std::string retStr(pBuf); delete[]pwBuf; delete[]pBuf; pwBuf = NULL; pBuf = NULL; return retStr; } string StringToUtf(string strValue) { int nwLen = MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), -1, NULL, 0); wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0' memset(pwBuf, 0, nwLen * 2 + 2); MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen); int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL); char* pBuf = new char[nLen + 1]; memset(pBuf, 0, nLen + 1); WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL); std::string retStr = pBuf; delete[]pBuf; delete[]pwBuf; return retStr; }
注:函式實現需放在main函式之前。
附件
- Python-多執行緒及生產者與消費者
- git檔案管理與索引,深入理解工作原理
- 【SpringCloud-Alibaba】環境搭建以及注意事項
- 小白也能看懂的Redis教學基礎篇——做一個時間窗限流就是這麼簡單
- 譜聚類原理總結
- rocketmq實現延遲佇列精確到秒級實現(總結編)
- 開發 IDEA Plugin 引入探針,基於位元組碼插樁獲取執行SQL
- Chrome外掛:提醒你正在摸魚,摸魚的時候知道自己在摸魚,減少摸魚的時間和頻率。
- HttpRunner3的用例是怎麼執行起來的
- RocketMQ 原理:訊息儲存、高可用、訊息重試、訊息冪等性
- k8s-pv-pvc
- JUC併發程式設計與高效能記憶體佇列disruptor實戰-下
- 談談最近做的一個自動化平臺(二)
- 淺談23種設計模式之單例設計模式
- Python 為什麼不設計 do-while 迴圈結構?
- 【高併發】深入理解執行緒的執行順序
- java日誌列印使用指南
- 使用Rainbond打包業務模組,實現業務積木式拼裝
- JS定時器不可靠的原因及解決方案
- 程式設計進階之路,雖無捷徑但有長短