加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

进程池的实现过程详解

发布时间:2020-12-14 04:32:30 所属栏目:大数据 来源:网络整理
导读:进程池 父进程 1、定义数据结构pChild,申请子进程数目的结构体空间 2、通过循环,socketpair创建全双工管道,创建子进程,将子进程pid,管道对端,是否忙碌等信息存储 3、socket,bind,listen,对应的端口处于监听状态 netstat 4、epoll_create创建epfd,监

进程池

父进程

1、定义数据结构pChild,申请子进程数目的结构体空间
2、通过循环,socketpair创建全双工管道,创建子进程,将子进程pid,管道对端,是否忙碌等信息存储
3、socket,bind,listen,对应的端口处于监听状态 netstat
4、epoll_create创建epfd,监控socketFd和所有子进程的管道对端
5、while(1)循环 epoll_wait等待客户端的请求及子进程是否有通知
如果socketFd可读,说明是客户端有连接请求,accept对应连接请求,得到new_fd,循环遍历,找到非忙碌的子进程,将new_fd发送给对应子进程,将对应子进程标识为忙碌,然后父进程关闭new_fd。

判断就绪的描述符 是哪个子进程的管道对端,就将对应子进程标识为非忙碌,同时读出管道内数据。

子进程的流程

while(1)
{
1、接收任务,得到newFd
2、通过newFd给客户端发送文件
3、关闭newFd
4、通过写管道,通知父进程完成文件下载任务
}

1、 send_recv_syn 同步机制

设置发送端1000,接收端1500,但接收端因为网络原因,不能保证每个包都是接收到1500字节。

tcp_client.c

#include <func.h>
#define N 1048576
int main(int argc,char* argv[])
{
    ARGS_CHECK(argc,3);
    int socketFd;
    socketFd=socket(AF_INET,SOCK_STREAM,0);
    ERROR_CHECK(socketFd,-1,"socket");
    struct sockaddr_in ser;
    bzero(&ser,sizeof(ser));
    ser.sin_family=AF_INET;
    ser.sin_port=htons(atoi(argv[2]));
    ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
    int ret;
    ret=connect(socketFd,(struct sockaddr*)&ser,sizeof(ser));
    ERROR_CHECK(ret,"connect");
    printf("connect successn");
    int i;
    int total=0;
    char buf[1000]={0};
    for(i=0;i<N;i++)
    {
        ret=send(socketFd,buf,sizeof(buf),MSG_DONTWAIT);
        if(-1==ret)
        {
            //printf("errno=%dn",errno);
            printf("total=%dn",total);
            return -1;
        }
        total += ret;
        printf("ret = %dn",ret);
    }
    printf("total send = %dn",total);
    close(socketFd);
}

tcp_server.c

#include <func.h>

int main(int argc,sizeof(ser));
    ser.sin_family=AF_INET;
    ser.sin_port=htons(atoi(argv[2]));
    ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
    int ret;
    ret=bind(socketFd,"bind");
    listen(socketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息
    int new_fd;
    struct sockaddr_in client;
    bzero(&client,sizeof(client));
    int addrlen=sizeof(client);
    new_fd=accept(socketFd,(struct sockaddr*)&client,&addrlen);
    ERROR_CHECK(new_fd,"accept");
    printf("client ip=%s,port=%dn",inet_ntoa(client.sin_addr),ntohs(client.sin_port));
    char buf[1500]={0};
    int total = 0;
    while(1){
        ret = recv(new_fd,0);
        total = total + ret;
        printf("ret = %dn",ret);
    };
    printf("total recv = %dn",total);
    close(new_fd);
    close(socketFd);
}

2、自定义设置函数recvCycle改进问题

#include "function.h"

int recvCycle(int newFd,void* p,int len){
    int total = 0;
    int ret;
    char *pStart = (char*)p;   
    while(total < len){
        ret = recv(newFd,pStart + total,len - total,0);
        total = total + ret;
    }
    return 0;
}

3、客户端中显示下载进度 time/slice两种方法

time型

#include "function.h"

