非阻塞socket网络编程之数据收发完整代码示例

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 非阻塞socket网络编程之数据收发完整代码示例

背景

公司业务需要,读取yuv个数的数据,发送到服务端。刚开始使用的阻塞的套接字(注意:创建的套接字默认是阻塞的),想着用非阻塞的模式试一试,经过一番摸索,将整个过程记录一下。

因为一笔yuv数据是12M,所以在非阻塞模式下,send或recv的时候会报错Resource temporarily unavailable,这是因为对方的接收缓冲满了或者己方的接收缓冲区没有数据。

引言

对于套接字来说,阻塞和非阻塞的只会影响以下几个网络编程api

1.客户端的 connect 函数
2.服务端的 accept 函数
3.发送数据 send 函数
4.接收数据 recv 函数

为了行文方便,先做以下说明:

  1. connfd: 客户端创建的套接字
  2. listenfd: 服务端创建的监听套接字
  3. clientfd: 客户端连接时,服务端accept函数返回的套接字

如果有对网络编程、IO模型和IO复用还不熟悉的小伙伴可以先看下我之前的文章:Posix API与网络协议栈实现原理Linux五种IO模型

一、客户端

对于客户端 connfd 来说:

  • 阻塞: connect 函数会一直阻塞直到连接成功或者超时或者出错
  • 非阻塞:无论连接是否建立成功,connect函数都会立即返回。 如果返回值为0,表示连接成功;如果小于0,不代表一定失败,需根据实际的错误码errno来执行响应的操作。此时错误码errno可能为:EINTR 或 EINPROGRESS,如果是EINTR,表示连接被信号打断,可以继续尝试连接;如果是 EINPROGRESS 表示连接还在进行中,后面需要通过select或poll来判断connfd是否可写,如果可写表示连接成功。

二、 服务端

对于服务端 listenfd 来说

  • 阻塞: accept函数会一直阻塞,直到pending连接队列中有连接要处理
  • 非阻塞:accepte函数会立即返回,如果pending连接队列中有连接,则返回clientfd;如果没有要处理的连接,返回值小于0。错误码errno为:EWOULDBLOCK 或 EAGAIN,对应的错误为:Resource temporarily unavailable。当出现EWOULDBLOCK 或 EAGAIN 最好的办法是重试,重试一定次数后还不成功就退出操作。不能无限重试,导致线程卡死

三.、send 和 recv 系统调用

3.1 send函数

如果是sock是阻塞的,对于发送数据send函数来说,当对方的接收窗口太小,会一直卡在send函数,从现象看就是程序卡死了;如果是非阻塞的,send函数会立即返回,返回值是-1,errno是EWOULDBLOCK或EAGAIN。

3.2 recv函数

如果是sock是阻塞的,对于接收数据recv函数来说,如果接收缓冲区中没有数据,会一直卡在recv函数,从现象看就是程序卡死了;如果是非阻塞的,recv函数会立即返回,返回值是-1,errno是EWOULDBLOCK或EAGAIN。

3.3 send 和 recv 函数返回值分析
  1. ret > 0 send或recv成功的字节数
  2. ret == 0 对端关闭连接
  3. ret < 0 对端Tcp窗口太小或者缓冲器无数据可读;或者出错或者被信号中断

四、错误码

#define EAGAIN 11 /* Try again */ 
# EAGAIN表示:资源短暂不可用,这个操作可能等下重试后可用。EAGAIN的另一个名字叫做EWOULDAGAIN,这两个宏定义在GNU的c库中永远是同一个值。
#define EINTR 4 /* Interrupted system call */
 #define EWOULDBLOCK EAGAIN /* Operation would block */

IO复用epoll + 非阻塞sokcet

服务端代码:server_epoll_noblock.cpp

