Skip to content

code learning

xbc112233 edited this page Oct 10, 2019 · 72 revisions

1 支持resueport后的代码逻辑

1.1 ngx_event_process_init是在worker进程fork出来以后,走到的epoll初始化,这里面有几个地方比较关键,代码:

/*
1. 如果不用ngx_use_accept_mutex锁、或者开了reuseport,则直接ngx_add_event,去触发accept
2. 如果不在这里ngx_add_event,则到ngx_process_events_and_timers去抢锁,谁抢到谁去accept
*/
    if (ngx_use_accept_mutex
	#if (NGX_HAVE_REUSEPORT)
        && !ls[i].reuseport
	#endif
       )
    {
        continue;
    }

    if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
        return NGX_ERROR;
    }

1.2 ngx_clone_listening 里面,如果当前是resueport模式,则按照监听的端口和worker进程数量,依次复制socket fd

ngx_int_t
ngx_clone_listening(ngx_conf_t *cf, ngx_listening_t *ls)
{

	#if (NGX_HAVE_REUSEPORT)

    ngx_int_t         n;
    ngx_core_conf_t  *ccf;
    ngx_listening_t   ols;

    if (!ls->reuseport) {
        return NGX_OK;
    }

    ols = *ls;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
                                           ngx_core_module);

    for (n = 1; n < ccf->worker_processes; n++) {

        /* create a socket for each worker process */

        ls = ngx_array_push(&cf->cycle->listening);
        if (ls == NULL) {
            return NGX_ERROR;
        }

        *ls = ols;
        ls->worker = n;
    }
	#endif
    return NGX_OK;
}

2. ngxin初始化http、rtmp套接字

根据配置的http和rtmp端口,创建好listen结构体信息,最后统一到ngx_open_listening_sockets绑定,
  监听,这些都是在父进程里进行的

3. epoll初始化相关

nginx先按照读取的配置,初始化http和rtmp的监听端口套接字。在fork了worker进程以后,
    再初始化epoll模块,然后将已经监听起来的套接字加入epoll全局句柄ep
   (每一个工作进程一个)。fork出子进程后,每个子进程都有一个全局的epoll句柄ep,
    所有事件都注册到这个ep上

4. 模块加载

1. 启动时候加载顺序应该是按照CORE_MODULES配置,生成到ngx_modules数组里依次加载,
2. main函数里面,先执行ngx_init_cycle,listen所有模块的套接字,ngx_conf_handler里,
       执行ngx_command_t 里的set函数, 如ngx_http_block和ngx_rtmp_block,block函数
       里面有各个模块的postconfiguration;
3. ngx_spawn_process里调用fork出worker进程的时候,
   ngx_worker_process_cycle(unix版本)里的
   ngx_worker_process_init 调用每个模块的init_process,这个里面初始化epoll

5. 模块执行顺序

首先是nginx给lua发HTTP的connect和PLAY消息,走到notify模块,然后发RTMP流,走到rtmp_cmd模块

6. 关于ngx_buf_t结构体里的start,pos,last,end

最近定位了一个bug,代码模型如下:
初始化:
tctx->carg[4096];
tctx->barg.start = tctx->barg.pos = tctx->barg.last = tctx->carg;
tctx->barg.end = tctx->carg + sizeof(tctx->carg);
。。。。
barg是一个buf,把他和carg这一个char数组关联起来
使用:
tctx->barg.last = tctx->barg.pos;                //此处bug,没有把pos置回carg的首地址
tctx->barg.last = ngx_snprintf(tctx->barg.last, 
    tctx->barg.end - tctx->barg.last, "Begin=%T", 
    tctx->begin);
问题就在注释处,因为没有把pos置回carg的首地址,所以在netcall模块发包后,pos每次向前移动
16字节(业务逻辑,定长的16),导致carg数组不停的追加"Begin=XX"的16字节字符串,所以每次
运行42分25秒左右,业务中断,因为每16字节的自加,把carg的4096个空间填满了
 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
ps:buf结构体里面,start和end应该是不变的,表示这个buf的定长大小,但pos和last是动态的,
比如openresty里,每次到发送模块的时候,pos会往前递增发送的大小n,而业务层每次填包n字节,
会让last往后移动n

7.一个连接从创建到进入http模块的逻辑

