-
Notifications
You must be signed in to change notification settings - Fork 0
code learning
- 而当正确的设置了TCP_DEFER_ACCEPT选项之后,server端会在接收到最后一个ack之后,并不进入establish状态,而只是将这个socket标记为acked,然后丢掉这个ack。此时server端这个socket还是处于syn_recved,然后接下来就是等待client发送数据, 而由于这个socket还是处于syn_recved,因此此时就会被syn_ack定时器所控制,对syn ack进行重传
- 影响服务端性能的四大原因有:内存拷贝、内存分配、系统调用,进程切换。而nginx的deffered选项的作用是用来降低服务端进行epoll_ctl、epoll_wait(linux下)的次数(系统调用)和降低服务端保持的连接句柄数,从而提高服务端处理性能的设置。设置这个选项以后只有客户端有数据到达时才被epoll_wait监听到,再去accept和处理连接数据。 在ngx_http_init_connection中就将接收到的事件放到posted队列中等待线程的处理。而在未设置deferred 选项时,这个操作是要我们在将接收到的连接epoll_ctl(ADD)之后再epoll_wait探测到EPOLLIN事件时才做这个处理的,从这里看我们就减少了一次epoll_wait该事件的过程,而且等到有数据时才accept,那么和未设置defered选项时相比减少了accept到有数据之间这段事件服务器维护的一条连接。
void
ngx_http_init_connection(ngx_connection_t *c)
{
......
if (rev->ready) {
/* the deferred accept(), rtsig, aio, iocp */
if (ngx_use_accept_mutex) {
ngx_post_event(rev, &ngx_posted_events);
return;
}
......
}
#include <unistd.h>
ssize_t write(int fildes, const void *buf, size_t nbyte);
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset);
#include <sys/uio.h>
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt, off_t offset);
- pwrite() 会直接跳转到 offset 处,然后继续读取 nbyte 个字节,不过这不会影响到原文件的偏移量。这对于多线程的读写会比较友好,此时不会相互影响读写文件时的 offset 。
- write() 用来写入连续数据块,而 writev 会写入分散的数据块,当然,两个函数的最终结果都是将内容写入连续的空间。
- 对于 write() 函数来说很简单,而 writev() 返回的是字节数,但是入参却是 iovec ,也就意味着此时需要重新计算 iovec 了,包括了写入的数据可能会在某个 iovec 的中间位置。
- 最为高效的方法是预先分配好连续的内存空间,然后直接通过 write()函数写入。
ngx_chain_t *
ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
ngx_os_io_t ngx_os_io = {
ngx_unix_recv,
ngx_readv_chain,
ngx_udp_unix_recv,
ngx_unix_send,
ngx_udp_unix_send,
ngx_writev_chain,
0
};
void
ngx_event_accept(ngx_event_t *ev) {
...
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
...
}
if (c->tcp_nopush == NGX_TCP_NOPUSH_UNSET || c->tcp_nopush == NGX_TCP_NOPUSH_DISABLED) {
if (c->tcp_nodelay == NGX_TCP_NODELAY_SET) {
tcp_nodelay = 0;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *)&tcp_nodelay,
sizeof(int)) == -1)
{
err = ngx_socket_errno;
if (err != NGX_EINTR) {
wev->error = 1;
ngx_connection_error(c, err, "setsockopt(TCP_NODELAY) failed");
return;
}
} else {
c->tcp_nodelay = NGX_TCP_NODELAY_UNSET;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "unset tcp_nodelay!");
}
}
if (c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
if (ngx_tcp_nopush(c->fd) == NGX_ERROR) {
err = ngx_socket_errno;
if (err != NGX_EINTR) {
wev->error = 1;
ngx_connection_error(c, err, ngx_tcp_nopush_n " failed");
return;
}
} else {
c->tcp_nopush = NGX_TCP_NODELAY_SET;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "tcp_nopush set!");
}
}
}
var_location_gw.conf, 这个里面设置的都是ngx.var相关自定义参数, 在url的根路径include,配置如下,其中include的配置命令实现函数是ngx_conf_include,主要功能就是解析include对于的conf文件,把对应的遍历设置到cf里
location / {
include /usr/local/openresty/nginx/conf/gw_normal_custom_header.conf;
include /usr/local/openresty/nginx/conf/var_location_gw.conf;
rewrite_by_lua_file "../luascript/location_gw/gw_main.lua";
log_by_lua_file "../luascript/location_gw/section_end.lua";
access_by_lua_block {
}
content_by_lua_block {
local log = require "util.log"
ngx.log(ngx.INFO, log.content("access", "not support uri:", ngx.var.uri))
ngx.exit(403)
}
access_log logs/uni_access.log uni_main;
#access_log logs/access.log main;
}
--主函数
--识别uri,并设置domain,app,stream,tartet,suffix等信息,配置项是
section_identify.process(dconf)
local ctx = init_ctx(conf)
--鉴权
section_auth.process(conf, ctx)
--分发处理
section_dispatch.process(conf, ctx)
--选择调用链运行
function _M.chain_process(dirpath, conf, ctx)
for i=1, #(loadmodule.module_list[dirpath]) do
local module_name = loadmodule.module_list[dirpath][i];
local mdl = require (dirpath .. "/" .. module_name);
mdl.process(conf, ctx);
end
end
--分发模块调用链入口
function _M.process(conf, ctx)
local name = "location_gw/section_dispatch"
return processmodule.chain_process(name, conf, ctx)
end
对于section_dispatch或者section_auth,每个文件夹下按顺序写入对于的lua脚本即可顺序执行 针对所有请求,location到根目录,然后走gw_main, 构造子请求到各个location,子请求的api有ngx.exec和ngx.location.capture,具体区别如下:
-
ngx.exec不构造HTTP协议,代码里匹配到对应的location_name直接走ngx_http_core_run_phases
-
capture要构造http协议,但因为是本地disk_file配置,所以没有进程间通信,一定是本worker命中,另外支持并发多个子请求,子请求和父请求直接的ngx.var也是共享的
所有lua的module用如下代码串起来,即文件夹location_api/下编写N个子模块的入口,每个子模块的入口再新建文件夹,以此类推
local op = ngx.var.uri --string.match(ngx.var.uri, "/([^/]+)")
local name = api_list[op]
if name == nil then
ngx.log(ngx.ERR, "err uri, lock process module op: ", op)
ngx.exit(503)
else
local base = "location_api."
ngx.log(ngx.DEBUG, "process module:", base .. name)
local module = require(base .. name)
module.process()
end
MTU: Maximum Transmit Unit,最大传输单元,数据链路层控制,即物理接口(数据链路层)提供给其上层(通常是IP层)最大一次传输数据的大小;以普遍使用的以太网接口为例,缺省MTU=1500 Byte,这是以太网接口对IP层的约束,如果IP层有<=1500 byte 需要发送,只需要一个IP包就可以完成发送任务;如果IP层有> 1500 byte 数据需要发送,需要分片才能完成发送,这些分片有一个共同点,即IP Header ID相同。
MSS:Maximum Segment Size ,传输层设置,TCP提交给IP层最大分段大小,不包含TCP Header和 TCP Option,只包含TCP Payload ,MSS是TCP用来限制application层最大的发送字节数。如果底层物理接口MTU= 1500 byte,则 MSS = 1500- 20(IP Header) -20 (TCP Header) = 1460 byte,如果application 有2000 byte发送,需要两个segment才可以完成发送,第一个TCP segment = 1460,第二个TCP segment = 540。
MTU = MSS + IP_HEADER + TCP_HEADER
客户端:
服务端:
对于同时打开,他们仅仅建立一条连接,状态转移和一般的三次握手略有不同,逻辑如下
A --SYN_A--> B, B--SYN_B--> A
B -->SYN_B, ACK_A+1 --> A, A --SYN_A,ACK_B+1-->B
综上,同时打开是没有第三次ack的,TCP协议设计此处边界的时候,保留了同时连接的能力,用各自的SYN回应对方的SYN,4次信令
第一步:两边同时调用close函数发FIN给对方,A和B都进入FIN_WAIT1状态
第二步:A和B都收到对方法的FIN,同时进入CLOSING状态 第三步:A和B都收到对方发的ack,都进入TIME_WAIT
关闭NAGEL算法,在发送端取消小包发送限制,NAGEL算法主要逻辑如下:
对于规则4),就是说要求一个TCP连接上最多只能有一个未被确认的小数据包,在该分组的确认到达之前,不能发送其他的小数据包。如果某个小分组的确认被延迟了(案例中的40ms),那么后续小分组的发送就会相应的延迟。也就是说延迟确认影响的并不是被延迟确认的那个数据包,而是后续的应答包。
NAGEL的优劣:
TCP在接收到对端的报文后,并不会立即发送ack,而是等待一段时间发送ack,以便将ack和要发送的数据一块发送。当然ack不能无限延长,否则对端会认为包超时而造成报文重传。linux采用动态调节算法来确定延时的时间。
TCP在何时发送ACK的时候有如下规定:
2. 如果没有响应数据,ACK就会有一个延迟,以等待是否有响应数据一块发送,但是这个延迟一般在40ms~500ms之间,一般情况下在40ms左右,如果在40ms内有数据发送,那么ACK会随着数据一块发送,对于这个延迟的需要注意一下,这个延迟并不是指的是收到数据到发送ACK的时间延迟,而是内核会启动一个定时器,每隔200ms就会检查一次,比如定时器在0ms启动,200ms到期,180ms的时候data来到,那么200ms的时候没有响应数据,ACK仍然会被发送,这个时候延迟了20ms.
可以通过TCP_QUICKACK这个选项来启动快速ACK:
-
重传定时器,RTO,管理重传的间隔
-
坚持定时器,定期保持窗口大小信息的不断更新流动,即使接收端关闭了接受窗口
-
保活定时器,SO_KEEPALIVE
-
2MSL定时器,控制TIME_WAIT状态持续的时间
发送端迟迟未收到某个数据包对应的ACK,触发了RTO定时器,,这种机制下,每个数据包都有相应的计时器,一旦超过 RTO 而没有收到 ACK,就重发该数据包。没收到 ACK 的数据包都会存在重传缓冲区里,等到 ACK 后,就从缓冲区里删除。 首先明确一点,对 TCP 来说,超时重传是相当重要的事件(RTO 往往大于两倍的 RTT,超时往往意味着拥塞),一旦发生这种情况,TCP 不仅会重传对应数据段,还会降低当前的数据发送速率,因为TCP 会认为当前网络发生了拥塞。
对于完全的丢包,只能用超时重传,但是对于乱序的丢包,也就是后发的包先到了接收端,但之前某个包接收端未收到,则可以用快速重传的办法提升重传效率,即服务器如果收到乱序的包,也给客户端回复 ACK,只不过是重复的 ACK。假设发送端发送了5,6,7,8,9这几个包,但接收端只收到乱序的包 6,7,8,9 时,这时候接受端全都会发DUP-ACK = 5。这样,发送端就知道 5 发生了空缺。一般来说,如果客户端连续三次收到重复的 ACK,就会重传对应包,而不需要等到计时器超时。从抓包表现看,发送端收到接受端发来的3次DUP-ACK,就会重传对应的数据
sshresh的最小值一定是2个报文段,因为cwnd的最小值是1个报文段
慢启动:cwnd < sshresh时执行的一种指数递增算法,此算法的效果是发送数据的速率指数递增,一直递增到cwnd > sshtresh或者遇到拥塞(超时RTO或者DUP-ACK)为止,此处要强调一点。慢启动算法可出现在发送开始阶段以外,还可以在触发了超时的拥塞后,因为触发了超时的拥塞,cwnd设置成了1
那么思考一下,sshresh如果在遇到几次拥塞后降低到一个比较低的水平,后面能否再恢复到一个高值?(当然可以,cwnd在拥塞避免阶段是线性增加,当不丢包的收发足够多时,cwnd可以增长到之前sshresh的两倍以上,那么当再次遇到丢包时,sshresh降低为cwnd的一半,反而比之前增大了)
快速恢复:在没有触发超时重传,只有DUP-ACK的情况下,不把cwnd降低为1走慢启动(因为这样会把数据流量突然减少),只把sshresh设置为当前cwnd的一半,cwnd被设置为sshresh + 重复ACK的个数(一定是3)*报文段大小,所谓的快速恢复,其实就是遇到拥塞以后,将cwnd设置为约等于之前的一半走拥塞避免,保证数据流不会突然降低到1。
struct ngx_http_variable_s {
ngx_str_t name; /* must be first to build the hash */
ngx_http_set_variable_pt set_handler;
ngx_http_get_variable_pt get_handler;
uintptr_t data;
ngx_uint_t flags;
ngx_uint_t index;
};
以http的limit_rate为例,假设在lua代码里用ngx.var.limit_rate变量赋值,则对应C代码里的set_handler和get_handler如下变量注册代码
{ ngx_string("limit_rate"), ngx_http_variable_request_set_size,
ngx_http_variable_request_get_size,
offsetof(ngx_http_request_t, limit_rate),
NGX_HTTP_VAR_CHANGEABLE|NGX_HTTP_VAR_NOCACHEABLE, 0 },
所以当lua用ngx.var.limit_rate获取或者设置时,调用的都是对于的set_handler和get_handler
- 如果共享内存有锁,则父进程通过signal注册的函数,如果是SIGCHILD信号,则进入ngx_process_get_status,调用ngx_shmtx_force_unlock强制解锁
- 如果共享内存配置项有更改,如名称或者大小,则释放原来的共享内存,新建一个,详细代码参见ngx_init_cycle, 操作old_cycle
/*
1. 如果不用ngx_use_accept_mutex锁、或者开了reuseport,则直接ngx_add_event,去触发accept
2. 如果不在这里ngx_add_event,则到ngx_process_events_and_timers去抢锁,谁抢到谁去accept
*/
if (ngx_use_accept_mutex
#if (NGX_HAVE_REUSEPORT)
&& !ls[i].reuseport
#endif
)
{
continue;
}
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_int_t
ngx_clone_listening(ngx_conf_t *cf, ngx_listening_t *ls)
{
#if (NGX_HAVE_REUSEPORT)
ngx_int_t n;
ngx_core_conf_t *ccf;
ngx_listening_t ols;
if (!ls->reuseport) {
return NGX_OK;
}
ols = *ls;
ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
ngx_core_module);
for (n = 1; n < ccf->worker_processes; n++) {
/* create a socket for each worker process */
ls = ngx_array_push(&cf->cycle->listening);
if (ls == NULL) {
return NGX_ERROR;
}
*ls = ols;
ls->worker = n;
}
#endif
return NGX_OK;
}
根据配置的http和rtmp端口,创建好listen结构体信息,最后统一到ngx_open_listening_sockets绑定,
监听,这些都是在父进程里进行的
nginx先按照读取的配置,初始化http和rtmp的监听端口套接字。在fork了worker进程以后,
再初始化epoll模块,然后将已经监听起来的套接字加入epoll全局句柄ep
(每一个工作进程一个)。fork出子进程后,每个子进程都有一个全局的epoll句柄ep,
所有事件都注册到这个ep上
1. 启动时候加载顺序应该是按照CORE_MODULES配置,生成到ngx_modules数组里依次加载,
2. main函数里面,先执行ngx_init_cycle,listen所有模块的套接字,ngx_conf_handler里,
执行ngx_command_t 里的set函数, 如ngx_http_block和ngx_rtmp_block,block函数
里面有各个模块的postconfiguration;
3. ngx_spawn_process里调用fork出worker进程的时候,
ngx_worker_process_cycle(unix版本)里的
ngx_worker_process_init 调用每个模块的init_process,这个里面初始化epoll
首先是nginx给lua发HTTP的connect和PLAY消息,走到notify模块,然后发RTMP流,走到rtmp_cmd模块
最近定位了一个bug,代码模型如下:
初始化:
tctx->carg[4096];
tctx->barg.start = tctx->barg.pos = tctx->barg.last = tctx->carg;
tctx->barg.end = tctx->carg + sizeof(tctx->carg);
。。。。
barg是一个buf,把他和carg这一个char数组关联起来
使用:
tctx->barg.last = tctx->barg.pos; //此处bug,没有把pos置回carg的首地址
tctx->barg.last = ngx_snprintf(tctx->barg.last,
tctx->barg.end - tctx->barg.last, "Begin=%T",
tctx->begin);
问题就在注释处,因为没有把pos置回carg的首地址,所以在netcall模块发包后,pos每次向前移动
16字节(业务逻辑,定长的16),导致carg数组不停的追加"Begin=XX"的16字节字符串,所以每次
运行42分25秒左右,业务中断,因为每16字节的自加,把carg的4096个空间填满了
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
ps:buf结构体里面,start和end应该是不变的,表示这个buf的定长大小,但pos和last是动态的,
比如openresty里,每次到发送模块的时候,pos会往前递增发送的大小n,而业务层每次填包n字节,
会让last往后移动n
(1) 首先,父进程会读取配置,当读到http时,调用ngx_http_commands的ngx_http_block, 在ngx_http_block函数里,把要监听的端口socket创建好,每个创建好的句柄放到ngx_listening_t *ls里,注册回调函数ngx_http_init_connection, 代码如下:
static ngx_listening_t *
ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr)
{
ngx_listening_t *ls;
ngx_http_core_loc_conf_t *clcf;
ngx_http_core_srv_conf_t *cscf;
//ls是每个要监听的端口的句柄,如80等
ls = ngx_create_listening(cf, &addr->opt.sockaddr.sockaddr,
addr->opt.socklen);
if (ls == NULL) {
return NULL;
}
ls->addr_ntop = 1;
ls->handler = ngx_http_init_connection;//此处注册回调给ls
。。。
}
2) nginx启动后,fork出N个worker进程,每个进程会初始化epoll,调用 ngx_event_process_init,把每个监听的socket注册上accept回调ngx_event_accept 代码:
for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
//如果开启reuseport,则只有本worker的ls才往后走(每个ls在clone里都有N个worker)
if (ls[i].reuseport && ls[i].worker != ngx_worker) {
continue;
}
#endif
。。。
do {
i--;
c[i].data = next;
c[i].read = &cycle->read_events[i];
c[i].write = &cycle->write_events[i];
c[i].fd = (ngx_socket_t) -1;
next = &c[i];
} while (i);
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;
。。。
}
(3)当accept触发后,进入ngx_event_accept,从epoll的data指针获取ls:
void
ngx_event_accept(ngx_event_t *ev)
{
。。。。。。。
lc = ev->data;
ls = lc->listening;
ev->ready = 0;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"accept on %V, ready: %d", &ls->addr_text, ev->available);
do {
socklen = sizeof(ngx_sockaddr_t);
#if (NGX_HAVE_ACCEPT4)
if (use_accept4) {
s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
} else {
s = accept(lc->fd, &sa.sockaddr, &socklen);
}
#else
s = accept(lc->fd, &sa.sockaddr, &socklen);
#endif
。。。。
ls->handler(c);//调用到ngx_http_init_connection
}
RTMP模块道理也一样的,ngx_rtmp_add_listening里注册ngx_rtmp_init_connection
A. timewait状态的意义 1) 为什么TIME_WAIT状态需要经过2MSL(最大报文段生存时间,一般是30*2)才能返回到CLOSE状态?
虽然按道理,四个报文都发送完毕,我们可以直接进入CLOSE状态了,但是我们必须假象网络是不可靠的,有可以最后一个ACK丢失。所以
TIME_WAIT状态就是用来重发可能丢失的ACK报文,判断报文是否丢失主要是根据被动关闭段是否重发fin,如果重发fin,说明ack丢失了。如果发
RST,说明报文迟缓到达。
2) 如果上次链接server发的fin报文客户端没有收到,有可能在客户端当前端口复用后再收到这个fin报文,导致连接关闭。为了解决这个问题,
一个是在timewait状态端口不能拉起,既不用reuseaddr;还有一个是内核校验新连接的syn的seq,如果大于上一个连接的fin的seq,则丢弃fin
B. RST异常终止连接,参见第十部分
C. 检测半连接
n = c->recv(c, b->last, size);
//EAGAIN,加入定时器重新接受
if (n == NGX_AGAIN) {
if (!rev->timer_set) {
ngx_add_timer(rev, c->listening->post_accept_timeout);
ngx_reusable_connection(c, 1);
}
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_close_connection(c);
return;
}
/*
* We are trying to not hold c->buffer's memory for an idle connection.
*/
if (ngx_pfree(c->pool, b->start) == NGX_OK) {
b->start = NULL;
}
return;
}
//吃屎了,退出
if (n == NGX_ERROR) {
ngx_http_close_connection(c);
return;
}
//返回0,断链
if (n == 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"client closed connection");
ngx_http_close_connection(c);
return;
}
SO_LINGER选项用来改变此缺省设置。使用如下结构:
struct linger {
int l_onoff; /* 0 = off, nozero = on */
int l_linger; /* linger time */
};
有下列三种情况:
1)设置 l_onoff为0,则该选项关闭,l_linger的值被忽略,等于内核缺省情况,close调用会立即返回给调用者,如果可能将会传输任何未发送的数据;
2)设置 l_onoff为非0,l_linger为0,则套接口关闭时TCP夭折连接,TCP将丢弃保留在套接口发送缓冲区中的任何数据并发送一个RST给对方,而不是通常的四分组终止序列,这避免了TIME_WAIT状态;
3)设置 l_onoff 为非0,l_linger为非0,当套接口关闭时内核将拖延一段时间(由l_linger决定)。如果套接口缓冲区中仍残留数据,进程将处于睡眠状态,直 到(a)所有数据发送完且被对方确认,之后进行正常的终止序列(描述字访问计数为0)或(b)延迟时间到。此种情况下,应用程序检查close的返回值是非常重要的,如果在数据发送完并被确认前时间到,close将返回EWOULDBLOCK错误且套接口发送缓冲区中的任何数据都丢失。close的成功返回仅告诉我们发送的数据(和FIN)已由对方TCP确认,它并不能告诉我们对方应用进程是否已读了数据。如果套接口设为非阻塞的,它将不等待close完成。
半关闭: 调用shutdown,并且启用SHUT_WR宏,发送FIN给对端,表明本端程序只收不发,对端依然可发送数据,这个就是半关闭,TCP变成了单向收发; 对于SHUT_RD,可以在HTTP短连接的时候,服务器收到客户端的GET请求后,直接调用,这个不妨碍服务器给客户端发数据,客户端在发给服务器后可以直接SHUT_WR,客户端只收不发。
RST有三种条件可以产生:
1)SYN到达某端口,但端口没有监听
2)TCP想取消一个连接
3)TCP接收了一个根本不存在的连接上的分节
可以用SO_LINGER宏模拟accept之前客户端发RST给服务器的这种边界情况,这种情况下,服务器对于NGX_ECONNABORTED错误,应该直接忽略,继续accpet,而不是退出
数据包有一个seq_num,还有数据len,next_seq=seq_num+len,所以接收方会根据收到的每一个TCP包, 知道下一个要收包的序号(relative seq number),如果发现来包不是预计的序号,就触发乱序和重传
调用fork后,子进程会复制父进程的进程信息,如文件描述符,这样fd[0], fd[1]在子进程中有同样的一个拷贝,他们的引用都为2,也就是两个进程在使用他们。而实际上父进程只使用fd[1],子进程只使用fd[0],这样如果父进程不想使用fd[1]了,调用close()来关闭fd[1],这是不成功的,因为这样只是将fd[1]的引用减少到1,fd[1]没有被系统回收,仍然在子进程中有效,所以必须父进程close(fd[0]);子进程close(fd[1])
//reload信号的定义
{ ngx_signal_value(NGX_RECONFIGURE_SIGNAL),
"SIG" ngx_value(NGX_RECONFIGURE_SIGNAL),
"reload",
ngx_signal_handler },
//a. 首先接受到一个NGX_RECONFIGURE_SIGNAL信号,在ngx_signal_handler处理
case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
ngx_reconfigure = 1; //主要是父进程或者对应的reload进程设置这个
action = ", reconfiguring";
break;
//b. 父进程ngx_master_process_cycle发现ngx_reconfigure变为1,则启动N个worker进程,并用ngx_signal_worker_processes给当前的worker发NGX_SHUTDOWN_SIGNAL
信号: //reload,重新读配置 if (ngx_reconfigure) { ngx_reconfigure = 0;
//换版本咯
if (ngx_new_binary) {
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
ngx_start_cache_manager_processes(cycle, 0);
ngx_noaccepting = 0;
continue;
}
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");
cycle = ngx_init_cycle(cycle);
if (cycle == NULL) {
cycle = (ngx_cycle_t *) ngx_cycle;
continue;
}
ngx_cycle = cycle;
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_core_module);
//启动NGX_PROCESS_JUST_RESPAWN类型的子worker
//NGX_PROCESS_JUST_RESPAWN表示子进程退出时,父进程要重新拉起:
/*
if (signo == SIGCHLD) {
ngx_process_get_status();
}
*/
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_JUST_RESPAWN);/
ngx_start_cache_manager_processes(cycle, 1);
/* allow new processes to start */
ngx_msleep(100);
live = 1;
//发送NGX_SHUTDOWN_SIGNAL给worker
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
}
worker在ngx_channel_handler 将ngx_quit置1,然后ngx_worker_process_cycle里如下操作:
if (ngx_quit) {
ngx_quit = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"gracefully shutting down");
ngx_setproctitle("worker process is shutting down");
if (!ngx_exiting) {
ngx_exiting = 1;
//将当前还有用户带来连接从epoll摘除,不再触发,等用户退出
ngx_close_listening_sockets(cycle);
//关闭空闲的连接池,如果有事件先处理了
ngx_close_idle_connections(cycle);
}
}
c. worker进程在刚拉起的时候,ngx_worker_process_init里面就把每个channel的fd,加入epoll,监听是否有父进程发来的信号 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } ngx_channel_handler里面,一直调用ngx_read_channel来监听收到的信号,然后改变对应全局变量的值,作用到ngx_worker_process_cycle的死循环里。
ngx_palloc相对ngx_pnalloc,其会将申请的内存大小向上扩增到NGX_ALIGNMENT的倍数,以方便内存对齐,减少内存访问次数
如果一个 TCP 连接的一端启用了 Nagle‘s Algorithm(未开启TCP_NODELAY),而另一端启用了 TCP Delayed Ack(未开启TCP_QUICKACK),而发送的数据包又比较小,则可能会出现这样的情况: 发送端在等待接收端对上一个packet 的 Ack 才发送当前的 packet,而接收端则正好延迟了此 Ack 的发送,那么这个正要被发送的 packet 就会同样被延迟。 nginx里面,长连接时间的是NODELAY,但是短连接没有
//(1) ngx_signal_handler里注册了SIGCHLD信号,子进程挂了父进程拿到SIGCHLD信号,把ngx_reap搞成1
case SIGCHLD:
ngx_reap = 1;
break;
//(2) 父进程的大循环里面如下
if (ngx_reap) {
ngx_reap = 0;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");
//ngx_reap_children才是真正拉起新的子进程的函数
live = ngx_reap_children(cycle);
}
(1) 粘包的原因: a. 发送端需要等缓冲区满才发送出去,造成粘包 b. 接收方不及时接收缓冲区的包,造成多个包接收 具体点:
(1)发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据。若连续几次发送的数据都很少,通常TCP会根据优化算法把这些数据合成一包后一次发送出去,这样接收方就收到了粘包数据。
(2)接收方引起的粘包是由于接收方用户进程不及时接收数据,从而导致粘包现象。这是因为接收方先把收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据,若下一包数据到达时前一包数据尚未被用户进程取走,则下一包数据放到系统接收缓冲区时就接到前一包数据之后,而用户进程根据预先设定的缓冲区大小从系统接收缓冲区取数据,这样就一次取到了多包数据。
UDP不存在粘包问题,是由于UDP发送的时候,没有经过Negal算法优化,不会将多个小包合并一次发送出去。另外,在UDP协议的接收端,采用了链式结构来记录每一个到达的UDP包,这样接收端应用程序一次recv只能从socket接收缓冲区中读出一个数据包。也就是说,发送端send了几次,接收端必须recv几次(无论recv时指定了多大的缓冲区)
(2) 粘包的解决方案: 为了避免粘包现象,可采取以下几种措施:
a. 对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;
b. 对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;
c. 由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/epoll.h>
#include <string.h>
#define MAXEVENTS 64
int create_and_bind (int port) {
int sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(sfd == -1) {
return -1;
}
struct sockaddr_in sa;
bzero(&sa, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sfd, (struct sockaddr*)&sa, sizeof(struct sockaddr)) == -1) {
return -1;
}
return sfd;
}
int make_socket_non_blocking (int sfd) {
int flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1) {
return -1;
}
if(fcntl (sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
return -1;
}
return 0;
}
/* 此函数用于读取参数或者错误提示 */
int read_param(int argc, char *argv[]) {
if (argc != 2) {
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
exit (EXIT_FAILURE);
}
return atoi(argv[1]);
}
int main (int argc, char *argv[]) {
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;
int port = read_param(argc, argv);
/* 创建并绑定socket */
sfd = create_and_bind (port);
if (sfd == -1) {
perror("create_and_bind");
abort ();
}
/* 设置sfd为非阻塞 */
s = make_socket_non_blocking (sfd);
if (s == -1) {
perror("make_socket_non_blocking");
abort ();
}
/* SOMAXCONN 为系统默认的backlog */
s = listen (sfd, SOMAXCONN);
if (s == -1) {
perror ("listen");
abort ();
}
efd = epoll_create1 (0);
if (efd == -1) {
perror ("epoll_create");
abort ();
}
event.data.fd = sfd;
/* 设置ET模式 */
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1) {
perror ("epoll_ctl");
abort ();
}
/* 创建事件数组并清零 */
events = calloc (MAXEVENTS, sizeof event);
/* 开始事件循环 */
while (1) {
int n, i;
n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++) {
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
/* 监控到错误或者挂起 */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
if(events[i].events & EPOLLIN) {
if (sfd == events[i].data.fd) {
/* 处理新接入的socket */
while (1) {
struct sockaddr_in sa;
socklen_t len = sizeof(sa);
char hbuf[INET_ADDRSTRLEN];
int infd = accept (sfd, (struct sockaddr*)&sa, &len);
if (infd == -1) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
/* 资源暂时不可读,再来一遍 */
break;
} else {
perror ("accept");
break;
}
}
inet_ntop(AF_INET, &sa.sin_addr, hbuf, sizeof(hbuf));
printf("Accepted connection on descriptor %d "
"(host=%s, port=%d)\n", infd, hbuf, sa.sin_port);
/* 设置接入的socket为非阻塞 */
s = make_socket_non_blocking (infd);
if (s == -1) abort ();
/* 为新接入的socket注册事件 */
event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1) {
perror ("epoll_ctl");
abort ();
}
}
//continue;
} else {
/* 接入的socket有数据可读 */
while (1) {
ssize_t count;
char buf[512];
count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1) {
if (errno != EAGAIN) {
perror ("read");
close(events[i].data.fd);
}
break;
} else if (count == 0) {
/* 数据读取完毕,结束 */
close(events[i].data.fd);
printf ("Closed connection on descriptor %d\n", events[i].data.fd);
break;
}
/* 输出到stdout */
s = write (1, buf, count);
if (s == -1) {
perror ("write");
abort ();
}
event.events = EPOLLOUT | EPOLLET;
epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
}
}
} else if((events[i].events & EPOLLOUT) && (events[i].data.fd != sfd)) {
/* 接入的socket有数据可写 */
write(events[i].data.fd, "it's echo man\n", 14);
event.events = EPOLLET | EPOLLIN;
epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
}
}
}
free (events);
close (sfd);
return EXIT_SUCCESS;
}
https://cloud.tencent.com/developer/article/1481046
#include <iostream>
using namespace std;
class Message
{
public:
virtual void SetMsg() = 0;
};
class QueryMessage : public Message
{
public:
virtual void SetMsg()
{
}
int num = 4;
};
class AnwserMessage : public Message
{
public:
virtual void SetMsg()
{}
int num = 111;
};
class IMessageHandler
{
public:
virtual void HandleMsg(Message *pMsg) = 0;
};
template<typename T>
class MessageHandler : public IMessageHandler
{
public:
virtual void HandleMsg(Message *pMsg)
{
T *p = dynamic_cast<T*>(pMsg);
_ASSERT(p != NULL);
MsgCallback(p);
}
void MsgCallback(T *pMsg)
{
cout << pMsg->num << endl;
}
};
主函数调用
MessageHandler<AnwserMessage> ansMsgHandler;
MessageHandler<QueryMessage> queMsgHandler;
AnwserMessage *pAnsMsg = new AnwserMessage();
QueryMessage *pQueMsg = new QueryMessage();
ansMsgHandler.HandleMsg(pAnsMsg);
queMsgHandler.HandleMsg(pQueMsg);