int main(int argc,"connect");
    printf("connect successn");
    int dataLen;
    char buf[1000]={0};
    //接收文件名
    recvCycle(socketFd,&dataLen,4);
    recvCycle(socketFd,dataLen);
    int fd;
    fd=open(buf,O_CREAT|O_WRONLY,0666);
    ERROR_CHECK(fd,"open");
    //接文件大小
    off_t fileSize = 0;//文件大小off_t 长整型
    recvCycle(socketFd,&fileSize,dataLen);
    time_t start,now;
    //接受文件内容
    start = now = time(NULL);
    int downLoadSize = 0;
    while(1)
    {
        recvCycle(socketFd,4);
        if(dataLen>0)
        {
            recvCycle(socketFd,dataLen);
            write(fd,dataLen);
            downLoadSize+=dataLen;
            time(&now);
            if(now-start>=1){
                printf("r%5.2f%%",(float)downLoadSize / fileSize * 100);
                fflush(stdout);
                start = now;
            }
        }else{
            printf("r100%%         n");
            break;
        }
    }
    close(fd);
    close(socketFd);
    return 0;
}

slice型

#include "function.h"

int main(int argc,"open");
    //接文件大小
    off_t fileSize = 0,oldSize = 0,sliceSize;//文件大小off_t 长整型
    off_t downLoadSize = 0;
    recvCycle(socketFd,dataLen);
    //接受文件内容
    sliceSize = fileSize / 10000;
    while(1)
    {
        recvCycle(socketFd,dataLen);
            downLoadSize+=dataLen;
            if(downLoadSize - oldSize > sliceSize){
                printf("r%5.2f%%",(float)downLoadSize / fileSize * 100);
                fflush(stdout);
                oldSize = downLoadSize;
            }
        }else{
            printf("r100%%         n");
            break;
        }
    }
    close(fd);
    close(socketFd);
    return 0;
}

4、设置异常情况

(1)客户端在下载中突然断开,原先的服务端会一直死循环打印

父进程处于S和R状态来回切换,因为子进程断开后处于Z状态(僵尸状态),而fd未关闭会一直使epoll_wait的返回值为1。

原因:

(a)epoll_wait是每次监控到僵尸子进程为空闲。

(b)父进程和子进程分别掌握管道的一端,当子进程(客户端)关闭后,管道会被标记为一直可读,于是会main函数会一直在while(1)中不退出,read返回值为0,并一直打印 "child is not busy"。

方法:

修改tran_n.c,设置当send返回值为-1时及时退出。

tran_n.c

#include "function.h"

int tranFile(int newFd){
    Train_t train;
    int ret;
    //发送文件名
    train.dataLen = strlen(FILENAME);
    strcpy(train.buf,FILENAME);
    int fd = open(FILENAME,O_RDONLY);
    ret = send(newFd,&train,4 + train.dataLen,0);
    ERROR_CHECK(ret,"send");
    //发文件大小
    struct stat buf;
    fstat(fd,&buf);
    train.dataLen = sizeof(buf.st_size);
    memcpy(train.buf,&buf.st_size,train.dataLen);
    ret = send(newFd,"send");
    //发文件内容 
    while((train.dataLen = read(fd,train.buf,sizeof(train.buf)))){
        ret = send(newFd,0);
        ERROR_CHECK(ret,"send");
    }
    //发送结束标志
    ret = send(newFd,4,"send");
    return 0;
}

修改后:

(2)服务器突然断开,客户端全部死循环

原因:recv返回值一直为0,total的值一直小于len。

方法:需要在函数后判断返回值是否为0,如果为0直接break返回-1

修改后:

(3)在改进(2)问题后,服务器断开后,再次执行./process_pool_server 192.168.3.160 2000 5会出现如下异常:

原因:TCP断开连接后,恢复同一个端口需要一定时间,此时如果直接再重连同个端口就会出现上面的错误

方法在tcpInit中修改代码,设置setsockopt参数reuse即可。

#include "function.h"

