Skip to content

code learning

xbc112233 edited this page Apr 12, 2023 · 72 revisions

操作系统相关知识

内存相关

nginx代码精髓

TCP_DEFER_ACCEPT在nginx里的作用

  1. 而当正确的设置了TCP_DEFER_ACCEPT选项之后,server端会在接收到最后一个ack之后,并不进入establish状态,而只是将这个socket标记为acked,然后丢掉这个ack。此时server端这个socket还是处于syn_recved,然后接下来就是等待client发送数据, 而由于这个socket还是处于syn_recved,因此此时就会被syn_ack定时器所控制,对syn ack进行重传
  2. 影响服务端性能的四大原因有:内存拷贝、内存分配、系统调用,进程切换。而nginx的deffered选项的作用是用来降低服务端进行epoll_ctl、epoll_wait(linux下)的次数(系统调用)和降低服务端保持的连接句柄数,从而提高服务端处理性能的设置。设置这个选项以后只有客户端有数据到达时才被epoll_wait监听到,再去accept和处理连接数据。 在ngx_http_init_connection中就将接收到的事件放到posted队列中等待线程的处理。而在未设置deferred 选项时,这个操作是要我们在将接收到的连接epoll_ctl(ADD)之后再epoll_wait探测到EPOLLIN事件时才做这个处理的,从这里看我们就减少了一次epoll_wait该事件的过程,而且等到有数据时才accept,那么和未设置defered选项时相比减少了accept到有数据之间这段事件服务器维护的一条连接。
void
ngx_http_init_connection(ngx_connection_t *c)
{
    ......
    if (rev->ready) {
        /* the deferred accept(), rtsig, aio, iocp */
        if (ngx_use_accept_mutex) {
            ngx_post_event(rev, &ngx_posted_events);
            return;
        }
    ......
}

writev,write,pwrite

#include <unistd.h>
ssize_t write(int fildes, const void *buf, size_t nbyte);
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset);

#include <sys/uio.h>
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt, off_t offset);
  1. pwrite() 会直接跳转到 offset 处,然后继续读取 nbyte 个字节,不过这不会影响到原文件的偏移量。这对于多线程的读写会比较友好,此时不会相互影响读写文件时的 offset 。
  2. write() 用来写入连续数据块,而 writev 会写入分散的数据块,当然,两个函数的最终结果都是将内容写入连续的空间。
  3. 对于 write() 函数来说很简单,而 writev() 返回的是字节数,但是入参却是 iovec ,也就意味着此时需要重新计算 iovec 了,包括了写入的数据可能会在某个 iovec 的中间位置。
  4. 最为高效的方法是预先分配好连续的内存空间,然后直接通过 write()函数写入。

ngx_rtmp数据发送事件

ngx_chain_t *
ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)

ngx_os_io_t ngx_os_io = {
    ngx_unix_recv,
    ngx_readv_chain,
    ngx_udp_unix_recv,
    ngx_unix_send,
    ngx_udp_unix_send,
    ngx_writev_chain,
    0
};
void
ngx_event_accept(ngx_event_t *ev) {
...
        c->recv_chain = ngx_recv_chain;
        c->send_chain = ngx_send_chain;
...
}

tcp_nodelay和tcp_nopush在直播数据发送的逻辑

if (c->tcp_nopush == NGX_TCP_NOPUSH_UNSET || c->tcp_nopush == NGX_TCP_NOPUSH_DISABLED) {

        if (c->tcp_nodelay == NGX_TCP_NODELAY_SET) {

            tcp_nodelay = 0;

            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *)&tcp_nodelay, 
                sizeof(int)) == -1)
            {
                err = ngx_socket_errno;
                if (err != NGX_EINTR) {
                    wev->error = 1;
                    ngx_connection_error(c, err, "setsockopt(TCP_NODELAY) failed");

                    return;
                }

            } else {
                c->tcp_nodelay = NGX_TCP_NODELAY_UNSET;

                ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "unset tcp_nodelay!");
            }
        }

        if (c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
    
            if (ngx_tcp_nopush(c->fd) == NGX_ERROR) {
                err = ngx_socket_errno;

                if (err != NGX_EINTR) {
                    wev->error = 1;
                    ngx_connection_error(c, err, ngx_tcp_nopush_n " failed");

                    return;
                }

            } else {
                c->tcp_nopush = NGX_TCP_NODELAY_SET;
                ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "tcp_nopush set!");
            }
        }
    }

openresty-lua动态配置代码框架

配置结构入口

var_location_gw.conf, 这个里面设置的都是ngx.var相关自定义参数, 在url的根路径include,配置如下,其中include的配置命令实现函数是ngx_conf_include,主要功能就是解析include对于的conf文件,把对应的遍历设置到cf里

location / {
    include /usr/local/openresty/nginx/conf/gw_normal_custom_header.conf;
    include /usr/local/openresty/nginx/conf/var_location_gw.conf;
    rewrite_by_lua_file "../luascript/location_gw/gw_main.lua";
    log_by_lua_file     "../luascript/location_gw/section_end.lua";
    access_by_lua_block {
    }

    content_by_lua_block {
        local log   = require "util.log"
        ngx.log(ngx.INFO, log.content("access", "not support uri:", ngx.var.uri))
        ngx.exit(403)
    }

    access_log logs/uni_access.log uni_main;
    #access_log logs/access.log main;
}