// 编译方法:
// g++ -o server_epoll_noblock server_epoll_noblock.cpp
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <iostream>
#define ANET_OK  0
#define ANET_ERR -1
int anetSetBlock(int fd, int non_block) {
    int flags;
    if ((flags = fcntl(fd, F_GETFL)) == -1) {
        return ANET_ERR;
    }
    /* Check if this flag has been set or unset, if so, 
     * then there is no need to call fcntl to set/unset it again. */
    if (!!(flags & O_NONBLOCK) == !!non_block)
        return ANET_OK;
    if (non_block)
        flags |= O_NONBLOCK;
    else
        flags &= ~O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        return ANET_ERR;
    }
    return ANET_OK;
}
#define CLIENTCOUNT 2048
#define MAX_EVENTS 2048
#define MAX_EPOLL_EVENTS 1024
#define MAX_ACCEPT_PER_CALL 1000
#define NOBLOCK_ACCEPT_OPTIMAL 0
int main(int argc, char **argv)
{
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd < 0)
    {
        perror("socket");
        return -1;
    }
    anetSetBlock(listenfd, true);
    unsigned short sport = 3000;
    if(argc == 2)
    {
        sport = atoi(argv[1]);
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    printf("port = %d\n", sport);
    addr.sin_port = htons(sport);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    if(bind(listenfd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
    {
        perror("bind");
        return -2;
    }
    if(listen(listenfd, 20) < 0)
    {
        perror("listen");
        return -3;
    }
    struct sockaddr_in connaddr;
    socklen_t len = sizeof(connaddr);
    int i = 0, ret = 0;
    int epollfd = epoll_create(MAX_EVENTS);
    // 将listenfd加入到epoll集合
    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = listenfd;
    if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0)
    {
        perror("epoll_ctl");
        return -2;
    }
    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    int count = 0;
    int nready = 0;
    char buf[1024] = {0};
    int clientfd  = 0;
    while(1)
    {
        nready = epoll_wait(epollfd, events, MAX_EPOLL_EVENTS, -1);
        if(nready == -1)
        {
            perror("epoll_wait");
                        return -3;
        }
        if(nready == 0) // 肯定不会走到这里,因为上面没设置超时时间
        {
            continue;
        }
        for(i = 0; i < nready; i++)
        {
            std::cout << "current ready fd is: " << events[i].data.fd << std::endl;
            if(events[i].data.fd == listenfd)
            {
                // listenfd 是非阻塞,我们就可以一直在这里死循环处理pending连接队列中的客户端,直到pending队列为null
                // 也存在一个问题,当连接的客户端数量较多是,这里的处理会耗时
                // Redis 在这里做了优化,每次最多处理1000条连接
#if NOBLOCK_ACCEPT_OPTIMAL
                int maxAccept = MAX_ACCEPT_PER_CALL;  //Redis 优化使用
                while (maxAccept--)  //Redis 优化
#else
                while (true)
#endif
                {
                    clientfd = accept(listenfd, (struct sockaddr*)&connaddr, &len);
                    if (clientfd == ANET_ERR){
                        // 已经没有连接要处理了,退出accept的while循环
                        if (errno == EWOULDBLOCK || errno == EAGAIN){
                            std::cout << "no client link need to handler" << std::endl;
                            break;
                        }else if(errno == EINTR){
                            continue;
                        }else{
                            // error happend
                        }
                    }else {
                        char strip[64] = {0};
                        char *ip = inet_ntoa(connaddr.sin_addr);
                        strcpy(strip, ip);
                        printf("client connect, clientfd:%d,ip:%s, port:%d, count:%d\n", clientfd, strip,ntohs(connaddr.sin_port), ++count);
                        // 设置clientfd为非阻塞
                        anetSetBlock(clientfd, true);
                        // 将clientfd加入到epoll集合,并监听它的读事件
                        event.data.fd = clientfd;// 这样在epoll_wait返回时就可以直接用了
                        event.events = EPOLLIN;  // 水平触发
                        // event.events = EPOLLIN|EPOLLET; // 边沿触发
                        epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &event);
                    }
                }
            }
            else if(events[i].events & EPOLLIN)
            {
                std::cout << "clientfd : " << clientfd << " on message" << std::endl;
                clientfd = events[i].data.fd;
                if(clientfd < 0)
                    continue;
                char *buffer = new char[15*1024*1024];
                if (!buffer){
                    std::cout << "malloc buffer failed!" << std::endl;
                    return -1;
                }
                std::cout << "malloc buffer success。。。" << std::endl;
                memset(buffer, 0, 15*1024*1024);
                int total = 12441600;
                while (1)
                {
                    // int ret;
                    memset(buffer, 0, 15*1024*1024);
                    int sendBytes = 0, recvBytes = 0, ret, offset = 0;
                    // 接收客户端的请求报文。
                    while (recvBytes < total)
                    {
                        if ( (ret = recv(clientfd, buffer+offset, total-recvBytes, 0)) == 0) // 接收服务端的回应报文。
                        {
                            printf("ret = %d , client disconected!!!\n", ret); 
                            return -1;
                        }
                        else if (ret < 0){
                            if (errno != EWOULDBLOCK || errno != EAGAIN){
                                std::cout << "read error" << std::endl;
                            }
                            // return -1;
                        }
                        else
                        {
                            offset += ret;
                            recvBytes += ret;
                            // printf("curent recv :%d bytes\n", ret);
                            // break;
                        }
                    }
                    printf("从客户端总共接收:%d 字节的数据\n", recvBytes);
                    ret = 0;
                    offset = 0;
                    while (sendBytes < total)
                    {
                        ret = send(clientfd, buffer + offset, total-sendBytes, 0);
                        if (ret == 0) // 向服务端发送请求报文。
                        { 
                            perror("send"); 
                            break; 
                        }else if (ret == ANET_ERR){
                            if (errno == EWOULDBLOCK || errno == EAGAIN)
                            {
                                // std::cout << "send kernal buffer full, retrying..." << std::endl;
                                continue;
                            } else {
                                // 连接出错,关闭connfd
                                close(clientfd);
                                return -1;
                            }
                        }
                        else
                        {
                            sendBytes += ret;
                            offset += ret;
                            // printf("已发送:%d 字节的数据\n", ret);
                        }
                    }
                    printf("回复客户端总共:%d 字节的数据\n", sendBytes);  
                }
                delete buffer;
            }
        }    
    }
    close(listenfd);
    return 0;
}