int tcpInit(int *pSocketFd,char *ip,char *port){
    int socketFd;
    socketFd = socket(AF_INET,"socket");
    int ret,reuse = 1;
    ret = setsockopt(socketFd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(int));
    ERROR_CHECK(ret,"setsockopt");
    struct sockaddr_in ser;
    bzero(&ser,sizeof(ser));
    ser.sin_family = AF_INET;
    ser.sin_port = htons(atoi(port));
    ser.sin_addr.s_addr = inet_addr(ip);//点分十进制转为32位的网络字节序
    ret = bind(socketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息
    *pSocketFd = socketFd;
    return 0;
}

5、服务器要升级,通知客户端有序退出

方法:异步拉起同步方法,设置一个管道,在同一进程中既可读又可写,当信号产生时写管道exitFds[1],并让exitFds[0]加入epoll监控events的集合,监控是否有管道中的读描述符是否可读(exitFds[0])。

(a)如果业务不重要,直接暴力kill

main.c

#include "function.h"

int exitFds[2]; //全局变量,用于信号处理的管道,同一管道的目的是在同一进程中既可读又可写

void sigFunc(int sigNum){
    write(exitFds[1],&sigNum,1);//写一个字节
}

int main(int argc,char* argv[]){
    ARGS_CHECK(argc,4);
    pipe(exitFds);
    signal(SIGUSR1,sigFunc);
    int ret;
    int childNum = atoi(argv[3]);
    Process_Data *pChild = (Process_Data*)calloc(childNum,sizeof(Process_Data));
    makeChild(pChild,childNum);//创建子进程
    int socketFd;
    tcpInit(&socketFd,argv[1],argv[2]);//建立TCP连接
    int epfd;
    epfd = epoll_create(1);//创建一个句柄,占用一个文件描述符,参数表示需要监控的数目
    struct epoll_event event,*evs;
    evs = (struct epoll_event*)calloc(childNum + 2,sizeof(struct epoll_event));
    event.events = EPOLLIN;
    event.data.fd = socketFd;
    ret = epoll_ctl(epfd,EPOLL_CTL_ADD,socketFd,&event);//监听socketFd
    ERROR_CHECK(ret,"epoll_ctl");
    event.data.fd = exitFds[0];
    ret = epoll_ctl(epfd,exitFds[0],&event);
    ERROR_CHECK(ret,"epoll_ctl");
    int i,j;
    for(i = 0; i < childNum ; i++){
        event.data.fd = pChild[i].fd;
        ret = epoll_ctl(epfd,pChild[i].fd,&event);//将要监控的子进程的fd加入
        ERROR_CHECK(ret,"epoll_ctl");
    }
    int readyFdNum; 
    int newFd;
    //int count = 0;
    while(1){
        readyFdNum = epoll_wait(epfd,evs,childNum + 2,-1);
        //printf("count = %d,readyFdNum = %dn",count++,readyFdNum);
        //epoll_wait等待事件的产生,参数evs用来从内核得到事件的集合
        //用childNum + 1告知内核这个events有多大
        for(i = 0; i < readyFdNum; i++){
            //有客户端连入
            if(evs[i].events == EPOLLIN && evs[i].data.fd == socketFd){
                //断开后不会进入该循环
                newFd = accept(socketFd,NULL,NULL);//不保存远程主机信息
                for(j = 0; j < childNum; j++){
                    if(!pChild[j].busy){ //找到非忙碌的子进程,发任务(文件描述符)
                        sendFd(pChild[j].fd,newFd);
                        pChild[j].busy = 1;
                        printf("%d child is busyn",pChild[j].pid);
                        break;
                    }
                }   
                close(newFd);   //限制客户端只下载一次后就关闭,否则newFd的引用计数为2,即父进程和子进程都可以读取数据
            }
            if(evs[i].events == EPOLLIN && evs[i].data.fd == exitFds[0]){
                printf("start exitn");
                close(socketFd);
                //两种方式:1、暴力Kill 2、同步退出机制
                //暴力kill
                for(j = 0;j < childNum; j++){
                    kill(pChild[j].pid,9);
                }
                for(j = 0;j < childNum; j++){
                    wait(NULL);
                }
                return 0;
            }
            for(j = 0; j < childNum; j++){
                if(evs[i].data.fd == pChild[j].fd){  //遍历所有子进程的fd
                    //判断就绪描述符是哪个子进程的管道对端,说明子进程已完成任务,就将对应子进程标记为非忙碌,并读出管道内容。
                    //printf("%d %dn",evs[i].data.fd,pChild[j].fd);
                    read(pChild[j].fd,&ret,1);//对端写一个字节这边读一个字节标记已完成任务,如果数据不读出,则会一直可读状态
             //       if(0 == ret2){
             //           return -1;
             //       }
                    pChild[j].busy = 0;
                    //printf("ret2 = %dn",ret2);
                    printf("%d child is not busyn",pChild[j].pid);
                }
            }
           // printf("%dn",count++);
           // sleep(3);
        }
    }
    return 0;
}

用10号信号Kill父进程及父进程状况:

子进程状况:

(b)如果业务重要,需要退出

方法一:sigprocmask屏蔽信号加保护 ..........................sigprocmask解除保护

方法二:同步退出机制,设置一个exitFlag,某个子进程完成当前任务后下次接收一个指定fd和exitFlag,若exitFlag为1则继续执行任务,如果exitFlag为0则有序退出,最大退出时间是所有子进程执行完当前任务的剩余最大时长。

效果如下:

具体代码见最后完整版。

6、服务端因其他因素挂掉后自动重启的设计方法

方法:父进程监控子进程,在子进程非正常退出后父进程重新启用子进程

这里用三个名字来代表各个亲缘关系:爷进程、父进程、5个子进程

while(fork()) //爷进程进入循环
{
   int status;
   wait(&status); //获取退出码
   if(WIFEXITED(status)) //如果父进程是正常退出
   {
      printf("child exit normaln");
      exit(0);//爷进程退出
   }
   //父进程非正常退出,重新回while循环创建父进程
}

效果如下:

当process_pool_server执行后,会出现1+1+5个进程,这时候给父进程发送信号通知其退出(不是给爷进程发信号)

function.h

#include <errno.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <setjmp.h>
#include <signal.h>
#include <sys/msg.h>
#include <strings.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <syslog.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/mman.h>
#include <pwd.h>
#include <grp.h>
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/types.h>
#include <dirent.h>
#include <fcntl.h>
#define ARGS_CHECK(argc,val) {if(argc!=val)     {printf("error argsn");return -1;}}
#define ERROR_CHECK(ret,retval,funcName) {if(ret==retval)     {printf("LINE %d Function ERROR ",__LINE__);fflush(stdout);perror(funcName);return -1;}}
#define THREAD_ERROR_CHECK(ret,funcName) {if(ret!=0)     {printf("%s:%sn",funcName,strerror(ret));return -1;}}
//管理每一个子进程的数据结构
typedef struct{
    pid_t pid;//子进程的pid
    int fd;//子进程的管道对端
    short busy;//子进程是否忙碌,0代表非忙碌,1代表忙碌
}Process_Data;

typedef struct{
    int dataLen;
    char buf[1000];
}Train_t;
#define FILENAME "file"
int makeChild(Process_Data*,int);
int tcpInit(int*,char*,char*);
int childHandle(int);
int sendFd(int,int,int);
int recvFd(int,int*,int*);
int tranFile(int);

main.c

#include "function.h"

int exitFds[2]; //全局变量,用于信号处理的管道,同一管道的目的是在同一进程中既可读又可写

void sigFunc(int sigNum){
    write(exitFds[1],char* argv[]){
    while(fork()) //爷进程进入循环
    {
        int status;
        wait(&status); //获取退出码
        if(WIFEXITED(status)) //如果父进程是正常退出
        {
            printf("child exit normaln");
            exit(0);//爷进程退出

        }
        //父进程非正常退出,重新回while循环创建父进程
    }
    ARGS_CHECK(argc,newFd,1);
                        pChild[j].busy = 1;
                        printf("%d child is busyn",pChild[j].pid);
                        break;
                    }
                }   
                close(newFd);   //限制客户端只下载一次后就关闭,否则newFd的引用计数为2,即父进程和子进程都可以读取数据
            }
            if(evs[i].events == EPOLLIN && evs[i].data.fd == exitFds[0]){
                printf("start exitn");
                close(socketFd);
                //两种方式:1、暴力Kill 2、同步退出机制
                //暴力kill
                for(j = 0;j < childNum; j++){
                    //kill(pChild[j].pid,9);
                    sendFd(pChild[j].fd,0);//给所有的子进程发送0号描述符(没啥用),exitFlag为0
                }
                for(j = 0;j < childNum; j++){
                    wait(NULL);
                }
                return 0;
            }
            for(j = 0; j < childNum; j++){
                if(evs[i].data.fd == pChild[j].fd){  //遍历所有子进程的fd
                    //判断就绪描述符是哪个子进程的管道对端,说明子进程已完成任务,就将对应子进程标记为非忙碌,并读出管道内容。
                    //printf("%d %dn",1);//对端写一个字节这边读一个字节标记已完成任务,如果数据不读出,则会一直可读状态
                    //       if(0 == ret2){
                    //           return -1;
                    //       }
                    pChild[j].busy = 0;
                    //printf("ret2 = %dn",pChild[j].pid);
                }
            }
            // printf("%dn",count++);
            // sleep(3);
        }
    }
    return 0;
}

