1 module libasync.socket;
2 
3 import std.variant;
4 import std.exception : assumeWontThrow, ifThrown;
5 
6 import libasync.events;
7 import libasync.internals.logging;
8 import libasync.internals.socket_compat;
9 import libasync.internals.freelist;
10 import libasync.internals.queue;
11 
12 public import std.socket : SocketType, SocketOSException;
13 public import libasync.internals.socket_compat :
14 	SOCK_STREAM, SOCK_SEQPACKET, SOCK_DGRAM, SOCK_RAW, SOCK_RDM,
15 	AF_INET, AF_INET6,
16 	SOL_SOCKET,
17 	SO_REUSEADDR;
18 version (Posix) public import libasync.internals.socket_compat :
19 	AF_UNIX,
20 	SO_REUSEPORT;
21 
22 /// Returns `true` if the given type of socket is connection-oriented.
23 /// Standards: Conforms to IEEE Std 1003.1, 2013 Edition
24 bool isConnectionOriented(SocketType type) @safe pure nothrow @nogc
25 {
26 	final switch (type) with (SocketType) {
27 		case STREAM: return true;
28 		case SEQPACKET: return true;
29 
30 		case DGRAM: return false;
31 
32 		// Socket types not covered by POSIX.1-2013
33 		// are assumed to be connectionless.
34 		case RAW: return false;
35 		case RDM: return false;
36 	}
37 }
38 
39 /// Returns `true` if the given type of socket is datagram-oriented.
40 /// Standards: Conforms to IEEE Std 1003.1, 2013 Edition
41 bool isDatagramOriented(SocketType type) @safe pure nothrow @nogc
42 {
43 	final switch (type) with (SocketType) {
44 		case STREAM: return false;
45 
46 		case SEQPACKET: return true;
47 		case DGRAM: return true;
48 
49 		// Socket types not covered by POSIX.1-2013
50 		// are assumed to be datagram-oriented.
51 		case RAW: return true;
52 		case RDM: return true;
53 	}
54 }
55 
56 /**
57  * Represents a single message to be transferred over the network by $(D AsyncSocket).
58  * Authors: Moritz Maxeiner, moritz@ucworks.org
59  * Date: 2016
60  */
61 struct NetworkMessage
62 {
63 version (Posix) {
64 	import core.sys.posix.sys.socket : msghdr, iovec;
65 
66 	alias Header        = msghdr;
67 	alias Content       = iovec;
68 
69 	@property ubyte* contentStart() @trusted pure @nogc nothrow { return cast (ubyte*) m_content.iov_base; }
70 	@property void contentStart(ubyte* contentStart) @safe pure @nogc nothrow { m_content.iov_base = contentStart; }
71 
72 	@property size_t contentLength() @trusted pure @nogc nothrow { return m_content.iov_len; }
73 	@property void contentLength(size_t contentLength) @safe pure @nogc nothrow { m_content.iov_len = contentLength; }
74 } else version (Windows) {
75 	import libasync.internals.win32 : WSABUF, DWORD;
76 
77 	struct Header
78 	{
79 		sockaddr* msg_name;
80 		socklen_t msg_namelen;
81 		WSABUF*   msg_iov;
82 		size_t    msg_iovlen;
83 		DWORD     msg_flags;
84 	}
85 
86 	alias Content     = WSABUF;
87 
88 	@property ubyte* contentStart() @trusted pure @nogc nothrow { return m_content.buf; }
89 	@property void contentStart(ubyte* contentStart) @safe pure @nogc nothrow { m_content.buf = contentStart; }
90 
91 	@property size_t contentLength() @trusted pure @nogc nothrow { return m_content.len; }
92 	@property void contentLength(size_t contentLength) @safe pure @nogc nothrow { m_content.len = contentLength; }
93 } else { static assert(false, "Platform unsupported"); }
94 
95 	@property sockaddr* name() @trusted pure @nogc nothrow { return cast(sockaddr*) m_header.msg_name; }
96 	@property void name(sockaddr* name) @safe pure @nogc nothrow { m_header.msg_name = name; }
97 
98 	@property socklen_t nameLength() @trusted pure @nogc nothrow { return m_header.msg_namelen; }
99 	@property void nameLength(socklen_t nameLength) @safe pure @nogc nothrow { m_header.msg_namelen = nameLength; }
100 
101 	@property Content* buffers() @trusted pure @nogc nothrow { return m_header.msg_iov; }
102 	@property void buffers(Content* buffers) @safe pure @nogc nothrow { m_header.msg_iov = buffers; }
103 
104 	@property typeof(m_header.msg_iovlen) bufferCount() @trusted pure @nogc nothrow { return m_header.msg_iovlen; }
105 	@property void bufferCount(typeof(m_header.msg_iovlen) bufferCount) @safe pure @nogc nothrow { m_header.msg_iovlen = bufferCount; }
106 
107 	@property int flags() @trusted pure @nogc nothrow { return m_header.msg_flags; }
108 	@property void flags(int flags) @safe pure @nogc nothrow { m_header.msg_flags = flags; }
109 
110 private:
111 	Header m_header;
112 	Content m_content;
113 
114 	ubyte[] m_buffer;
115 	size_t m_count = 0;
116 
117 package:
118 	@property Header* header() const @trusted pure @nogc nothrow
119 	{ return cast(Header*) &m_header; }
120 
121 public:
122 	this(ubyte[] content, inout NetworkAddress* addr = null) @safe pure @nogc nothrow
123 	{
124 		if (addr is null) {
125 			name = null;
126 			nameLength = 0;
127 		} else {
128 			delegate () @trusted { name = cast(sockaddr*) addr.sockAddr; } ();
129 			nameLength = addr.sockAddrLen;
130 		}
131 
132 		buffers = &m_content;
133 		bufferCount = 1;
134 
135 		version (Posix) {
136 			m_header.msg_control = null;
137 			m_header.msg_controllen = 0;
138 		}
139 
140 		flags = 0;
141 
142 		m_buffer      = content;
143 		contentStart  = &content[0];
144 		contentLength = content.length;
145 	}
146 
147 	this(const ref NetworkMessage other) nothrow
148 	{
149 		m_header = cast(Header) other.m_header;
150 		m_content = cast(Content) other.m_content;
151 		buffers = &m_content;
152 		bufferCount = 1;
153 		m_buffer = contentStart[0..contentLength];
154 	}
155 
156 	this(this) @safe pure @nogc nothrow
157 	{ buffers = &m_content; }
158 
159 	@property size_t count() @safe pure @nogc nothrow
160 	{ return m_count; }
161 
162 	@property void count(size_t count) @safe pure @nogc nothrow
163 	{
164 		m_count = count;
165 		auto content = m_buffer[count .. $];
166 		contentStart = &content[0];
167 		contentLength = content.length;
168 	}
169 
170 	@property bool hasAddress() @safe pure @nogc nothrow
171 	{ return name !is null; }
172 
173 	@property bool receivedAny() @safe pure @nogc nothrow
174 	{ return m_count > 0; }
175 
176 	@property bool receivedAll() @safe pure @nogc nothrow
177 	{ return m_count == m_buffer.length; }
178 
179 	@property bool sent() @safe pure @nogc nothrow
180 	{ return m_count == m_buffer.length; }
181 
182 	@property ubyte[] transferred() @safe pure @nogc nothrow
183 	{ return m_buffer[0 .. m_count]; }
184 
185 	invariant
186 	{
187 		assert(m_count <= m_buffer.length, "Count of transferred bytes must not exceed the message buffer's length");
188 	}
189 
190 	mixin FreeList!(1_000);
191 }
192 
193 /**
194  * Represents a single request to asynchronously accept an incoming connection.
195  * Authors: Moritz Maxeiner, moritz@ucworks.org
196  * Date: 2016
197  */
198 struct AsyncAcceptRequest
199 {
200 	/// Passive socket to accept a peer's connection on
201 	AsyncSocket socket;
202 	/**
203 	 * Posix:   Active accepted peer socket
204 	 * Windows: Creater peer socket for AcceptEx
205 	 */
206 	fd_t peer;
207 	/// Called once the request completed successfully
208 	OnComplete onComplete;
209 	/// Peer socket family
210 	version (Posix) int family;
211 
212 	/**
213 	 * Must instantiate and return a new $(D AsyncSocket) for the connected peer,
214 	 * calling $(D AsyncSocket)'s constructor for existing OS handles in the process
215 	 * - the provided arguments are safe to call it with.
216 	 */
217 	alias OnComplete = AsyncSocket delegate(fd_t peer, int domain, SocketType type, int protocol) nothrow;
218 
219 	// These are used internally be the Windows event loop, do NOT modify them.
220 	version (Windows)
221 	{
222 		/// Outbut buffer where AcceptEx places local and remote address
223 		ubyte[2 * (16 + sockaddr_storage.sizeof)] buffer;
224 	}
225 
226 	mixin FreeList!1_000;
227 	mixin Queue;
228 }
229 
230 /**
231  * Represents a single request to asynchronously receive data.
232  * Authors: Moritz Maxeiner, moritz@ucworks.org
233  * Date: 2016
234  */
235 struct AsyncReceiveRequest
236 {
237 	AsyncSocket socket;       /// Socket to receive the message on
238 	NetworkMessage* message;  /// Storage to receive the message into
239 	OnComplete onComplete;    /// Called once the request completed successfully
240 	bool exact;               /// Whether the message's buffer should be filled completely
241 
242 	alias OnDataReceived = void delegate(ubyte[] data) nothrow;
243 	alias OnDataAvailable = void delegate() nothrow;
244 	alias OnComplete = Algebraic!(OnDataReceived, OnDataAvailable);
245 
246 	mixin FreeList!1_000;
247 	mixin Queue;
248 }
249 
250 /**
251  * Represents a single request to asynchronously send data.
252  * Authors: Moritz Maxeiner, moritz@ucworks.org
253  * Date: 2016
254  */
255 struct AsyncSendRequest
256 {
257 	AsyncSocket socket;      /// Socket to send the message on
258 	NetworkMessage* message; /// The message to be sent
259 	OnComplete onComplete;   /// Called once the request completed successfully
260 
261 	alias OnComplete = void delegate() nothrow;
262 
263 	mixin FreeList!1_000;
264 	mixin Queue;
265 }
266 
267 /**
268  * Proactor-model inspired asynchronous socket implementation.
269  * In contrast to POSIX.1-2013 readiness I/O - which essentially
270  * describes synchronous socket I/O operations with asynchronous
271  * notification of future blocking behaviour for said operations -
272  * this provides an API for asynchronous socket I/O operations:
273  * The three major socket operations accept, receive, and send
274  * modeled by this API will submit a request for asynchronous
275  * completion; towards that end, each call to these must be provided
276  * with a callback that will be called to notify you of said competion.
277  * It is therefore not recommended to keep calling any of these three
278  * methods in rapid succession, as they will normally not fail
279  * (bugs, memory exhaustion, or the operating system not supporting
280  * further pending requests excluded) to notify you that you should try
281  * again later. They will, however, notify you via the callbacks
282  * you provide once a request has been completed, or once there
283  * has been a socket error (refer to $(D OnError)). It follows
284  * that you should usually have only a small number of requests
285  * pending on a socket at the same time (preferably at most only a 
286  * single receive and a single send - respectively a single accept)
287  * and submit the next request only once the previous one
288  * (of the same type) notifies you of its completion.
289  * For connection-oriented, active sockets, connection completion and
290  * disconnect (by the remote peer) are handled by $(D OnConnect)
291  * and $(D OnClose) respectively; disconnecting from the remote peer
292  * can be initiated with $(D kill) and will not trigger $(D OnClose).
293  * Authors: Moritz Maxeiner, moritz@ucworks.org
294  * Date: 2016
295  */
296 final class AsyncSocket
297 {
298 	invariant
299 	{
300 		// There are
301 		//  - connection-oriented, datagram-oriented sockets,
302 		//  - connection-oriented, not datagram-oriented (stream) sockets,
303 		//  - connectionless, datagram-oriented sockets
304 		// There are no connectionless, not datagram-oriented sockets
305 		assert(m_connectionOriented || m_datagramOriented);
306 	}
307 
308 private:
309 	fd_t m_preInitializedSocket;    /// If constructing from an existing socket, this holds it until initialization.
310 
311 	fd_t m_socket = INVALID_SOCKET; /// The socket used internally.
312 	SocketInfo m_info;              /// Additional information about the socket.
313 	bool m_connectionOriented;      /// Whether this socket is connection-oriented.
314 	bool m_datagramOriented;        /// Whether this socket is datagram-oriented.
315 
316 	/**
317 	 * Whether this socket has been put into passive mode.
318 	 * See_Also: listen
319 	 */
320 	bool m_passive;
321 
322 	OnConnect m_onConnect; /// See_Also: onConnect
323 	OnClose m_onClose;   /// See_Also: onClose
324 	OnError m_onError;   /// See_Also: onError
325 
326 	/**
327 	 * If disabled: Every call to $(D receiveMessage) will be processed only once.
328 	 * After enabling: The first call to $(D receiveMessage) will be processed repeatedly.
329 	 *                 Any further calls to $(D receiveMessage) are forbidden (while enabled).
330 	 */
331 	bool m_receiveContinuously;
332 
333 	version (Posix) {
334 		package AsyncAcceptRequest.Queue m_pendingAccepts;   /// Queue of calls to $(D accept).
335 		package AsyncReceiveRequest.Queue m_pendingReceives; /// Queue of calls to $(D receiveMessage).
336 		package AsyncSendRequest.Queue m_pendingSends;       /// Queue of requests initiated by $(D sendMessage).
337 	}
338 
339 package:
340 	EventLoop m_evLoop; /// Event loop of the thread this socket was created on.
341 
342 public:
343 
344 	///
345 	@property NetworkAddress localAddress() const @trusted
346 	{
347 		import libasync.internals.socket_compat : getsockname;
348 
349 		NetworkAddress addr;
350 		auto addrLen = NetworkAddress.sockAddrMaxLen();
351 		if (SOCKET_ERROR == getsockname(m_socket, addr.sockAddr, &addrLen)) {
352 			throw new SocketOSException("Unable to obtain local socket address");
353 		}
354 		assert(addrLen <= addr.sockAddrLen,
355 			   "POSIX.1-2013 requires sockaddr_storage be able to store any socket address");
356 		assert(addr.family == m_info.domain, "Inconsistent address family");
357 		return addr;
358 	}
359 
360 	///
361 	@property NetworkAddress remoteAddress() const @trusted
362 	{
363 		import libasync.internals.socket_compat : getpeername;
364 
365 		NetworkAddress addr;
366 		auto addrLen = NetworkAddress.sockAddrMaxLen();
367 		if (SOCKET_ERROR == getpeername(m_socket, addr.sockAddr, &addrLen)) {
368 			throw new SocketOSException("Unable to obtain local socket address");
369 		}
370 		assert(addrLen <= addr.sockAddrLen,
371 			   "POSIX.1-2013 requires sockaddr_storage be able to store any socket address");
372 		assert(addr.family == m_info.domain, "Inconsistent address family");
373 		return addr;
374 	}
375 
376 	/// Get a socket option (taken from std.socket).
377 	/// Returns: The number of bytes written to $(D result).
378 	int getOption(int level, int option, void[] result) @trusted const
379 	{
380 		import libasync.internals.socket_compat : getsockopt;
381 
382 		socklen_t len = cast(socklen_t) result.length;
383 		if (SOCKET_ERROR == getsockopt(m_socket, level, option, result.ptr, &len)) {
384 			throw new SocketOSException("Unable to get socket option");
385 		}
386 		return len;
387 	}
388 
389 	/// Common case of getting integer and boolean options (taken from std.socket).
390 	int getOption(int level, int option, out int result) @trusted const
391 	{ return getOption(level, option, (&result)[0 .. 1]); }
392 
393 	/// Set a socket option (taken from std.socket).
394 	void setOption(int level, int option, void[] value) @trusted const
395 	{
396 		import libasync.internals.socket_compat : setsockopt;
397 
398 		if (SOCKET_ERROR == setsockopt(m_socket, level, option, value.ptr, cast(uint) value.length)) {
399 			throw new SocketOSException("Unable to set socket option");
400 		}
401 	}
402 
403 	/// Common case for setting integer and boolean options (taken from std.socket).
404 	void setOption(int level, int option, int value) @trusted const
405 	{ setOption(level, option, (&value)[0 .. 1]); }
406 
407 nothrow:
408 
409 package:
410 	mixin COSocketMixins;
411 
412 	/// Reset internal OS socket handle to $(D INVALID_SOCKET) and return its previous value
413 	fd_t resetHandle()
414 	{
415 		scope (exit) m_socket = INVALID_SOCKET;
416 		return m_socket;
417 	}
418 
419 	void handleError()
420 	{ if (m_onError !is null) m_onError(); }
421 
422 	void handleConnect()
423 	{ if (m_onConnect !is null) m_onConnect(); }
424 
425 	void handleClose()
426 	{ if (m_onClose !is null) m_onClose(); }
427 
428 	///
429 	@property SocketInfo info() const @safe pure @nogc
430 	{ return m_info; }
431 
432 	@property fd_t preInitializedHandle() @safe pure @nogc
433 	{ return m_preInitializedSocket; }
434 
435 	///
436 	@property void connectionOriented(bool connectionOriented) @safe pure @nogc
437 	{ m_connectionOriented = connectionOriented; }
438 
439 	/// Retrieves and clears the most recent error on this socket
440 	@property auto lastError() const
441 	{
442 		import libasync.internals.socket_compat : SOL_SOCKET, SO_ERROR;
443 		int code;
444 		assumeWontThrow(getOption(SOL_SOCKET, SO_ERROR, code));
445 		return code;
446 	}
447 
448 	/**
449 	 * Submits an asynchronous request on this socket to receive a $(D message).
450 	 * Upon successful reception $(D onReceive) will be called with the received data.
451 	 * $(D exact) indicates whether successful reception requires the entire buffer
452 	 * provided within $(D message) to have been filled. If a socket error occurs,
453 	 * but some data has already been received, then $(D onReceive) will be called
454 	 * with that partial data regardless of $(D exact).
455 	 * The $(D message) must have been allocated using $(D NetworkMessage.alloc) and
456 	 * will be freed with $(D NetworkMessage.free) after the completion callback returns,
457 	 * or once an error occurs that prevents said callback from being called.
458 	 */
459 	void receiveMessage(NetworkMessage* message, AsyncReceiveRequest.OnComplete onReceive, bool exact)
460 	in {
461 		assert(alive, "Cannot receive on an unrun / killed socket");
462 		assert(!m_passive, "Passive sockets cannot receive");
463 		assert(!m_connectionOriented || connected, "Established connection required");
464 		assert(!m_connectionOriented || !message || !message.hasAddress, "Connected peer is already known through .remoteAddress");
465 		version (Posix) assert(!m_receiveContinuously || m_pendingReceives.empty, "Cannot receive message manually while receiving continuously");
466 		assert(m_connectionOriented || !exact, "Connectionless datagram sockets must receive one datagram at a time");
467 		assert(!message || message.m_buffer.length > 0, "Only zero byte receives may refrain from providing a non-empty message buffer");
468 	} body {
469 		auto request = assumeWontThrow(AsyncReceiveRequest.alloc(this, message, onReceive, exact));
470 		m_evLoop.submitRequest(request);
471 	}
472 
473 	/**
474 	 * Submits an asynchronous request on this socket to send a $(D message).
475 	 * Upon successful transmission $(D onSend) will be called.
476 	 * The $(D message) must have been allocated using $(D NetworkMessage.alloc) and
477 	 * will be freed with $(D NetworkMessage.free) after the completion callback returns,
478 	 * or once an error occurs that prevents said callback from being called.
479 	 */
480 	void sendMessage(NetworkMessage* message, AsyncSendRequest.OnComplete onSend)
481 	in {
482 		assert(alive, "Cannot send on an unrun / killed socket");
483 		assert(!m_passive, "Passive sockets cannot receive");
484 		assert(!m_connectionOriented || connected, "Established connection required");
485 		assert(!m_connectionOriented || !message.hasAddress, "Connected peer is already known through .remoteAddress");
486 		assert(m_connectionOriented || message.hasAddress || assumeWontThrow({ remoteAddress; return true; }().ifThrown(false)), "Remote address required");
487 		assert(onSend !is null, "Completion callback required");
488 	} body {
489 		auto request = AsyncSendRequest.alloc(this, message, onSend);
490 		m_evLoop.submitRequest(request);
491 	}
492 
493 public:
494 
495 	/**
496 	 * Create a new asynchronous socket within $(D domain) of $(D type) using $(D protocol) from an
497 	 * existing OS $(D handle). It is your responsibility to ensure that $(D handle) - in addition
498 	 * to being a valid socket descriptor - fulfills all requirements to be used by $(D AsyncSocket):
499 	 *   POSIX: Must be non-blocking (keyword $(D O_NONBLOCK))
500 	 *   Windows: Must be overlapped (keyword $(D WSA_FLAG_OVERLAPPED))
501 	 */
502 	this(EventLoop evLoop, int domain, SocketType type, int protocol, fd_t handle) @safe @nogc
503 	in {
504 		assert(evLoop !is EventLoop.init);
505 		if (handle != INVALID_SOCKET) assert(handle.isSocket);
506 	} body {
507 		m_evLoop = evLoop;
508 		m_preInitializedSocket = handle;
509 		m_info = SocketInfo(domain, type, protocol);
510 		m_connectionOriented = type.isConnectionOriented;
511 		m_datagramOriented = type.isDatagramOriented;
512 
513 		version (Posix) {
514 			readBlocked = true;
515 			writeBlocked = true;
516 		}
517 	}
518 
519 	/**
520 	 * Create a new asynchronous socket within $(D domain) of $(D type) using $(D protocol).
521 	 * See_Also:
522 	 *     http://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html
523 	 */
524 	this(EventLoop evLoop, int domain, SocketType type, int protocol) @safe @nogc
525 	{ this(evLoop, domain, type, protocol, INVALID_SOCKET); }
526 
527 	/**
528 	 *  Convenience constructor for when there is only one protocol
529 	 *  supporting both $(D domain) and $(D type).
530 	 */
531 	this(EventLoop eventLoop, int domain, SocketType type) @safe @nogc
532 	{ this(eventLoop, domain, type, 0); }
533 
534 	/**
535 	 *  Convenience constructor if avoiding $(D SocketType) is preferred.
536 	 *  Supports only
537 	 *    $(D SOCK_STREAM),
538 	 *    $(D SOCK_SEQPACKET),
539 	 *    $(D SOCK_DGRAM),
540 	 *    $(D SOCK_RAW), and
541 	 *    $(D SOCK_RDM).
542 	 */
543 	this(EventLoop evLoop, int domain, int type, int protocol) @safe @nogc
544 	{
545 		auto socketType = { switch(type) {
546 			case SOCK_STREAM:    return SocketType.STREAM;
547 			case SOCK_SEQPACKET: return SocketType.SEQPACKET;
548 			case SOCK_DGRAM:     return SocketType.DGRAM;
549 			case SOCK_RAW:       return SocketType.RAW;
550 			case SOCK_RDM:       return SocketType.RDM;
551 			default:             assert(false, "Unsupported socket type");
552 		}}();
553 		this(evLoop, domain, socketType, protocol);
554 	}
555 
556 	/**
557 	 *  Convenience constructor for when there is only one protocol
558 	 *  supporting both $(D domain) and $(D type).
559 	 */
560 	this(EventLoop evLoop, int domain, int type) @safe @nogc
561 	{ this(evLoop, domain, type, 0); }
562 
563 	~this() { if (alive) kill(); }
564 
565 	/// The underlying OS socket descriptor
566 	@property fd_t handle() @safe pure @nogc
567 	{ return m_socket; }
568 
569 	/// Whether this socket establishes a (stateful) connection to a remote peer.
570 	/// See_Also: isConnectionOriented
571 	@property bool connectionOriented() @safe pure @nogc
572 	{ return m_connectionOriented; }
573 
574 	/// Whether this socket transceives datagrams.
575 	/// See_Also: isDatagramOriented
576 	@property bool datagramOriented() const @safe pure @nogc
577 	{ return m_datagramOriented; }
578 
579 	/// Whether this socket has been put into passive mode.
580 	/// See_Also: listen
581 	@property bool passive() const @safe pure @nogc
582 	{ return m_passive; }
583 
584 	/// Type of callback triggered when a connection-oriented socket completes connecting
585 	alias OnConnect = void delegate();
586 
587 	/// Sets this socket's $(D OnConnect) callback.
588 	@property void onConnect(OnConnect onConnect) @safe pure @nogc
589 	in { assert(m_connectionOriented); }
590 	body { m_onConnect = onConnect; }
591 
592 	/**
593 	 * Type of callback triggered when a connection-oriented, active socket completes disconnects.
594 	 * The socket will have been $(D kill)ed before the call.
595 	 */
596 	alias OnClose = void delegate();
597 
598 	/// Sets this socket's $(D OnClose) callback.
599 	@property void onClose(OnClose onClose) @safe pure @nogc
600 	in { assert(m_connectionOriented); }
601 	body { m_onClose = onClose; }
602 
603 	/**
604 	 * Type of callback triggered when a socker error occured.
605 	 * The socket will have been $(D kill)ed before the call.
606 	 */
607 	alias OnError = void delegate();
608 
609 	/// Sets callback for when a socket error has occurred.
610 	@property void onError(OnError onError) @safe pure @nogc
611 	{ m_onError = onError; }
612 	
613 	/// Creates the underlying OS socket - if necessary - and
614 	/// registers the event handler in the underlying OS event loop.
615 	bool run()
616 	in { assert(m_socket == INVALID_SOCKET); }
617 	body {
618 		m_socket = m_evLoop.run(this);
619 		return m_socket != INVALID_SOCKET;
620 	}
621 
622 	/**
623 	 * Assigns the network address pointed to by $(D addr),
624 	 * with $(D addrlen) specifying the size, in bytes, of
625 	 * this address, as the local name of this socket.
626 	 * Returns: $(D true) if the binding was successful.
627 	 * See_Also:
628 	 *     localAddress, http://pubs.opengroup.org/onlinepubs/9699919799/functions/bind.html
629 	 */
630 	bool bind(sockaddr* addr, socklen_t addrlen)
631 	{ return m_evLoop.bind(this, addr, addrlen); }
632 
633 	/// Convenience wrapper.
634 	bool bind(const ref NetworkAddress addr)
635 	{ return bind(cast(sockaddr*) addr.sockAddr, addr.sockAddrLen); }
636 
637 	/**
638 	 * Assigns the network address pointed to by $(D addr),
639 	 * with $(D addrlen) specifying the size, n bytes, of
640 	 * this address, as the name of the remote socket.
641 	 * For connection-oriented sockets, also start establishing a
642 	 * connection with that socket and call $(D onConnect) once it has.
643 	 * Returns: $(D true) if the name was successfully assigned and
644 	 *          - for connection-oriented sockets - if the connection is
645 	 *          now being established.
646 	 * See_Also:
647 	 *     remoteAddress, onConnect, http://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
648 	 */
649 	bool connect(sockaddr* addr, socklen_t addrlen)
650 	{ return m_evLoop.connect(this, addr, addrlen); }
651 
652 	/// Convenience wrapper.
653 	bool connect(const ref NetworkAddress to)
654 	{ return connect(cast(sockaddr*) to.sockAddr, to.sockAddrLen); }
655 
656 	/**
657 	 * Marks the socket as passive and enables acceptance of incoming connections
658 	 * into instances of $(D AsyncSocket). Only after calling this successfully
659 	 * may accept request be submitted via $(D accept).
660 	 */
661 	bool listen(int backlog = SOMAXCONN)
662 	{
663 		m_passive = true;
664 		return m_evLoop.listen(this, backlog);
665 	}
666 
667 	/**
668 	 * Submits an asynchronous request on this socket to accept an incoming
669 	 * connection. Upon successful acceptance of such a connection $(D onAccept)
670 	 * will be called with a new $(D AsyncSocket) representing the peer.
671 	 * See_Also: listen
672 	 */
673 	void accept(AsyncAcceptRequest.OnComplete onAccept)
674 	in {
675 		assert(alive, "Cannot accept on an unrun / killed socket");
676 		assert(m_connectionOriented && m_passive, "Can only accept on connection-oriented, passive sockets");
677 	}
678 	body {
679 		auto request = AsyncAcceptRequest.alloc(this, INVALID_SOCKET, onAccept);
680 		m_evLoop.submitRequest(request);
681 	}
682 
683 	/// Whether the socket is automatically resubmitting the current receive request
684 	/// upon its successful completion.
685 	@property bool receiveContinuously() const @safe pure @nogc
686 	{ return m_receiveContinuously; }
687 
688 	/// Toggles automatic resubmission of the current receive request upon its successful completion.
689 	/// Enabling this primes the socket so that the next $(D receiveMessage) will exhibit the behaviour.
690 	/// Any further calls to $(D receiveMessage) while active are forbidden; may only be disabled again
691 	/// in the completion callback provided with the $(D receiveMessage) that started it.
692 	/// After disabling, may not be reenabled in the same callback.
693 	@property void receiveContinuously(bool toggle) @safe pure
694 	in {
695 		version (Posix) assert(m_pendingReceives.empty, "Cannot start/stop receiving continuously when there are still pending receive requests");
696 	} body {
697 		if (m_receiveContinuously == toggle) return;
698 		m_receiveContinuously = toggle;
699 	}
700 
701 	/**
702 	 * Submits an asynchronous request on this socket to receive a $(D message).
703 	 * Upon successful reception $(D onReceive) will be called with the received data.
704 	 * $(D exact) indicates whether successful reception requires the entire buffer
705 	 * provided within $(D message) to have been filled. If a socket error occurs,
706 	 * but some data has already been received, then $(D onReceive) will be called
707 	 * with that partial data regardless of $(D exact).
708 	 */
709 	void receiveMessage(ref NetworkMessage message, AsyncReceiveRequest.OnDataReceived onReceive, bool exact = false)
710 	{
711 		receiveMessage(assumeWontThrow(NetworkMessage.alloc(message)),
712 		               AsyncReceiveRequest.OnComplete(onReceive),
713 		               exact);
714 	}
715 
716 	/**
717 	 * Submits an asynchronous request on this socket to receive $(D data).
718 	 * Upon successful reception of at most $(D data.length) bytes $(D onReceive)
719 	 * will be called with the received bytes as a slice of $(D data).
720 	 * See_Also: receiveExactly, receiveFrom
721 	 */
722 	void receive(ref ubyte[] data, AsyncReceiveRequest.OnDataReceived onReceive)
723 	{
724 		receiveMessage(NetworkMessage.alloc(data),
725 		               AsyncReceiveRequest.OnComplete(onReceive),
726 		               false);
727 	}
728 
729 	/**
730 	 * Submits a special asynchronous request on this socket to receive nothing.
731 	 * Also known as a "zero byte receive" $(D onReceive) will be called once
732 	 * there is new data on the socket that can be received immediately.
733 	 * Additionally, $(D onReceive) may also be called on connection-oriented sockets
734 	 * where the remote peer has disconnected gracefully with no further data being
735 	 * available for reception.
736 	 */
737 	void receive(AsyncReceiveRequest.OnDataAvailable onReceive)
738 	in {
739 		assert(!m_receiveContinuously, "Continuous receiving and zero byte receives may not be mixed");
740 	} body {
741 		receiveMessage(null,
742 		               AsyncReceiveRequest.OnComplete(onReceive),
743 		               false);
744 	}
745 
746 	/**
747 	 * Submits an asynchronous request on this socket to receive $(D data).
748 	 * Upon successful reception of exactly $(D data.lengt) bytes $(D onReceive)
749 	 * will be called with $(D data).
750 	 * See_Also: receive, receiveFrom
751 	 */
752 	void receiveExactly(ref ubyte[] data, AsyncReceiveRequest.OnDataReceived onReceive)
753 	{
754 		receiveMessage(NetworkMessage.alloc(data),
755 		               AsyncReceiveRequest.OnComplete(onReceive),
756 		               true);
757 	}
758 
759 	/**
760 	 * Submits an asynchronous request on this socket to receive $(D data) $(D from)
761 	 * an unknown sender, whose address will also be received.
762 	 * Upon successful reception of at most $(D data.length) bytes $(D onReceive)
763 	 * will be called with the received bytes as a slice of $(D data) and $(D from)
764 	 * will have been set to the sender's address.
765 	 * This method may only be called on connectionless sockets, to retrieve the
766 	 * remote address on connection-oriented sockets, refer to $(D remoteAddress).
767 	 * See_Also: receive, receiveExactly, remoteAddress
768 	 */
769 	void receiveFrom(ref ubyte[] data, ref NetworkAddress from, AsyncReceiveRequest.OnDataReceived onReceive)
770 	{
771 		receiveMessage(NetworkMessage.alloc(data, &from),
772 		               AsyncReceiveRequest.OnComplete(onReceive),
773 		               false);
774 	}
775 
776 	/**
777 	 * Submits an asynchronous request on this socket to send a $(D message).
778 	 * Upon successful transmission $(D onSend) will be called.
779 	 */
780 	void sendMessage(const ref NetworkMessage message, AsyncSendRequest.OnComplete onSend)
781 	{
782 		sendMessage(NetworkMessage.alloc(message), onSend);
783 	}
784 
785 	/**
786 	 * Submits an asynchronous request on this socket to send $(D data).
787 	 * Upon successful transmission $(D onSend) will be called.
788 	 */
789 	void send(in ubyte[] data, AsyncSendRequest.OnComplete onSend)
790 	{
791 		sendMessage(NetworkMessage.alloc(cast(ubyte[]) data), onSend);
792 	}
793 
794 	/**
795 	 * Submits an asynchronous request on this socket to send $(D data) $(D to)
796 	 * a specific recipient. Upon successful transmission $(D onSend) will be called.
797 	 */
798 	void sendTo(in ubyte[] data, const ref NetworkAddress to, AsyncSendRequest.OnComplete onSend)
799 	{
800 		sendMessage(NetworkMessage.alloc(cast(ubyte[]) data, &to), onSend);
801 	}
802 
803 	/**
804 	 * Removes the socket from the event loop, shutting it down if necessary,
805 	 * and cleans up the underlying resources. Only after this method has been
806 	 * called may the socket instance be deallocated.
807 	 */
808 	bool kill(bool forced = false)
809 	{
810 		m_receiveContinuously = false;
811 		return m_evLoop.kill(this, forced);
812 	}
813 
814 	/// Returns whether the socket has not yet been killed.
815 	@property bool alive() @safe @nogc {
816 		return m_socket != INVALID_SOCKET;
817 	}
818 
819 	/// Provides access to event loop information
820 	mixin DefStatus;
821 }
822 
823 
824 /// Holds additional information about a socket.
825 struct SocketInfo
826 {
827 	int domain;
828 	SocketType type;
829 	int protocol;
830 }
831 
832 /**
833  * Represents a network/socket address. (adapted from vibe.core.net)
834  */
835 struct NetworkAddress
836 {
837 	import std.bitmanip: nativeToBigEndian, bigEndianToNative;
838 
839 	import libasync.internals.socket_compat :
840 		sockaddr, sockaddr_storage,
841 		sockaddr_in, AF_INET,
842 		sockaddr_in6, AF_INET6;
843 	version (Posix) import libasync.internals.socket_compat :
844 		sockaddr_un, AF_UNIX;
845 
846 	package union {
847 		sockaddr addr = { AF_UNSPEC };
848 		sockaddr_storage addr_storage = void;
849 		sockaddr_in addr_ip4 = void;
850 		sockaddr_in6 addr_ip6 = void;
851 		version (Posix) sockaddr_un addr_un = void;
852 	}
853 
854 	this(sockaddr* addr, socklen_t addrlen) @trusted pure nothrow @nogc
855 	in {
856 		assert(addrlen <= sockaddr_storage.sizeof,
857 			   "POSIX.1-2013 requires sockaddr_storage be able to store any socket address");
858 	} body {
859 		import std.algorithm : copy;
860 		copy((cast(ubyte*) addr)[0 .. addrlen],
861 			 (cast(ubyte*) &addr_storage)[0 .. addrlen]);
862 	}
863 
864 	import std.socket : PhobosAddress = Address;
865 	this(PhobosAddress address) @safe pure nothrow @nogc
866 	{ this(address.name, address.nameLen); }
867 
868 	@property bool ipv6() const @safe pure nothrow @nogc
869 	{ return this.family == AF_INET6; }
870 
871 	/** Family (AF_) of the socket address.
872 	 */
873 	@property ushort family() const @safe pure nothrow @nogc
874 	{ return addr.sa_family; }
875 	/// ditto
876 	@property void family(ushort val) pure @safe nothrow @nogc
877 	{ addr.sa_family = cast(ubyte) val; }
878 
879 	/** The port in host byte order.
880 	 */
881 	@property ushort port()
882 	const @trusted @nogc pure nothrow {
883 		switch (this.family) {
884 			default: assert(false, "port() called for invalid address family.");
885 			case AF_INET: return bigEndianToNative!ushort((cast(ubyte*) &addr_ip4.sin_port)[0..2]);
886 			case AF_INET6: return bigEndianToNative!ushort((cast(ubyte*) &addr_ip6.sin6_port)[0..2]);
887 		}
888 	}
889 	/// ditto
890 	@property void port(ushort val)
891 	@trusted @nogc pure nothrow {
892 		switch (this.family) {
893 			default: assert(false, "port() called for invalid address family.");
894 			case AF_INET: addr_ip4.sin_port =  *cast(ushort*) nativeToBigEndian(val).ptr; break;
895 			case AF_INET6: addr_ip6.sin6_port = *cast(ushort*) nativeToBigEndian(val).ptr; break;
896 		}
897 	}
898 
899 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
900 	 */
901 	@property inout(sockaddr)* sockAddr() inout pure @safe @nogc nothrow { return &addr; }
902 
903 	/** Size of the sockaddr struct that is returned by sockAddr().
904 	 */
905 	@property socklen_t sockAddrLen()
906 	const @safe @nogc pure nothrow {
907 		switch (this.family) {
908 			default: assert(false, "Unsupported address family");
909 			case AF_UNSPEC: return addr_storage.sizeof;
910 			case AF_INET: return addr_ip4.sizeof;
911 			case AF_INET6: return addr_ip6.sizeof;
912 			version (Posix) case AF_UNIX: return addr_un.sizeof;
913 		}
914 	}
915 
916 	/++
917 	 + Maximum size of any sockaddr struct, regardless of address family.
918 	 +/
919 	static @property socklen_t sockAddrMaxLen()
920 	pure nothrow { return sockaddr_storage.sizeof; }
921 
922 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
923 	in { assert (family == AF_INET); }
924 	body { return &addr_ip4; }
925 
926 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
927 	in { assert (family == AF_INET6); }
928 	body { return &addr_ip6; }
929 
930 	version (Posix)
931 	@property inout(sockaddr_un)* sockAddrUnix() inout pure nothrow
932 	in { assert (family == AF_UNIX); }
933 	body { return &addr_un; }
934 
935 	/** Returns a string representation of the IP address
936 	*/
937 	string toAddressString()
938 	const {
939 		import std.array : appender;
940 		auto ret = appender!string();
941 		ret.reserve(40);
942 		toAddressString(str => ret.put(str));
943 		return ret.data;
944 	}
945 	/// ditto
946 	void toAddressString(scope void delegate(const(char)[]) @safe sink)
947 	const {
948 		import std.array : appender;
949 		import std.format : formattedWrite;
950 		import std..string : fromStringz;
951 
952 		ubyte[2] _dummy = void; // Workaround for DMD regression in master
953 
954 		switch (this.family) {
955 			default: assert(false, "toAddressString() called for invalid address family.");
956 			case AF_INET: {
957 				ubyte[4] ip = () @trusted { return (cast(ubyte*) &addr_ip4.sin_addr.s_addr)[0 .. 4]; } ();
958 				sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
959 				break;
960 			}
961 			case AF_INET6: {
962 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
963 				foreach (i; 0 .. 8) {
964 					if (i > 0) sink(":");
965 					_dummy[] = ip[i*2 .. i*2+2];
966 					sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy));
967 				}
968 				break;
969 			}
970 			version (Posix) {
971 			case AF_UNIX:
972 				sink.formattedWrite("%s", fromStringz(cast(char*) addr_un.sun_path));
973 				break;
974 			}
975 		}
976 	}
977 
978 	/** Returns a full string representation of the address, including the port number.
979 	*/
980 	string toString()
981 	const {
982 		import std.array : appender;
983 		auto ret = appender!string();
984 		toString(str => ret.put(str));
985 		return ret.data;
986 	}
987 	/// ditto
988 	void toString(scope void delegate(const(char)[]) @safe sink)
989 	const {
990 		import std.format : formattedWrite;
991 		switch (this.family) {
992 			default: assert(false, "toString() called for invalid address family.");
993 			case AF_INET:
994 				toAddressString(sink);
995 				sink.formattedWrite(":%s", port);
996 				break;
997 			case AF_INET6:
998 				sink("[");
999 				toAddressString(sink);
1000 				sink.formattedWrite("]:%s", port);
1001 				break;
1002 			version (Posix) {
1003 			case AF_UNIX:
1004 				toAddressString(sink);
1005 				break;
1006 			}
1007 		}
1008 	}
1009 }
1010 
1011 version (Posix)
1012 {
1013 	enum SOCKET_ERROR = -1;
1014 	enum INVALID_SOCKET = -1;
1015 } else version (Windows) {
1016 	import core.sys.windows.winsock2 : SOCKET_ERROR, INVALID_SOCKET;
1017 }
1018 
1019 /// Checks whether the given file descriptor refers to a valid socket.
1020 bool isSocket(fd_t fd) @trusted @nogc nothrow
1021 {
1022 	import libasync.internals.socket_compat : getsockopt, SOL_SOCKET, SO_TYPE;
1023 
1024 	int type;
1025 	socklen_t typesize = cast(socklen_t) type.sizeof;
1026 	return SOCKET_ERROR != getsockopt(fd, SOL_SOCKET, SO_TYPE, cast(char*) &type, &typesize);
1027 }