您当前的位置: 首页 >  phymat.nico

高并发的epoll+线程池,epoll在线程池内

phymat.nico 发布时间:2017-12-26 01:18:06 ,浏览量:4

epoll是linux下高并发服务器的完美方案,因为是基于事件触发的,所以比select快的不只是一个数量级。
单线程epoll,触发量可达到15000,但是加上业务后,因为大多数业务都与数据库打交道,所以就会存在阻塞的情况,这个时候就必须用多线程来提速。
 
epoll在线程池内,测试结果2000个/s
增加了网络断线后的无效socket检测。
 
测试工具:stressmark
因为加了适用与ab的代码,所以也可以适用ab进行压力测试。
char buf[1000] = {0};
sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
send(socketfd,buf, strlen(buf),0);
 
 
 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include


//stl head

#include //包含hash_map 的头文件

//#include
//stl的map

using namespace std ; //std 命名空间

using namespace __gnu_cxx ; //而hash_map是在__gnu_cxx的命名空间里的



int init_thread_pool ( int threadNum ) ;
void *epoll_loop ( void * para ) ;
void *check_connect_timeout ( void * para ) ;


struct sockStruct
{
     time_t time ;

     unsigned int * recvBuf ;
} ;

//hash-map

//hash_map        sock_map;

hash_map         sock_map ;

 
# define MAXRECVBUF 4096
# define MAXBUF MAXRECVBUF +10

int fd_Setnonblocking ( int fd )
{
     int op ;
 
    op =fcntl (fd ,F_GETFL ,0 ) ;
    fcntl (fd ,F_SETFL ,op |O_NONBLOCK ) ;
 
     return op ;
}
 
void on_sigint ( int signal )
{
     exit (0 ) ;
}
 
