1 module libasync.socket; 2 3 import std.variant; 4 import std.exception : assumeWontThrow, ifThrown; 5 6 import libasync.events; 7 import libasync.internals.logging; 8 import libasync.internals.socket_compat; 9 import libasync.internals.freelist; 10 import libasync.internals.queue; 11 12 public import std.socket : SocketType, SocketOSException; 13 public import libasync.internals.socket_compat : 14 SOCK_STREAM, SOCK_SEQPACKET, SOCK_DGRAM, SOCK_RAW, SOCK_RDM, 15 AF_INET, AF_INET6, 16 SOL_SOCKET, 17 SO_REUSEADDR; 18 version (Posix) public import libasync.internals.socket_compat : 19 AF_UNIX, 20 SO_REUSEPORT; 21 22 /// Returns `true` if the given type of socket is connection-oriented. 23 /// Standards: Conforms to IEEE Std 1003.1, 2013 Edition 24 bool isConnectionOriented(SocketType type) @safe pure nothrow @nogc 25 { 26 final switch (type) with (SocketType) { 27 case STREAM: return true; 28 case SEQPACKET: return true; 29 30 case DGRAM: return false; 31 32 // Socket types not covered by POSIX.1-2013 33 // are assumed to be connectionless. 34 case RAW: return false; 35 case RDM: return false; 36 } 37 } 38 39 /// Returns `true` if the given type of socket is datagram-oriented. 40 /// Standards: Conforms to IEEE Std 1003.1, 2013 Edition 41 bool isDatagramOriented(SocketType type) @safe pure nothrow @nogc 42 { 43 final switch (type) with (SocketType) { 44 case STREAM: return false; 45 46 case SEQPACKET: return true; 47 case DGRAM: return true; 48 49 // Socket types not covered by POSIX.1-2013 50 // are assumed to be datagram-oriented. 51 case RAW: return true; 52 case RDM: return true; 53 } 54 } 55 56 /** 57 * Represents a single message to be transferred over the network by $(D AsyncSocket). 58 * Authors: Moritz Maxeiner, moritz@ucworks.org 59 * Date: 2016 60 */ 61 struct NetworkMessage 62 { 63 version (Posix) { 64 import core.sys.posix.sys.socket : msghdr, iovec; 65 66 alias Header = msghdr; 67 alias Content = iovec; 68 69 @property ubyte* contentStart() @trusted pure @nogc nothrow { return cast (ubyte*) m_content.iov_base; } 70 @property void contentStart(ubyte* contentStart) @safe pure @nogc nothrow { m_content.iov_base = contentStart; } 71 72 @property size_t contentLength() @trusted pure @nogc nothrow { return m_content.iov_len; } 73 @property void contentLength(size_t contentLength) @safe pure @nogc nothrow { m_content.iov_len = contentLength; } 74 } else version (Windows) { 75 import libasync.internals.win32 : WSABUF, DWORD; 76 77 struct Header 78 { 79 sockaddr* msg_name; 80 socklen_t msg_namelen; 81 WSABUF* msg_iov; 82 size_t msg_iovlen; 83 DWORD msg_flags; 84 } 85 86 alias Content = WSABUF; 87 88 @property ubyte* contentStart() @trusted pure @nogc nothrow { return m_content.buf; } 89 @property void contentStart(ubyte* contentStart) @safe pure @nogc nothrow { m_content.buf = contentStart; } 90 91 @property size_t contentLength() @trusted pure @nogc nothrow { return m_content.len; } 92 @property void contentLength(size_t contentLength) @safe pure @nogc nothrow { m_content.len = contentLength; } 93 } else { static assert(false, "Platform unsupported"); } 94 95 @property sockaddr* name() @trusted pure @nogc nothrow { return cast(sockaddr*) m_header.msg_name; } 96 @property void name(sockaddr* name) @safe pure @nogc nothrow { m_header.msg_name = name; } 97 98 @property socklen_t nameLength() @trusted pure @nogc nothrow { return m_header.msg_namelen; } 99 @property void nameLength(socklen_t nameLength) @safe pure @nogc nothrow { m_header.msg_namelen = nameLength; } 100 101 @property Content* buffers() @trusted pure @nogc nothrow { return m_header.msg_iov; } 102 @property void buffers(Content* buffers) @safe pure @nogc nothrow { m_header.msg_iov = buffers; } 103 104 @property typeof(m_header.msg_iovlen) bufferCount() @trusted pure @nogc nothrow { return m_header.msg_iovlen; } 105 @property void bufferCount(typeof(m_header.msg_iovlen) bufferCount) @safe pure @nogc nothrow { m_header.msg_iovlen = bufferCount; } 106 107 @property int flags() @trusted pure @nogc nothrow { return m_header.msg_flags; } 108 @property void flags(int flags) @safe pure @nogc nothrow { m_header.msg_flags = flags; } 109 110 private: 111 Header m_header; 112 Content m_content; 113 114 ubyte[] m_buffer; 115 size_t m_count = 0; 116 117 package: 118 @property Header* header() const @trusted pure @nogc nothrow 119 { return cast(Header*) &m_header; } 120 121 public: 122 this(ubyte[] content, inout NetworkAddress* addr = null) @safe pure @nogc nothrow 123 { 124 if (addr is null) { 125 name = null; 126 nameLength = 0; 127 } else { 128 delegate () @trusted { name = cast(sockaddr*) addr.sockAddr; } (); 129 nameLength = addr.sockAddrLen; 130 } 131 132 buffers = &m_content; 133 bufferCount = 1; 134 135 version (Posix) { 136 m_header.msg_control = null; 137 m_header.msg_controllen = 0; 138 } 139 140 flags = 0; 141 142 m_buffer = content; 143 contentStart = &content[0]; 144 contentLength = content.length; 145 } 146 147 this(const ref NetworkMessage other) nothrow 148 { 149 m_header = cast(Header) other.m_header; 150 m_content = cast(Content) other.m_content; 151 buffers = &m_content; 152 bufferCount = 1; 153 m_buffer = contentStart[0..contentLength]; 154 } 155 156 this(this) @safe pure @nogc nothrow 157 { buffers = &m_content; } 158 159 @property size_t count() @safe pure @nogc nothrow 160 { return m_count; } 161 162 @property void count(size_t count) @safe pure @nogc nothrow 163 { 164 m_count = count; 165 auto content = m_buffer[count .. $]; 166 contentStart = &content[0]; 167 contentLength = content.length; 168 } 169 170 @property bool hasAddress() @safe pure @nogc nothrow 171 { return name !is null; } 172 173 @property bool receivedAny() @safe pure @nogc nothrow 174 { return m_count > 0; } 175 176 @property bool receivedAll() @safe pure @nogc nothrow 177 { return m_count == m_buffer.length; } 178 179 @property bool sent() @safe pure @nogc nothrow 180 { return m_count == m_buffer.length; } 181 182 @property ubyte[] transferred() @safe pure @nogc nothrow 183 { return m_buffer[0 .. m_count]; } 184 185 invariant 186 { 187 assert(m_count <= m_buffer.length, "Count of transferred bytes must not exceed the message buffer's length"); 188 } 189 190 mixin FreeList!(1_000); 191 } 192 193 /** 194 * Represents a single request to asynchronously accept an incoming connection. 195 * Authors: Moritz Maxeiner, moritz@ucworks.org 196 * Date: 2016 197 */ 198 struct AsyncAcceptRequest 199 { 200 /// Passive socket to accept a peer's connection on 201 AsyncSocket socket; 202 /** 203 * Posix: Active accepted peer socket 204 * Windows: Creater peer socket for AcceptEx 205 */ 206 fd_t peer; 207 /// Called once the request completed successfully 208 OnComplete onComplete; 209 /// Peer socket family 210 version (Posix) int family; 211 212 /** 213 * Must instantiate and return a new $(D AsyncSocket) for the connected peer, 214 * calling $(D AsyncSocket)'s constructor for existing OS handles in the process 215 * - the provided arguments are safe to call it with. 216 */ 217 alias OnComplete = AsyncSocket delegate(fd_t peer, int domain, SocketType type, int protocol) nothrow; 218 219 // These are used internally be the Windows event loop, do NOT modify them. 220 version (Windows) 221 { 222 /// Outbut buffer where AcceptEx places local and remote address 223 ubyte[2 * (16 + sockaddr_storage.sizeof)] buffer; 224 } 225 226 mixin FreeList!1_000; 227 mixin Queue; 228 } 229 230 /** 231 * Represents a single request to asynchronously receive data. 232 * Authors: Moritz Maxeiner, moritz@ucworks.org 233 * Date: 2016 234 */ 235 struct AsyncReceiveRequest 236 { 237 AsyncSocket socket; /// Socket to receive the message on 238 NetworkMessage* message; /// Storage to receive the message into 239 OnComplete onComplete; /// Called once the request completed successfully 240 bool exact; /// Whether the message's buffer should be filled completely 241 242 alias OnDataReceived = void delegate(ubyte[] data) nothrow; 243 alias OnDataAvailable = void delegate() nothrow; 244 alias OnComplete = Algebraic!(OnDataReceived, OnDataAvailable); 245 246 mixin FreeList!1_000; 247 mixin Queue; 248 } 249 250 /** 251 * Represents a single request to asynchronously send data. 252 * Authors: Moritz Maxeiner, moritz@ucworks.org 253 * Date: 2016 254 */ 255 struct AsyncSendRequest 256 { 257 AsyncSocket socket; /// Socket to send the message on 258 NetworkMessage* message; /// The message to be sent 259 OnComplete onComplete; /// Called once the request completed successfully 260 261 alias OnComplete = void delegate() nothrow; 262 263 mixin FreeList!1_000; 264 mixin Queue; 265 } 266 267 /** 268 * Proactor-model inspired asynchronous socket implementation. 269 * In contrast to POSIX.1-2013 readiness I/O - which essentially 270 * describes synchronous socket I/O operations with asynchronous 271 * notification of future blocking behaviour for said operations - 272 * this provides an API for asynchronous socket I/O operations: 273 * The three major socket operations accept, receive, and send 274 * modeled by this API will submit a request for asynchronous 275 * completion; towards that end, each call to these must be provided 276 * with a callback that will be called to notify you of said competion. 277 * It is therefore not recommended to keep calling any of these three 278 * methods in rapid succession, as they will normally not fail 279 * (bugs, memory exhaustion, or the operating system not supporting 280 * further pending requests excluded) to notify you that you should try 281 * again later. They will, however, notify you via the callbacks 282 * you provide once a request has been completed, or once there 283 * has been a socket error (refer to $(D OnError)). It follows 284 * that you should usually have only a small number of requests 285 * pending on a socket at the same time (preferably at most only a 286 * single receive and a single send - respectively a single accept) 287 * and submit the next request only once the previous one 288 * (of the same type) notifies you of its completion. 289 * For connection-oriented, active sockets, connection completion and 290 * disconnect (by the remote peer) are handled by $(D OnConnect) 291 * and $(D OnClose) respectively; disconnecting from the remote peer 292 * can be initiated with $(D kill) and will not trigger $(D OnClose). 293 * Authors: Moritz Maxeiner, moritz@ucworks.org 294 * Date: 2016 295 */ 296 final class AsyncSocket 297 { 298 invariant 299 { 300 // There are 301 // - connection-oriented, datagram-oriented sockets, 302 // - connection-oriented, not datagram-oriented (stream) sockets, 303 // - connectionless, datagram-oriented sockets 304 // There are no connectionless, not datagram-oriented sockets 305 assert(m_connectionOriented || m_datagramOriented); 306 } 307 308 private: 309 fd_t m_preInitializedSocket; /// If constructing from an existing socket, this holds it until initialization. 310 311 fd_t m_socket = INVALID_SOCKET; /// The socket used internally. 312 SocketInfo m_info; /// Additional information about the socket. 313 bool m_connectionOriented; /// Whether this socket is connection-oriented. 314 bool m_datagramOriented; /// Whether this socket is datagram-oriented. 315 316 /** 317 * Whether this socket has been put into passive mode. 318 * See_Also: listen 319 */ 320 bool m_passive; 321 322 OnConnect m_onConnect; /// See_Also: onConnect 323 OnClose m_onClose; /// See_Also: onClose 324 OnError m_onError; /// See_Also: onError 325 326 /** 327 * If disabled: Every call to $(D receiveMessage) will be processed only once. 328 * After enabling: The first call to $(D receiveMessage) will be processed repeatedly. 329 * Any further calls to $(D receiveMessage) are forbidden (while enabled). 330 */ 331 bool m_receiveContinuously; 332 333 version (Posix) { 334 package AsyncAcceptRequest.Queue m_pendingAccepts; /// Queue of calls to $(D accept). 335 package AsyncReceiveRequest.Queue m_pendingReceives; /// Queue of calls to $(D receiveMessage). 336 package AsyncSendRequest.Queue m_pendingSends; /// Queue of requests initiated by $(D sendMessage). 337 } 338 339 package: 340 EventLoop m_evLoop; /// Event loop of the thread this socket was created on. 341 342 public: 343 344 /// 345 @property NetworkAddress localAddress() const @trusted 346 { 347 import libasync.internals.socket_compat : getsockname; 348 349 NetworkAddress addr; 350 auto addrLen = NetworkAddress.sockAddrMaxLen(); 351 if (SOCKET_ERROR == getsockname(m_socket, addr.sockAddr, &addrLen)) { 352 throw new SocketOSException("Unable to obtain local socket address"); 353 } 354 assert(addrLen <= addr.sockAddrLen, 355 "POSIX.1-2013 requires sockaddr_storage be able to store any socket address"); 356 assert(addr.family == m_info.domain, "Inconsistent address family"); 357 return addr; 358 } 359 360 /// 361 @property NetworkAddress remoteAddress() const @trusted 362 { 363 import libasync.internals.socket_compat : getpeername; 364 365 NetworkAddress addr; 366 auto addrLen = NetworkAddress.sockAddrMaxLen(); 367 if (SOCKET_ERROR == getpeername(m_socket, addr.sockAddr, &addrLen)) { 368 throw new SocketOSException("Unable to obtain local socket address"); 369 } 370 assert(addrLen <= addr.sockAddrLen, 371 "POSIX.1-2013 requires sockaddr_storage be able to store any socket address"); 372 assert(addr.family == m_info.domain, "Inconsistent address family"); 373 return addr; 374 } 375 376 /// Get a socket option (taken from std.socket). 377 /// Returns: The number of bytes written to $(D result). 378 int getOption(int level, int option, void[] result) @trusted const 379 { 380 import libasync.internals.socket_compat : getsockopt; 381 382 socklen_t len = cast(socklen_t) result.length; 383 if (SOCKET_ERROR == getsockopt(m_socket, level, option, result.ptr, &len)) { 384 throw new SocketOSException("Unable to get socket option"); 385 } 386 return len; 387 } 388 389 /// Common case of getting integer and boolean options (taken from std.socket). 390 int getOption(int level, int option, out int result) @trusted const 391 { return getOption(level, option, (&result)[0 .. 1]); } 392 393 /// Set a socket option (taken from std.socket). 394 void setOption(int level, int option, void[] value) @trusted const 395 { 396 import libasync.internals.socket_compat : setsockopt; 397 398 if (SOCKET_ERROR == setsockopt(m_socket, level, option, value.ptr, cast(uint) value.length)) { 399 throw new SocketOSException("Unable to set socket option"); 400 } 401 } 402 403 /// Common case for setting integer and boolean options (taken from std.socket). 404 void setOption(int level, int option, int value) @trusted const 405 { setOption(level, option, (&value)[0 .. 1]); } 406 407 nothrow: 408 409 package: 410 mixin COSocketMixins; 411 412 /// Reset internal OS socket handle to $(D INVALID_SOCKET) and return its previous value 413 fd_t resetHandle() 414 { 415 scope (exit) m_socket = INVALID_SOCKET; 416 return m_socket; 417 } 418 419 void handleError() 420 { if (m_onError !is null) m_onError(); } 421 422 void handleConnect() 423 { if (m_onConnect !is null) m_onConnect(); } 424 425 void handleClose() 426 { if (m_onClose !is null) m_onClose(); } 427 428 /// 429 @property SocketInfo info() const @safe pure @nogc 430 { return m_info; } 431 432 @property fd_t preInitializedHandle() @safe pure @nogc 433 { return m_preInitializedSocket; } 434 435 /// 436 @property void connectionOriented(bool connectionOriented) @safe pure @nogc 437 { m_connectionOriented = connectionOriented; } 438 439 /// Retrieves and clears the most recent error on this socket 440 @property auto lastError() const 441 { 442 import libasync.internals.socket_compat : SOL_SOCKET, SO_ERROR; 443 int code; 444 assumeWontThrow(getOption(SOL_SOCKET, SO_ERROR, code)); 445 return code; 446 } 447 448 /** 449 * Submits an asynchronous request on this socket to receive a $(D message). 450 * Upon successful reception $(D onReceive) will be called with the received data. 451 * $(D exact) indicates whether successful reception requires the entire buffer 452 * provided within $(D message) to have been filled. If a socket error occurs, 453 * but some data has already been received, then $(D onReceive) will be called 454 * with that partial data regardless of $(D exact). 455 * The $(D message) must have been allocated using $(D NetworkMessage.alloc) and 456 * will be freed with $(D NetworkMessage.free) after the completion callback returns, 457 * or once an error occurs that prevents said callback from being called. 458 */ 459 void receiveMessage(NetworkMessage* message, AsyncReceiveRequest.OnComplete onReceive, bool exact) 460 in { 461 assert(alive, "Cannot receive on an unrun / killed socket"); 462 assert(!m_passive, "Passive sockets cannot receive"); 463 assert(!m_connectionOriented || connected, "Established connection required"); 464 assert(!m_connectionOriented || !message || !message.hasAddress, "Connected peer is already known through .remoteAddress"); 465 version (Posix) assert(!m_receiveContinuously || m_pendingReceives.empty, "Cannot receive message manually while receiving continuously"); 466 assert(m_connectionOriented || !exact, "Connectionless datagram sockets must receive one datagram at a time"); 467 assert(!message || message.m_buffer.length > 0, "Only zero byte receives may refrain from providing a non-empty message buffer"); 468 } body { 469 auto request = assumeWontThrow(AsyncReceiveRequest.alloc(this, message, onReceive, exact)); 470 m_evLoop.submitRequest(request); 471 } 472 473 /** 474 * Submits an asynchronous request on this socket to send a $(D message). 475 * Upon successful transmission $(D onSend) will be called. 476 * The $(D message) must have been allocated using $(D NetworkMessage.alloc) and 477 * will be freed with $(D NetworkMessage.free) after the completion callback returns, 478 * or once an error occurs that prevents said callback from being called. 479 */ 480 void sendMessage(NetworkMessage* message, AsyncSendRequest.OnComplete onSend) 481 in { 482 assert(alive, "Cannot send on an unrun / killed socket"); 483 assert(!m_passive, "Passive sockets cannot receive"); 484 assert(!m_connectionOriented || connected, "Established connection required"); 485 assert(!m_connectionOriented || !message.hasAddress, "Connected peer is already known through .remoteAddress"); 486 assert(m_connectionOriented || message.hasAddress || assumeWontThrow({ remoteAddress; return true; }().ifThrown(false)), "Remote address required"); 487 assert(onSend !is null, "Completion callback required"); 488 } body { 489 auto request = AsyncSendRequest.alloc(this, message, onSend); 490 m_evLoop.submitRequest(request); 491 } 492 493 public: 494 495 /** 496 * Create a new asynchronous socket within $(D domain) of $(D type) using $(D protocol) from an 497 * existing OS $(D handle). It is your responsibility to ensure that $(D handle) - in addition 498 * to being a valid socket descriptor - fulfills all requirements to be used by $(D AsyncSocket): 499 * POSIX: Must be non-blocking (keyword $(D O_NONBLOCK)) 500 * Windows: Must be overlapped (keyword $(D WSA_FLAG_OVERLAPPED)) 501 */ 502 this(EventLoop evLoop, int domain, SocketType type, int protocol, fd_t handle) @safe @nogc 503 in { 504 assert(evLoop !is EventLoop.init); 505 if (handle != INVALID_SOCKET) assert(handle.isSocket); 506 } body { 507 m_evLoop = evLoop; 508 m_preInitializedSocket = handle; 509 m_info = SocketInfo(domain, type, protocol); 510 m_connectionOriented = type.isConnectionOriented; 511 m_datagramOriented = type.isDatagramOriented; 512 513 version (Posix) { 514 readBlocked = true; 515 writeBlocked = true; 516 } 517 } 518 519 /** 520 * Create a new asynchronous socket within $(D domain) of $(D type) using $(D protocol). 521 * See_Also: 522 * http://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html 523 */ 524 this(EventLoop evLoop, int domain, SocketType type, int protocol) @safe @nogc 525 { this(evLoop, domain, type, protocol, INVALID_SOCKET); } 526 527 /** 528 * Convenience constructor for when there is only one protocol 529 * supporting both $(D domain) and $(D type). 530 */ 531 this(EventLoop eventLoop, int domain, SocketType type) @safe @nogc 532 { this(eventLoop, domain, type, 0); } 533 534 /** 535 * Convenience constructor if avoiding $(D SocketType) is preferred. 536 * Supports only 537 * $(D SOCK_STREAM), 538 * $(D SOCK_SEQPACKET), 539 * $(D SOCK_DGRAM), 540 * $(D SOCK_RAW), and 541 * $(D SOCK_RDM). 542 */ 543 this(EventLoop evLoop, int domain, int type, int protocol) @safe @nogc 544 { 545 auto socketType = { switch(type) { 546 case SOCK_STREAM: return SocketType.STREAM; 547 case SOCK_SEQPACKET: return SocketType.SEQPACKET; 548 case SOCK_DGRAM: return SocketType.DGRAM; 549 case SOCK_RAW: return SocketType.RAW; 550 case SOCK_RDM: return SocketType.RDM; 551 default: assert(false, "Unsupported socket type"); 552 }}(); 553 this(evLoop, domain, socketType, protocol); 554 } 555 556 /** 557 * Convenience constructor for when there is only one protocol 558 * supporting both $(D domain) and $(D type). 559 */ 560 this(EventLoop evLoop, int domain, int type) @safe @nogc 561 { this(evLoop, domain, type, 0); } 562 563 ~this() { if (alive) kill(); } 564 565 /// The underlying OS socket descriptor 566 @property fd_t handle() @safe pure @nogc 567 { return m_socket; } 568 569 /// Whether this socket establishes a (stateful) connection to a remote peer. 570 /// See_Also: isConnectionOriented 571 @property bool connectionOriented() @safe pure @nogc 572 { return m_connectionOriented; } 573 574 /// Whether this socket transceives datagrams. 575 /// See_Also: isDatagramOriented 576 @property bool datagramOriented() const @safe pure @nogc 577 { return m_datagramOriented; } 578 579 /// Whether this socket has been put into passive mode. 580 /// See_Also: listen 581 @property bool passive() const @safe pure @nogc 582 { return m_passive; } 583 584 /// Type of callback triggered when a connection-oriented socket completes connecting 585 alias OnConnect = void delegate(); 586 587 /// Sets this socket's $(D OnConnect) callback. 588 @property void onConnect(OnConnect onConnect) @safe pure @nogc 589 in { assert(m_connectionOriented); } 590 body { m_onConnect = onConnect; } 591 592 /** 593 * Type of callback triggered when a connection-oriented, active socket completes disconnects. 594 * The socket will have been $(D kill)ed before the call. 595 */ 596 alias OnClose = void delegate(); 597 598 /// Sets this socket's $(D OnClose) callback. 599 @property void onClose(OnClose onClose) @safe pure @nogc 600 in { assert(m_connectionOriented); } 601 body { m_onClose = onClose; } 602 603 /** 604 * Type of callback triggered when a socker error occured. 605 * The socket will have been $(D kill)ed before the call. 606 */ 607 alias OnError = void delegate(); 608 609 /// Sets callback for when a socket error has occurred. 610 @property void onError(OnError onError) @safe pure @nogc 611 { m_onError = onError; } 612 613 /// Creates the underlying OS socket - if necessary - and 614 /// registers the event handler in the underlying OS event loop. 615 bool run() 616 in { assert(m_socket == INVALID_SOCKET); } 617 body { 618 m_socket = m_evLoop.run(this); 619 return m_socket != INVALID_SOCKET; 620 } 621 622 /** 623 * Assigns the network address pointed to by $(D addr), 624 * with $(D addrlen) specifying the size, in bytes, of 625 * this address, as the local name of this socket. 626 * Returns: $(D true) if the binding was successful. 627 * See_Also: 628 * localAddress, http://pubs.opengroup.org/onlinepubs/9699919799/functions/bind.html 629 */ 630 bool bind(sockaddr* addr, socklen_t addrlen) 631 { return m_evLoop.bind(this, addr, addrlen); } 632 633 /// Convenience wrapper. 634 bool bind(const ref NetworkAddress addr) 635 { return bind(cast(sockaddr*) addr.sockAddr, addr.sockAddrLen); } 636 637 /** 638 * Assigns the network address pointed to by $(D addr), 639 * with $(D addrlen) specifying the size, n bytes, of 640 * this address, as the name of the remote socket. 641 * For connection-oriented sockets, also start establishing a 642 * connection with that socket and call $(D onConnect) once it has. 643 * Returns: $(D true) if the name was successfully assigned and 644 * - for connection-oriented sockets - if the connection is 645 * now being established. 646 * See_Also: 647 * remoteAddress, onConnect, http://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html 648 */ 649 bool connect(sockaddr* addr, socklen_t addrlen) 650 { return m_evLoop.connect(this, addr, addrlen); } 651 652 /// Convenience wrapper. 653 bool connect(const ref NetworkAddress to) 654 { return connect(cast(sockaddr*) to.sockAddr, to.sockAddrLen); } 655 656 /** 657 * Marks the socket as passive and enables acceptance of incoming connections 658 * into instances of $(D AsyncSocket). Only after calling this successfully 659 * may accept request be submitted via $(D accept). 660 */ 661 bool listen(int backlog = SOMAXCONN) 662 { 663 m_passive = true; 664 return m_evLoop.listen(this, backlog); 665 } 666 667 /** 668 * Submits an asynchronous request on this socket to accept an incoming 669 * connection. Upon successful acceptance of such a connection $(D onAccept) 670 * will be called with a new $(D AsyncSocket) representing the peer. 671 * See_Also: listen 672 */ 673 void accept(AsyncAcceptRequest.OnComplete onAccept) 674 in { 675 assert(alive, "Cannot accept on an unrun / killed socket"); 676 assert(m_connectionOriented && m_passive, "Can only accept on connection-oriented, passive sockets"); 677 } 678 body { 679 auto request = AsyncAcceptRequest.alloc(this, INVALID_SOCKET, onAccept); 680 m_evLoop.submitRequest(request); 681 } 682 683 /// Whether the socket is automatically resubmitting the current receive request 684 /// upon its successful completion. 685 @property bool receiveContinuously() const @safe pure @nogc 686 { return m_receiveContinuously; } 687 688 /// Toggles automatic resubmission of the current receive request upon its successful completion. 689 /// Enabling this primes the socket so that the next $(D receiveMessage) will exhibit the behaviour. 690 /// Any further calls to $(D receiveMessage) while active are forbidden; may only be disabled again 691 /// in the completion callback provided with the $(D receiveMessage) that started it. 692 /// After disabling, may not be reenabled in the same callback. 693 @property void receiveContinuously(bool toggle) @safe pure 694 in { 695 version (Posix) assert(m_pendingReceives.empty, "Cannot start/stop receiving continuously when there are still pending receive requests"); 696 } body { 697 if (m_receiveContinuously == toggle) return; 698 m_receiveContinuously = toggle; 699 } 700 701 /** 702 * Submits an asynchronous request on this socket to receive a $(D message). 703 * Upon successful reception $(D onReceive) will be called with the received data. 704 * $(D exact) indicates whether successful reception requires the entire buffer 705 * provided within $(D message) to have been filled. If a socket error occurs, 706 * but some data has already been received, then $(D onReceive) will be called 707 * with that partial data regardless of $(D exact). 708 */ 709 void receiveMessage(ref NetworkMessage message, AsyncReceiveRequest.OnDataReceived onReceive, bool exact = false) 710 { 711 receiveMessage(assumeWontThrow(NetworkMessage.alloc(message)), 712 AsyncReceiveRequest.OnComplete(onReceive), 713 exact); 714 } 715 716 /** 717 * Submits an asynchronous request on this socket to receive $(D data). 718 * Upon successful reception of at most $(D data.length) bytes $(D onReceive) 719 * will be called with the received bytes as a slice of $(D data). 720 * See_Also: receiveExactly, receiveFrom 721 */ 722 void receive(ref ubyte[] data, AsyncReceiveRequest.OnDataReceived onReceive) 723 { 724 receiveMessage(NetworkMessage.alloc(data), 725 AsyncReceiveRequest.OnComplete(onReceive), 726 false); 727 } 728 729 /** 730 * Submits a special asynchronous request on this socket to receive nothing. 731 * Also known as a "zero byte receive" $(D onReceive) will be called once 732 * there is new data on the socket that can be received immediately. 733 * Additionally, $(D onReceive) may also be called on connection-oriented sockets 734 * where the remote peer has disconnected gracefully with no further data being 735 * available for reception. 736 */ 737 void receive(AsyncReceiveRequest.OnDataAvailable onReceive) 738 in { 739 assert(!m_receiveContinuously, "Continuous receiving and zero byte receives may not be mixed"); 740 } body { 741 receiveMessage(null, 742 AsyncReceiveRequest.OnComplete(onReceive), 743 false); 744 } 745 746 /** 747 * Submits an asynchronous request on this socket to receive $(D data). 748 * Upon successful reception of exactly $(D data.lengt) bytes $(D onReceive) 749 * will be called with $(D data). 750 * See_Also: receive, receiveFrom 751 */ 752 void receiveExactly(ref ubyte[] data, AsyncReceiveRequest.OnDataReceived onReceive) 753 { 754 receiveMessage(NetworkMessage.alloc(data), 755 AsyncReceiveRequest.OnComplete(onReceive), 756 true); 757 } 758 759 /** 760 * Submits an asynchronous request on this socket to receive $(D data) $(D from) 761 * an unknown sender, whose address will also be received. 762 * Upon successful reception of at most $(D data.length) bytes $(D onReceive) 763 * will be called with the received bytes as a slice of $(D data) and $(D from) 764 * will have been set to the sender's address. 765 * This method may only be called on connectionless sockets, to retrieve the 766 * remote address on connection-oriented sockets, refer to $(D remoteAddress). 767 * See_Also: receive, receiveExactly, remoteAddress 768 */ 769 void receiveFrom(ref ubyte[] data, ref NetworkAddress from, AsyncReceiveRequest.OnDataReceived onReceive) 770 { 771 receiveMessage(NetworkMessage.alloc(data, &from), 772 AsyncReceiveRequest.OnComplete(onReceive), 773 false); 774 } 775 776 /** 777 * Submits an asynchronous request on this socket to send a $(D message). 778 * Upon successful transmission $(D onSend) will be called. 779 */ 780 void sendMessage(const ref NetworkMessage message, AsyncSendRequest.OnComplete onSend) 781 { 782 sendMessage(NetworkMessage.alloc(message), onSend); 783 } 784 785 /** 786 * Submits an asynchronous request on this socket to send $(D data). 787 * Upon successful transmission $(D onSend) will be called. 788 */ 789 void send(in ubyte[] data, AsyncSendRequest.OnComplete onSend) 790 { 791 sendMessage(NetworkMessage.alloc(cast(ubyte[]) data), onSend); 792 } 793 794 /** 795 * Submits an asynchronous request on this socket to send $(D data) $(D to) 796 * a specific recipient. Upon successful transmission $(D onSend) will be called. 797 */ 798 void sendTo(in ubyte[] data, const ref NetworkAddress to, AsyncSendRequest.OnComplete onSend) 799 { 800 sendMessage(NetworkMessage.alloc(cast(ubyte[]) data, &to), onSend); 801 } 802 803 /** 804 * Removes the socket from the event loop, shutting it down if necessary, 805 * and cleans up the underlying resources. Only after this method has been 806 * called may the socket instance be deallocated. 807 */ 808 bool kill(bool forced = false) 809 { 810 m_receiveContinuously = false; 811 return m_evLoop.kill(this, forced); 812 } 813 814 /// Returns whether the socket has not yet been killed. 815 @property bool alive() @safe @nogc { 816 return m_socket != INVALID_SOCKET; 817 } 818 819 /// Provides access to event loop information 820 mixin DefStatus; 821 } 822 823 824 /// Holds additional information about a socket. 825 struct SocketInfo 826 { 827 int domain; 828 SocketType type; 829 int protocol; 830 } 831 832 /** 833 * Represents a network/socket address. (adapted from vibe.core.net) 834 */ 835 struct NetworkAddress 836 { 837 import std.bitmanip: nativeToBigEndian, bigEndianToNative; 838 839 import libasync.internals.socket_compat : 840 sockaddr, sockaddr_storage, 841 sockaddr_in, AF_INET, 842 sockaddr_in6, AF_INET6; 843 version (Posix) import libasync.internals.socket_compat : 844 sockaddr_un, AF_UNIX; 845 846 package union { 847 sockaddr addr = { AF_UNSPEC }; 848 sockaddr_storage addr_storage = void; 849 sockaddr_in addr_ip4 = void; 850 sockaddr_in6 addr_ip6 = void; 851 version (Posix) sockaddr_un addr_un = void; 852 } 853 854 this(sockaddr* addr, socklen_t addrlen) @trusted pure nothrow @nogc 855 in { 856 assert(addrlen <= sockaddr_storage.sizeof, 857 "POSIX.1-2013 requires sockaddr_storage be able to store any socket address"); 858 } body { 859 import std.algorithm : copy; 860 copy((cast(ubyte*) addr)[0 .. addrlen], 861 (cast(ubyte*) &addr_storage)[0 .. addrlen]); 862 } 863 864 import std.socket : PhobosAddress = Address; 865 this(PhobosAddress address) @safe pure nothrow @nogc 866 { this(address.name, address.nameLen); } 867 868 @property bool ipv6() const @safe pure nothrow @nogc 869 { return this.family == AF_INET6; } 870 871 /** Family (AF_) of the socket address. 872 */ 873 @property ushort family() const @safe pure nothrow @nogc 874 { return addr.sa_family; } 875 /// ditto 876 @property void family(ushort val) pure @safe nothrow @nogc 877 { addr.sa_family = cast(ubyte) val; } 878 879 /** The port in host byte order. 880 */ 881 @property ushort port() 882 const @trusted @nogc pure nothrow { 883 switch (this.family) { 884 default: assert(false, "port() called for invalid address family."); 885 case AF_INET: return bigEndianToNative!ushort((cast(ubyte*) &addr_ip4.sin_port)[0..2]); 886 case AF_INET6: return bigEndianToNative!ushort((cast(ubyte*) &addr_ip6.sin6_port)[0..2]); 887 } 888 } 889 /// ditto 890 @property void port(ushort val) 891 @trusted @nogc pure nothrow { 892 switch (this.family) { 893 default: assert(false, "port() called for invalid address family."); 894 case AF_INET: addr_ip4.sin_port = *cast(ushort*) nativeToBigEndian(val).ptr; break; 895 case AF_INET6: addr_ip6.sin6_port = *cast(ushort*) nativeToBigEndian(val).ptr; break; 896 } 897 } 898 899 /** A pointer to a sockaddr struct suitable for passing to socket functions. 900 */ 901 @property inout(sockaddr)* sockAddr() inout pure @safe @nogc nothrow { return &addr; } 902 903 /** Size of the sockaddr struct that is returned by sockAddr(). 904 */ 905 @property socklen_t sockAddrLen() 906 const @safe @nogc pure nothrow { 907 switch (this.family) { 908 default: assert(false, "Unsupported address family"); 909 case AF_UNSPEC: return addr_storage.sizeof; 910 case AF_INET: return addr_ip4.sizeof; 911 case AF_INET6: return addr_ip6.sizeof; 912 version (Posix) case AF_UNIX: return addr_un.sizeof; 913 } 914 } 915 916 /++ 917 + Maximum size of any sockaddr struct, regardless of address family. 918 +/ 919 static @property socklen_t sockAddrMaxLen() 920 pure nothrow { return sockaddr_storage.sizeof; } 921 922 @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow 923 in { assert (family == AF_INET); } 924 body { return &addr_ip4; } 925 926 @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow 927 in { assert (family == AF_INET6); } 928 body { return &addr_ip6; } 929 930 version (Posix) 931 @property inout(sockaddr_un)* sockAddrUnix() inout pure nothrow 932 in { assert (family == AF_UNIX); } 933 body { return &addr_un; } 934 935 /** Returns a string representation of the IP address 936 */ 937 string toAddressString() 938 const { 939 import std.array : appender; 940 auto ret = appender!string(); 941 ret.reserve(40); 942 toAddressString(str => ret.put(str)); 943 return ret.data; 944 } 945 /// ditto 946 void toAddressString(scope void delegate(const(char)[]) @safe sink) 947 const { 948 import std.array : appender; 949 import std.format : formattedWrite; 950 import std..string : fromStringz; 951 952 ubyte[2] _dummy = void; // Workaround for DMD regression in master 953 954 switch (this.family) { 955 default: assert(false, "toAddressString() called for invalid address family."); 956 case AF_INET: { 957 ubyte[4] ip = () @trusted { return (cast(ubyte*) &addr_ip4.sin_addr.s_addr)[0 .. 4]; } (); 958 sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 959 break; 960 } 961 case AF_INET6: { 962 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 963 foreach (i; 0 .. 8) { 964 if (i > 0) sink(":"); 965 _dummy[] = ip[i*2 .. i*2+2]; 966 sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy)); 967 } 968 break; 969 } 970 version (Posix) { 971 case AF_UNIX: 972 sink.formattedWrite("%s", fromStringz(cast(char*) addr_un.sun_path)); 973 break; 974 } 975 } 976 } 977 978 /** Returns a full string representation of the address, including the port number. 979 */ 980 string toString() 981 const { 982 import std.array : appender; 983 auto ret = appender!string(); 984 toString(str => ret.put(str)); 985 return ret.data; 986 } 987 /// ditto 988 void toString(scope void delegate(const(char)[]) @safe sink) 989 const { 990 import std.format : formattedWrite; 991 switch (this.family) { 992 default: assert(false, "toString() called for invalid address family."); 993 case AF_INET: 994 toAddressString(sink); 995 sink.formattedWrite(":%s", port); 996 break; 997 case AF_INET6: 998 sink("["); 999 toAddressString(sink); 1000 sink.formattedWrite("]:%s", port); 1001 break; 1002 version (Posix) { 1003 case AF_UNIX: 1004 toAddressString(sink); 1005 break; 1006 } 1007 } 1008 } 1009 } 1010 1011 version (Posix) 1012 { 1013 enum SOCKET_ERROR = -1; 1014 enum INVALID_SOCKET = -1; 1015 } else version (Windows) { 1016 import core.sys.windows.winsock2 : SOCKET_ERROR, INVALID_SOCKET; 1017 } 1018 1019 /// Checks whether the given file descriptor refers to a valid socket. 1020 bool isSocket(fd_t fd) @trusted @nogc nothrow 1021 { 1022 import libasync.internals.socket_compat : getsockopt, SOL_SOCKET, SO_TYPE; 1023 1024 int type; 1025 socklen_t typesize = cast(socklen_t) type.sizeof; 1026 return SOCKET_ERROR != getsockopt(fd, SOL_SOCKET, SO_TYPE, cast(char*) &type, &typesize); 1027 }