(1) 首先,父进程会读取配置,当读到http时,调用ngx_http_commands的ngx_http_block, 在ngx_http_block函数里,把要监听的端口socket创建好,每个创建好的句柄放到ngx_listening_t *ls里,注册回调函数ngx_http_init_connection, 代码如下:

  static ngx_listening_t *
  ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr)
 {
ngx_listening_t           *ls;
ngx_http_core_loc_conf_t  *clcf;
ngx_http_core_srv_conf_t  *cscf;

//ls是每个要监听的端口的句柄,如80等
ls = ngx_create_listening(cf, &addr->opt.sockaddr.sockaddr,
                          addr->opt.socklen);
if (ls == NULL) {
    return NULL;
}

ls->addr_ntop = 1;

ls->handler = ngx_http_init_connection;//此处注册回调给ls
。。。
}

2) nginx启动后,fork出N个worker进程,每个进程会初始化epoll,调用 ngx_event_process_init,把每个监听的socket注册上accept回调ngx_event_accept 代码:

 for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
    //如果开启reuseport,则只有本worker的ls才往后走(每个ls在clone里都有N个worker)
    if (ls[i].reuseport && ls[i].worker != ngx_worker) {
        continue;
    }
  #endif
  。。。
do {
    i--;

    c[i].data = next;
    c[i].read = &cycle->read_events[i];
    c[i].write = &cycle->write_events[i];
    c[i].fd = (ngx_socket_t) -1;

    next = &c[i];
} while (i);
  rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
                                          : ngx_event_recvmsg;
    。。。
}

(3)当accept触发后,进入ngx_event_accept,从epoll的data指针获取ls:

 void
  ngx_event_accept(ngx_event_t *ev)
 {
 。。。。。。。
lc = ev->data;
ls = lc->listening;
ev->ready = 0;

ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
               "accept on %V, ready: %d", &ls->addr_text, ev->available);

do {
    socklen = sizeof(ngx_sockaddr_t);

  #if (NGX_HAVE_ACCEPT4)
    if (use_accept4) {
        s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
    } else {
        s = accept(lc->fd, &sa.sockaddr, &socklen);
    }
     #else
    s = accept(lc->fd, &sa.sockaddr, &socklen);
 #endif
 。。。。
    ls->handler(c);//调用到ngx_http_init_connection
}

RTMP模块道理也一样的,ngx_rtmp_add_listening里注册ngx_rtmp_init_connection

8.

A. timewait状态的意义 1) 为什么TIME_WAIT状态需要经过2MSL(最大报文段生存时间,一般是30*2)才能返回到CLOSE状态?

   虽然按道理,四个报文都发送完毕,我们可以直接进入CLOSE状态了,但是我们必须假象网络是不可靠的,有可以最后一个ACK丢失。所以
 TIME_WAIT状态就是用来重发可能丢失的ACK报文,判断报文是否丢失主要是根据被动关闭段是否重发fin,如果重发fin,说明ack丢失了。如果发
 RST,说明报文迟缓到达。

 2) 如果上次链接server发的fin报文客户端没有收到,有可能在客户端当前端口复用后再收到这个fin报文,导致连接关闭。为了解决这个问题,
 一个是在timewait状态端口不能拉起,既不用reuseaddr;还有一个是内核校验新连接的syn的seq,如果大于上一个连接的fin的seq,则丢弃fin

B. RST异常终止连接,参见第十部分

C. 检测半连接

9. recv返回值

n = c->recv(c, b->last, size);
//EAGAIN,加入定时器重新接受
if (n == NGX_AGAIN) {

    if (!rev->timer_set) {
        ngx_add_timer(rev, c->listening->post_accept_timeout);
        ngx_reusable_connection(c, 1);
    }

    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_close_connection(c);
        return;
    }

    /*
     * We are trying to not hold c->buffer's memory for an idle connection.
     */

    if (ngx_pfree(c->pool, b->start) == NGX_OK) {
        b->start = NULL;
    }

    return;
}
//吃屎了,退出
if (n == NGX_ERROR) {
    ngx_http_close_connection(c);
    return;
}
//返回0,断链
if (n == 0) {
    ngx_log_error(NGX_LOG_INFO, c->log, 0,
                  "client closed connection");
    ngx_http_close_connection(c);
    return;
}