child.c

#include "function.h"
//服务端
int makeChild(Process_Data *pChild,int childNum)
{
    int i;
    pid_t pid;
    int fds[2];
    int ret;
    for(i=0;i < childNum;i++)
    {
        //初始化socketpair类型描述符,与pipe不同,每一端既可读又可写
        ret = socketpair(AF_LOCAL,fds);
        ERROR_CHECK(ret,"socketpair");
        pid = fork();
        if(0 == pid)  //子进程
        {
            close(fds[1]); 
            ret = childHandle(fds[0]);
            if(-1 == ret){
                return -1;
            }
        }
        close(fds[0]); //父进程
        pChild[i].pid = pid;
        pChild[i].fd = fds[1];
        pChild[i].busy = 0;
    }
    return 0;
}

int childHandle(int fd)
{
    int ret;
    int newFd;
    int exitFlag;
    while(1){
        //开5个子进程,newFd为10,因内核控制信息,父子进程共享同一块文件描述符
        recvFd(fd,&newFd,&exitFlag);//接收到任务
#ifdef DEBUG
        printf("newFd = %dn",newFd);
#endif
        if(exitFlag){
            ret = tranFile(newFd);//给客户端发送文件
            printf("I get task %dn",newFd);
            if(-1 == ret){
                printf("tranFile not finish!n");
                continue;
            }
            //newFd的值为10,socketFd为3,有五个子进程管道,为4-8,epfd也占1
            printf("finish send filen");
            close(newFd);
        }
        else{
            exit(0);//不能用break
        }
 //       if(0 == newFd){
 //           printf("conncect failed!n");
 //           return -1;
 //       }
        write(fd,1); //通知父进程非忙碌,写一个字节即可
    }
}