1 loadmodule和processmodule基建

--主函数
--识别uri,并设置domain,app,stream,tartet,suffix等信息,配置项是
section_identify.process(dconf) 
local ctx = init_ctx(conf)

--鉴权
section_auth.process(conf, ctx)

--分发处理
section_dispatch.process(conf, ctx)

--选择调用链运行
function _M.chain_process(dirpath, conf, ctx)
    for i=1, #(loadmodule.module_list[dirpath]) do  
        local module_name = loadmodule.module_list[dirpath][i];
        local mdl  = require (dirpath .. "/" .. module_name);      
        mdl.process(conf, ctx);                  
    end
end
--分发模块调用链入口
function _M.process(conf, ctx)
    local name = "location_gw/section_dispatch"
    return processmodule.chain_process(name, conf, ctx)
end

对于section_dispatch或者section_auth,每个文件夹下按顺序写入对于的lua脚本即可顺序执行 针对所有请求,location到根目录,然后走gw_main, 构造子请求到各个location,子请求的api有ngx.exec和ngx.location.capture,具体区别如下:

  1. ngx.exec不构造HTTP协议,代码里匹配到对应的location_name直接走ngx_http_core_run_phases

  2. capture要构造http协议,但因为是本地disk_file配置,所以没有进程间通信,一定是本worker命中,另外支持并发多个子请求,子请求和父请求直接的ngx.var也是共享的

2 location_api设计

所有lua的module用如下代码串起来,即文件夹location_api/下编写N个子模块的入口,每个子模块的入口再新建文件夹,以此类推

local op = ngx.var.uri --string.match(ngx.var.uri, "/([^/]+)")
local name = api_list[op]
if name == nil then
    ngx.log(ngx.ERR, "err uri, lock process module op: ", op)
    ngx.exit(503)
else
    local base = "location_api."
    ngx.log(ngx.DEBUG, "process module:", base .. name)
    local module = require(base .. name)
    module.process()
end

TCP/UDP相关

1 MTU和MSS的语义

MTU: Maximum Transmit Unit,最大传输单元,数据链路层控制,即物理接口(数据链路层)提供给其上层(通常是IP层)最大一次传输数据的大小;以普遍使用的以太网接口为例,缺省MTU=1500 Byte,这是以太网接口对IP层的约束,如果IP层有<=1500 byte 需要发送,只需要一个IP包就可以完成发送任务;如果IP层有> 1500 byte 数据需要发送,需要分片才能完成发送,这些分片有一个共同点,即IP Header ID相同。

MSS:Maximum Segment Size ,传输层设置,TCP提交给IP层最大分段大小,不包含TCP Header和 TCP Option,只包含TCP Payload ,MSS是TCP用来限制application层最大的发送字节数。如果底层物理接口MTU= 1500 byte,则 MSS = 1500- 20(IP Header) -20 (TCP Header) = 1460 byte,如果application 有2000 byte发送,需要两个segment才可以完成发送,第一个TCP segment = 1460,第二个TCP segment = 540。

MTU = MSS + IP_HEADER + TCP_HEADER

2 TCP四次分手状态

客户端:

a. 调用close,进入FIN_WAIT_1, 发送FIN给server
b. 收到服务器的ack后进入FIN_WAIT_2
c. 接受服务器调用close后发送的FIN,进入TIME_WAIT,然后发送此FIN的ack

服务端:

a. 收到客户端发送的FIN后进入CLOSE_WAIT,发送一个ack给客户端,此时代码并未调用close
b. 本机调用close后,发送一个FIN给客户端,状态进入LAST_ACK,即等待接受客户端的TIME_WAIT状态下发出的ack
c. 收到TIME_WAIT客户端发出的ack,进入CLOSED

3 TCP同时打开和关闭

同时打开

对于同时打开,他们仅仅建立一条连接,状态转移和一般的三次握手略有不同,逻辑如下

第一步:两边同时发SYN给对方,A和B都进入SYN_SENT状态

A --SYN_A--> B, B--SYN_B--> A

第二步:B用SYN_B和ACK_A+1回应A, A用SYN_A和ACK_B+1回应B,同时进入ESTB阶段

B -->SYN_B, ACK_A+1 --> A, A --SYN_A,ACK_B+1-->B

综上,同时打开是没有第三次ack的,TCP协议设计此处边界的时候,保留了同时连接的能力,用各自的SYN回应对方的SYN,4次信令

同时关闭

第一步:两边同时调用close函数发FIN给对方,A和B都进入FIN_WAIT1状态

A --FIN_A--> B, B--FIN_B--> A

第二步:A和B都收到对方法的FIN,同时进入CLOSING状态 第三步:A和B都收到对方发的ack,都进入TIME_WAIT

B -->ACK_A+1 --> A, A --ACK_B+1-->B

