分享自己平時使用的socket多客戶端通訊的程式碼技術點和軟體使用

語言: CN / TW / HK

前言

說到linux下多程序通訊,有好幾種,之前也在喵哥的公眾號回覆過,這裡再拿出來,重新寫一遍:多程序通訊有管道,而管道分為匿名和命名管道 ,後者比前者優勢在於可以進行無親緣程序通訊;此外訊號也是程序通訊的一種,比如我們最常用的就是設定ctrl+c的kill訊號傳送給程序;其次訊號量一般來說是一種同步機制但是也可以認為是通訊,需要注意的是訊號量、共享記憶體、訊息佇列在使用時候也有posix和system v的區別;還有我們今天的主角套接字( socket ) :套接字也是一種程序間通訊機制。

執行緒間的通訊的話,共享變數,此外在unpipc書描述的話,同步也屬於通訊機制,那麼就要補充上執行緒中我們最多用的互斥量、條件變數、讀寫鎖、記錄鎖和執行緒中的訊號量使用。

今天想分享一些socket程式設計的例子,socket嵌入式。linux開發很常用,用於程序間通訊很方便,也有很多介紹,今天我也也來做自己的介紹分享。和別人不一樣的地方,我主要想分享socket 服務端在linux寫的程式碼,使用vscode除錯執行,並且同時分享自己使用tcp監控軟體去判斷socket通訊正確性。

作者:良知猶存

轉載授權以及圍觀:歡迎關注微信公眾號: 羽林君

或者新增作者個人微信: become_me

socket通訊基本函式介紹

在這裡有一個簡單demo演示以及函式的介紹,大家開啟這個 連結 就可以看到哈:

socket重要函式

socket通訊有些固定的函式,這裡先給大家做簡單的分享:

  • int socket(int domain, int type, int protocol);

該函式用於建立一個socket描述符,它唯一標識一個socket,這個socket描述字跟檔案描述字一樣,後續的操作都有用到它,把它作為引數,通過它來進行一些讀寫操作。建立socket的時候,也可以指定不同的引數建立不同的socket描述符,socket函式的三個引數分別為:

1.domain:引數domain表示該套接字使用的協議族,在Linux系統中支援多種協議族,對於TCP/IP協議來說,選擇AF_INET就足以,當然如果你的IP協議的版本支援IPv6,那麼可以選擇AF_INET6,可選的協議族具體見:

- AF_UNIX, AF_LOCAL: 本地通訊-AF_INET : IPv4
- AF_INET6 : IPv6
- AF_IPX : IPX - Novell 協議
- AF_NETLINK : 核心使用者介面裝置
- AF_X25 : ITU-T X.25 / ISO-8208 協議
- AF_AX25 : 業餘無線電 AX.25 協議
- AF_ATMPVC : 訪問原始ATM PVC
- AF_APPLETALK : AppleTalk
- AF_PACKET : 底層資料包介面
- AF_ALG : 核心加密API的AF_ALG介面

2.type:引數type指定了套接字使用的服務型別,可能的型別有以下幾種:

- SOCK_STREAM:提供可靠的(即能保證資料正確傳送到對方)面向連線的Socket服務,多用於資料(如檔案)傳輸,如TCP協議。
- SOCK_DGRAM:是提供無保障的面向訊息的Socket 服務,主要用於在網路上發廣播資訊,如UDP協議,提供無連線不可靠的資料報交付服務。
- SOCK_SEQPACKET:為固定最大長度的資料報提供有序的,可靠的,基於雙向連線的資料傳輸路徑。
- SOCK_RAW:表示原始套接字,它允許應用程式訪問網路層的原始資料包,這個套接字用得比較少,暫時不用理會它。
- SOCK_RDM:提供不保證排序的可靠資料報層。

3.protocol:引數protocol指定了套接字使用的協議,在IPv4中,只有TCP協議提供SOCK_STREAM這種可靠的服務,只有UDP協議提供SOCK_DGRAM服務,對於這兩種協議,protocol的值均為0,因為當protocol為0時,會自動選擇type型別對應的預設協議。

  • int bind(int sockfd, struct sockaddr *my_addr, socklen_t addrlen);

在進行網路通訊的時候,必須把一個套接字與一個IP地址或埠號相關聯,這個bind就是繫結的過程。

  • int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

這個connect()函式用於客戶端中,將sockfd與遠端IP地址、埠號進行繫結,在TCP客戶端中呼叫這個函式將發生握手過程(會發送一個TCP連線請求),並最終建立一個TCP連線,而對於UDP協議來說,呼叫這個函式只是在sockfd中記錄遠端IP地址與埠號,而不傳送任何資料,引數資訊與bind()函式是一樣的。

  • int listen(int s, int backlog);

listen()函式只能在TCP伺服器程序中使用,讓伺服器程序進入監聽狀態,等待客戶端的連線請求,listen()函式在一般在bind()函式之後呼叫,在accept()函式之前呼叫,它的函式原型是:

  • int accept(int s, struct sockaddr *addr, socklen_t *addrlen);

accept()函式就是用於處理連線請求的,accept()函式用於TCP伺服器中,等待著遠端主機的連線請求,並且建立一個新的TCP連線,在呼叫這個函式之前需要通過呼叫listen()函式讓伺服器進入監聽狀態,如果佇列中沒有未完成連線套接字,並且套接字沒有標記為非阻塞模式,accept()函式的呼叫會阻塞應用程式直至與遠端主機建立TCP連線;如果一個套接字被標記為非阻塞式而佇列中沒有未完成連線套接字, 呼叫accept()函式將立即返回EAGAIN。

  • ssize_t read(int fd, void *buf, size_t count);

read 從描述符 fd 中讀取 count 位元組的資料並放入從 buf 開始的緩衝區中.

  • ssize_t recv(int sockfd, void *buf, size_t len, int flags);