10. SO_LINGER选项、TCP的半关闭

SO_LINGER选项用来改变此缺省设置。使用如下结构:

    struct linger {
       int l_onoff; /* 0 = off, nozero = on */
        int l_linger; /* linger time */
   };

有下列三种情况:

1)设置 l_onoff为0,则该选项关闭,l_linger的值被忽略,等于内核缺省情况,close调用会立即返回给调用者,如果可能将会传输任何未发送的数据;

2)设置 l_onoff为非0,l_linger为0,则套接口关闭时TCP夭折连接,TCP将丢弃保留在套接口发送缓冲区中的任何数据并发送一个RST给对方,而不是通常的四分组终止序列,这避免了TIME_WAIT状态;

3)设置 l_onoff 为非0,l_linger为非0,当套接口关闭时内核将拖延一段时间(由l_linger决定)。如果套接口缓冲区中仍残留数据,进程将处于睡眠状态,直 到(a)所有数据发送完且被对方确认,之后进行正常的终止序列(描述字访问计数为0)或(b)延迟时间到。此种情况下,应用程序检查close的返回值是非常重要的,如果在数据发送完并被确认前时间到,close将返回EWOULDBLOCK错误且套接口发送缓冲区中的任何数据都丢失。close的成功返回仅告诉我们发送的数据(和FIN)已由对方TCP确认,它并不能告诉我们对方应用进程是否已读了数据。如果套接口设为非阻塞的,它将不等待close完成。

半关闭: 调用shutdown,并且启用SHUT_WR宏,发送FIN给对端,表明本端程序只收不发,对端依然可发送数据,这个就是半关闭,TCP变成了单向收发; 对于SHUT_RD,可以在HTTP短连接的时候,服务器收到客户端的GET请求后,直接调用,这个不妨碍服务器给客户端发数据,客户端在发给服务器后可以直接SHUT_WR,客户端只收不发。

RST有三种条件可以产生:

1)SYN到达某端口,但端口没有监听

2)TCP想取消一个连接

3)TCP接收了一个根本不存在的连接上的分节

可以用SO_LINGER宏模拟accept之前客户端发RST给服务器的这种边界情况,这种情况下,服务器对于NGX_ECONNABORTED错误,应该直接忽略,继续accpet,而不是退出

11. TCP的ack确认

数据包有一个seq_num,还有数据len,next_seq=seq_num+len,所以接收方会根据收到的每一个TCP包, 知道下一个要收包的序号(relative seq number),如果发现来包不是预计的序号,就触发乱序和重传

12. linux的pipe

调用fork后,子进程会复制父进程的进程信息,如文件描述符,这样fd[0], fd[1]在子进程中有同样的一个拷贝,他们的引用都为2,也就是两个进程在使用他们。而实际上父进程只使用fd[1],子进程只使用fd[0],这样如果父进程不想使用fd[1]了,调用close()来关闭fd[1],这是不成功的,因为这样只是将fd[1]的引用减少到1,fd[1]没有被系统回收,仍然在子进程中有效,所以必须父进程close(fd[0]);子进程close(fd[1])

13.nginx的reload原理

reload信号的定义 { ngx_signal_value(NGX_RECONFIGURE_SIGNAL), "SIG" ngx_value(NGX_RECONFIGURE_SIGNAL), "reload", ngx_signal_handler }, a. 首先接受到一个NGX_RECONFIGURE_SIGNAL信号,在ngx_signal_handler处理 case ngx_signal_value(NGX_RECONFIGURE_SIGNAL): ngx_reconfigure = 1; //主要是父进程或者对应的reload进程设置这个 action = ", reconfiguring"; break;