TCP的QUICKACK和NODELAY

1. NODELAY和NAGEL算法:

关闭NAGEL算法,在发送端取消小包发送限制,NAGEL算法主要逻辑如下:

1)如果包长度达到MSS,则允许发送;
2)如果该包含有FIN,则允许发送;
3)设置了TCP_NODELAY选项,则允许发送;
4)未设置TCP_CORK选项时,若所有发出去的包均被确认,或所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送。

对于规则4),就是说要求一个TCP连接上最多只能有一个未被确认的小数据包,在该分组的确认到达之前,不能发送其他的小数据包。如果某个小分组的确认被延迟了(案例中的40ms),那么后续小分组的发送就会相应的延迟。也就是说延迟确认影响的并不是被延迟确认的那个数据包,而是后续的应答包。

NAGEL的优劣:

优点:避免网络中充斥着许多小数据块,降低网络负载,减少网络拥塞,提高网络吞吐
缺点:客户端的延迟会增加,实时性降低,不适合延时要求尽量小的场景;且对于大文件传输这种场景,会降低传输速度

2. 延迟ACK和QUICKACK

TCP在接收到对端的报文后,并不会立即发送ack,而是等待一段时间发送ack,以便将ack和要发送的数据一块发送。当然ack不能无限延长,否则对端会认为包超时而造成报文重传。linux采用动态调节算法来确定延时的时间。

TCP在何时发送ACK的时候有如下规定:

1. 当有响应数据发送的时候,ACK会随着数据一块发送
2. 如果没有响应数据,ACK就会有一个延迟,以等待是否有响应数据一块发送,但是这个延迟一般在40ms~500ms之间,一般情况下在40ms左右,如果在40ms内有数据发送,那么ACK会随着数据一块发送,对于这个延迟的需要注意一下,这个延迟并不是指的是收到数据到发送ACK的时间延迟,而是内核会启动一个定时器,每隔200ms就会检查一次,比如定时器在0ms启动,200ms到期,180ms的时候data来到,那么200ms的时候没有响应数据,ACK仍然会被发送,这个时候延迟了20ms.
3. 如果在等待发送ACK期间,第二个数据又到了,这时候就要立即发送ACK

可以通过TCP_QUICKACK这个选项来启动快速ACK:

1. 如果在快速的ACK模式下,ACK被立即发送
2. 这个flag并不是永久的,系统会判定是交互数据流,仍然会启动delay ACK,所以这个flag在recv之后需要重新设置

TCP的管理连接的四种定时器

  1. 重传定时器,RTO,管理重传的间隔

  2. 坚持定时器,定期保持窗口大小信息的不断更新流动,即使接收端关闭了接受窗口

  3. 保活定时器,SO_KEEPALIVE

  4. 2MSL定时器,控制TIME_WAIT状态持续的时间

TCP的流量控制

1. 超时重传

发送端迟迟未收到某个数据包对应的ACK,触发了RTO定时器,,这种机制下,每个数据包都有相应的计时器,一旦超过 RTO 而没有收到 ACK,就重发该数据包。没收到 ACK 的数据包都会存在重传缓冲区里,等到 ACK 后,就从缓冲区里删除。 首先明确一点,对 TCP 来说,超时重传是相当重要的事件(RTO 往往大于两倍的 RTT,超时往往意味着拥塞),一旦发生这种情况,TCP 不仅会重传对应数据段,还会降低当前的数据发送速率,因为TCP 会认为当前网络发生了拥塞。

2. 快速重传

对于完全的丢包,只能用超时重传,但是对于乱序的丢包,也就是后发的包先到了接收端,但之前某个包接收端未收到,则可以用快速重传的办法提升重传效率,即服务器如果收到乱序的包,也给客户端回复 ACK,只不过是重复的 ACK。假设发送端发送了5,6,7,8,9这几个包,但接收端只收到乱序的包 6,7,8,9 时,这时候接受端全都会发DUP-ACK = 5。这样,发送端就知道 5 发生了空缺。一般来说,如果客户端连续三次收到重复的 ACK,就会重传对应包,而不需要等到计时器超时。从抓包表现看,发送端收到接受端发来的3次DUP-ACK,就会重传对应的数据

3. 慢启动门限(ssthresh), 拥塞窗口(cwnd)与慢启动和拥塞避免算法的关系

sshresh的最小值一定是2个报文段,因为cwnd的最小值是1个报文段

慢启动:cwnd < sshresh时执行的一种指数递增算法,此算法的效果是发送数据的速率指数递增,一直递增到cwnd > sshtresh或者遇到拥塞(超时RTO或者DUP-ACK)为止,此处要强调一点。慢启动算法可出现在发送开始阶段以外,还可以在触发了超时的拥塞后,因为触发了超时的拥塞,cwnd设置成了1
拥塞避免:当cwnd > sshresh时,执行拥塞避免,即达到慢启动门限以后,每收到一个数据包,则讲cwnd += 1/cwnd,cwnd成线性增加,不在指数增加