/*
handle_message - 处理每个 socket 上的消息收发
*/
int handle_message ( int new_fd )
{
     char buf [MAXBUF + 1 ] ;
     char sendbuf [MAXBUF +1 ] ;
     int len ;
     /* 开始处理每个新连接上的数据收发 */
    bzero (buf , MAXBUF + 1 ) ;
     /* 接收客户端的消息 */
     //len = recv(new_fd, buf, MAXBUF, 0);



     int nRecvBuf = MAXRECVBUF ; //设置为32K

     setsockopt (new_fd , SOL_SOCKET , SO_RCVBUF , ( const char * ) &nRecvBuf , sizeof ( int ) ) ;
    len = recv (new_fd , &buf , MAXBUF ,0 ) ;

     //--------------------------------------------------------------------------------------------

     //这块为了使用ab测试

     char bufSend [1000 ] = {0 } ;
     sprintf (bufSend , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
     send (new_fd ,bufSend , strlen (buf ) ,0 ) ;

     //--------------------------------------------------------------------------------------------


     if (len > 0 ) {

         //printf ("%d接收消息成功:'%s',共%d个字节的数据\n", new_fd, buf, len);


         //hash-map

        
        hash_map : : iterator it_find ;
        it_find = sock_map . find (new_fd ) ;
         if (it_find = = sock_map .end ( ) ) {
             //新的网络连接,申请新的接收缓冲区,并放入map中

             //printf("new socket %d\n", new_fd);


            sockStruct newSockStruct ;
            newSockStruct . time = time ( ( time_t * )0 ) ;
            newSockStruct .recvBuf = ( unsigned int * ) malloc (1000 ) ;
             memset (newSockStruct .recvBuf , 0 , 1000 ) ;
             strcat ( ( char * )newSockStruct .recvBuf , buf ) ;
            sock_map .insert ( pair (new_fd , newSockStruct ) ) ;
         } else {
             //网络连接已经存在,找到对应的数据缓冲区,将接收到的数据拼接到数据缓冲区中

             //printf("socket %d exist!\n", it_find->first);


             (it_find - >second ) . time = time ( ( time_t * )0 ) ;                 //时间更改

             char * bufSockMap = ( char * ) (it_find - >second ) .recvBuf ;     //数据存储


             strcat (bufSockMap , buf ) ;
             //printf("bufSockMap:%s\n", bufSockMap);

         }


     }
     else {
         if (len second ) .recvBuf ) ;
    sock_map .erase (it_find ) ;

     close (new_fd ) ;
     return len ;
}


     int listenfd ;
     int sock_op =1 ;
     struct sockaddr_in address ;
     struct epoll_event event ;
     struct epoll_event events [1024 ] ;
     int epfd ;
     int n ;
     int i ;
     char buf [512 ] ;
     int off ;
     int result ;
     char *p ;

int main ( int argc , char * argv [ ] )
{

    init_thread_pool (1 ) ;

     signal (SIGPIPE , SIG_IGN ) ;
     signal (SIGCHLD , SIG_IGN ) ;
     signal ( SIGINT , &on_sigint ) ;
    listenfd = socket ( AF_INET , SOCK_STREAM ,0 ) ;
     setsockopt (listenfd ,SOL_SOCKET ,SO_REUSEADDR , &sock_op , sizeof (sock_op ) ) ;
 
     memset ( &address ,0 , sizeof (address ) ) ;
    address .sin_addr .s_addr = htonl ( INADDR_ANY ) ;
    address .sin_port = htons (8006 ) ;
     bind (listenfd , ( struct sockaddr * ) &address , sizeof (address ) ) ;
     listen (listenfd ,1024 ) ;
    fd_Setnonblocking (listenfd ) ;
 
    epfd =epoll_create (65535 ) ;
     memset ( &event ,0 , sizeof (event ) ) ;
    event .data .fd =listenfd ;
    event .events =EPOLLIN |EPOLLET ;
    epoll_ctl (epfd ,EPOLL_CTL_ADD ,listenfd , &event ) ;

     while (1 ) {
         sleep (1000 ) ;
     }
     return 0 ;
}

/*************************************************
* Function: * init_thread_pool
* Description: * 初始化线程
* Input: * threadNum:用于处理epoll的线程数
* Output: *
* Others: * 此函数为静态static函数,
*************************************************/
int init_thread_pool ( int threadNum )
{
     int i ,ret ;

     pthread_t threadId ;

     //初始化epoll线程池

     for ( i = 0 ; i 0 )
         {
             for (i =0 ;i 0 )
                         {
                            fd_Setnonblocking (event .data .fd ) ;
                            event .events =EPOLLIN |EPOLLET ;
                            epoll_ctl (epfd ,EPOLL_CTL_ADD ,event .data .fd , &event ) ;
                         }
                         else
                         {
                             if ( errno = =EAGAIN )
                             break ;
                         }
                     }
                 }
                 else
                 {
                     if (events [i ] .events &EPOLLIN )
                     {
                         //handle_message(events[i].data.fd);


                         char recvBuf [1024 ] = {0 } ;

                         int ret = 999 ;

                         int rs = 1 ;


                         while (rs )
                         {
                            ret = recv (events [n ] .data .fd ,recvBuf ,1024 ,0 ) ; // 接受客户端消息

                             if (ret 0 ) {

                            count111 + + ;



                             struct tm *today ;
                             time_t ltime ;
                             time ( &nowtime ) ;

                             if (nowtime ! = oldtime ) {
                                 printf ( "%d\n" , count111 ) ;
                                oldtime = nowtime ;
                                count111 = 0 ;
                             }


                             char buf [1000 ] = {0 } ;
                             sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
                             send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;


                             //    CGelsServer Gelsserver;

                             //    Gelsserver.handle_message(events[i].data.fd);

                         }


                        epoll_ctl (epfd , EPOLL_CTL_DEL , events [i ] .data .fd , &event ) ;
                         close (events [i ] .data .fd ) ;

                     }
                     else if (events [i ] .events &EPOLLOUT )
                     {
                         sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
                         send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;
                         /*
                        if(p!=NULL)
                        {
                            free(p);
                            p=NULL;
                        }
                        */
                         close (events [i ] .data .fd ) ;
                     }
                     else
                     {
                         close (events [i ] .data .fd ) ;
                     }
                 }
             }
         }
     }

}
/*************************************************
* Function: * check_connect_timeout
* Description: * 检测长时间没反应的网络连接,并关闭删除
* Input: *
* Output: *
* Others: *
*************************************************/
void *check_connect_timeout ( void * para )
{
    hash_map : : iterator it_find ;
     for (it_find = sock_map .begin ( ) ; it_find!=sock_map .end ( ) ; + +it_find ) {
         if ( time ( ( time_t * )0 ) - (it_find - >second ) . time > 120 ) {                 //时间更改


             free ( (it_find - >second ) .recvBuf ) ;
            sock_map .erase (it_find ) ;

             close (it_find - >first ) ;
         }
     }

}
关注
打赏
查看更多评论

phymat.nico

暂无认证

  • 4浏览

    0关注

    1946博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录