b. 父进程ngx_master_process_cycle发现ngx_reconfigure变为1,则启动N个worker进程,并用ngx_signal_worker_processes给当前的worker发NGX_SHUTDOWN_SIGNAL 信号: //reload,重新读配置 if (ngx_reconfigure) { ngx_reconfigure = 0;

        //换版本咯
        if (ngx_new_binary) {
            ngx_start_worker_processes(cycle, ccf->worker_processes,
                                       NGX_PROCESS_RESPAWN);
            ngx_start_cache_manager_processes(cycle, 0);
            ngx_noaccepting = 0;

            continue;
        }

        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");

        cycle = ngx_init_cycle(cycle);
        if (cycle == NULL) {
            cycle = (ngx_cycle_t *) ngx_cycle;
            continue;
        }

        ngx_cycle = cycle;
        ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                               ngx_core_module);
		//启动NGX_PROCESS_JUST_RESPAWN类型的子worker
		//NGX_PROCESS_JUST_RESPAWN表示子进程退出时,父进程要重新拉起:
		/*
		    if (signo == SIGCHLD) {
				ngx_process_get_status();
			}
		*/
        ngx_start_worker_processes(cycle, ccf->worker_processes,
                                   NGX_PROCESS_JUST_RESPAWN);/
        ngx_start_cache_manager_processes(cycle, 1);

        /* allow new processes to start */
        ngx_msleep(100);

        live = 1;
		//发送NGX_SHUTDOWN_SIGNAL给worker
        ngx_signal_worker_processes(cycle,
                                    ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
    }
	
worker在ngx_channel_handler	将ngx_quit置1,然后ngx_worker_process_cycle里如下操作:
    if (ngx_quit) {
        ngx_quit = 0;
        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                      "gracefully shutting down");
        ngx_setproctitle("worker process is shutting down");

        if (!ngx_exiting) {
            ngx_exiting = 1;
			//将当前还有用户带来连接从epoll摘除,不再触发,等用户退出
            ngx_close_listening_sockets(cycle);
			//关闭空闲的连接池,如果有事件先处理了
            ngx_close_idle_connections(cycle);
        }
    }	

c. worker进程在刚拉起的时候,ngx_worker_process_init里面就把每个channel的fd,加入epoll,监听是否有父进程发来的信号 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } ngx_channel_handler里面,一直调用ngx_read_channel来监听收到的信号,然后改变对应全局变量的值,作用到ngx_worker_process_cycle的死循环里。

14.nginx的内存池

ngx_palloc相对ngx_pnalloc,其会将申请的内存大小向上扩增到NGX_ALIGNMENT的倍数,以方便内存对齐,减少内存访问次数

15.TCP_NODELAY和TCP_QUICKACK

如果一个 TCP 连接的一端启用了 Nagle‘s Algorithm(未开启TCP_NODELAY),而另一端启用了 TCP Delayed Ack(未开启TCP_QUICKACK),而发送的数据包又比较小,则可能会出现这样的情况: 发送端在等待接收端对上一个packet 的 Ack 才发送当前的 packet,而接收端则正好延迟了此 Ack 的发送,那么这个正要被发送的 packet 就会同样被延迟。 nginx里面,长连接时间的是NODELAY,但是短连接没有

16.nginx的子进程core以后如何自动拉起

(1) ngx_signal_handler里注册了SIGCHLD信号,子进程挂了父进程拿到SIGCHLD信号,把ngx_reap搞成1 case SIGCHLD: ngx_reap = 1; break;