那么思考一下,sshresh如果在遇到几次拥塞后降低到一个比较低的水平,后面能否再恢复到一个高值?(当然可以,cwnd在拥塞避免阶段是线性增加,当不丢包的收发足够多时,cwnd可以增长到之前sshresh的两倍以上,那么当再次遇到丢包时,sshresh降低为cwnd的一半,反而比之前增大了)

快速恢复:在没有触发超时重传,只有DUP-ACK的情况下,不把cwnd降低为1走慢启动(因为这样会把数据流量突然减少),只把sshresh设置为当前cwnd的一半,cwnd被设置为sshresh + 重复ACK的个数(一定是3)*报文段大小,所谓的快速恢复,其实就是遇到拥塞以后,将cwnd设置为约等于之前的一半走拥塞避免,保证数据流不会突然降低到1。

nginx源码

openresty的变量注册机制

struct ngx_http_variable_s {
    ngx_str_t                     name;   /* must be first to build the hash */
    ngx_http_set_variable_pt      set_handler;
    ngx_http_get_variable_pt      get_handler;
    uintptr_t                     data;
    ngx_uint_t                    flags;
    ngx_uint_t                    index;
};

以http的limit_rate为例,假设在lua代码里用ngx.var.limit_rate变量赋值,则对应C代码里的set_handler和get_handler如下变量注册代码

    { ngx_string("limit_rate"), ngx_http_variable_request_set_size,
      ngx_http_variable_request_get_size,
      offsetof(ngx_http_request_t, limit_rate),
      NGX_HTTP_VAR_CHANGEABLE|NGX_HTTP_VAR_NOCACHEABLE, 0 },

所以当lua用ngx.var.limit_rate获取或者设置时,调用的都是对于的set_handler和get_handler

reload后相关操作

共享内存相关

  1. 如果共享内存有锁,则父进程通过signal注册的函数,如果是SIGCHILD信号,则进入ngx_process_get_status,调用ngx_shmtx_force_unlock强制解锁
  2. 如果共享内存配置项有更改,如名称或者大小,则释放原来的共享内存,新建一个,详细代码参见ngx_init_cycle, 操作old_cycle

1 支持resueport后的代码逻辑

1.1 ngx_event_process_init是在worker进程fork出来以后,走到的epoll初始化,这里面有几个地方比较关键,代码:

/*
1. 如果不用ngx_use_accept_mutex锁、或者开了reuseport,则直接ngx_add_event,去触发accept
2. 如果不在这里ngx_add_event,则到ngx_process_events_and_timers去抢锁,谁抢到谁去accept
*/
    if (ngx_use_accept_mutex
	#if (NGX_HAVE_REUSEPORT)
        && !ls[i].reuseport
	#endif
       )
    {
        continue;
    }

    if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
        return NGX_ERROR;
    }

1.2 ngx_clone_listening 里面,如果当前是resueport模式,则按照监听的端口和worker进程数量,依次复制socket fd

ngx_int_t
ngx_clone_listening(ngx_conf_t *cf, ngx_listening_t *ls)
{

	#if (NGX_HAVE_REUSEPORT)

    ngx_int_t         n;
    ngx_core_conf_t  *ccf;
    ngx_listening_t   ols;

    if (!ls->reuseport) {
        return NGX_OK;
    }

    ols = *ls;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
                                           ngx_core_module);

    for (n = 1; n < ccf->worker_processes; n++) {

        /* create a socket for each worker process */

        ls = ngx_array_push(&cf->cycle->listening);
        if (ls == NULL) {
            return NGX_ERROR;
        }

        *ls = ols;
        ls->worker = n;
    }
	#endif
    return NGX_OK;
}

2. ngxin初始化http、rtmp套接字

根据配置的http和rtmp端口,创建好listen结构体信息,最后统一到ngx_open_listening_sockets绑定,
  监听,这些都是在父进程里进行的

3. epoll初始化相关

nginx先按照读取的配置,初始化http和rtmp的监听端口套接字。在fork了worker进程以后,
    再初始化epoll模块,然后将已经监听起来的套接字加入epoll全局句柄ep
   (每一个工作进程一个)。fork出子进程后,每个子进程都有一个全局的epoll句柄ep,
    所有事件都注册到这个ep上

4. 模块加载

1. 启动时候加载顺序应该是按照CORE_MODULES配置,生成到ngx_modules数组里依次加载,
2. main函数里面,先执行ngx_init_cycle,listen所有模块的套接字,ngx_conf_handler里,
       执行ngx_command_t 里的set函数, 如ngx_http_block和ngx_rtmp_block,block函数
       里面有各个模块的postconfiguration;
3. ngx_spawn_process里调用fork出worker进程的时候,
   ngx_worker_process_cycle(unix版本)里的
   ngx_worker_process_init 调用每个模块的init_process,这个里面初始化epoll

5. 模块执行顺序

首先是nginx给lua发HTTP的connect和PLAY消息,走到notify模块,然后发RTMP流,走到rtmp_cmd模块

6. 关于ngx_buf_t结构体里的start,pos,last,end