客户端代码:client_noblock.cpp

/**
 * Linux 下使用poll实现异步的connect,linux_nonblocking_connect_poll.cpp
 * zhangyl 2019.03.16
 */
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <poll.h>
#include <iostream>
#include <string.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <iostream>
#include <fstream>
#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT     3000
#define ANET_OK  0
#define ANET_ERR -1
int main(int argc, char* argv[])
{
    //1.创建一个socket
    int connfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(connfd >= 0);
    if (connfd == -1)
    {
        std::cout << "create client socket error." << std::endl;
        return -1;
    }
  //将 connfd 设置成非阻塞模式 
  int oldSocketFlag = fcntl(connfd, F_GETFL, 0);
  int newSocketFlag = oldSocketFlag | O_NONBLOCK;
  if (fcntl(connfd, F_SETFL,  newSocketFlag) == -1)
  {
    close(connfd);
    std::cout << "set socket to nonblock error." << std::endl;
    return -1;
  }
    //2.连接服务器
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
    serveraddr.sin_port = htons(SERVER_PORT);
  for (;;)
  {
        // 由于connfd是非阻塞模式,无论连接是否建立成功,connect函数都会立即返回。
        // 异步connect 或者叫 非阻塞connect
    int ret = connect(connfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    if (ret == 0)
    {
      std::cout << "connect to server successfully." << std::endl;
      // close(connfd);
      return 0;
    } 
    else if (ret == ANET_ERR) 
    {
      if (errno == EINTR)
      {
        //connect 动作被信号中断,重试connect
        std::cout << "connecting interruptted by signal, try again." << std::endl;
        continue;
      } else if (errno == EINPROGRESS)
      {
                std::cout << "connecting..." << std::endl;
        //连接正在尝试中
        break;
      } else {
        // 连接出错,关闭connfd
        close(connfd);
        return -1;
      }
    }
  }
    // 使用poll函数判断socket是否可写,因为对于客户端connfd来说,connect的动作就是写操作
  pollfd event;
  event.fd = connfd;
  event.events = POLLOUT;
  int timeout = 3000;
  if (poll(&event, 1, timeout) != 1)
  {
    close(connfd);
    std::cout << "[poll] connect to server error." << std::endl;
    return -1;
  }
  if (!(event.revents & POLLOUT))
  {
    close(connfd);
    std::cout << "[POLLOUT] connect to server error." << std::endl;
    return -1;
  }
  int ret;
    socklen_t len = static_cast<socklen_t>(sizeof ret);
    // 使用 getsockopt 函数判断此时 connfd是否有错误
    if (::getsockopt(connfd, SOL_SOCKET, SO_ERROR, &ret, &len) < 0)
        return -1;
    if (ret == 0){
        std::cout << "connect to server successfully." << std::endl;
    } else{
        std::cout << "connect to server error." << std::endl;
    }
    // 当connfd为非阻塞时,即使对方的窗口太小导致发送失败,send函数也会立即返回;即使己方的接收缓冲区没有数据
    // recv函数也会立即返回,此时的返回值都是-1,错误码errno是:EWOULDBLOCK或EAGAIN
    // ret > 0  send或recv成功的字节数
    // ret == 0 对端关闭连接
    // ret < 0 对端Tcp窗口太小或者缓冲器无数据可读;或者出错或者被信号中断
    char* buf = new char[15*1024*1024];
    if (!buf){
        std::cout << "malloc failed!" << std::endl;
        return -1;
    }
    memset(buf, 0, 15*1024*1024);
    std::ifstream file;
    std::string fileName = "test.yuv";
    file.open (fileName.c_str(), std::ios::binary);
    // 获取filestr对应buffer对象的指针
    std::filebuf *pbuf = file.rdbuf();
    // 获取文件大小
    size_t fileSize = pbuf->pubseekoff (0,std::ios::end,std::ios::in);
    pbuf->sgetn (buf, fileSize);
    file.close();
    int total = fileSize;
    std::cout << "============== prepare total data.size [" << total << "] =============="<< std::endl;
  // 第3步:与服务端通信,发送一个报文后等待回复,然后再发下一个报文。
  for (int i = 0; i < 1; i++)
  {
        memset(buf, 0, 15*1024*1024);
    int sendBytes = 0, recvBytes = 0, ret, offset = 0;
        while (sendBytes < total)
        {
            ret = send(connfd, buf + offset, total-sendBytes, 0);
            if (ret == 0) // 向服务端发送请求报文。
            { 
                perror("send"); 
                break; 
            }else if (ret == ANET_ERR){
                if (errno == EWOULDBLOCK || errno == EAGAIN)
                {
                    // std::cout << "send kernal buffer full, retrying..." << std::endl;
                    continue;
                } else {
                    // 连接出错,关闭connfd
                    close(connfd);
                    return -1;
                }
            }
            else
            {
                sendBytes += ret;
                offset += ret;
                // printf("已发送:%d 字节的数据\n", ret);
            }
        }
        std::cout << "============== send total [" << sendBytes <<" ] success. =============="<< std::endl;
        memset(buf, 0, 15*1024*1024);
    ret = 0;
        offset = 0;
        while (recvBytes < total)
        {
            ret = recv(connfd, buf+offset, total-recvBytes, 0);
            if (ret == 0) // 向服务端发送请求报文。
            { 
                perror("recv"); 
                std::cout << "other end close"<< std::endl;
                break; 
            }else if (ret == ANET_ERR){
                if (errno == EWOULDBLOCK || errno == EAGAIN)
                {
                    // std::cout << "recv kernal buffer full, retrying..." << std::endl;
                    continue;
                } else {
                    // 连接出错,关闭connfd
                    close(connfd);
                    return -1;
                }
            }
            else
            {
                recvBytes += ret;
                offset += ret;
                // printf("从服务端接收:%d 字节的数据\n", ret);
            }
        }
        std::cout << "###################### recv total [" << recvBytes <<" ] success. ######################"<< std::endl;
    // sleep(1);
  }
    delete buf;
  //5. 关闭socket
  close(connfd);
    return 0;
}


推荐一个零声学院免费教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习:

相关文章
|
2月前
|
机器学习/深度学习 算法 调度
14种智能算法优化BP神经网络(14种方法)实现数据预测分类研究(Matlab代码实现)
14种智能算法优化BP神经网络(14种方法)实现数据预测分类研究(Matlab代码实现)
267 0
|
2月前
|
编解码 网络协议 安全
Socket-TCP 上位机下位机数据交互框架
Socket-TCP 上位机下位机数据交互框架
133 0
|
3月前
|
机器学习/深度学习 数据采集 传感器
【故障诊断】基于matlab BP神经网络电机数据特征提取与故障诊断研究(Matlab代码实现)
【故障诊断】基于matlab BP神经网络电机数据特征提取与故障诊断研究(Matlab代码实现)
112 0
|
4月前
|
数据采集 存储 算法
MyEMS 开源能源管理系统:基于 4G 无线传感网络的能源数据闭环管理方案
MyEMS 是开源能源管理领域的标杆解决方案,采用 Python、Django 与 React 技术栈,具备模块化架构与跨平台兼容性。系统涵盖能源数据治理、设备管理、工单流转与智能控制四大核心功能,结合高精度 4G 无线计量仪表,实现高效数据采集与边缘计算。方案部署灵活、安全性高,助力企业实现能源数字化与碳减排目标。
116 0
|
5月前
|
Python
LBA-ECO CD-32 通量塔网络数据汇编,巴西亚马逊:1999-2006,V2
该数据集汇集了1999年至2006年间巴西亚马逊地区九座观测塔的碳和能量通量、气象、辐射等多类数据,涵盖小时至月度时间步长。作为第二版汇编,数据经过协调与质量控制,扩展了第一版内容,并新增生态系统呼吸等相关计算数据,支持综合研究与模型合成。数据以36个制表符分隔文本文件形式提供,配套PDF说明文件,适用于生态与气候研究。引用来源为Restrepo-Coupe等人(2021)。
69 1
|
18天前
|
机器学习/深度学习 人工智能 算法
【基于TTNRBO优化DBN回归预测】基于瞬态三角牛顿-拉夫逊优化算法(TTNRBO)优化深度信念网络(DBN)数据回归预测研究(Matlab代码实现)
【基于TTNRBO优化DBN回归预测】基于瞬态三角牛顿-拉夫逊优化算法(TTNRBO)优化深度信念网络(DBN)数据回归预测研究(Matlab代码实现)
|
2月前
|
机器学习/深度学习 数据采集 运维
改进的遗传算法优化的BP神经网络用于电厂数据的异常检测和故障诊断
改进的遗传算法优化的BP神经网络用于电厂数据的异常检测和故障诊断
|
4月前
|
存储 监控 算法
基于 Python 跳表算法的局域网网络监控软件动态数据索引优化策略研究
局域网网络监控软件需高效处理终端行为数据,跳表作为一种基于概率平衡的动态数据结构,具备高效的插入、删除与查询性能(平均时间复杂度为O(log n)),适用于高频数据写入和随机查询场景。本文深入解析跳表原理,探讨其在局域网监控中的适配性,并提供基于Python的完整实现方案,优化终端会话管理,提升系统响应性能。
111 4
|
5月前
|
开发者
鸿蒙仓颉语言开发教程:网络请求和数据解析
本文介绍了在仓颉开发语言中实现网络请求的方法,以购物应用的分类列表为例,详细讲解了从权限配置、发起请求到数据解析的全过程。通过示例代码,帮助开发者快速掌握如何在网络请求中处理数据并展示到页面上,减少开发中的摸索成本。
鸿蒙仓颉语言开发教程:网络请求和数据解析
|
6月前
|
Linux 数据安全/隐私保护
使用Linux命令行接入无线网络Wi-Fi的示例。
现在,你已经使用命令行成功地连接到 Wi-Fi 网络了。这两个示例涵盖了用 `nmcli` 和 `wpa_supplicant` 连接无线网络的常见场景,让你能够不依赖图形化界面来完成这个任务。在日常使用中熟练掌握这些基本操作能增强你对 Linux 系统的理解,帮助你更有效地处理各种问题。
402 12

热门文章

最新文章