tran_file.c

#include "function.h"

int tranFile(int newFd){
    Train_t train;
    int ret;
    //发送文件名
    train.dataLen = strlen(FILENAME);
    strcpy(train.buf,"send");
    return 0;
}

tcpInit.c

#include "function.h"

int tcpInit(int *pSocketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息
    *pSocketFd = socketFd;
    return 0;
}

send_fd.c

#include "function.h"
int sendFd(int sfd,int fd,int exitFlag)
{
    struct msghdr msg;
    memset(&msg,sizeof(msg));
    struct iovec iov[2];
    char buf2[10]="world";
    iov[0].iov_base= &exitFlag;
    iov[0].iov_len=5;
    iov[1].iov_base=buf2;
    iov[1].iov_len=5;
    msg.msg_iov=iov;
    msg.msg_iovlen=2;
    struct cmsghdr *cmsg;
    int len=CMSG_LEN(sizeof(int));
    cmsg=(struct cmsghdr *)calloc(1,len);
    cmsg->cmsg_len=len;
    cmsg->cmsg_level=SOL_SOCKET;
    cmsg->cmsg_type=SCM_RIGHTS;
    *(int*)CMSG_DATA(cmsg)=fd;
    msg.msg_control=cmsg;
    msg.msg_controllen=len;
    int ret;
    ret=sendmsg(sfd,&msg,"sendmsg");
    return 0;
}
int recvFd(int sfd,int *fd,int *exitFlag)
{
    struct msghdr msg;
    memset(&msg,sizeof(msg));
    struct iovec iov[2];
    char buf2[10];
    iov[0].iov_base=exitFlag;
    iov[0].iov_len=5;
    iov[1].iov_base=buf2;
    iov[1].iov_len=5;
    msg.msg_iov=iov;
    msg.msg_iovlen=2;
    struct cmsghdr *cmsg;
    int len=CMSG_LEN(sizeof(int));
    cmsg=(struct cmsghdr *)calloc(1,len);
    cmsg->cmsg_len=len;
    cmsg->cmsg_level=SOL_SOCKET;
    cmsg->cmsg_type=SCM_RIGHTS;
    msg.msg_control=cmsg;
    msg.msg_controllen=len;
    int ret;
    ret=recvmsg(sfd,"sendmsg");
    *fd=*(int*)CMSG_DATA(cmsg);
    return 0;
}

客户端

client.c

#include "function.h"

int main(int argc,dataLen);
    //接受文件内容
    sliceSize = fileSize / 10000;
    while(1)
    {
        ret = recvCycle(socketFd,4);
        if(-1 == ret){
            printf("n");
            printf("server is update!n");
            break;
        }
        if(dataLen > 0)
        {
            ret = recvCycle(socketFd,dataLen);
            if(-1 == ret){
                printf("n");
                printf("server is update!n");
                break;
            }
            write(fd,dataLen);
            downLoadSize += dataLen;
            if(downLoadSize - oldSize > sliceSize){
                printf("r%5.2f%%",(float)downLoadSize / fileSize * 100);
                fflush(stdout);
                oldSize = downLoadSize;
            }
        }
        else{
            printf("r100%%         n");
            break;
        }
    }
    close(fd);
    close(socketFd);
    return 0;
}

recvCycle.c

#include "function.h"

int recvCycle(int newFd,0);
        //printf("recv ret = %dn",ret);
        if(0 == ret){
            return -1;
        }
        total = total + ret;
    }
    
    return 0;
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读