最近定位了一个bug,代码模型如下:
初始化:
tctx->carg[4096];
tctx->barg.start = tctx->barg.pos = tctx->barg.last = tctx->carg;
tctx->barg.end = tctx->carg + sizeof(tctx->carg);
。。。。
barg是一个buf,把他和carg这一个char数组关联起来
使用:
tctx->barg.last = tctx->barg.pos;                //此处bug,没有把pos置回carg的首地址
tctx->barg.last = ngx_snprintf(tctx->barg.last, 
    tctx->barg.end - tctx->barg.last, "Begin=%T", 
    tctx->begin);
问题就在注释处,因为没有把pos置回carg的首地址,所以在netcall模块发包后,pos每次向前移动
16字节(业务逻辑,定长的16),导致carg数组不停的追加"Begin=XX"的16字节字符串,所以每次
运行42分25秒左右,业务中断,因为每16字节的自加,把carg的4096个空间填满了
 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
ps:buf结构体里面,start和end应该是不变的,表示这个buf的定长大小,但pos和last是动态的,
比如openresty里,每次到发送模块的时候,pos会往前递增发送的大小n,而业务层每次填包n字节,
会让last往后移动n

7.一个连接从创建到进入http模块的逻辑

(1) 首先,父进程会读取配置,当读到http时,调用ngx_http_commands的ngx_http_block, 在ngx_http_block函数里,把要监听的端口socket创建好,每个创建好的句柄放到ngx_listening_t *ls里,注册回调函数ngx_http_init_connection, 代码如下:

  static ngx_listening_t *
  ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr)
 {
ngx_listening_t           *ls;
ngx_http_core_loc_conf_t  *clcf;
ngx_http_core_srv_conf_t  *cscf;

//ls是每个要监听的端口的句柄,如80等
ls = ngx_create_listening(cf, &addr->opt.sockaddr.sockaddr,
                          addr->opt.socklen);
if (ls == NULL) {
    return NULL;
}

ls->addr_ntop = 1;

ls->handler = ngx_http_init_connection;//此处注册回调给ls
。。。
}

2) nginx启动后,fork出N个worker进程,每个进程会初始化epoll,调用 ngx_event_process_init,把每个监听的socket注册上accept回调ngx_event_accept 代码:

 for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
    //如果开启reuseport,则只有本worker的ls才往后走(每个ls在clone里都有N个worker)
    if (ls[i].reuseport && ls[i].worker != ngx_worker) {
        continue;
    }
  #endif
  。。。
do {
    i--;

    c[i].data = next;
    c[i].read = &cycle->read_events[i];
    c[i].write = &cycle->write_events[i];
    c[i].fd = (ngx_socket_t) -1;

    next = &c[i];
} while (i);
  rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
                                          : ngx_event_recvmsg;
    。。。
}

(3)当accept触发后,进入ngx_event_accept,从epoll的data指针获取ls:

 void
  ngx_event_accept(ngx_event_t *ev)
 {
 。。。。。。。
lc = ev->data;
ls = lc->listening;
ev->ready = 0;

ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
               "accept on %V, ready: %d", &ls->addr_text, ev->available);

do {
    socklen = sizeof(ngx_sockaddr_t);

  #if (NGX_HAVE_ACCEPT4)
    if (use_accept4) {
        s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
    } else {
        s = accept(lc->fd, &sa.sockaddr, &socklen);
    }
     #else
    s = accept(lc->fd, &sa.sockaddr, &socklen);
 #endif
 。。。。
    ls->handler(c);//调用到ngx_http_init_connection
}

RTMP模块道理也一样的,ngx_rtmp_add_listening里注册ngx_rtmp_init_connection

8.

A. timewait状态的意义 1) 为什么TIME_WAIT状态需要经过2MSL(最大报文段生存时间,一般是30*2)才能返回到CLOSE状态?

   虽然按道理,四个报文都发送完毕,我们可以直接进入CLOSE状态了,但是我们必须假象网络是不可靠的,有可以最后一个ACK丢失。所以
 TIME_WAIT状态就是用来重发可能丢失的ACK报文,判断报文是否丢失主要是根据被动关闭段是否重发fin,如果重发fin,说明ack丢失了。如果发
 RST,说明报文迟缓到达。

 2) 如果上次链接server发的fin报文客户端没有收到,有可能在客户端当前端口复用后再收到这个fin报文,导致连接关闭。为了解决这个问题,
 一个是在timewait状态端口不能拉起,既不用reuseaddr;还有一个是内核校验新连接的syn的seq,如果大于上一个连接的fin的seq,则丢弃fin

B. RST异常终止连接,参见第十部分

C. 检测半连接

9. recv返回值

n = c->recv(c, b->last, size);
//EAGAIN,加入定时器重新接受
if (n == NGX_AGAIN) {

    if (!rev->timer_set) {
        ngx_add_timer(rev, c->listening->post_accept_timeout);
        ngx_reusable_connection(c, 1);
    }

    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_close_connection(c);
        return;
    }

    /*
     * We are trying to not hold c->buffer's memory for an idle connection.
     */

    if (ngx_pfree(c->pool, b->start) == NGX_OK) {
        b->start = NULL;
    }

    return;
}
//吃屎了,退出
if (n == NGX_ERROR) {
    ngx_http_close_connection(c);
    return;
}
//返回0,断链
if (n == 0) {
    ngx_log_error(NGX_LOG_INFO, c->log, 0,
                  "client closed connection");
    ngx_http_close_connection(c);
    return;
}