不論是客戶還是伺服器應用程式都可以用recv()函式從TCP連線的另一端接收資料,它與read()函式的功能是差不多的。

  • ssize_t write(int fd, const void *buf, size_t count);

write()函式一般用於處於穩定的TCP連線中傳輸資料,當然也能用於UDP協議中,它向套接字描述符 fd 中寫入 count 位元組的資料,資料起始地址由 buf 指定,函式呼叫成功返回寫的位元組數,失敗返回-1,並設定errno變數。

  • int send(int s, const void *msg, size_t len, int flags);

無論是客戶端還是伺服器應用程式都可以用send()函式來向TCP連線的另一端傳送資料。

  • int sendto(int s, const void *msg, size_t len, int flags, const struct sockaddr *to, socklen_t tolen);

sendto()函式與send函式非常像,但是它會通過 struct sockaddr 指向的 to 結構體指定要傳送給哪個遠端主機,在to引數中需要指定遠端主機的IP地址、埠號等,而tolen引數則是指定to 結構體的位元組長度。

  • int close(int fd);

close()函式是用於關閉一個指定的套接字,在關閉套接字後,將無法使用對應的套接字描述符

TCP客戶端一般流程

  • 呼叫socket()函式建立一個套接字描述符。
  • 呼叫connect()函式連線到指定伺服器中,埠號為伺服器監聽的埠號。
  • 呼叫write()函式傳送資料。
  • 呼叫close()函式終止連線。
 // 建立套接字描述符