(2) 父进程的大循环里面如下 if (ngx_reap) { ngx_reap = 0; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children"); //ngx_reap_children才是真正拉起新的子进程的函数 live = ngx_reap_children(cycle); }

17.关于TCP的粘包问题

(1) 粘包的原因: a. 发送端需要等缓冲区满才发送出去,造成粘包 b. 接收方不及时接收缓冲区的包,造成多个包接收 具体点:

(1)发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据。若连续几次发送的数据都很少,通常TCP会根据优化算法把这些数据合成一包后一次发送出去,这样接收方就收到了粘包数据。

(2)接收方引起的粘包是由于接收方用户进程不及时接收数据,从而导致粘包现象。这是因为接收方先把收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据,若下一包数据到达时前一包数据尚未被用户进程取走,则下一包数据放到系统接收缓冲区时就接到前一包数据之后,而用户进程根据预先设定的缓冲区大小从系统接收缓冲区取数据,这样就一次取到了多包数据。

UDP不存在粘包问题,是由于UDP发送的时候,没有经过Negal算法优化,不会将多个小包合并一次发送出去。另外,在UDP协议的接收端,采用了链式结构来记录每一个到达的UDP包,这样接收端应用程序一次recv只能从socket接收缓冲区中读出一个数据包。也就是说,发送端send了几次,接收端必须recv几次(无论recv时指定了多大的缓冲区)

(2) 粘包的解决方案: 为了避免粘包现象,可采取以下几种措施:

a. 对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;

b. 对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;

c. 由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。

17.关于epoll边缘触发的实例

17.1 代码:

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <netdb.h> #include <sys/epoll.h> #include <string.h> #define MAXEVENTS 64 int create_and_bind (int port) { int sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(sfd == -1) { return -1; } struct sockaddr_in sa; bzero(&sa, sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(port); sa.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(sfd, (struct sockaddr*)&sa, sizeof(struct sockaddr)) == -1) { return -1; } return sfd; } int make_socket_non_blocking (int sfd) { int flags = fcntl (sfd, F_GETFL, 0); if (flags == -1) { return -1; } if(fcntl (sfd, F_SETFL, flags | O_NONBLOCK) == -1) { return -1; } return 0; } /* 此函数用于读取参数或者错误提示 */ int read_param(int argc, char *argv[]) { if (argc != 2) { fprintf (stderr, "Usage: %s [port]\n", argv[0]); exit (EXIT_FAILURE); } return atoi(argv[1]); } int main (int argc, char *argv[]) { int sfd, s; int efd; struct epoll_event event; struct epoll_event *events; int port = read_param(argc, argv); /* 创建并绑定socket */ sfd = create_and_bind (port); if (sfd == -1) { perror("create_and_bind"); abort (); } /* 设置sfd为非阻塞 */ s = make_socket_non_blocking (sfd); if (s == -1) { perror("make_socket_non_blocking"); abort (); } /* SOMAXCONN 为系统默认的backlog */ s = listen (sfd, SOMAXCONN); if (s == -1) { perror ("listen"); abort (); } efd = epoll_create1 (0); if (efd == -1) { perror ("epoll_create"); abort (); } event.data.fd = sfd; /* 设置ET模式 */ event.events = EPOLLIN | EPOLLET; s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); if (s == -1) { perror ("epoll_ctl"); abort (); } /* 创建事件数组并清零 */ events = calloc (MAXEVENTS, sizeof event); /* 开始事件循环 */ while (1) { int n, i; n = epoll_wait (efd, events, MAXEVENTS, -1); for (i = 0; i < n; i++) { if (events[i].events & (EPOLLERR | EPOLLHUP)) { /* 监控到错误或者挂起 */ fprintf (stderr, "epoll error\n"); close (events[i].data.fd); continue; } if(events[i].events & EPOLLIN) { if (sfd == events[i].data.fd) { /* 处理新接入的socket */ while (1) { struct sockaddr_in sa; socklen_t len = sizeof(sa); char hbuf[INET_ADDRSTRLEN]; int infd = accept (sfd, (struct sockaddr*)&sa, &len); if (infd == -1) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /* 资源暂时不可读,再来一遍 */ break; } else { perror ("accept"); break; } } inet_ntop(AF_INET, &sa.sin_addr, hbuf, sizeof(hbuf)); printf("Accepted connection on descriptor %d " "(host=%s, port=%d)\n", infd, hbuf, sa.sin_port); /* 设置接入的socket为非阻塞 */ s = make_socket_non_blocking (infd); if (s == -1) abort (); /* 为新接入的socket注册事件 */ event.data.fd = infd; event.events = EPOLLIN | EPOLLET; s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); if (s == -1) { perror ("epoll_ctl"); abort (); } } //continue; } else { /* 接入的socket有数据可读 */ while (1) { ssize_t count; char buf[512]; count = read (events[i].data.fd, buf, sizeof buf); if (count == -1) { if (errno != EAGAIN) { perror ("read"); close(events[i].data.fd); } break; } else if (count == 0) { /* 数据读取完毕,结束 */ close(events[i].data.fd); printf ("Closed connection on descriptor %d\n", events[i].data.fd); break; } /* 输出到stdout */ s = write (1, buf, count); if (s == -1) { perror ("write"); abort (); } event.events = EPOLLOUT | EPOLLET; epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event); } } } else if((events[i].events & EPOLLOUT) && (events[i].data.fd != sfd)) { /* 接入的socket有数据可写 */ write(events[i].data.fd, "it's echo man\n", 14); event.events = EPOLLET | EPOLLIN; epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event); } } } free (events); close (sfd); return EXIT_SUCCESS; }

Clone this wiki locally