10. SO_LINGER选项、TCP的半关闭

SO_LINGER选项用来改变此缺省设置。使用如下结构:

    struct linger {
       int l_onoff; /* 0 = off, nozero = on */
        int l_linger; /* linger time */
   };

有下列三种情况:

1)设置 l_onoff为0,则该选项关闭,l_linger的值被忽略,等于内核缺省情况,close调用会立即返回给调用者,如果可能将会传输任何未发送的数据;

2)设置 l_onoff为非0,l_linger为0,则套接口关闭时TCP夭折连接,TCP将丢弃保留在套接口发送缓冲区中的任何数据并发送一个RST给对方,而不是通常的四分组终止序列,这避免了TIME_WAIT状态;

3)设置 l_onoff 为非0,l_linger为非0,当套接口关闭时内核将拖延一段时间(由l_linger决定)。如果套接口缓冲区中仍残留数据,进程将处于睡眠状态,直 到(a)所有数据发送完且被对方确认,之后进行正常的终止序列(描述字访问计数为0)或(b)延迟时间到。此种情况下,应用程序检查close的返回值是非常重要的,如果在数据发送完并被确认前时间到,close将返回EWOULDBLOCK错误且套接口发送缓冲区中的任何数据都丢失。close的成功返回仅告诉我们发送的数据(和FIN)已由对方TCP确认,它并不能告诉我们对方应用进程是否已读了数据。如果套接口设为非阻塞的,它将不等待close完成。

半关闭: 调用shutdown,并且启用SHUT_WR宏,发送FIN给对端,表明本端程序只收不发,对端依然可发送数据,这个就是半关闭,TCP变成了单向收发; 对于SHUT_RD,可以在HTTP短连接的时候,服务器收到客户端的GET请求后,直接调用,这个不妨碍服务器给客户端发数据,客户端在发给服务器后可以直接SHUT_WR,客户端只收不发。

RST有三种条件可以产生:

1)SYN到达某端口,但端口没有监听

2)TCP想取消一个连接

3)TCP接收了一个根本不存在的连接上的分节

可以用SO_LINGER宏模拟accept之前客户端发RST给服务器的这种边界情况,这种情况下,服务器对于NGX_ECONNABORTED错误,应该直接忽略,继续accpet,而不是退出

11. TCP的ack确认

数据包有一个seq_num,还有数据len,next_seq=seq_num+len,所以接收方会根据收到的每一个TCP包, 知道下一个要收包的序号(relative seq number),如果发现来包不是预计的序号,就触发乱序和重传

12. linux的pipe

调用fork后,子进程会复制父进程的进程信息,如文件描述符,这样fd[0], fd[1]在子进程中有同样的一个拷贝,他们的引用都为2,也就是两个进程在使用他们。而实际上父进程只使用fd[1],子进程只使用fd[0],这样如果父进程不想使用fd[1]了,调用close()来关闭fd[1],这是不成功的,因为这样只是将fd[1]的引用减少到1,fd[1]没有被系统回收,仍然在子进程中有效,所以必须父进程close(fd[0]);子进程close(fd[1])

13.nginx的reload原理

//reload信号的定义
{ ngx_signal_value(NGX_RECONFIGURE_SIGNAL),
  "SIG" ngx_value(NGX_RECONFIGURE_SIGNAL),
  "reload",
  ngx_signal_handler },
//a. 首先接受到一个NGX_RECONFIGURE_SIGNAL信号,在ngx_signal_handler处理
    case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
        ngx_reconfigure = 1; //主要是父进程或者对应的reload进程设置这个
        action = ", reconfiguring";
        break;


//b. 父进程ngx_master_process_cycle发现ngx_reconfigure变为1,则启动N个worker进程,并用ngx_signal_worker_processes给当前的worker发NGX_SHUTDOWN_SIGNAL

信号: //reload,重新读配置 if (ngx_reconfigure) { ngx_reconfigure = 0;

        //换版本咯
        if (ngx_new_binary) {
            ngx_start_worker_processes(cycle, ccf->worker_processes,
                                       NGX_PROCESS_RESPAWN);
            ngx_start_cache_manager_processes(cycle, 0);
            ngx_noaccepting = 0;

            continue;
        }

        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");

        cycle = ngx_init_cycle(cycle);
        if (cycle == NULL) {
            cycle = (ngx_cycle_t *) ngx_cycle;
            continue;
        }

        ngx_cycle = cycle;
        ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                               ngx_core_module);
		//启动NGX_PROCESS_JUST_RESPAWN类型的子worker
		//NGX_PROCESS_JUST_RESPAWN表示子进程退出时,父进程要重新拉起:
		/*
		    if (signo == SIGCHLD) {
				ngx_process_get_status();
			}
		*/
        ngx_start_worker_processes(cycle, ccf->worker_processes,
                                   NGX_PROCESS_JUST_RESPAWN);/
        ngx_start_cache_manager_processes(cycle, 1);

        /* allow new processes to start */
        ngx_msleep(100);

        live = 1;
		//发送NGX_SHUTDOWN_SIGNAL给worker
        ngx_signal_worker_processes(cycle,
                                    ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
    }
	