((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
// 建立TCP連線
(connect(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr))
write(sockfd, buffer, sizeof(buffer))
close(sockfd);

TCP伺服器一般流程

  • 伺服器的程式碼流程如下:
  • 呼叫socket()函式建立一個套接字描述符。
  • 呼叫bind()函式繫結監聽的埠號。
  • 呼叫listen()函式讓伺服器進入監聽狀態。
  • 呼叫accept()函式處理來自客戶端的連線請求。
  • 呼叫read()函式接收客戶端傳送的資料。
  • 呼叫close()函式終止連線。
// socket create and verification
sockfd = socket(AF_INET, SOCK_STREAM, 0);
// binding newly created socket to given IP and verification   
if ((bind(sockfd, (struct sockaddr*)&server, sizeof(server))) != 0) 
// now server is ready to listen and verification
if ((listen(sockfd, 5)) != 0) {
// accept the data packet from client and verification
connfd = accept(sockfd, (struct sockaddr*)&client, &len);
if (read(connfd, buff, sizeof(buff)) <= 0) {
close(connfd);
close(sockfd);

這裡也順帶分享一個socket 阻塞和非阻塞的機制 前面提到accept函式中,描述套接字沒有標記為非阻塞模式,accept()函式的呼叫會阻塞應用程式直至與遠端主機建立TCP連線;如果一個套接字被標記為非阻塞式而佇列中沒有未完成連線套接字, 呼叫accept()函式將立即返回EAGAIN。但是socket預設初始化是阻塞的,正常初始化後accept沒有收到客戶端的連結請求的話,就會一直的等待阻塞當前執行緒,直到有客戶端進行連結請求。

那麼如何才能把socket設定為非阻塞呢?用 ioctl(sockfd, FIONBIO, &mode);

//-------------------------

// Set the socket I/O mode: In this case FIONBIO

// enables or disables the blocking mode for the

// socket based on the numerical value of iMode.

// If iMode = 0, blocking is enabled;

// If iMode != 0, non-blocking mode is enabled.

u_long iMode = 1;  //non-blocking mode is enabled.

ioctlsocket(m_socket, FIONBIO, &iMode); //設定為非阻塞模式

一般大家介紹會說使用ioctlsocket,但是有些系統使用會報錯。如下:

ioctlsocket 會報錯,所以使用 ioctl 就好了,操作都是一樣的。

 #include <sys/ioctl.h>
ioctl(sockfd, FIONBIO, &mode);

這是一個簡單的圖表分析,來自下面文章連結,大家有興趣也可以自行檢視。

阻塞非阻塞的介紹 連結

程式碼例項

程式碼有 test_socket_client.cpptest_socket_server.htest_socket_server.cpp 三個檔案,互動機制以及實現功能如下:

首先test_socket_client.cpp 是客戶端程式碼,用來測試連結伺服器端互動,用select進行接收資料,並監聽執行終端是否有輸入資訊,輸入資訊立刻傳送。

test_socket_server.h是test_socket_server.cpp使用定義的類和api的標頭檔案,而在test_socket_server.cpp實現了定義了一個支援多客戶端連線的通訊介面,同時也時刻檢測執行終端輸入資訊,並廣播到全部連結的客戶端;而客戶端發過來的資訊,服務端針對的點對點收發,即接收到特定客戶端的資訊只發送到該客戶端。其中使用了 std::future + std::async 實現了通訊的非同步操作,並使用 impl模式 包裹了socket介面。在監聽執行終端資訊時候分別使用了 std::conditionstd::async 實現,大家可以通過巨集開關自行選擇測試。

還有些其他的技術使用,多執行緒的排程以及流的輸出,忽略SIGPIPE訊號用來控制客戶端連結斷開之後程式碼正常執行等,再後面我一一給大家分析介紹。

test_socket_client.cpp這個檔案就是隨便找了一個socket客戶端程式碼,這個test_socket_client程式碼來源是網路,大家也可以自己去寫或者網上自己找相關的用例,因為本次的重要部分是服務端server程式碼,所以這塊就貼一下程式碼。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/time.h>

//g++ test_socket_client.cpp -o  test_socket_client

#define BUFLEN 1024
#define PORT 8555

int main(int argc, char **argv)
{
    int sockfd;
    struct sockaddr_in s_addr;
    socklen_t len;
    unsigned int port;
    char buf[BUFLEN];
    fd_set rfds;
    struct timeval tv;
    int retval, maxfd; 
    
    /*建立socket*/
    if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
        perror("socket");
        exit(errno);
    }else
        printf("socket create success!\n");

    /*設定伺服器ip*/
    memset(&s_addr,0,sizeof(s_addr));
    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(PORT);
    if (inet_aton("127.0.0.1", (struct in_addr *)&s_addr.sin_addr.s_addr) == 0) {
        perror("127.0.0.1");
        exit(errno);
    }
  
    /*開始連線伺服器*/ 
    while(connect(sockfd,(struct sockaddr*)&s_addr,sizeof(struct sockaddr)) == -1){
        perror("connect");
        sleep(1);
        exit(errno);
    }

    while(1){
        FD_ZERO(&rfds);
        FD_SET(0, &rfds);
        maxfd = 0;
        FD_SET(sockfd, &rfds);
        if(maxfd < sockfd)
            maxfd = sockfd;
        tv.tv_sec = 6;
        tv.tv_usec = 0;
        retval = select(maxfd+1, &rfds, NULL, NULL, &tv);
        if(retval == -1){
            printf("select出錯,客戶端程式退出\n");
            break;
        }else if(retval == 0){
            printf("waiting...\n");
            continue;
        }else{
            /*伺服器發來了訊息*/
            if(FD_ISSET(sockfd,&rfds)){
                /******接收訊息*******/
                bzero(buf,BUFLEN);
                len = recv(sockfd,buf,BUFLEN,0);
                if(len > 0)
                    printf("伺服器發來的訊息是:%s\n",buf);
                else{
                    if(len < 0 )
                        printf("接受訊息失敗!\n");
                    else
                        printf("伺服器退出了,聊天終止!\n");
                break; 
                }
            }
            /*使用者輸入資訊了,開始處理資訊併發送*/
            if(FD_ISSET(0, &rfds)){ 
                /******傳送訊息*******/ 
                bzero(buf,BUFLEN);
                fgets(buf,BUFLEN,stdin);
               
                if(!strncasecmp(buf,"quit",4)){
                    printf("client 請求終止聊天!\n");
                    break;
                }
                    len = send(sockfd,buf,strlen(buf),0);
                if(len > 0)
                    printf("\t訊息傳送成功:%s\n",buf); 
                else{
                    printf("訊息傳送失敗!\n");
                    break; 
                } 
            }
        }
    
    }
    /*關閉連線*/
    close(sockfd);
    return 0;
}

test_socket_server.h使用的標頭檔案,定義一些外部api

#ifndef _TEST_SOCKET_H
#define _TEST_SOCKET_H

#include <functional>
#include <memory>
#include <thread>
#include <vector>

namespace linx_socket {

int Writen(int fd, const void *vptr, int n);
int Readn(int fd, void *vptr, int maxlen);
int CreatSocket(const char *ip, int port);
int StartLisen(int fd);
bool Close(int fd);

}  // namespace linx_socket

class DevSocket  {
 public:
  using CallBack  = std::function<void(int ,std::vector<uint8_t>&&)>;
  DevSocket();
  DevSocket(const CallBack& callback);
  bool Send(int fd,const std::vector<uint8_t> &data) const ;
  // std::vector<uint8_t> Recive() const ; //當建立連線後 就會線上程裡面迴圈讀取客戶端發來的資訊, 所以不需要專門寫rx函式
  ~DevSocket(){}

  private:
  class Socket;
  std::unique_ptr<Socket> SocketImpl;

};

#endif

test_socket_server.cpp

裡面包含的 #include "log.h" 這個檔案是我自己寫的log輸出檔案,列印時間和顏色等,看著比較方便,大家使用程式碼時候自行替換成自己需要printf或者std::cout或者自己的列印檔案

#include <stdio.h>
#include <algorithm>
#include <array>
#include <chrono>
#include <boost/thread/mutex.hpp>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <iterator>
#include <string>
#include <thread>
#include <vector>
#include <arpa/inet.h>
#include <errno.h>
#include <net/if.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <unistd.h>
#include <future>

#include "test_socket_server.h"
#include "log.h"
// g++ test_socket_server_optimiza_2.cpp -o  test_socket_server_optimiza -lboost_thread -lpthread
namespace linx_socket
{

    constexpr int socket_que_size = 3;

    //使用select進行寫入
    int Writen(int fd, const void *vptr, int n)
    {
        ssize_t nleft = n;
        const char *ptr = (const char *)vptr;
        fd_set write_fd_set;
        struct timeval timeout;
        while (nleft > 0)
        {
            ssize_t nwriten = 0;

            timeout.tv_sec = 1;
            timeout.tv_usec = 0;
            FD_ZERO(&write_fd_set);
            FD_SET(fd, &write_fd_set);
            int s_ret = select(FD_SETSIZE, NULL, &write_fd_set, NULL, &timeout);

            if (s_ret < 0)
            {
                EXC_ERROR("-------write_fd_set error------------");
                return -1;
            }
            else if (s_ret == 0)
            {
                usleep(100 * 1000);
                EXC_ERROR("-------write_fd_set timeout ------------");
                continue;
            }

            if ((nwriten = write(fd, ptr, nleft)) < 0)
            {
                if (nwriten < 0 && errno == EINTR)
                {
                    nwriten = 0;
                }
                else
                {
                    EXC_ERROR("-------nwriten error = %d ------------", nwriten);
                    return -1;
                }
            }
            nleft -= nwriten;
            ptr += nwriten;
        }
        return n;
    }

    //使用select進行讀取
    int Readn(int fd, void *vptr, int maxlen)
    {
        bool ret = false;
        ssize_t nread = 0;
        fd_set read_fd_set;
        struct timeval timeout;
        while (!ret)
        {
            // EXC_INFO("Readn begine.");
            timeout.tv_sec = 1;
            timeout.tv_usec = 0;
            FD_ZERO(&read_fd_set);
            FD_SET(fd, &read_fd_set);
            int s_ret = select(FD_SETSIZE, &read_fd_set, NULL, NULL, &timeout);

            if (s_ret < 0)
            {
                EXC_ERROR("-------select error------------");
                return -1;
            }
            else if (s_ret == 0)
            {
                usleep(100 * 1000);
                // EXC_ERROR("-------select timeout ------------");
                continue;
            }

            if ((nread = read(fd, vptr, maxlen)) < 0)
            {
                if (errno == EINTR)
                {
                    EXC_ERROR("buff = %d, fd=%d, errno=%d.", vptr, fd, errno);
                    nread = 0;
                }
                else
                {
                    EXC_ERROR("buff = %d, fd=%d, errno=%d.", vptr, fd, errno);
                    return -1;
                }
            }
            else
            {
                if (nread == 0)
                {
                    EXC_ERROR("buff = %d, fd=%d, nread=%d. data:%s", vptr, fd, nread, vptr);
                }
                // else
                // {
                //     EXC_INFO("buff = %d, fd=%d, nread=%d. data:%s", vptr, fd, nread, vptr);
                // }
                ret = 1;
            }
        }
        return nread;
    }
    //進行處理來自客戶端的連線請求
    int IsListened(int fd)
    {
        struct sockaddr_in c_addr;
        socklen_t c_lent = sizeof(c_addr);
        int fd_c = accept(fd, (struct sockaddr *)&c_addr, &c_lent);

        if (fd_c < 0)
        {
            if (errno == EPROTO || errno == ECONNABORTED)
            {
                return -1;
            }
        }
        EXC_INFO("accept %s: %d sucess", inet_ntoa(c_addr.sin_addr), ntohs(c_addr.sin_port));
        return fd_c;
    }

    //建立一個套接字描述符
    int CreatSocket(const char *ip, int port)
    {
        int ret = -1;
        // EXC_INFO("CreatSocket");
        int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (fd < 0)
        {
            return -1;
        }
        int reuse = 1;
        //設定套接字的一些選項 SOL_SOCKET:表示在Socket層 SO_REUSEADDR(允許重用本地地址和埠)
        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
        {
            return -1;
        }

        struct sockaddr_in s_addr;
        memset(&s_addr, 0, sizeof(s_addr));
        s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        s_addr.sin_port = htons(port);
        s_addr.sin_family = AF_INET;
        if (bind(fd, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0)
        {
            EXC_ERROR("bind %s: %d error", inet_ntoa(s_addr.sin_addr), ntohs(s_addr.sin_port));
            close(fd);
            return -2;
        }
        if (listen(fd, socket_que_size) < 0)
        {
            close(fd);
            return -3;
        }
        return fd;
    }

    int CreatSocket(const char *ip, int port, int socket_que_size)
    {
        int ret = -1;
        EXC_INFO("");
        int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (fd < 0)
        {
            return -1;
        }
        struct sockaddr_in s_addr;
        memset(&s_addr, 0, sizeof(s_addr));
        s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        s_addr.sin_port = htons(port);
        s_addr.sin_family = AF_INET;
        if (bind(fd, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0)
        {
            close(fd);
            return -2;
        }
        if (listen(fd, socket_que_size) < 0)
        {
            close(fd);
            return -3;
        }
        return fd;
    }

    bool Close(int fd)
    {
        close(fd);
        return true;
    }
} // namespace linx_socket

class Connection
{
public:
    Connection(int fd, DevSocket::CallBack c) : call_back_f_(c), fd_(fd)
    {
        read_sta = std::async(std::launch::async, [this]()
                              { Read(); }); //迴圈讀取socket連線的資料


    };

    void Read()
    {
        while (!kill_thread_)
        {
            if (fd_ < 0)
                break;
            std::array<uint8_t, kBuffSize> buf;
            int len = linx_socket::Readn(fd_, buf.data(), kBuffSize);
            if (len > 0)
            {
                if (call_back_f_)
                {
                    data_parser_mutex_.lock();
                    call_back_f_(fd_, {buf.begin(), buf.begin() + len});
                    data_parser_mutex_.unlock();
                }
            }
            else if (len < 0)
            {
                kill_thread_ = true;
                EXC_ERROR("read error, fd= %d, rev len= %d.", fd_, len);
                break;
            }
            else if (len == 0)
            {
                std::this_thread::sleep_for(std::chrono::seconds(1));
                EXC_ERROR("call_back_f_ = %d, fd=%d, rev len=%d.", call_back_f_, fd_, len);
            }
        }
    }

    bool Write(std::vector<uint8_t> data)
    {
        if (linx_socket::Writen(fd_, data.data(), data.size()) < 0)
        {
            kill_thread_ = true;
            EXC_ERROR("Writen error.");
            return false;
        }
        return true;
    }
    bool GetIsKillThread() { return kill_thread_; }

    ~Connection()
    {
        EXC_INFO("kill_thread_ is %d", kill_thread_);

        kill_thread_ = true;
        if (fd_ != -1)
        {
            linx_socket::Close(fd_);
            fd_ = -1;
        }
    }
    std::future<void> &GetReadSta() { return read_sta; }
    int GetFd() { return fd_; }

private:
    int fd_ = -1;
    bool kill_thread_ = false;
    DevSocket::CallBack call_back_f_ = nullptr; /**/
    boost::mutex data_parser_mutex_;
    std::future<void> read_sta;
    constexpr static int kBuffSize = 1024;
};

class DevSocket::Socket
{
public:
    Socket(){};
    Socket(std::pair<std::string, int> port, const CallBack &callback_)
        : call_back_f_(callback_)
    {
        EXC_WARN("Socket ");
        int n;
        if ((n = linx_socket::CreatSocket(port.first.c_str(), port.second)) < 0)
        {
            throw std::string("CreatSocket  error ") + std::to_string(n);
        }
        fd = n;
        auto threa_func = [this]()
        {
            while (!kill_thread_)
            {
                //迴圈std::launch::async 傳遞的可呼叫物件非同步執行 
                std::future<int> listened_status = std::async(std::launch::async, [this]()
                                                              {
                                                                  EXC_INFO("Listened .");
                                                                  return linx_socket::IsListened(fd);
                                                              });

                //lister 套接字有沒有偵聽到連線,任務沒返回,沒有偵聽到連線套接字
                while (listened_status.wait_for(std::chrono::seconds(0)) !=
                       std::future_status::ready)
                {
                    if (kill_thread_)
                        return;
                    for (auto it = connections_.begin(); it != connections_.end();)
                    {
                        //任務返回了,說明該連線結束了
                        if ((*it)->GetReadSta().wait_for(std::chrono::seconds(0)) ==
                            std::future_status::ready)
                        {
                            if ((*it)->GetReadSta().valid())
                            {
                                EXC_ERROR("connection_kill_thread is %d, socket_kill_thread_ is =%d", (*it)->GetIsKillThread(), kill_thread_);
                                (*it)->GetReadSta().get();//主動退出
                            }
                            EXC_INFO("dis connection_ fd=%d.", (*it)->GetFd());
                            boost::mutex::scoped_lock lock(connection_mutex_);
                            it = connections_.erase(it);
                            if (connections_.size() <= 0)
                            {
                                EXC_ERROR("all is dis connected");
                            }
                        }
                        if (it != connections_.end())
                        {
                            ++it;
                        }
                    }

                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                }
         //    EXC_INFO( "==================== thread id: %d" ,std::this_thread::get_id());

                //有新的連線
                int clien_fd = listened_status.get();
                if (clien_fd > 0)
                {
                    boost::mutex::scoped_lock lock(connection_mutex_);
                    connections_.push_back(
                        std::make_shared<Connection>(clien_fd, call_back_f_));
                    EXC_INFO("connection_ fd=%d.", clien_fd);
                }
            }
        };

        // EXC_INFO("before move threa_func=%d.", threa_func);

        thread_ = std::thread(std::move(threa_func)); //左值變右值傳入 減少拷貝

        // EXC_INFO("after  move threa_func=%d.", threa_func);
    }

    bool SendData(const std::vector<uint8_t> &data, std::shared_ptr<Connection> connection)
    {
        boost::mutex::scoped_lock lock(connection_mutex_);
        return connection->Write(data);
    }
    std::vector<std::shared_ptr<Connection>> GetConnections()
    {
        return connections_;
    }
    ~Socket()
    {
        kill_thread_ = true;
        if (fd != -1)
        {
            linx_socket::Close(fd);
        }
        if (thread_.joinable())
        {
            thread_.join();
        }
    }

private:
    int fd = -1;
    bool kill_thread_ = false;
    CallBack call_back_f_ = nullptr;
    std::thread thread_;
    std::vector<std::shared_ptr<Connection>> connections_;
    boost::mutex connection_mutex_;
};

#define HOST "127.0.0.1" // 根據你伺服器的IP地址修改
#define PORT 8555        // 根據你伺服器程序繫結的埠號修改

DevSocket::DevSocket()
{
    EXC_WARN("new  DevSocket");
    std::pair<std::string, int> par{HOST, PORT};
    SocketImpl = std::unique_ptr<Socket>(new Socket(par, nullptr));
}

DevSocket::DevSocket(const CallBack &callback)
{
    EXC_WARN("new  DevSocket");
    std::pair<std::string, int> par{HOST, PORT};
    SocketImpl = std::unique_ptr<Socket>(new Socket(par, callback));
}

bool DevSocket::Send(int fd, const std::vector<uint8_t> &data) const
{
    for (auto connection : SocketImpl->GetConnections())
    {
        if (nullptr == connection)
            continue;
        if (fd == connection->GetFd() || fd == 0) //fd ==0 全部發送
        {
            int ret = SocketImpl->SendData(data, connection);
            EXC_WARN("fd %d  send status :%d", connection->GetFd(), ret);
        }
    }
    return true;
}

std::ostream &operator<<(std::ostream &out, std::vector<uint8_t> &data)
{
    EXC_WARN("operator 1<<<<<<<<<<<<<<");

    out << "hex ";
    out << std::hex;
    for (auto &d : data)
    {
        out << "0x" << std::hex << (int)d << " ";
    }
    out << std::endl;
    EXC_WARN("operator 2<<<<<<<<<<<<<");
    return out;
}

#include <signal.h>
void pipesig_handler(int sig)
{
    EXC_ERROR("receive signal %d", sig);
}
#if 1 //std::async 控制傳送
int main(int argc, char *argv[])
{
    DevSocket *Device;
    // 為SIGPIPE新增訊號處理函式,處理完程式繼續執行 1
    signal(SIGPIPE, pipesig_handler);
    bool SendFlag = false;
    std::vector<uint8_t> send_data; 
    int read_fd{-1};
    try
    {
        EXC_INFO("device socket init");
        Device =
            new DevSocket([&](int fd, std::vector<uint8_t> &&d)
                          {
                              EXC_INFO("recive call fd :%d", fd);
                              send_data = d;
                              SendFlag = true;
                              std::ostringstream ss;
                              ss << "recive data:[";
                              std::for_each(send_data.begin(), send_data.end(),
                                            [&](uint8_t temp)
                                            { ss << " " << temp << ","; });
                              EXC_WARN("%s]", ss.str().c_str());
                            //   std::cout << send_data; //使用operator<< 函式
                          });
    }
    catch (const std::string s)
    {
        EXC_INFO("Device.emplace_back:%s", s.c_str());
        return EXIT_FAILURE;
    }
    const int BUFLEN = 1024;
    char buf[BUFLEN];
    std::thread input_keyboard = std::thread([&]
                                             {
                                                 while (true)
                                                 {
                                                     memset(buf, 0, sizeof(buf));
                                                     /*fgets函式:從流中讀取BUFLEN-1個字元*/
                                                     fgets(buf, BUFLEN, stdin);
                                                     EXC_INFO("from terminal:%s", buf);

                                                     if (!strncasecmp(buf, "quit", 4))
                                                     {
                                                         EXC_INFO("server quit!");
                                                         exit(0);
                                                     }
                                                     std::vector<uint8_t> send_msg;

                                                     for (int i = 0; buf[i] != '\0'; i++)
                                                     {
                                                        //  EXC_INFO("data:index[%d] :%d", i, buf[i]);
                                                         send_msg.emplace_back(buf[i]);
                                                     }
                                                     Device->Send(0, send_msg); //代表全部連結的都發送 fd =0
                                                 }
                                             });
    while (true)
    {
        // EXC_INFO(" ");

 std::future<bool> send_future = std::async(std::launch::async, [&]()
                                                              {
                                                                    while(true)
                                                                    {
                                                                        if(SendFlag)
                                                                        {
                                                                            return true;
                                                                        }
                                                                        std::this_thread::sleep_for(std::chrono::milliseconds(20));
                                                                        return false;
                                                                    }
                                                              });
        {

            if (send_future.wait_for(std::chrono::milliseconds(30)) == std::future_status::ready) //子執行緒已執行完
            {
                // EXC_INFO( "ready...");
                if(send_future.get())
                {
                    SendFlag=false;
                    std::ostringstream ss;
                    ss.clear();
                    ss << "send date :[";
                    std::for_each(send_data.begin(), send_data.end(),
                                [&](uint8_t &temp)
                                { ss << " " << temp << ","; });
                    EXC_INFO("%s]", ss.str().c_str());
                    
                    if (!send_data.empty())
                    {
                        Device->Send(read_fd, send_data);
                        send_data.clear();
                    }
                }
            }
        }

    }
    input_keyboard.join();
}
#elif 1 //std::condition_variable 選擇傳送
int main(int argc, char *argv[])
{
    std::mutex SendMutex;
    std::condition_variable SendCondition;
    bool SendFlag = false;
    DevSocket *Device;
    // 為SIGPIPE新增訊號處理函式,處理完程式繼續執行 1
    signal(SIGPIPE, pipesig_handler);

    std::vector<uint8_t> send_data; //(8, 1);
    int read_fd{-1};
    EXC_WARN("");
    {
        std::ostringstream ss;
        ss << "send_date 1:[";
        std::for_each(send_data.begin(), send_data.end(),
                      [&](uint8_t temp)
                      { ss << " " << temp << ","; });
        EXC_INFO("%s]", ss.str().c_str());
    }
    const int BUFLEN = 1024;
    char buf[BUFLEN];

    try
    {
        EXC_INFO("Device.emplace_back");
        Device =
            new DevSocket([&](int fd,std::vector<uint8_t> &&d)
                          {
                              EXC_INFO("recive call fd :%d",fd);
                              {
                                  std::lock_guard<std::mutex> m(SendMutex);
                                  send_data = d;
                                  SendFlag = true;

                                  std::ostringstream ss;
                                  ss.clear();
                                  ss << "recive 2:[";
                                  std::for_each(send_data.begin(), send_data.end(),
                                                [&](uint8_t temp)
                                                { ss << " " << temp << ","; });
                                  EXC_WARN("%s]", ss.str().c_str());
                                  std::cout << send_data;
                              }
                              SendCondition.notify_one();
                              EXC_INFO("");
                          });
    }
    catch (const std::string s)
    {
        EXC_INFO("Device.emplace_back:%s", s.c_str());
        return EXIT_FAILURE;
    }
    std::thread input_keyboard = std::thread([&]
                                             {
                                                 while (true)
                                                 {
                                                     memset(buf, 0, sizeof(buf));
                                                     /*fgets函式:從流中讀取BUFLEN-1個字元*/
                                                     fgets(buf, BUFLEN, stdin);
                                                     EXC_INFO("from terminal:%s", buf);

                                                     if (!strncasecmp(buf, "quit", 4))
                                                     {
                                                         EXC_INFO("server quit!");
                                                         exit(0);
                                                     }
                                                     std::vector<uint8_t> send_msg;

                                                     for (int i = 0; buf[i] != '\0'; i++)
                                                     {
                                                         EXC_INFO("data:index[%d] :%d", i, buf[i]);
                                                         send_msg.emplace_back(buf[i]);
                                                     }
                                                     Device->Send(0,send_msg);//代表全部連結的都發送 fd =0
                                                 }
                                             });
    while (true)
    {
        EXC_INFO(" ");

        {
            std::unique_lock<std::mutex> m(SendMutex);
            SendCondition.wait(m, [&]
                               { return SendFlag; });
            SendFlag = false;
        }

        {
            std::ostringstream ss;
            ss.clear();
            ss << "send_date 3:[";
            std::for_each(send_data.begin(), send_data.end(),
                          [&](uint8_t &temp)
                          { ss << " " << temp << ","; });
            EXC_ERROR("%s]", ss.str().c_str());
        }

        EXC_INFO("");
        if (!send_data.empty())
        {
            Device->Send(read_fd,send_data);
            send_data.clear();
        }
    }
    input_keyboard.join();
}
#endif

分析介紹服務端程式碼使用到的技術點

程式碼展示完畢,接下來給大家一點點分析裡面用的一些關鍵點:

使用的技術點:

std::future + std::async

使用程式碼: std::future<void> read_sta = std::async(std::launch::async, [this]() { Read(); });

非同步呼叫往往不知道何時返回,但是如果非同步呼叫的過程需要同步,或者說後一個非同步呼叫需要使用前一個非同步呼叫的結果。這個時候就要用到future。

首先std::future是一個類模板,其物件儲存將來的值。提供了一種訪問該值的機制,即使用get()成員函式。但是,如果此時在get()函式可用之前訪問它的未來關聯值,則get()函式將阻塞當前執行緒,直到get()函式準備好它的資料。std::future期待一個返回,從一個非同步呼叫的角度來說,future更像是執行函式的返回值,C++標準庫使用std::future為一次性事件建模,如果一個事件需要等待特定的一次性事件,那麼這執行緒可以獲取一個future物件來代表這個事件。

執行緒可以週期性的在這個future上等待一小段時間,檢查future是否已經ready,如果沒有,該執行緒可以先去做另一個任務,一旦future就緒,該future就無法復位(無法再次使用這個future等待這個事件),所以future代表的是一次性事件。

std::future物件是std::async、std::promise、std::packaged_task的底層物件,用來傳遞其他執行緒中操作的資料結果。這就是我們會有 std::future + std::asyncstd::future + std::promisestd::future + std::packaged_task 的組合使用。幾者使用的方法大同小異,std::async是函式,std::promise和std::packaged_task是類, 相信對比這篇 文章 之後大家會有更加詳細的理解用法,這裡我就不多做贅述了。

本次使用了std::async函式,以及配合使用了wait_for()函式和get()函式,使用這兩個部分原因是阻塞動作,因為std::async建立非同步任務時候建立一個執行緒去執行任務,使用以上兩個函式可以進行確認非同步執行緒的狀態,兩者的區別是使用get函式時候,要是非同步執行緒沒有執行完成,當前執行緒會原地阻塞直接非同步執行緒執行完成;而wait_for()呼叫也會在當前位置阻塞,但wait_for有阻塞時間的引數,如果引數為std::chrono::seconds(0),那麼就不會阻塞當前執行緒。

而在本次的程式碼裡面,std::future本次請求返回是void,也就是無需要具體的返回,可以理解為執行緒結束的話,get()函式就可以準備好了的。

題外話:在實際開發中,有時候某執行緒的值不止被一個執行緒所需要,而get()卻只能只用一次,std::future自身問題,它只容許一個執行緒等待結果。若我們要讓多個執行緒等待同一個目標事件,這時可以通過std::shared_future達到多次通過get()獲取值的目的。

注:get()函式只能使用一次,因為get()函式的設計是一個移動語義,相當於將future物件中的值轉移到了get()呼叫者中,所以再次get()就會報出異常。

std::condition_variable

處理一次性事件,我們std::condition_variable可以用於非同步事件的重複通知,condition_variable可以用於非同步事件的重複通知是條件變數,和條件變數pthread_cond_t類似,而std::condition_variable在Linux 下也有使用 Pthread 庫中的 pthread_cond_*() 函式提供了與條件變數相關的功能,所以兩者使用方法都是類似的,效果也是一樣的。

std::condition_variable 物件通常使用 std::unique_lock std::mutex 來等待,當 std::condition_variable 物件的某個 wait 函式被呼叫的時候,它使用 std::unique_lock(通過 std::mutex) 來鎖住當前執行緒。當前執行緒會一直被阻塞,直到另外一個執行緒在相同的 std::condition_variable 物件上呼叫了 notification 函式來喚醒當前執行緒,使用 notify_all可以通知所有等待的執行緒,notify_one則只會喚醒一個執行緒。

signal訊號處理

訊號是程序通訊一種手段,除了通訊,很多程式碼跑飛的問題,都是核心通知訊號到程序的,所以解決bug時候我們也會這裡面為什麼要忽略這個訊號呢,後面gdb除錯可以看到詳細的資訊,這裡直接說原因,是因為本次程式碼是 一個socket服務端對應多個客戶端,而中間互動過程中,會有一些客戶端連結也有客戶端斷開,而當伺服器完整close這個連線時,若客戶端端接著發資料。根據TCP協議的規定,會收到一個RST響應,client再往這個伺服器傳送資料時,系統會發出一個SIGPIPE訊號給程序,告訴程序這個連線已經斷開了,不要再寫了。這個時候程序會game over,所以為了避免程序退出, 可以捕獲SIGPIPE訊號, 或者忽略它, 給它設定SIG_IGN訊號處理函式。

直接把這個訊號忽略掉

signal(SIGPIPE, SIG_IGN);

為SIGPIPE新增訊號處理函式,處理完程式繼續執行

void pipesig_handler(int sig)
{
  EXC_ERROR("receive signal %d", sig);
}
  // 為SIGPIPE新增訊號處理函式,處理完程式繼續執行 1
  signal(SIGPIPE, pipesig_handler);  

執行原理分析:

  • linx_socket名稱空間寫了socket通訊基本一些介面,基於select的read write讀寫函式、socket建立函式、用來處理客戶端連結的IsListened函式、close函式關閉socket,這部分程式碼用來做後面類成員函式的基本呼叫的”庫“函式; Connection類實現了每一個客戶端連結成功後,都會執行Read函式,Read是一個while迴圈,使用std::async啟動之後,,迴圈退出的條件在解構函式置位;此外還有Write函式做外部介面。

  • DevSocket類是最終使用的socket通訊的外部介面,其中使用impl模式包裝外部介面,在DevSocket類裡面定義一個Socket類,這個類最重要的就是 Socket(std::pair<std::string, int> port, const CallBack &callback_) : call_back_f_(callback_) 這個建構函式,除了使用CreatSocket函式初始化建立一個socket裝置描述符,還啟動一個lambda執行緒函式threa_func,該執行緒一直迴圈執行std::async去建立執行緒去呼叫linx_socket::IsListened(fd)處理隨時來的客戶端連結請求。

    A:做了這一步之後 listened_status.wait_for 開始無延時判斷 linx_socket::IsListened 函式的執行狀態,在沒有客戶端有連結請求的時候, listened_status.wait_for 會返回 std::future_status::timeout ,然後迴圈判斷所有 connections_ 中的 read_staRead 函式是否已經執行完成,而 Read 函式只有讀取失敗這裡才會退出迴圈,執行完函式,這時候其實對應這個連結結束。

    B:而當 istened_status.wait_for 返回 std::future_status::ready 之後 connections_.push_back(std::make_shared<Connection>(clien_fd, call_back_f_)) ;這段程式碼把新的客戶端連結新增到 connections_ 中去,然後一直迴圈執行A B動作。

    除了Socket這個建構函式,還有SendData去呼叫我們上面提到Connection的Write函式,包裝成新的外部介面。

  • &operator<< 函式過載了 << 運算子,方便輸出 std::vector<uint8_t> 型別資料,這個型別資料是 std::vector<uint8_t> send_data ; 大家可以開啟我上面的註釋程式碼測試驗證。

  • main函式,這部分有兩處,用 #if #elif 進行選擇,分別使用了 std::asyncstd::condition_variable 實現收到的資料之後喚醒主執行緒,再令主執行緒把收到的資料轉發到對應連結的客戶端;其次還啟動了 input_keyboard 這個執行緒,用來監控,終端介面輸入的字元,轉發到所有連結的客戶端。

除錯方法

這裡我分享兩種除錯方法, gdbvscode
首先程式碼gdb除錯時候,編譯記得加上 -g

g++ test_socket_server.cpp -o  test_socket_server -g -lboost_thread -lpthread

直接gdb + 編譯好的可執行檔案

gdb test_socket_server_optimiza 進入之後使用 layout src 再加 l 命令檢視原始碼除錯,很方便。

gdb除錯時候遇到這個報錯received signal SIGPIPE, Broken pipe. 需要忽略SIGPIPE訊號

vscode c++程式中新增外部動態連結庫 幫助除錯

在每個vscode開啟的工程目錄下,都有.vscode目錄,裡面會有幾個.json檔案,開啟開啟tasks.json檔案,在 "${fileDirname}/${fileBasenameNoExtension}" ,繼續增加自己連結需要庫 "-lboost_thread","-lpthread" ,如下所示:

這樣子就可以正常除錯了

通訊過程分析

下圖是我執行程式碼的log日誌輸出效果,那麼我們怎麼檢視底層的傳輸資料呢?

我這裡分享使用的兩個軟體可以互相配合使用:

tcpdump 抓包分析

tcpdump,就是:dump the traffic on a network,根據使用者的定義對網路上的資料包進行截獲的包分析工具。 tcpdump可以將網路中傳送的資料包的“頭”完全截獲下來提供分析。它支援針對網路層、協議、主機、網路或埠的過濾,並提供and、or、not等邏輯語句來幫助你去掉無用的資訊。

它有很多命令操作,連結

我直接實時顯示了資料 sudo tcpdump host 127.0.0.1 and port 8555 -i lo

這裡面詳細資訊分析,大家直接可以看這篇文章進行對比,這位仁兄寫的很詳細, 連結

但是資料有時候無法實時檢視,這個時候把資料儲存起來,然後用Wireshark進行分析 sudo tcpdump host 127.0.0.1 and port 8555 -i lo -w socket_test.pcap

然後使用 wireshark socket_test.pcap 開啟

.pcap檔案直接使用Wireshark開啟就可以看到了,這裡面的小 demo 應該可以幫到你。

Wireshark抓包分析

Wireshark 是一款自由開源的網路協議分析器,它在全球被廣泛使用。通過使用 Wireshark,你可以實時捕獲網路的傳入和傳出資料包,並將其用於網路故障排除、資料包分析、軟體和通訊協議開發等。 windows和Ubuntu都可以使用, 本次我使用場景是Ubuntu。

Wireshark 可以在 Ubuntu 的 Universe 儲存庫中找到。你可以啟用 universe 儲存庫,然後按如下方式安裝:

sudo add-apt-repository universe
sudo apt install wireshark

安裝時候有wireshark-common設定,我選擇了預設的否,裡面提示也是建議禁用它。 後期大家自己想更改的話,也可以使用 sudo dpkg-reconfigure wireshark-common 命令重新修改。

使用 sudo wireshark 開啟軟體

開啟儲存好的.pcap wireshark socket_test.pcap

實時監控 sudo wireshark

篩選欄進行設定port,我的埠是8555,所以如此設定tcp.port == 8555,就可以看到實時互動的底層資料了。

這裡只是配合自己的demo進行簡單的軟體簡單使用分享,更為詳細使用,大家可以網上自行搜尋。

結語

這就是我自己的一些socket相關的程式碼和軟體使用分享。如果大家有更好的想法和需求,也歡迎大家加我好友交流分享哈。

作者:良知猶存,白天努力工作,晚上原創公號號主。公眾號內容除了技術還有些人生感悟,一個認真輸出內容的職場老司機,也是一個技術之外豐富生活的人,攝影、音樂 and 籃球。關注我,與我一起同行。

‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧  END  ‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