-
Notifications
You must be signed in to change notification settings - Fork 128
/
Copy pathfibersocket.d
502 lines (414 loc) · 21.6 KB
/
fibersocket.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
/++
Fiber-based socket i/o built on Phobos' std.socket and Socket.select without any other dependencies.
This is meant to be a single-threaded event-driven basic network server.
---
void main() {
auto fm = new FiberManager();
// little tcp echo server
// exits when it gets "QUIT" on the socket.
Socket listener;
listener = fm.listenTcp6(6660, (Socket conn) {
while(true) {
char[128] buffer;
auto ret = conn.receive(buffer[]);
// keeps the Phobos interface so...
if(ret <= 0) // ...still need to check return values
break;
auto got = buffer[0 .. ret];
if(got.length >= 4 && got[0 .. 4] == "QUIT") {
listener.close();
break;
} else {
conn.send(got);
}
}
conn.close();
});
// simultaneously listen for and echo UDP packets
fm.makeFiber( () {
auto sock = fm.bindUdp4(9999);
char[128] buffer;
Address addr;
while(true) {
auto ret = sock.receiveFrom(buffer[], addr);
if(ret <= 0)
break;
import std.stdio;
auto got = buffer[0 .. ret];
// print it to the console
writeln("Received UDP ", got);
// send the echo
sock.sendTo(got, addr);
if(got.length > 4 && got[0 .. 4] == "QUIT") {
break; // stop processing udp when told to quit too
}
}
}).call(); // need to call it the first time ourselves to get it started
// run the events. This keeps going until there are no more registered events;
// so when all registered sockets are closed or abandoned.
//
// So this will return when both QUIT messages are received and all clients disconnect.
import std.stdio;
writeln("Entering.");
fm.run();
writeln("Exiting.");
}
---
Note that DNS address lookups here may still block the whole thread, but other methods on `Socket` are overridden in the subclass ([FiberSocket]) to `yield` appropriately, so you should be able to reuse most existing code that uses Phobos' Socket with little to no modification. However, since it keeps the same interface as the original object, remember you still need to check your return values!
There's two big differences:
$(NUMBERED_LIST
* You should not modify the `blocking` flag on the Sockets. It is already set for you and changing it will... probably not hurt, but definitely won't help.
* You shouldn't construct the Sockets yourself, nor call `connect` or `listen` on them. Instead, use the methods in the [FiberManager] class. It will ensure you get the right objects initialized in the right way with the minimum amount of blocking.
The `listen` family of functions accept a delegate that is called per each connection in a fresh fiber. The `connect` family of functions can only be used from inside an existing fiber - if you do it in a connection handler from listening, it is already set up. If it is from your main thread though, you'll get an assert error unless you make your own fiber ahead of time. [FiberManager.makeFiber] can construct one for you, or you can call `new Fiber(...)` from `import core.thread.fiber` yourself. Put all the work with the connection inside that fiber so the manager can do its work most efficiently.
)
There's several convenience functions to construct addresses for you too, or you may simply do `getAddress` or `new InternetAddress` and friends from `std.socket` yourself.
$(H2 Conceptual Overview)
A socket is a common programming object for communication over a network. Phobos has support for the basics and you can read more about that in my blog socket tutorial: http://dpldocs.info/this-week-in-d/Blog.Posted_2019_11_11.html
A lot of things describe [core.thread.fiber.Fiber|fibers] as lightweight threads, and that's not wrong, but I think that actually overcomplicates them. I prefer to think of a fiber as a function that can pause itself. You call it like a function, you write it like a function, but instead of always completing and returning, it can [core.thread.fiber.Fiber.yield|yield], which is putting itself on pause and returning to the caller. The caller then has a chance to resume the function when it chooses to simply by [core.thread.fiber.Fiber.call|calling] it again, and it picks up where it left off, or the caller can [core.thread.fiber.Fiber.reset|reset] the fiber function to the beginning and start over.
Fiber-based async i/o thus isn't as complicated as it sounds. The basic idea is you just write an ordinary function in the same style as if you were doing linear, blocking i/o calls, but instead of actually blocking, you register a callback to be woken up when the call can succeed, then yield yourself. This callback you register is simply your own fiber resume method; the event loop picks up where you left off.
With Phobos sockets (and most Unix i/o functions), you then retry the operation that would have blocked and carry on because the callback is triggered when the operation is ready. If you're using another async system, like Windows' Overlapped I/O callbacks, it is actually even easier, since that callback happens when the operation has already completed. In those cases, you register the fiber's resume function as the event callback, then yield. When you wake up, you can immediately carry on.
When a fiber is woken up, it continues executing from the last `yield` call. Just think of `yield` as being a pause button you press.
Understanding how it works means you can translate any callback-based i/o system to use fibers, since it would always follow that same pattern: register the fiber resume method, then yield. If it is a callback when the operation is ready, try it again when you wake up (so right after yield, you can loop back to the call), or if it is a callback when the operation is complete, you can immediately use the result when you wake up (so right after yield, you use it).
How does the event loop work? How do you know what fiber runs next? See, this is where the "lightweight thread" explanation complicates things. With a thread, the operating system is responsible for scheduling them and might even run several simultaneously. Fibers are much simpler: again, think of them as just being a function that can pause itself. Like with an ordinary function, just one runs at a time (in your thread anyway, of course adding threads can complicate fibers like it can complicate any other function). Like with an ordinary function, YOU choose which one you want to call and when. And when a fiber `yield`s, it is very much like an ordinary function `return`ing - it passes control back to you, the caller. The only difference is the Fiber object remembers where the function was when it yielded, so you can ask it to pick up where it left off.
The event loop therefore doesn't look all that special. If you've used `Socket.select` before, you'll recognize most of it. (`select` can be tricky to use though, `epoll` based code is actually simpler and more efficient... but this module only wanted to use Phobos' std.socket on its own. Besides, `select` still isn't that complicated, is cross-platform, and performs well enough for most tasks anyway.) It has a list of active sockets that it adds to either a read or write set, it calls the select function, then it loops back over and handles the events, if set. The only special thing is the event handler resumes the fiber instead of some other action.
I encourage you to view the source of this file and try to follow along. It isn't terribly long and can hopefully help to introduce you to a new world of possibilities. You can use Fibers in other cases too, for example, the game I'm working on uses them in enemy scripts. It sets up their action, then yields and lets the player take their turn. When it is the computer's turn again, the script fiber resumes. Same principle, simple code once you get to know it.
$(H2 Limitations)
`Socket.select` has a limit on the number of pending sockets at any time, and since you have to loop through them each iteration, it can get slow with huge numbers of concurrent connections. I'd note that you probably will not see this problem, but it certainly can happen. Similarly, there's `new` allocations for each socket and virtual calls throughout, which, again, probably will be good enough for you, but this module is not C10K+ "web scale".
It also cannot be combined with other event loops in the same thread. But, since the [FiberManager] only uses the thread you give it, you might consider running it here and other things along side in their own threads.
Credits:
vibe.d is the first time I recall even hearing of fibers and is the direct inspiration for this.
History:
Written December 26, 2020. First included in arsd-official dub release 9.1.
License:
BSL-1.0, same as Phobos
+/
module arsd.fibersocket; // previously known as "centivibe" since it provides like 1/100th the functionality of vibe.d
public import std.socket;
import core.thread.fiber;
/// just because I forget how to enable this, trivial helper function
void allowBroadcast(Socket socket) {
socket.setOption(SocketOptionLevel.SOCKET, SocketOption.BROADCAST, 1);
}
/// Convenience function to loop and send until it it all sent or an error occurs.
ptrdiff_t sendAll(Socket s, scope const(void)[] data) {
auto ol = data.length;
while(data.length) {
auto ret = s.send(data);
if(ret <= 0)
return ret;
data = data[ret .. $];
}
return ol;
}
/++
Subclass of Phobos' socket that basically works the same way, except it yields back to the [FiberManager] when it would have blocked.
You should not modify the `blocking` flag on these and generally not construct them, connect them, or listen on them yourself (let [FiberManager] do the setup for you), but otherwise they work the same as the original Phobos [std.socket.Socket] and implement the very same interface. You can call the exact same functions with original Sockets or FiberSockets.
+/
class FiberSocket : Socket {
enum PendingOperation {
none, read, write
}
protected this(FiberManager fm) pure nothrow @safe {
this.fm = fm;
super();
}
/// You should probably call the helper functions in [FiberManager] instead.
this(FiberManager fm, AddressFamily af, SocketType st, Fiber fiber) {
assert(fm !is null);
this.fm = fm;
this.fiber = fiber;
super(af, st);
this.blocking = false;
}
void callFiber() {
fiber.call();
}
private FiberManager fm;
private Fiber fiber;
private PendingOperation pendingOperation;
private void queue(PendingOperation op) @trusted nothrow {
pendingOperation = op;
fm.pendingSockets ~= this;
fiber.yield();
}
protected override Socket accepting() pure nothrow {
return new FiberSocket(fm);
}
private ptrdiff_t magic(scope ptrdiff_t delegate() @safe what, PendingOperation op) @trusted {
try_again:
auto r = what();
if(r == -1 && wouldHaveBlocked()) {
queue(op);
goto try_again;
}
return r;
}
/// Yielding override of the Phobos interface
override ptrdiff_t send(scope const(void)[] buf, SocketFlags flags) {
return magic( () { return super.send(buf, flags); }, PendingOperation.write);
}
/// ditto
override ptrdiff_t receive(scope void[] buf, SocketFlags flags) {
return magic( () { return super.receive(buf, flags); }, PendingOperation.read);
}
/// ditto
override ptrdiff_t receiveFrom(scope void[] buf, SocketFlags flags, ref Address from) @trusted {
return magic( () { return super.receiveFrom(buf, flags, from); }, PendingOperation.read);
}
/// ditto
override ptrdiff_t receiveFrom(scope void[] buf, SocketFlags flags) @trusted {
return magic( () { return super.receiveFrom(buf, flags); }, PendingOperation.read);
}
/// ditto
override ptrdiff_t sendTo(scope const(void)[] buf, SocketFlags flags, Address to) @trusted {
return magic( () { return super.sendTo(buf, flags, to); }, PendingOperation.write);
}
/// ditto
override ptrdiff_t sendTo(scope const(void)[] buf, SocketFlags flags) @trusted {
return magic( () { return super.sendTo(buf, flags); }, PendingOperation.write);
}
// lol overload sets
/// The Phobos overloads are still available too, they forward to the overrides in this class and thus work the same way.
alias send = typeof(super).send;
/// ditto
alias receive = typeof(super).receive;
/// ditto
alias sendTo = typeof(super).sendTo;
/// ditto
alias receiveFrom = typeof(super).receiveFrom;
}
/++
The FiberManager is responsible for running your socket event loop and dispatching events to your fibers. It is your main point of interaction with this library.
Generally, a `FiberManager` will exist in your `main` function and take over that thread when you call [run]. You construct one, set up your listeners, etc., then call `run` and let it do its thing.
+/
class FiberManager {
private FiberSocket[] pendingSockets;
private size_t defaultFiberStackSize;
/++
Params:
defaultFiberStackSize = size, in bytes, of the fiber stacks [makeFiber] returns. If 0 (the default), use the druntime default.
+/
this(size_t defaultFiberStackSize = 0) {
this.defaultFiberStackSize = defaultFiberStackSize;
}
/++
Convenience function to make a worker fiber based on the manager's configuration.
This is used internally when connections come in.
+/
public Fiber makeFiber(void delegate() fn) {
return defaultFiberStackSize ? new Fiber(fn, defaultFiberStackSize) : new Fiber(fn);
}
/++
Convenience functions for creating listening sockets. These are trivial forwarders to [listenStream], constructing the appropriate [std.socket.Address] object for you. Note the address lookup does NOT at this time use the fiber io and may thus block your thread.
You can `close` the returned socket when you want to stop listening, or just ignore it if you want to listen for the whole duration of the program.
+/
final Socket listenTcp6(ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
return listenStream(new Internet6Address(port), connectionHandler, backlog);
}
/// ditto
final Socket listenTcp6(string address, ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
return listenStream(new Internet6Address(address, port), connectionHandler, backlog);
}
/// ditto
final Socket listenTcp4(ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
return listenStream(new InternetAddress(port), connectionHandler, backlog);
}
/// ditto
final Socket listenTcp4(string address, ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
return listenStream(new InternetAddress(address, port), connectionHandler, backlog);
}
/// ditto
version(Posix)
final Socket listenUnix(string path, void delegate(Socket) connectionHandler, int backlog = 8) {
return listenStream(new UnixAddress(path), connectionHandler, backlog);
}
/++
Core listen function for streaming connection-oriented sockets (TCP, etc.)
It will:
$(LIST
* Create a [FiberSocket]
* Create fibers on it for each incoming connection which call your `connectionHandler`
* Bind to the given `Address`
* Call `socket.listen(backlog)`
* Start `accept`ing connections.
)
Returns: the listening socket. You shouldn't do much with this except maybe `close` it when you are done.
+/
Socket listenStream(Address addr, void delegate(Socket) connectionHandler, int backlog) {
assert(connectionHandler !is null, "null connectionHandler passed to a listenTcp function");
FiberSocket socket;
socket = new FiberSocket(this, addr.addressFamily, SocketType.STREAM, makeFiber(
delegate() {
while(socket.isAlive()) {
socket.queue(FiberSocket.PendingOperation.read); // put fiber on hold until ready to accept
auto ns = cast(FiberSocket) socket.accept();
ns.blocking = false;
ns.fiber = makeFiber(delegate() {
connectionHandler(ns);
});
// need to get the new connection started
ns.fiber.call();
}
}
));
socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
socket.bind(addr);
socket.blocking = false;
socket.listen(backlog);
socket.callFiber();
return socket;
}
/++
Convenience functions that forward to [connectStream] for the given protocol. They connect, send, and receive in an async manner, but do not create their own fibers - you must already be in one when you call this function.
Connections only work if you are already in a fiber. This is the case in a connectionHandler, but not from your main function. You'll have to make your own worker fiber. (But tbh if you only have one connection anyway, you might as well use a standard Socket.)
If you are already in a connection handler set in the listen family of functions, you're all set - those are automatically in fibers. If you are in main though, you need to make a worker fiber.
Making a worker fiber is simple enough. You can do it with `new Fiber` or with [FiberManager.makeFiber] (the latter just calls the former with a size argument set up in the FiberManager constructor).
---
auto fm = new FiberManager();
fm.makeFiber(() {
auto socket = fm.connectTcp4(...);
socket.send(...);
}).call(); // you must call it the first time yourself so it self-registers
---
OR
---
import core.thread.fiber;
auto fiber = new Fiber(() {
auto socket = fm.connectTcp4(...);
// do stuff in here
}).call(); // same deal, still need to call it the first time yourself to give it a chance to self-register
---
+/
final Socket connectTcp4(string address, ushort port) {
return connectStream(new InternetAddress(address, port));
}
/// ditto
final Socket connectTcp6(string address, ushort port) {
return connectStream(new Internet6Address(address, port));
}
/// ditto
version(Posix)
final Socket connectUnix(string path) {
return connectStream(new UnixAddress(path));
}
/++
Connects a streaming socket to the given address that will yield to this FiberManager instead of blocking.
+/
Socket connectStream(Address address) {
assert(Fiber.getThis !is null, "connect functions can only be used from inside preexisting fibers");
FiberSocket socket = new FiberSocket(this, address.addressFamily, SocketType.STREAM, Fiber.getThis);
socket.connect(address);
socket.queue(FiberSocket.PendingOperation.write); // wait for it to connect
scope(failure)
socket.close();
// and ensure the connection was successful before proceeding
int result;
if(socket.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, result) < 0)
throw new Exception("get socket error failed");
if(result != 0)
throw new Exception("Connect failed");
return socket;
}
/++
These are convenience functions that forward to [bindDatagram].
UDP sockets don't connect per se, but the basically work the same as [connectStream]. See the caveat about requiring a premade Fiber from that page.
+/
Socket bindUdp4(string address, ushort port) {
return bindDatagram(new InternetAddress(address, port));
}
/// ditto
Socket bindUdp4(ushort port) {
return bindDatagram(new InternetAddress(port));
}
/// ditto
Socket bindUdp6(string address, ushort port) {
return bindDatagram(new Internet6Address(address, port));
}
/// ditto
Socket bindUdp6(ushort port) {
return bindDatagram(new Internet6Address(port));
}
/++
Only valid from inside a worker fiber, see [makeFiber].
---
fm.makeFiber(() {
auto sock = fm.bindDatagram(new InternetAddress(5555));
sock.receiveFrom(....);
}).call(); // remember to call it the first time or it will never start!
+/
Socket bindDatagram(Address address) {
assert(Fiber.getThis !is null, "bind datagram functions can only be used from inside preexisting fibers");
FiberSocket socket = new FiberSocket(this, address.addressFamily, SocketType.DGRAM, Fiber.getThis);
socket.bind(address);
return socket;
}
/++
Runs the program and manages the fibers and connections for you, calling the appropriate functions when new events arrive.
Returns when no connections are left open.
+/
void run() {
auto readSet = new SocketSet;
auto writeSet = new SocketSet;
while(true) {
readSet.reset();
writeSet.reset();
int added;
for(int idx = 0; idx < pendingSockets.length; idx++) {
auto pending = pendingSockets[idx];
if(!pending.isAlive()) {
// order not important here since we haven't done any real work yet
// really it shouldn't even be on the list.
pendingSockets[idx] = pendingSockets[$-1];
pendingSockets = pendingSockets[0 .. $-1];
pendingSockets.assumeSafeAppend();
idx--;
continue;
}
final switch(pending.pendingOperation) {
case FiberSocket.PendingOperation.none:
assert(0); // why is this object on this list?!
case FiberSocket.PendingOperation.write:
writeSet.add(pending);
added++;
break;
case FiberSocket.PendingOperation.read:
readSet.add(pending);
added++;
break;
}
}
if(added == 0)
return; // no work to do, all connections closed
auto eventCount = Socket.select(readSet, writeSet, null);//, 5.seconds);
if(eventCount == -1)
continue;
for(int idx = 0; idx < pendingSockets.length && eventCount > 0; idx++) {
auto pending = pendingSockets[idx];
SocketSet toCheck;
final switch(pending.pendingOperation) {
case FiberSocket.PendingOperation.none:
break;
case FiberSocket.PendingOperation.write:
toCheck = writeSet;
break;
case FiberSocket.PendingOperation.read:
toCheck = readSet;
break;
}
if(toCheck is null)
continue;
if(toCheck.isSet(pending)) {
eventCount--;
import std.algorithm.mutation;
// the order is fairly important since previous calls can append to
// this again, and we want to be sure we process the ones in this batch
// before seeing anything from the next batch.
pendingSockets = remove!(SwapStrategy.stable)(pendingSockets, idx);
pendingSockets.assumeSafeAppend();
idx--; // the slot we used to have is now different, so it needs to be reprocessed
pending.fiber.call();
}
}
}
}
}