worker在ngx_channel_handler	将ngx_quit置1,然后ngx_worker_process_cycle里如下操作:
    if (ngx_quit) {
        ngx_quit = 0;
        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                      "gracefully shutting down");
        ngx_setproctitle("worker process is shutting down");

        if (!ngx_exiting) {
            ngx_exiting = 1;
			//将当前还有用户带来连接从epoll摘除,不再触发,等用户退出
            ngx_close_listening_sockets(cycle);
			//关闭空闲的连接池,如果有事件先处理了
            ngx_close_idle_connections(cycle);
        }
    }	

c. worker进程在刚拉起的时候,ngx_worker_process_init里面就把每个channel的fd,加入epoll,监听是否有父进程发来的信号 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } ngx_channel_handler里面,一直调用ngx_read_channel来监听收到的信号,然后改变对应全局变量的值,作用到ngx_worker_process_cycle的死循环里。

14.nginx的内存池

ngx_palloc相对ngx_pnalloc,其会将申请的内存大小向上扩增到NGX_ALIGNMENT的倍数,以方便内存对齐,减少内存访问次数

15.TCP_NODELAY和TCP_QUICKACK

如果一个 TCP 连接的一端启用了 Nagle‘s Algorithm(未开启TCP_NODELAY),而另一端启用了 TCP Delayed Ack(未开启TCP_QUICKACK),而发送的数据包又比较小,则可能会出现这样的情况: 发送端在等待接收端对上一个packet 的 Ack 才发送当前的 packet,而接收端则正好延迟了此 Ack 的发送,那么这个正要被发送的 packet 就会同样被延迟。 nginx里面,长连接时间的是NODELAY,但是短连接没有

16.nginx的子进程core以后如何自动拉起

//(1) ngx_signal_handler里注册了SIGCHLD信号,子进程挂了父进程拿到SIGCHLD信号,把ngx_reap搞成1
    case SIGCHLD:
        ngx_reap = 1;
        break;


//(2) 父进程的大循环里面如下
    if (ngx_reap) {
        ngx_reap = 0;
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");
        //ngx_reap_children才是真正拉起新的子进程的函数
        live = ngx_reap_children(cycle);
    }

17.关于TCP的粘包问题

(1) 粘包的原因: a. 发送端需要等缓冲区满才发送出去,造成粘包 b. 接收方不及时接收缓冲区的包,造成多个包接收 具体点:

(1)发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据。若连续几次发送的数据都很少,通常TCP会根据优化算法把这些数据合成一包后一次发送出去,这样接收方就收到了粘包数据。

(2)接收方引起的粘包是由于接收方用户进程不及时接收数据,从而导致粘包现象。这是因为接收方先把收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据,若下一包数据到达时前一包数据尚未被用户进程取走,则下一包数据放到系统接收缓冲区时就接到前一包数据之后,而用户进程根据预先设定的缓冲区大小从系统接收缓冲区取数据,这样就一次取到了多包数据。

UDP不存在粘包问题,是由于UDP发送的时候,没有经过Negal算法优化,不会将多个小包合并一次发送出去。另外,在UDP协议的接收端,采用了链式结构来记录每一个到达的UDP包,这样接收端应用程序一次recv只能从socket接收缓冲区中读出一个数据包。也就是说,发送端send了几次,接收端必须recv几次(无论recv时指定了多大的缓冲区)

(2) 粘包的解决方案: 为了避免粘包现象,可采取以下几种措施:

a. 对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;

b. 对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;

c. 由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。

17.关于epoll边缘触发的实例

