Skip to content

Commit

Permalink
Merge epoll feature branch to master (xapi-project#6005)
Browse files Browse the repository at this point in the history
Pending some final testing by QA, these commits have already been
reviewed on the feature branch.

This switches `select` calls to `Unixext.select` (which is implemented
using `epoll`), and implements the few performance sensitive parts using
epoll directly.
It also enables some more tests with >1024 fds.
  • Loading branch information
edwintorok authored Sep 19, 2024
2 parents bc10ca0 + 8cf7ab2 commit f51e7e7
Show file tree
Hide file tree
Showing 27 changed files with 183 additions and 26 deletions.
1 change: 1 addition & 0 deletions ezxenstore.opam
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ depends: [
"cmdliner" {with-test & >= "1.1.0"}
"logs"
"uuidm"
"xapi-stdext-unix"
"xenctrl"
"xenstore"
"xenstore_transport"
Expand Down
1 change: 1 addition & 0 deletions ezxenstore.opam.template
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ depends: [
"cmdliner" {with-test & >= "1.1.0"}
"logs"
"uuidm"
"xapi-stdext-unix"
"xenctrl"
"xenstore"
"xenstore_transport"
Expand Down
2 changes: 1 addition & 1 deletion ocaml/database/block_device_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ let accept_conn s latest_response_time =
let now = Unix.gettimeofday () in
let timeout = latest_response_time -. now in
(* Await an incoming connection... *)
let ready_to_read, _, _ = Unix.select [s] [] [] timeout in
let ready_to_read, _, _ = Xapi_stdext_unix.Unixext.select [s] [] [] timeout in
R.info "Finished selecting" ;
if List.mem s ready_to_read then
(* We've received a connection. Accept it and return the socket. *)
Expand Down
6 changes: 5 additions & 1 deletion ocaml/database/master_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ let open_secure_connection () =
~write_to_log:(fun x -> debug "stunnel: %s\n" x)
~verify_cert host port
@@ fun st_proc ->
let fd_closed = Thread.wait_timed_read Unixfd.(!(st_proc.Stunnel.fd)) 5. in
let fd_closed =
Xapi_stdext_threads.Threadext.wait_timed_read
Unixfd.(!(st_proc.Stunnel.fd))
5.
in
let proc_quit =
try
Unix.kill (Stunnel.getpid st_proc.Stunnel.pid) 0 ;
Expand Down
8 changes: 6 additions & 2 deletions ocaml/forkexecd/src/child.ml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ let handle_comms_sock comms_sock state =

let handle_comms_no_fd_sock2 comms_sock fd_sock state =
debug "Selecting in handle_comms_no_fd_sock2" ;
let ready, _, _ = Unix.select [comms_sock; fd_sock] [] [] (-1.0) in
let ready, _, _ =
Xapi_stdext_unix.Unixext.select [comms_sock; fd_sock] [] [] (-1.0)
in
debug "Done" ;
if List.mem fd_sock ready then (
debug "fd sock" ;
Expand All @@ -74,7 +76,9 @@ let handle_comms_no_fd_sock2 comms_sock fd_sock state =

let handle_comms_with_fd_sock2 comms_sock _fd_sock fd_sock2 state =
debug "Selecting in handle_comms_with_fd_sock2" ;
let ready, _, _ = Unix.select [comms_sock; fd_sock2] [] [] (-1.0) in
let ready, _, _ =
Xapi_stdext_unix.Unixext.select [comms_sock; fd_sock2] [] [] (-1.0)
in
debug "Done" ;
if List.mem fd_sock2 ready then (
debug "fd sock2" ;
Expand Down
1 change: 1 addition & 0 deletions ocaml/libs/ezxenstore/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
(re_export xenstore)
(re_export xenstore_transport)
threads.posix
xapi-stdext-unix
(re_export xenstore.unix))
)
2 changes: 1 addition & 1 deletion ocaml/libs/ezxenstore/core/watch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ let wait_for ~xs ?(timeout = 300.) (x : 'a t) =
let thread =
Thread.create
(fun () ->
let r, _, _ = Unix.select [p1] [] [] timeout in
let r, _, _ = Xapi_stdext_unix.Unixext.select [p1] [] [] timeout in
if r <> [] then
()
else
Expand Down
16 changes: 10 additions & 6 deletions ocaml/libs/http-lib/buf_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ let is_full ic = ic.cur = 0 && ic.max = Bytes.length ic.buf
let fill_buf ~buffered ic timeout =
let buf_size = Bytes.length ic.buf in
let fill_no_exc timeout len =
let l, _, _ = Unix.select [ic.fd] [] [] timeout in
if l <> [] then (
Xapi_stdext_unix.Unixext.with_socket_timeout ic.fd timeout @@ fun () ->
try
let n = Unix.read ic.fd ic.buf ic.max len in
ic.max <- n + ic.max ;
if n = 0 && len <> 0 then raise Eof ;
n
) else
-1
with Unix.Unix_error (Unix.(EAGAIN | EWOULDBLOCK), _, _) -> -1
in
(* If there's no space to read, shift *)
if ic.max = buf_size then shift ic ;
let space_left = buf_size - ic.max in
(* Read byte one by one just do make sure we don't buffer too many chars *)
let n =
fill_no_exc timeout (if buffered then space_left else min space_left 1)
fill_no_exc (Some timeout)
(if buffered then space_left else min space_left 1)
in
(* Select returned nothing to read *)
if n = -1 then raise Timeout ;
Expand All @@ -97,7 +97,11 @@ let fill_buf ~buffered ic timeout =
let tofillsz =
if buffered then buf_size - ic.max else min (buf_size - ic.max) 1
in
ignore (fill_no_exc 0.0 tofillsz)
(* cannot use 0 here, for select that'd mean timeout immediately, for
setsockopt it would mean no timeout.
So use a very short timeout instead
*)
ignore (fill_no_exc (Some 1e-6) tofillsz)
)

(** Input one line terminated by \n *)
Expand Down
9 changes: 1 addition & 8 deletions ocaml/libs/http-lib/http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,8 @@ let read_frame_header buf =
let prefix = Bytes.sub_string buf 0 frame_header_length in
try Scanf.sscanf prefix "FRAME %012d" (fun x -> Some x) with _ -> None

let set_socket_timeout fd t =
try Unix.(setsockopt_float fd SO_RCVTIMEO t)
with Unix.Unix_error (Unix.ENOTSOCK, _, _) ->
(* In the unit tests, the fd comes from a pipe... ignore *)
()

let read_http_request_header ~read_timeout ~total_timeout ~max_length fd =
Option.iter (fun t -> set_socket_timeout fd t) read_timeout ;
Unixext.with_socket_timeout fd read_timeout @@ fun () ->
let buf = Bytes.create (Option.value ~default:1024 max_length) in
let deadline =
Option.map
Expand Down Expand Up @@ -372,7 +366,6 @@ let read_http_request_header ~read_timeout ~total_timeout ~max_length fd =
check_timeout_and_read 0 length ;
(true, length)
in
set_socket_timeout fd 0. ;
(frame, Bytes.sub_string buf 0 headers_length, proxy)

let read_http_response_header buf fd =
Expand Down
15 changes: 15 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1062,3 +1062,18 @@ module Daemon = struct
true
with Unix.Unix_error _ -> false
end
let set_socket_timeout fd t =
try Unix.(setsockopt_float fd SO_RCVTIMEO t)
with Unix.Unix_error (Unix.ENOTSOCK, _, _) ->
(* In the unit tests, the fd comes from a pipe... ignore *)
()
let with_socket_timeout fd timeout_opt f =
match timeout_opt with
| Some t ->
if t < 1e-6 then invalid_arg (Printf.sprintf "Timeout too short: %g" t) ;
let finally () = set_socket_timeout fd 0. in
set_socket_timeout fd t ; Fun.protect ~finally f
| None ->
f ()
4 changes: 3 additions & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ val try_read_string : ?limit:int -> Unix.file_descr -> string

exception Timeout

val with_socket_timeout : Unix.file_descr -> float option -> (unit -> 'a) -> 'a

val time_limited_write : Unix.file_descr -> int -> bytes -> float -> unit

val time_limited_write_substring :
Expand Down Expand Up @@ -257,7 +259,7 @@ val domain_of_addr : string -> Unix.socket_domain option

val test_open : int -> unit
(** [test_open n] opens n file descriptors. This is useful for testing that the application makes no calls
to [Unix.select] that use file descriptors, because such calls will then immediately fail.
to [Xapi_stdext_unix.Unixext.select] that use file descriptors, because such calls will then immediately fail.
This assumes that [ulimit -n] has been suitably increased in the test environment.
Expand Down
1 change: 1 addition & 0 deletions ocaml/message-switch/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
rpclib.json
threads.posix
xapi-stdext-threads
xapi-stdext-unix
)
(preprocess (per_module ((pps ppx_deriving_rpc) Protocol_unix_scheduler)))
)
Expand Down
4 changes: 2 additions & 2 deletions ocaml/networkd/lib/jsonrpc_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ let timeout_read fd timeout =
in
let rec inner max_time max_bytes =
let ready_to_read, _, _ =
try Unix.select [fd] [] [] (to_s max_time)
try Xapi_stdext_unix.Unixext.select [fd] [] [] (to_s max_time)
with
(* in case the unix.select call fails in situation like interrupt *)
| Unix.Unix_error (Unix.EINTR, _, _) ->
Expand Down Expand Up @@ -96,7 +96,7 @@ let timeout_write filedesc total_length data response_time =
in
let rec inner_write offset max_time =
let _, ready_to_write, _ =
try Unix.select [] [filedesc] [] (to_s max_time)
try Xapi_stdext_unix.Unixext.select [] [filedesc] [] (to_s max_time)
with
(* in case the unix.select call fails in situation like interrupt *)
| Unix.Unix_error (Unix.EINTR, _, _) ->
Expand Down
2 changes: 2 additions & 0 deletions ocaml/tests/common/suite_init.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
let harness_init () =
(* before any calls to XAPI code, to catch early uses of Unix.select *)
Xapi_stdext_unix.Unixext.test_open 1024 ;
Xapi_stdext_unix.Unixext.mkdir_safe Test_common.working_area 0o755 ;
(* Alcotest hides the standard output of successful tests,
so we will probably not exceed the 4MB limit in Travis *)
Expand Down
38 changes: 38 additions & 0 deletions ocaml/tests/dune
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,42 @@

(env (_ (env-vars (XAPI_TEST 1))))

; disassemble, but without sources
; (source lookup doesn't work for all dependencies, and is very slow on a large binary)
; To make debugging easier the disassembly is saved to a file instead of piping
(rule
(deps ../xapi/xapi_main.exe)
(target xapi.disasm)
(package xapi)
(action
(with-stdout-to %{target}
(run objdump %{deps} --wide -d --no-show-raw-insn)
)
)
)

(rule
(deps ../xenopsd/xc/xenops_xc_main.exe)
(target xenops_xc_main.disasm)
(package xapi-xenopsd-xc)
(action
(with-stdout-to %{target}
(run objdump %{deps} --wide -d --no-show-raw-insn)
)
)
)

(rule
(alias runtest)
(package xapi)
(deps (:script ./unix_select.gawk) (:disasm xapi.disasm))
(action (run gawk -f ./%{script} %{disasm}))
)
(rule
(alias runtest)
(package xapi-xenopsd-xc)
(deps (:script ./unix_select.gawk) (:disasm xenops_xc_main.disasm))
(action (run gawk -f ./%{script} %{disasm}))
)

(data_only_dirs test_data tests)
80 changes: 80 additions & 0 deletions ocaml/tests/unix_select.gawk
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
BEGIN { n = 0; }
# A function definition and its address
# Remember its address and update current symbol
# 0000000000850330 <unix_select>:
match($0, /^0*([0-9a-fA-F]+) <([^>]+)>/, symdef) {
SYMBOL = symdef[2];
ADDR = symdef[1];

SYMADDR[ADDR] = SYMBOL;

if (ADDR in LOADED) {
for (idx in LOADED[ADDR]) {
caller = LOADED[ADDR][idx]
CALLS[symdef[2]][n++] = caller
}
}
}

# Indirect calls (mostly used for C stubs)
# mov $0x850330,%rax
# call 872834 <caml_c_call>
match($0, /mov.*0x([0-9a-fA-F]*),/, addr) {
# this will have gaps, but the indexes will be unique
LOADED[addr[1]][n++] = SYMBOL
}

match($0, /call.*<([^>]+)>/, call) {
CALLS[call[1]][n++] = SYMBOL
}

END {
SYM = "unix_select"
had_calls = 0
if (SYM in CALLS) {
for (idx in CALLS[SYM]) {
caller = CALLS[SYM][idx];
print "--"
if (caller ~ /caml(Thread|Unix__fun_).*/) {
# direct calls from these functions to unix_select are expected
print caller "[expected]"
} else {
print caller "[bad]"
had_calls++
}
if (caller in CALLS) {
for (idx2 in CALLS[caller]) {
caller2 = CALLS[caller][idx2];
if (caller2 ~ /caml(Thread).*/) {
print caller2 "[expected]"
} else {
print caller2 "[bad]"
had_calls++
}
if (caller2 in CALLS) {
for (idx3 in CALLS[caller2]) {
caller3 = CALLS[caller2][idx3];
# but we don't expect anyone calling these functions from OCaml code,
# reject that
had_calls++
print caller3 "[bad]"
if (caller3 in CALLS) {
for (idx4 in CALLS[caller3]) {
caller4 = CALLS[caller3][idx4];
print caller4 "[bad]"
for (idx5 in CALLS[caller4]) {
caller5 = CALLS[caller4][idx5];
print caller5 "[bad]"
}
}
}
}
}
}
}
}
}
if (had_calls > 0) {
exit 2
}
}
1 change: 1 addition & 0 deletions ocaml/xapi-idl/lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
xapi-open-uri
xapi-stdext-pervasives
xapi-stdext-threads
xapi-stdext-unix
xapi-inventory
xmlm
)
Expand Down
2 changes: 2 additions & 0 deletions ocaml/xapi-idl/lib_test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
xapi-idl.xen
xapi-idl.xen.interface
xapi-log
xapi-stdext-unix
xapi-stdext-threads
)
(preprocess (per_module ((pps ppx_deriving_rpc) Task_server_test Updates_test))))

Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi-idl/lib_test/scheduler_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) ?(time_max = 60.) f =
()
in
f callback ;
let ready = Thread.wait_timed_read rd time_max in
let ready = Xapi_stdext_threads.Threadext.wait_timed_read rd time_max in
match (ready, !after) with
| true, None ->
Alcotest.fail "pipe ready to read, but after is not set"
Expand Down
3 changes: 2 additions & 1 deletion ocaml/xe-cli/newcli.ml
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,8 @@ let main_loop ifd ofd permitted_filenames =
finished := true
else
let r, _, _ =
Unix.select [Unix.stdin; fd] [] [] heartbeat_interval
Xapi_stdext_unix.Unixext.select [Unix.stdin; fd] [] []
heartbeat_interval
in
let now = Unix.time () in
if now -. !last_heartbeat >= heartbeat_interval then (
Expand Down
1 change: 1 addition & 0 deletions ocaml/xenopsd/cli/dune
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
xapi-idl.xen.interface
xapi-idl.xen.interface.types
xapi-stdext-pervasives
xapi-stdext-unix
yojson
)
(preprocess (per_module ((pps ppx_deriving_rpc) Common Xn_cfg_types)))
Expand Down
4 changes: 3 additions & 1 deletion ocaml/xenopsd/cli/xn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,9 @@ let raw_console_proxy sockaddr =
) else if !final then
finished := true
else
let r, _, _ = Unix.select [Unix.stdin; fd] [] [] (-1.) in
let r, _, _ =
Xapi_stdext_unix.Unixext.select [Unix.stdin; fd] [] [] (-1.)
in
if List.mem Unix.stdin r then (
let b =
Unix.read Unix.stdin buf_remote !buf_remote_end
Expand Down
1 change: 1 addition & 0 deletions ocaml/xsh/dune
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
safe-resources
xapi-consts
xapi-log
xapi-stdext-unix
)
)

Loading

0 comments on commit f51e7e7

Please sign in to comment.