#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <errno.h>  
#include <sys/socket.h>  
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>  
#include <netdb.h>
#include <sys/epoll.h>  
#include <string.h>  
#define MAXEVENTS 64
int create_and_bind (int port) {
    int sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if(sfd == -1) {
        return -1;
    }
    struct sockaddr_in sa;
    bzero(&sa, sizeof(sa));
    sa.sin_family = AF_INET;
    sa.sin_port   = htons(port);
    sa.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(sfd, (struct sockaddr*)&sa, sizeof(struct sockaddr)) == -1) {
        return -1;
    }
    return sfd;
}
int make_socket_non_blocking (int sfd) {
    int flags = fcntl (sfd, F_GETFL, 0);
    if (flags == -1) {
        return -1;
    }
    if(fcntl (sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
        return -1;
    }
    return 0;
}
/* 此函数用于读取参数或者错误提示 */
int read_param(int argc, char *argv[]) {
    if (argc != 2) {
        fprintf (stderr, "Usage: %s [port]\n", argv[0]);
        exit (EXIT_FAILURE);
    }
    return atoi(argv[1]);
}
int main (int argc, char *argv[]) {
    int sfd, s;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;
    int port = read_param(argc, argv);
    /* 创建并绑定socket */
    sfd = create_and_bind (port);
    if (sfd == -1) {
        perror("create_and_bind");
        abort ();
    }
    /* 设置sfd为非阻塞 */
    s = make_socket_non_blocking (sfd);
    if (s == -1) {
        perror("make_socket_non_blocking");
        abort ();
    }
    /* SOMAXCONN 为系统默认的backlog */
    s = listen (sfd, SOMAXCONN);
    if (s == -1) {
        perror ("listen");
        abort ();
    }
    efd = epoll_create1 (0);
    if (efd == -1) {
        perror ("epoll_create");
        abort ();
    }
    event.data.fd = sfd;
    /* 设置ET模式 */
    event.events = EPOLLIN | EPOLLET;
    s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
    if (s == -1) {
        perror ("epoll_ctl");
        abort ();
    }
    /* 创建事件数组并清零 */
    events = calloc (MAXEVENTS, sizeof event);
    /* 开始事件循环 */
    while (1) {
        int n, i;
        n = epoll_wait (efd, events, MAXEVENTS, -1);
        for (i = 0; i < n; i++) {
            if (events[i].events & (EPOLLERR | EPOLLHUP)) {
                /* 监控到错误或者挂起 */
                fprintf (stderr, "epoll error\n");
                close (events[i].data.fd);
                continue;
            } 
            if(events[i].events & EPOLLIN) {
                if (sfd == events[i].data.fd) {
                    /* 处理新接入的socket */
                    while (1) {
                        struct sockaddr_in sa;
                        socklen_t len = sizeof(sa);
                        char hbuf[INET_ADDRSTRLEN];
                        int infd = accept (sfd, (struct sockaddr*)&sa, &len);
                        if (infd == -1) {
                            if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                                /* 资源暂时不可读,再来一遍 */
                                break;
                            } else {
                                perror ("accept");
                                break;
                            }
                        }
                        inet_ntop(AF_INET, &sa.sin_addr, hbuf, sizeof(hbuf));
                        printf("Accepted connection on descriptor %d "
                                    "(host=%s, port=%d)\n", infd, hbuf, sa.sin_port);
                        /* 设置接入的socket为非阻塞 */
                        s = make_socket_non_blocking (infd);
                        if (s == -1) abort ();
                        /* 为新接入的socket注册事件 */
                        event.data.fd = infd;
                        event.events = EPOLLIN | EPOLLET;
                        s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
                        if (s == -1) {
                            perror ("epoll_ctl");
                            abort ();
                        }
                    }
                    //continue;
                } else {
                    /* 接入的socket有数据可读 */
                    while (1) {
                        ssize_t count;
                        char buf[512];
                        count = read (events[i].data.fd, buf, sizeof buf);
                        if (count == -1) {
                            if (errno != EAGAIN) {
                                perror ("read");
                                close(events[i].data.fd);
                            }
                            break;
                        } else if (count == 0) {
                            /* 数据读取完毕,结束 */
                            close(events[i].data.fd);
                            printf ("Closed connection on descriptor %d\n", events[i].data.fd);
                            break;
                        }
                        /* 输出到stdout */
                        s = write (1, buf, count);
                        if (s == -1) {
                            perror ("write");
                            abort ();
                        }
                        event.events = EPOLLOUT | EPOLLET;
                        epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
                    }
                }
            } else if((events[i].events & EPOLLOUT) && (events[i].data.fd != sfd)) {
                /* 接入的socket有数据可写 */
                write(events[i].data.fd, "it's echo man\n", 14);
                event.events = EPOLLET | EPOLLIN;
                epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
            }
        }
    }
    free (events);
    close (sfd);
    return EXIT_SUCCESS;
}

18.关于epoll的in事件和out事件

https://cloud.tencent.com/developer/article/1481046

19.关于derived-class-template特性

#include <iostream>
using namespace std;

class Message
{
public:
	virtual void SetMsg() = 0;
};

class QueryMessage : public Message
{
public:
	virtual void SetMsg()
	{
		
	}
	int num = 4;
};

class AnwserMessage : public Message
{
public:
	virtual void SetMsg()
	{}
	int num = 111;
};

class IMessageHandler
{
public:
	virtual void HandleMsg(Message *pMsg) = 0;
};

template<typename T> 
class MessageHandler : public IMessageHandler
{
public:
	virtual void HandleMsg(Message *pMsg)
	{
		T *p = dynamic_cast<T*>(pMsg);
		_ASSERT(p != NULL);
		MsgCallback(p);
	}
	void MsgCallback(T *pMsg)
	{
		cout << pMsg->num << endl;
	}
};
    
    主函数调用
    MessageHandler<AnwserMessage> ansMsgHandler;
MessageHandler<QueryMessage> queMsgHandler;
AnwserMessage *pAnsMsg = new AnwserMessage();
QueryMessage *pQueMsg = new QueryMessage();
ansMsgHandler.HandleMsg(pAnsMsg);
queMsgHandler.HandleMsg(pQueMsg);
Clone this wiki locally