1 module libasync.windows; 2 3 version (Windows): 4 5 import core.atomic; 6 import core.thread : Fiber; 7 import libasync.types; 8 import std.container : Array; 9 import std..string : toStringz; 10 import std.conv : to; 11 import std.datetime : Duration, msecs, seconds; 12 import std.algorithm : min; 13 import std.exception; 14 import libasync.internals.win32; 15 import libasync.internals.logging; 16 import std.traits : isIntegral; 17 import std.typecons : Tuple, tuple; 18 import std.utf : toUTFz; 19 import core.sync.mutex; 20 import libasync.events; 21 import memutils.utils; 22 import memutils.hashmap; 23 import memutils.vector; 24 pragma(lib, "ws2_32"); 25 pragma(lib, "ole32"); 26 alias fd_t = SIZE_T; 27 alias error_t = EWIN; 28 29 //todo : see if new connections with SO_REUSEADDR are evenly distributed between threads 30 31 32 package struct EventLoopImpl { 33 pragma(msg, "Using Windows message-based notifications and alertable IO for events"); 34 35 private: 36 HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array 37 HashMap!(fd_t, TCPEventHandler) m_tcpHandlers; 38 HashMap!(fd_t, TimerHandler) m_timerHandlers; 39 HashMap!(fd_t, UDPHandler) m_udpHandlers; 40 HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too 41 HashMap!(uint, DWFolderWatcher) m_dwFolders; 42 HashMap!(fd_t, tcp_keepalive)* kcache; 43 ~this() { kcache.destroy(); } 44 nothrow: 45 private: 46 struct TimerCache { 47 TimerHandler cb; 48 fd_t fd; 49 } 50 TimerCache m_timer; 51 52 EventLoop m_evLoop; 53 bool m_started; 54 wstring m_window; 55 HWND m_hwnd; 56 DWORD m_threadId; 57 ushort m_instanceId; 58 StatusInfo m_status; 59 error_t m_error = EWIN.WSA_OK; 60 __gshared Mutex gs_mtx; 61 62 HANDLE[] m_waitObjects; 63 AsyncOverlapped*[AsyncSocket] m_pendingConnects; 64 bool[AsyncOverlapped*] m_pendingAccepts; 65 66 @property HANDLE pendingConnectEvent() 67 { return m_waitObjects[0]; } 68 69 @property HANDLE pendingAcceptEvent() 70 { return m_waitObjects[1]; } 71 72 AsyncAcceptRequest.Queue m_completedSocketAccepts; 73 AsyncReceiveRequest.Queue m_completedSocketReceives; 74 AsyncSendRequest.Queue m_completedSocketSends; 75 package: 76 @property bool started() const { 77 return m_started; 78 } 79 bool init(EventLoop evl) 80 in { assert(!m_started); } 81 body 82 { 83 try if (!gs_mtx) 84 gs_mtx = new Mutex; catch (Throwable) {} 85 static ushort j; 86 assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux."); 87 j += 1; 88 m_status = StatusInfo.init; 89 90 import core.thread; 91 //try Thread.getThis().priority = Thread.PRIORITY_MAX; 92 //catch (Exception e) { assert(false, "Could not set thread priority"); } 93 SetThreadPriority(GetCurrentThread(), 31); 94 m_evLoop = evl; 95 shared static ushort i; 96 m_instanceId = i; 97 core.atomic.atomicOp!"+="(i, cast(ushort) 1); 98 wstring inststr; 99 import std.conv : to; 100 try { inststr = m_instanceId.to!wstring; } 101 catch (Exception e) { 102 return false; 103 } 104 m_window = "VibeWin32MessageWindow" ~ inststr; 105 wstring classname = "VibeWin32MessageWindow" ~ inststr; 106 107 LPCWSTR wnz; 108 LPCWSTR clsn; 109 try { 110 wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*); 111 clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*); 112 } catch (Exception e) { 113 setInternalError!"toUTFz"(Status.ERROR, e.msg); 114 return false; 115 } 116 117 m_threadId = GetCurrentThreadId(); 118 WNDCLASSW wc; 119 wc.lpfnWndProc = &wndProc; 120 wc.lpszClassName = clsn; 121 RegisterClassW(&wc); 122 m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE, 123 cast(HMENU) null, null, null); 124 static if (LOG) try log("Window registered: " ~ m_hwnd.to!string); catch (Throwable) {} 125 auto ptr = cast(ULONG_PTR)cast(void*)&this; 126 SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr); 127 assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this ); 128 WSADATA wd; 129 m_error = cast(error_t) WSAStartup(0x0202, &wd); 130 if (m_error == EWIN.WSA_OK) 131 m_status.code = Status.OK; 132 else { 133 m_status.code = Status.ABORT; 134 static if(LOG) log(m_status); 135 return false; 136 } 137 assert(wd.wVersion == 0x0202); 138 139 auto dummySocket = socket(AF_INET6, SOCK_STREAM, 0); 140 if (dummySocket == INVALID_SOCKET) return false; 141 scope (exit) closesocket(dummySocket); 142 143 DWORD bytesReturned; 144 145 if (WSAIoctl(dummySocket, 146 SIO_GET_EXTENSION_FUNCTION_POINTER, 147 &WSAID_ACCEPTEX, GUID.sizeof, 148 &AcceptEx, AcceptEx.sizeof, 149 &bytesReturned, 150 null, null) == SOCKET_ERROR) { 151 m_error = WSAGetLastErrorSafe(); 152 m_status.code = Status.ABORT; 153 return false; 154 } 155 156 if (WSAIoctl(dummySocket, 157 SIO_GET_EXTENSION_FUNCTION_POINTER, 158 &WSAID_GETACCEPTEXSOCKADDRS, GUID.sizeof, 159 &GetAcceptExSockaddrs, GetAcceptExSockaddrs.sizeof, 160 &bytesReturned, 161 null, null) == SOCKET_ERROR) { 162 m_error = WSAGetLastErrorSafe(); 163 m_status.code = Status.ABORT; 164 return false; 165 } 166 167 if (WSAIoctl(dummySocket, 168 SIO_GET_EXTENSION_FUNCTION_POINTER, 169 &WSAID_CONNECTEX, GUID.sizeof, 170 &ConnectEx, ConnectEx.sizeof, 171 &bytesReturned, 172 null, null) == SOCKET_ERROR) { 173 m_error = WSAGetLastErrorSafe(); 174 m_status.code = Status.ABORT; 175 return false; 176 } 177 178 if (WSAIoctl(dummySocket, 179 SIO_GET_EXTENSION_FUNCTION_POINTER, 180 &WSAID_DISCONNECTEX, GUID.sizeof, 181 &DisconnectEx, DisconnectEx.sizeof, 182 &bytesReturned, 183 null, null) == SOCKET_ERROR) { 184 m_error = WSAGetLastErrorSafe(); 185 m_status.code = Status.ABORT; 186 return false; 187 } 188 189 // Event for pending ConnectEx requests 190 m_waitObjects ~= CreateEvent(null, false, false, null); 191 // Event for pending AcceptEx requests 192 m_waitObjects ~= CreateEvent(null, false, false, null); 193 194 m_started = true; 195 return true; 196 } 197 198 // todo: find where to call this 199 void exit() { 200 cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0); 201 } 202 203 @property StatusInfo status() const { 204 return m_status; 205 } 206 207 @property string error() const { 208 string* ptr; 209 string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init; 210 return pv; 211 } 212 213 bool loop(Duration timeout = 0.seconds) 214 in { 215 assert(Fiber.getThis() is null); 216 assert(m_started); 217 } 218 body { 219 DWORD msTimeout; 220 221 if (timeout == -1.seconds) 222 msTimeout = DWORD.max; 223 else msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max); 224 225 /* 226 * Waits until one or all of the specified objects are in the signaled state 227 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx 228 */ 229 m_status = StatusInfo.init; 230 DWORD signal = MsgWaitForMultipleObjectsEx( 231 cast(DWORD) m_waitObjects.length, 232 m_waitObjects.ptr, 233 msTimeout, 234 QS_ALLEVENTS, 235 MWMO_ALERTABLE | MWMO_INPUTAVAILABLE // MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion) 236 // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting 237 ); 238 239 auto errors = 240 [ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ]; /* WAIT_FAILED: Failed to call MsgWait..() */ 241 242 if (signal == WAIT_TIMEOUT) { 243 return true; 244 } 245 246 if (signal == WAIT_IO_COMPLETION) { 247 if (m_status.code != Status.OK) return false; 248 249 foreach (request; m_completedSocketReceives) { 250 if (request.socket.receiveContinuously) { 251 m_completedSocketReceives.removeFront(); 252 assumeWontThrow(request.onComplete.get!0)(request.message.transferred); 253 if (request.socket.receiveContinuously && request.socket.alive) { 254 request.message.count = 0; 255 submitRequest(request); 256 } else { 257 assumeWontThrow(NetworkMessage.free(request.message)); 258 assumeWontThrow(AsyncReceiveRequest.free(request)); 259 } 260 } else { 261 m_completedSocketReceives.removeFront(); 262 if (request.message) { 263 assumeWontThrow(request.onComplete.get!0)(request.message.transferred); 264 assumeWontThrow(NetworkMessage.free(request.message)); 265 } else { 266 assumeWontThrow(request.onComplete.get!1)(); 267 } 268 assumeWontThrow(AsyncReceiveRequest.free(request)); 269 } 270 } 271 272 foreach (request; m_completedSocketSends) { 273 m_completedSocketSends.removeFront(); 274 request.onComplete(); 275 assumeWontThrow(NetworkMessage.free(request.message)); 276 assumeWontThrow(AsyncSendRequest.free(request)); 277 } 278 279 signal = MsgWaitForMultipleObjectsEx( 280 cast(DWORD) m_waitObjects.length, 281 m_waitObjects.ptr, 282 0, 283 QS_ALLEVENTS, 284 MWMO_INPUTAVAILABLE // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting 285 ); 286 if (signal == WAIT_TIMEOUT) { 287 return true; 288 } 289 } 290 291 if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) { 292 static if (LOG) log("Event Loop Exiting because of error"); 293 return false; 294 } 295 296 // Input messages 297 if (signal == WAIT_OBJECT_0 + m_waitObjects.length) { 298 MSG msg; 299 while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) { 300 m_status = StatusInfo.init; 301 TranslateMessage(&msg); 302 DispatchMessageW(&msg); 303 304 if (m_status.code == Status.ERROR) { 305 static if (LOG) log(m_status.text); 306 return false; 307 } 308 } 309 return true; 310 } 311 312 // Events 313 DWORD transferred, flags; 314 switch (signal - WAIT_OBJECT_0) { 315 // ConnectEx completion 316 case 0: 317 foreach (ref pendingConnect; m_pendingConnects.byKeyValue()) { 318 auto socket = pendingConnect.key; 319 auto overlapped = pendingConnect.value; 320 321 if (WSAGetOverlappedResult(socket.handle, 322 &overlapped.overlapped, 323 &transferred, 324 false, 325 &flags)) { 326 m_pendingConnects.remove(socket); 327 assumeWontThrow(AsyncOverlapped.free(overlapped)); 328 if (updateConnectContext(socket.handle)) { 329 socket.handleConnect(); 330 return true; 331 } else { 332 socket.kill(); 333 socket.handleError(); 334 return false; 335 } 336 } else { 337 m_error = WSAGetLastErrorSafe(); 338 if (m_error == WSA_IO_INCOMPLETE) { 339 continue; 340 } else { 341 m_status.code = Status.ABORT; 342 socket.kill(); 343 socket.handleError(); 344 return false; 345 } 346 } 347 } 348 break; 349 // AcceptEx completion 350 case 1: 351 foreach (overlapped; cast(AsyncOverlapped*[]) m_pendingAccepts.keys) { 352 auto request = overlapped.accept; 353 auto socket = request.socket; 354 355 if (WSAGetOverlappedResult(socket.handle, 356 &overlapped.overlapped, 357 &transferred, 358 false, 359 &flags)) { 360 m_pendingAccepts.remove(overlapped); 361 assumeWontThrow(AsyncOverlapped.free(overlapped)); 362 m_completedSocketAccepts.insertBack(request); 363 } else { 364 m_error = WSAGetLastErrorSafe(); 365 if (m_error == WSA_IO_INCOMPLETE) { 366 continue; 367 } else { 368 m_status.code = Status.ABORT; 369 m_pendingAccepts.remove(overlapped); 370 assumeWontThrow(AsyncOverlapped.free(overlapped)); 371 assumeWontThrow(AsyncAcceptRequest.free(request)); 372 socket.kill(); 373 socket.handleError(); 374 return false; 375 } 376 } 377 } 378 foreach (request; m_completedSocketAccepts) { 379 sockaddr* localAddress, remoteAddress; 380 socklen_t localAddressLength, remoteAddressLength; 381 382 GetAcceptExSockaddrs(request.buffer.ptr, 383 0, 384 cast(DWORD) request.buffer.length / 2, 385 cast(DWORD) request.buffer.length / 2, 386 &localAddress, 387 &localAddressLength, 388 &remoteAddress, 389 &remoteAddressLength); 390 391 m_completedSocketAccepts.removeFront(); 392 if (!onAccept(request.socket.handle, request, remoteAddress)) { 393 return false; 394 } 395 } 396 break; 397 default: 398 .warning("Unknown event was triggered: ", signal); 399 break; 400 } 401 402 return true; 403 } 404 405 bool run(AsyncEvent ctxt, EventHandler del) 406 { 407 return true; 408 } 409 410 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 411 { 412 m_status = StatusInfo.init; 413 fd_t fd = ctxt.socket; 414 bool reusing; 415 if (fd == fd_t.init) { 416 417 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 418 419 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 420 return 0; 421 422 if (!setOption(fd, TCPOption.REUSEADDR, true)) { 423 closeSocket(fd, false); 424 return 0; 425 } 426 // todo: defer accept? 427 428 if (ctxt.noDelay) { 429 if (!setOption(fd, TCPOption.NODELAY, true)) { 430 closeSocket(fd, false); 431 return 0; 432 } 433 } 434 } else reusing = true; 435 436 if (initTCPListener(fd, ctxt, reusing)) 437 { 438 try { 439 static if (LOG) log("Running listener on socket fd#" ~ fd.to!string); 440 m_connHandlers[fd] = del; 441 version(Distributed)ctxt.init(m_hwnd, fd); 442 } 443 catch (Exception e) { 444 setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg); 445 closeSocket(fd, false); 446 return 0; 447 } 448 } 449 else 450 { 451 return 0; 452 } 453 454 455 return fd; 456 } 457 458 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 459 in { 460 assert(ctxt.socket == fd_t.init); 461 assert(ctxt.peer.family != AF_UNSPEC); 462 } 463 body { 464 m_status = StatusInfo.init; 465 fd_t fd = ctxt.preInitializedSocket; 466 467 if (fd == fd_t.init) 468 fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 469 static if (LOG) log("Starting connection at: " ~ fd.to!string); 470 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 471 return 0; 472 473 try { 474 (m_tcpHandlers)[fd] = del; 475 } 476 catch (Exception e) { 477 setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg); 478 closeSocket(fd, false); 479 return 0; 480 } 481 482 nothrow void closeAll() { 483 try { 484 static if (LOG) log("Remove event handler for " ~ fd.to!string); 485 m_tcpHandlers.remove(fd); 486 } 487 catch (Exception e) { 488 setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg); 489 } 490 closeSocket(fd, false); 491 } 492 493 if (ctxt.noDelay) { 494 if (!setOption(fd, TCPOption.NODELAY, true)) { 495 closeAll(); 496 return 0; 497 } 498 } 499 500 if (!initTCPConnection(fd, ctxt)) { 501 closeAll(); 502 return 0; 503 } 504 505 506 static if (LOG) try log("Client started FD#" ~ fd.to!string); 507 catch (Throwable) {} 508 return fd; 509 } 510 511 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 512 m_status = StatusInfo.init; 513 fd_t fd = ctxt.preInitializedSocket; 514 515 if (fd == fd_t.init) 516 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); 517 518 if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET)) 519 return 0; 520 521 if (initUDPSocket(fd, ctxt)) 522 { 523 try { 524 (m_udpHandlers)[fd] = del; 525 } 526 catch (Exception e) { 527 setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg); 528 closesocket(fd); 529 return 0; 530 } 531 } 532 else return 0; 533 534 static if (LOG) try log("UDP Socket started FD#" ~ fd.to!string); 535 catch (Throwable) {} 536 537 return fd; 538 } 539 540 fd_t run(shared AsyncSignal ctxt) { 541 m_status = StatusInfo.init; 542 static if (LOG) try log("Signal subscribed to: " ~ m_hwnd.to!string); catch (Throwable) {} 543 return (cast(fd_t)m_hwnd); 544 } 545 546 fd_t run(AsyncNotifier ctxt) { 547 m_status = StatusInfo.init; 548 //static if (LOG) try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch (Throwable) {} 549 return cast(fd_t) m_hwnd; 550 } 551 552 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 553 if (timeout < 0.seconds) 554 timeout = 0.seconds; 555 m_status = StatusInfo.init; 556 fd_t timer_id = ctxt.id; 557 if (timer_id == fd_t.init) { 558 timer_id = createIndex(); 559 } 560 static if (LOG) try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch (Throwable) {} 561 562 BOOL err; 563 try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null); 564 catch(Exception e) { 565 setInternalError!"SetTimer"(Status.ERROR); 566 return 0; 567 } 568 569 if (err == 0) 570 { 571 m_error = GetLastErrorSafe(); 572 m_status.code = Status.ERROR; 573 m_status.text = "kill(AsyncTimer)"; 574 static if (LOG) log(m_status); 575 return 0; 576 } 577 578 if (m_timer.fd == fd_t.init || m_timer.fd == timer_id) 579 { 580 m_timer.fd = timer_id; 581 m_timer.cb = del; 582 } 583 else { 584 try 585 { 586 (m_timerHandlers)[timer_id] = del; 587 } 588 catch (Exception e) { 589 setInternalError!"HashMap assign"(Status.ERROR); 590 return 0; 591 } 592 } 593 594 595 return timer_id; 596 } 597 598 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) 599 { 600 static fd_t ids; 601 auto fd = ++ids; 602 603 try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 604 catch (Exception e) { 605 setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg); 606 } 607 608 return fd; 609 610 } 611 612 bool kill(AsyncDirectoryWatcher ctxt) { 613 614 try { 615 Array!DWFolderWatcher toFree; 616 foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) { 617 if (v.fd == ctxt.fd) { 618 CloseHandle(v.handle); 619 m_dwFolders.remove(k); 620 } 621 } 622 623 foreach (DWFolderWatcher obj; toFree[]) 624 ThreadMem.free(obj); 625 626 // todo: close all the handlers... 627 m_dwHandlers.remove(ctxt.fd); 628 } 629 catch (Exception e) { 630 setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg); 631 return false; 632 } 633 634 return true; 635 } 636 637 bool kill(AsyncTCPConnection ctxt, bool forced = false) 638 { 639 640 m_status = StatusInfo.init; 641 fd_t fd = ctxt.socket; 642 643 static if (LOG) log("Killing socket "~ fd.to!string); 644 try { 645 auto cb = m_tcpHandlers.get(ctxt.socket); 646 if (cb != TCPEventHandler.init){ 647 *cb.conn.connected = false; 648 *cb.conn.connecting = false; 649 return closeSocket(fd, true, forced); 650 } 651 } catch (Exception e) { 652 setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg); 653 assert(false); 654 //return false; 655 } 656 657 return true; 658 } 659 660 bool kill(AsyncTCPListener ctxt) 661 { 662 m_status = StatusInfo.init; 663 fd_t fd = ctxt.socket; 664 try { 665 if ((ctxt.socket in m_connHandlers) !is null) { 666 return closeSocket(fd, false, true); 667 } 668 } catch (Exception e) { 669 setInternalError!"in m_connHandlers"(Status.ERROR, e.msg); 670 return false; 671 } 672 673 return true; 674 } 675 676 bool kill(shared AsyncSignal ctxt) { 677 return true; 678 } 679 680 bool kill(AsyncNotifier ctxt) { 681 return true; 682 } 683 684 bool kill(AsyncTimer ctxt) { 685 m_status = StatusInfo.init; 686 687 static if (LOG) try log("Kill timer" ~ ctxt.id.to!string); catch (Throwable) {} 688 689 BOOL err = KillTimer(m_hwnd, ctxt.id); 690 if (err == 0) 691 { 692 m_error = GetLastErrorSafe(); 693 m_status.code = Status.ERROR; 694 m_status.text = "kill(AsyncTimer)"; 695 static if (LOG) log(m_status); 696 return false; 697 } 698 699 destroyIndex(ctxt); 700 scope(exit) 701 ctxt.id = fd_t.init; 702 if (m_timer.fd == ctxt.id) { 703 ctxt.id = 0; 704 m_timer = TimerCache.init; 705 } else { 706 try { 707 m_timerHandlers.remove(ctxt.id); 708 } 709 catch (Exception e) { 710 setInternalError!"HashMap remove"(Status.ERROR); 711 return 0; 712 } 713 } 714 715 716 return true; 717 } 718 719 bool kill(AsyncEvent ctxt, bool forced = false) { 720 return true; 721 } 722 723 bool kill(AsyncUDPSocket ctxt) { 724 m_status = StatusInfo.init; 725 726 fd_t fd = ctxt.socket; 727 INT err = closesocket(fd); 728 if (catchSocketError!"closesocket"(err)) 729 return false; 730 731 try m_udpHandlers.remove(ctxt.socket); 732 catch (Exception e) { 733 setInternalError!"HashMap remove"(Status.ERROR); 734 return 0; 735 } 736 737 return true; 738 } 739 740 bool setOption(T)(fd_t fd, TCPOption option, in T value) { 741 m_status = StatusInfo.init; 742 int err; 743 try { 744 nothrow bool errorHandler() { 745 if (catchSocketError!"setOption:"(err)) { 746 try m_status.text ~= option.to!string; 747 catch (Exception e){ assert(false, "to!string conversion failure"); } 748 return false; 749 } 750 751 return true; 752 } 753 754 755 756 final switch (option) { 757 758 case TCPOption.NODELAY: // true/false 759 static if (!is(T == bool)) 760 assert(false, "NODELAY value type must be bool, not " ~ T.stringof); 761 else { 762 BOOL val = value?1:0; 763 socklen_t len = val.sizeof; 764 err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); 765 return errorHandler(); 766 } 767 case TCPOption.REUSEPORT: 768 case TCPOption.REUSEADDR: // true/false 769 static if (!is(T == bool)) 770 assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof); 771 else 772 { 773 BOOL val = value?1:0; 774 socklen_t len = val.sizeof; 775 err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); 776 return errorHandler(); 777 } 778 case TCPOption.QUICK_ACK: 779 static if (!is(T == bool)) 780 assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof); 781 else { 782 m_status.code = Status.NOT_IMPLEMENTED; 783 return false; // quick ack is not implemented 784 } 785 case TCPOption.KEEPALIVE_ENABLE: // true/false 786 static if (!is(T == bool)) 787 assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof); 788 else 789 { 790 BOOL val = value?1:0; 791 socklen_t len = val.sizeof; 792 err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); 793 return errorHandler(); 794 } 795 case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn 796 static if (!isIntegral!T) 797 assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof); 798 else { 799 m_status.code = Status.NOT_IMPLEMENTED; 800 return false; 801 } 802 case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets 803 static if (!is(T == Duration)) 804 assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof); 805 else { 806 807 if (!kcache) 808 kcache = new HashMap!(fd_t, tcp_keepalive)(); 809 810 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 811 tcp_keepalive sReturned; 812 DWORD dwBytes; 813 kaSettings.onoff = ULONG(1); 814 if (kaSettings.keepalivetime == ULONG.init) { 815 kaSettings.keepalivetime = 1000; 816 } 817 kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG; 818 (*kcache)[fd] = kaSettings; 819 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 820 821 return errorHandler(); 822 } 823 case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start 824 static if (!is(T == Duration)) 825 assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof); 826 else { 827 828 if (!kcache) 829 kcache = new HashMap!(fd_t, tcp_keepalive)(); 830 831 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 832 tcp_keepalive sReturned; 833 DWORD dwBytes; 834 kaSettings.onoff = ULONG(1); 835 if (kaSettings.keepaliveinterval == ULONG.init) { 836 kaSettings.keepaliveinterval = 75*1000; 837 } 838 kaSettings.keepalivetime = value.total!"msecs".to!ULONG; 839 840 (*kcache)[fd] = kaSettings; 841 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 842 843 return errorHandler(); 844 } 845 case TCPOption.BUFFER_RECV: // bytes 846 static if (!isIntegral!T) 847 assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof); 848 else { 849 int val = value.to!int; 850 socklen_t len = val.sizeof; 851 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); 852 return errorHandler(); 853 } 854 case TCPOption.BUFFER_SEND: // bytes 855 static if (!isIntegral!T) 856 assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof); 857 else { 858 int val = value.to!int; 859 socklen_t len = val.sizeof; 860 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); 861 return errorHandler(); 862 } 863 case TCPOption.TIMEOUT_RECV: 864 static if (!is(T == Duration)) 865 assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof); 866 else { 867 DWORD val = value.total!"msecs".to!DWORD; 868 socklen_t len = val.sizeof; 869 err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len); 870 return errorHandler(); 871 } 872 case TCPOption.TIMEOUT_SEND: 873 static if (!is(T == Duration)) 874 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 875 else { 876 DWORD val = value.total!"msecs".to!DWORD; 877 socklen_t len = val.sizeof; 878 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len); 879 return errorHandler(); 880 } 881 case TCPOption.TIMEOUT_HALFOPEN: 882 static if (!is(T == Duration)) 883 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 884 else { 885 m_status.code = Status.NOT_IMPLEMENTED; 886 return false; 887 } 888 case TCPOption.LINGER: // bool onOff, int seconds 889 static if (!is(T == Tuple!(bool, int))) 890 assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof); 891 else { 892 linger l = linger(val[0]?1:0, val[1].to!USHORT); 893 socklen_t llen = l.sizeof; 894 err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen); 895 return errorHandler(); 896 } 897 case TCPOption.CONGESTION: 898 static if (!isIntegral!T) 899 assert(false, "CONGESTION value type must be integral, not " ~ T.stringof); 900 else { 901 m_status.code = Status.NOT_IMPLEMENTED; 902 return false; 903 } 904 case TCPOption.CORK: 905 static if (!isIntegral!T) 906 assert(false, "CORK value type must be int, not " ~ T.stringof); 907 else { 908 m_status.code = Status.NOT_IMPLEMENTED; 909 return false; 910 } 911 case TCPOption.DEFER_ACCEPT: // seconds 912 static if (!isIntegral!T) 913 assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof); 914 else { 915 int val = value.to!int; 916 socklen_t len = val.sizeof; 917 err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len); 918 return errorHandler(); 919 } 920 } 921 922 } 923 catch (Exception e) { 924 return false; 925 } 926 927 } 928 929 uint read(in fd_t fd, ref ubyte[] data) 930 { 931 return 0; 932 } 933 934 uint write(in fd_t fd, in ubyte[] data) 935 { 936 return 0; 937 } 938 939 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 940 size_t i; 941 Array!DWChangeInfo* changes; 942 try { 943 changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer); 944 if ((*changes).empty) 945 return 0; 946 947 import std.algorithm : min; 948 size_t cnt = min(dst.length, changes.length); 949 foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { 950 static if (LOG) try log("reading change: " ~ change.path); catch (Throwable) {} 951 dst[i] = (*changes)[i]; 952 i++; 953 } 954 changes.linearRemove((*changes)[0 .. cnt]); 955 } 956 catch (Exception e) { 957 setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); 958 return 0; 959 } 960 static if (LOG) try log("Changes returning with: " ~ i.to!string); catch (Throwable) {} 961 return cast(uint) i; 962 } 963 964 uint watch(in fd_t fd, in WatchInfo info) { 965 m_status = StatusInfo.init; 966 uint wd; 967 try { 968 HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()), 969 FILE_LIST_DIRECTORY, 970 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, 971 null, 972 OPEN_EXISTING, 973 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 974 null); 975 wd = cast(uint) hndl; 976 DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init); 977 assert(handler !is null); 978 static if (LOG) log("Watching: " ~ info.path.toNativeString()); 979 (m_dwFolders)[wd] = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive); 980 } catch (Exception e) { 981 setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg); 982 return 0; 983 } 984 return wd; 985 } 986 987 bool unwatch(in fd_t fd, in fd_t _wd) { 988 uint wd = cast(uint) _wd; 989 m_status = StatusInfo.init; 990 try { 991 DWFolderWatcher fw = m_dwFolders.get(wd, null); 992 assert(fw !is null); 993 m_dwFolders.remove(wd); 994 fw.close(); 995 ThreadMem.free(fw); 996 } catch (Exception e) { 997 setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg); 998 return false; 999 } 1000 return true; 1001 } 1002 1003 bool notify(T)(in fd_t fd, in T payload) 1004 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 1005 { 1006 m_status = StatusInfo.init; 1007 import std.conv; 1008 1009 auto payloadPtr = cast(ubyte*)payload; 1010 auto payloadAddr = cast(ulong)payloadPtr; 1011 1012 WPARAM wparam = payloadAddr & 0xffffffff; 1013 LPARAM lparam = cast(uint) (payloadAddr >> 32); 1014 1015 BOOL err; 1016 static if (is(T == AsyncNotifier)) 1017 err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam); 1018 else 1019 err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam); 1020 static if (LOG) try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch (Throwable) {} 1021 if (err == 0) 1022 { 1023 m_error = GetLastErrorSafe(); 1024 m_status.code = Status.ERROR; 1025 m_status.text = "notify"; 1026 static if (LOG) log(m_status); 1027 return false; 1028 } 1029 return true; 1030 } 1031 1032 fd_t run(AsyncSocket ctxt) 1033 { 1034 m_status = StatusInfo.init; 1035 1036 auto fd = ctxt.preInitializedHandle; 1037 1038 if (fd == INVALID_SOCKET) { 1039 fd = WSASocketW(ctxt.info.domain, ctxt.info.type, ctxt.info.protocol, null, 0, WSA_FLAG_OVERLAPPED); 1040 } 1041 1042 if (catchErrors!"socket"(fd)) { 1043 .error("Failed to create socket: ", error); 1044 return INVALID_SOCKET; 1045 } 1046 1047 return fd; 1048 } 1049 1050 bool kill(AsyncSocket ctxt, bool forced = false) 1051 { 1052 m_status = StatusInfo.init; 1053 1054 auto handle = ctxt.resetHandle(); 1055 1056 if (ctxt.connectionOriented && ctxt.passive) { 1057 foreach (request; m_completedSocketAccepts) if (request.socket is ctxt) { 1058 sockaddr* localAddress, remoteAddress; 1059 socklen_t localAddressLength, remoteAddressLength; 1060 1061 GetAcceptExSockaddrs(request.buffer.ptr, 1062 0, 1063 cast(DWORD) request.buffer.length / 2, 1064 cast(DWORD) request.buffer.length / 2, 1065 &localAddress, 1066 &localAddressLength, 1067 &remoteAddress, 1068 &remoteAddressLength); 1069 1070 m_completedSocketAccepts.removeFront(); 1071 if (!onAccept(handle, request, remoteAddress)) { 1072 .warning("Failed to accept incoming connection request while killing listener"); 1073 } 1074 } 1075 } 1076 1077 if (!ctxt.passive) { 1078 foreach (request; m_completedSocketReceives) if (request.socket is ctxt) { 1079 m_completedSocketReceives.removeFront(); 1080 if (request.message) { 1081 assumeWontThrow(request.onComplete.get!0)(request.message.transferred); 1082 assumeWontThrow(NetworkMessage.free(request.message)); 1083 } else { 1084 assumeWontThrow(request.onComplete.get!1)(); 1085 } 1086 assumeWontThrow(AsyncReceiveRequest.free(request)); 1087 } 1088 1089 foreach (request; m_completedSocketSends) if (request.socket is ctxt) { 1090 m_completedSocketSends.removeFront(); 1091 request.onComplete(); 1092 assumeWontThrow(NetworkMessage.free(request.message)); 1093 assumeWontThrow(AsyncSendRequest.free(request)); 1094 } 1095 1096 if(!CancelIo(cast(HANDLE) handle)) { 1097 m_status.code = Status.ABORT; 1098 m_error = GetLastErrorSafe(); 1099 .error("Failed to cancel outstanding overlapped I/O requests: ", this.error); 1100 return false; 1101 } 1102 } 1103 1104 if (ctxt.connectionOriented && ctxt.passive) { 1105 foreach (overlapped; cast(AsyncOverlapped*[]) m_pendingAccepts.keys) { 1106 if (overlapped.accept.socket is ctxt) { 1107 m_pendingAccepts.remove(overlapped); 1108 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1109 } 1110 } 1111 } else if (ctxt.connectionOriented && !ctxt.passive && ctxt in m_pendingConnects) { 1112 auto overlapped = cast(AsyncOverlapped*) m_pendingConnects[ctxt]; 1113 m_pendingConnects.remove(ctxt); 1114 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1115 } 1116 1117 if (ctxt.connectionOriented && !ctxt.passive) { 1118 *ctxt.connected = false; 1119 } 1120 1121 INT err; 1122 if (ctxt.connectionOriented) { 1123 if (forced) { 1124 err = shutdown(handle, SD_BOTH); 1125 closesocket(ctxt.handle); 1126 } else { 1127 err = shutdown(handle, SD_SEND); 1128 } 1129 if (catchSocketError!"shutdown"(err)) { 1130 return false; 1131 } 1132 } else { 1133 closesocket(handle); 1134 } 1135 1136 return true; 1137 } 1138 1139 bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen) 1140 { 1141 import libasync.internals.socket_compat : bind; 1142 1143 auto err = bind(ctxt.handle, addr, addrlen); 1144 if (catchSocketError!"bind"(err)) { 1145 .error("Failed to bind socket: ", error); 1146 return false; 1147 } 1148 1149 return true; 1150 } 1151 1152 bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen) 1153 { 1154 m_status = StatusInfo.init; 1155 1156 // Connectionless sockets can be connected immediately, 1157 // as this only sets the default remote address. 1158 if (!ctxt.connectionOriented) { 1159 import libasync.internals.socket_compat : connect; 1160 1161 auto err = connect(ctxt.handle, addr, addrlen); 1162 if (catchSocketError!"connect"(err)) { 1163 .error("Failed to connect socket: ", error); 1164 return false; 1165 } 1166 return true; 1167 } 1168 1169 // ConnectEx requires a bound connection-oriented socket. 1170 try ctxt.localAddress; catch (SocketOSException) { 1171 NetworkAddress local; 1172 switch (ctxt.info.domain) { 1173 case AF_INET: 1174 local.addr_ip4.sin_family = AF_INET; 1175 local.addr_ip4.sin_addr.s_addr = INADDR_ANY; 1176 local.addr_ip4.sin_port = 0; 1177 break; 1178 case AF_INET6: 1179 local.addr_ip6.sin6_family = AF_INET6; 1180 local.addr_ip6.sin6_addr = IN6ADDR_ANY; 1181 local.addr_ip6.sin6_port = 0; 1182 break; 1183 default: 1184 assert(false, "Unsupported address family"); 1185 } 1186 1187 if (!bind(ctxt, local.sockAddr, local.sockAddrLen)) { 1188 return false; 1189 } 1190 } catch (Exception e) assert(false); 1191 1192 auto overlapped = assumeWontThrow(AsyncOverlapped.alloc()); 1193 overlapped.hEvent = pendingConnectEvent; 1194 if (ConnectEx(ctxt.handle, addr, addrlen, null, 0, null, &overlapped.overlapped)) { 1195 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1196 if (updateConnectContext(ctxt.handle)) { 1197 ctxt.handleConnect(); 1198 return true; 1199 } else { 1200 ctxt.kill(); 1201 ctxt.handleError(); 1202 return false; 1203 } 1204 } else { 1205 m_error = WSAGetLastErrorSafe(); 1206 if (m_error == WSA_IO_PENDING) { 1207 m_pendingConnects[ctxt] = overlapped; 1208 return true; 1209 } else { 1210 m_status.code = Status.ABORT; 1211 ctxt.kill(); 1212 ctxt.handleError(); 1213 return false; 1214 } 1215 } 1216 } 1217 1218 auto updateAcceptContext(fd_t listener, fd_t socket) 1219 { 1220 auto err = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &listener, listener.sizeof); 1221 if (catchSocketError!"accept"(err)) { 1222 .error("Failed to setup accepted socket: ", error); 1223 return false; 1224 } 1225 else return true; 1226 } 1227 1228 auto updateConnectContext(fd_t socket) 1229 { 1230 auto err = setsockopt(socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, null, 0); 1231 if (catchSocketError!"connect"(err)) { 1232 .error("Failed to setup connected socket: ", error); 1233 return false; 1234 } 1235 else return true; 1236 } 1237 1238 /+ 1239 bool setupConnectedCOASocket(AsyncSocket ctxt, AsyncSocket incomingOn = null) 1240 { 1241 fd_t err; 1242 1243 *ctxt.connected = true; 1244 1245 if (incomingOn) { 1246 auto listenerHandle = incomingOn.handle; 1247 err = setsockopt(ctxt.handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &listenerHandle, listenerHandle.sizeof); 1248 if (catchSocketError!"connect"(err)) { 1249 .error("Failed to setup connected socket: ", error); 1250 ctxt.handleError(); 1251 return false; 1252 } 1253 } else { 1254 err = setsockopt(ctxt.handle, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, null, 0); 1255 if (catchSocketError!"connect"(err)) { 1256 .error("Failed to setup connected socket: ", error); 1257 ctxt.handleError(); 1258 return false; 1259 } 1260 } 1261 1262 return true; 1263 } 1264 +/ 1265 1266 bool listen(AsyncSocket ctxt, int backlog) 1267 { 1268 import libasync.internals.socket_compat : listen; 1269 1270 auto err = listen(ctxt.handle, backlog); 1271 if (catchSocketError!"listen"(err)) { 1272 .error("Failed to listen on socket: ", error); 1273 return false; 1274 } 1275 return true; 1276 } 1277 1278 bool onAccept(fd_t listener, AsyncAcceptRequest* request, sockaddr* remoteAddress) 1279 { 1280 auto socket = request.socket; 1281 scope (exit) assumeWontThrow(AsyncAcceptRequest.free(request)); 1282 1283 if (!updateAcceptContext(listener, request.peer)) { 1284 if (socket.alive) { 1285 m_status.code = Status.ABORT; 1286 socket.kill(); 1287 socket.handleError(); 1288 } 1289 return false; 1290 } 1291 1292 auto peer = request.onComplete(request.peer, remoteAddress.sa_family, socket.info.type, socket.info.protocol); 1293 if (peer.run()) { 1294 peer.handleConnect(); 1295 return true; 1296 } else { 1297 peer.kill(); 1298 peer.handleError(); 1299 return false; 1300 } 1301 } 1302 1303 void submitRequest(AsyncAcceptRequest* request) 1304 { 1305 auto overlapped = assumeWontThrow(AsyncOverlapped.alloc()); 1306 overlapped.accept = request; 1307 overlapped.hEvent = pendingAcceptEvent; 1308 1309 auto socket = request.socket; 1310 1311 request.peer = WSASocketW(request.socket.info.domain, 1312 request.socket.info.type, 1313 request.socket.info.protocol, 1314 null, 0, WSA_FLAG_OVERLAPPED); 1315 1316 if (request.peer == SOCKET_ERROR) { 1317 m_error = WSAGetLastErrorSafe(); 1318 1319 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1320 assumeWontThrow(AsyncAcceptRequest.free(request)); 1321 1322 .errorf("Failed to create peer socket with WSASocket: %s", error); 1323 m_status.code = Status.ABORT; 1324 socket.kill(); 1325 socket.handleError(); 1326 return; 1327 } 1328 1329 DWORD bytesReceived; 1330 retry: 1331 if (AcceptEx(socket.handle, 1332 request.peer, 1333 request.buffer.ptr, 1334 0, 1335 cast(DWORD) request.buffer.length / 2, 1336 cast(DWORD) request.buffer.length / 2, 1337 &bytesReceived, 1338 &overlapped.overlapped)) { 1339 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1340 m_completedSocketAccepts.insertBack(request); 1341 return; 1342 } else { 1343 m_error = WSAGetLastErrorSafe(); 1344 if (m_error == WSA_IO_PENDING) { 1345 m_pendingAccepts[overlapped] = true; 1346 return; 1347 // AcceptEx documentation states this error happens if "an incoming connection was indicated, 1348 // but was subsequently terminated by the remote peer prior to accepting the call". 1349 // This means there is no pending accept and we have to call AcceptEx again; this, 1350 // however, is a potential avenue for a denial-of-service attack, in which clients start 1351 // a connection to us but immediately terminate it, resulting in a (theoretically) infinite 1352 // loop here. The alternative to continuous resubmitting is closing the socket 1353 // (either immediately, or after a finite amount of tries to resubmit); that however, also opens up 1354 // a denial-of-service attack vector (a finite amount of such malicous connection attempts 1355 // can bring down any of our listening sockets). Of the two, the latter is a lot easier to exploit, 1356 // so for now we go with the first option of continuous resubmission. 1357 // TODO: Try to think of an better way to handle this. 1358 } else if (m_error == WSAECONNRESET) { 1359 goto retry; 1360 } else { 1361 m_status.code = Status.ABORT; 1362 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1363 assumeWontThrow(AsyncAcceptRequest.free(request)); 1364 socket.kill(); 1365 socket.handleError(); 1366 } 1367 } 1368 } 1369 1370 void submitRequest(AsyncReceiveRequest* request) 1371 { 1372 auto overlapped = assumeWontThrow(AsyncOverlapped.alloc()); 1373 overlapped.receive = request; 1374 auto socket = request.socket; 1375 1376 int err = void; 1377 if (!request.message) { 1378 .tracef("WSARecv on FD %s with zero byte buffer", socket.handle); 1379 WSABUF buffer; 1380 DWORD flags; 1381 err = WSARecv(socket.handle, 1382 &buffer, 1383 1, 1384 null, 1385 &flags, 1386 cast(const(WSAOVERLAPPEDX*)) overlapped, 1387 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete); 1388 } else if (request.message.name) { 1389 .tracef("WSARecvFrom on FD %s with buffer size %s", 1390 socket.handle, request.message.header.msg_iov.len); 1391 err = WSARecvFrom(socket.handle, 1392 request.message.buffers, 1393 cast(DWORD) request.message.bufferCount, 1394 null, 1395 &request.message.header.msg_flags, 1396 request.message.name, 1397 &request.message.header.msg_namelen, 1398 cast(const(WSAOVERLAPPEDX*)) overlapped, 1399 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete); 1400 } else { 1401 .tracef("WSARecv on FD %s with buffer size %s", 1402 socket.handle, request.message.header.msg_iov.len); 1403 err = WSARecv(socket.handle, 1404 request.message.buffers, 1405 cast(DWORD) request.message.bufferCount, 1406 null, 1407 &request.message.header.msg_flags, 1408 cast(const(WSAOVERLAPPEDX*)) overlapped, 1409 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete); 1410 } 1411 if (err == SOCKET_ERROR) { 1412 m_error = WSAGetLastErrorSafe(); 1413 if (m_error == WSA_IO_PENDING) return; 1414 1415 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1416 if (request.message) assumeWontThrow(NetworkMessage.free(request.message)); 1417 assumeWontThrow(AsyncReceiveRequest.free(request)); 1418 1419 // TODO: Possibly deal with WSAEWOULDBLOCK, which supposedly signals 1420 // too many pending overlapped I/O requests. 1421 if (m_error == WSAECONNRESET || 1422 m_error == WSAECONNABORTED || 1423 m_error == WSAENOTSOCK) { 1424 socket.handleClose(); 1425 1426 *socket.connected = false; 1427 1428 closesocket(socket.handle); 1429 return; 1430 } 1431 1432 .errorf("WSARecv* on FD %d encountered socket error: %s", socket.handle, this.error); 1433 m_status.code = Status.ABORT; 1434 socket.kill(); 1435 socket.handleError(); 1436 } 1437 } 1438 1439 void submitRequest(AsyncSendRequest* request) 1440 { 1441 auto overlapped = assumeWontThrow(AsyncOverlapped.alloc()); 1442 overlapped.send = request; 1443 auto socket = request.socket; 1444 1445 int err = void; 1446 if (request.message.name) { 1447 .tracef("WSASendTo on FD %s for %s with buffer size %s", 1448 socket.handle, 1449 NetworkAddress(request.message.name, request.message.header.msg_namelen), 1450 request.message.header.msg_iov.len); 1451 err = WSASendTo(socket.handle, 1452 request.message.buffers, 1453 cast(DWORD) request.message.bufferCount, 1454 null, 1455 request.message.header.msg_flags, 1456 request.message.name, 1457 request.message.nameLength, 1458 cast(const(WSAOVERLAPPEDX*)) overlapped, 1459 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedSendComplete); 1460 } else { 1461 .tracef("WSASend on FD %s with buffer size %s", socket.handle, request.message.header.msg_iov.len); 1462 err = WSASend(socket.handle, 1463 request.message.buffers, 1464 cast(DWORD) request.message.bufferCount, 1465 null, 1466 request.message.header.msg_flags, 1467 cast(const(WSAOVERLAPPEDX*)) overlapped, 1468 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedSendComplete); 1469 } 1470 1471 if (err == SOCKET_ERROR) { 1472 m_error = WSAGetLastErrorSafe(); 1473 if (m_error == WSA_IO_PENDING) return; 1474 1475 assumeWontThrow(AsyncOverlapped.free(overlapped)); 1476 assumeWontThrow(NetworkMessage.free(request.message)); 1477 assumeWontThrow(AsyncSendRequest.free(request)); 1478 1479 // TODO: Possibly deal with WSAEWOULDBLOCK, which supposedly signals 1480 // too many pending overlapped I/O requests. 1481 if (m_error == WSAECONNRESET || 1482 m_error == WSAECONNABORTED || 1483 m_error == WSAENOTSOCK) { 1484 socket.handleClose(); 1485 1486 *socket.connected = false; 1487 1488 closesocket(socket.handle); 1489 return; 1490 } 1491 1492 .errorf("WSASend* on FD %d encountered socket error: %s", socket.handle, this.error); 1493 m_status.code = Status.ABORT; 1494 socket.kill(); 1495 socket.handleError(); 1496 } 1497 } 1498 1499 pragma(inline, true) 1500 uint recv(in fd_t fd, void[] data) 1501 { 1502 m_status = StatusInfo.init; 1503 int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 1504 1505 //static if (LOG) try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch (Throwable) {} 1506 if (catchSocketError!".recv"(ret)) { // ret == -1 1507 if (m_error == error_t.WSAEWOULDBLOCK) 1508 m_status.code = Status.ASYNC; 1509 else if (m_error == error_t.WSAEINTR) 1510 m_status.code = Status.RETRY; 1511 return 0; // TODO: handle some errors more specifically 1512 } 1513 1514 return cast(uint) ret; 1515 } 1516 1517 pragma(inline, true) 1518 uint send(in fd_t fd, in void[] data) 1519 { 1520 m_status = StatusInfo.init; 1521 static if (LOG) try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string); 1522 catch (Throwable) {} 1523 int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0); 1524 1525 if (catchSocketError!"send"(ret)) { 1526 if (m_error == error_t.WSAEWOULDBLOCK) 1527 m_status.code = Status.ASYNC; 1528 else if (m_error == error_t.WSAEWOULDBLOCK) 1529 m_status.code = Status.RETRY; 1530 return 0; // TODO: handle some errors more specifically 1531 } 1532 return cast(uint) ret; 1533 } 1534 1535 bool broadcast(in fd_t fd, bool b) { 1536 1537 int val = b?1:0; 1538 socklen_t len = val.sizeof; 1539 int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len); 1540 if (catchSocketError!"setsockopt"(err)) 1541 return false; 1542 1543 return true; 1544 1545 } 1546 1547 uint recvFrom(in fd_t fd, void[] data, ref NetworkAddress addr) 1548 { 1549 m_status = StatusInfo.init; 1550 1551 addr.family = AF_INET6; 1552 socklen_t addrLen = addr.sockAddrLen; 1553 int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen); 1554 1555 if (addrLen < addr.sockAddrLen) { 1556 addr.family = AF_INET; 1557 } 1558 1559 static if (LOG) try log("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable) {} 1560 if (catchSocketError!".recvfrom"(ret)) { // ret == -1 1561 if (m_error == WSAEWOULDBLOCK) 1562 m_status.code = Status.ASYNC; 1563 else if (m_error == WSAEINTR) 1564 m_status.code = Status.RETRY; 1565 return 0; // TODO: handle some errors more specifically 1566 } 1567 m_status.code = Status.OK; 1568 1569 return cast(uint) ret; 1570 } 1571 1572 uint sendTo(in fd_t fd, in void[] data, in NetworkAddress addr) 1573 { 1574 m_status = StatusInfo.init; 1575 static if (LOG) try log("SENDTO " ~ data.length.to!string ~ "B " ~ addr.toString()); catch (Throwable) {} 1576 int ret; 1577 if (addr != NetworkAddress.init) 1578 ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen); 1579 else 1580 ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 1581 1582 if (catchSocketError!".sendTo"(ret)) { // ret == -1 1583 if (m_error == WSAEWOULDBLOCK) 1584 m_status.code = Status.ASYNC; 1585 else if (m_error == WSAEINTR) 1586 m_status.code = Status.RETRY; 1587 return 0; // TODO: handle some errors more specifically 1588 } 1589 1590 m_status.code = Status.OK; 1591 return cast(uint) ret; 1592 } 1593 1594 NetworkAddress localAddr(in fd_t fd, bool ipv6) { 1595 NetworkAddress ret; 1596 import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr; 1597 if (ipv6) 1598 ret.family = AF_INET6; 1599 else 1600 ret.family = AF_INET; 1601 socklen_t len = ret.sockAddrLen; 1602 int err = getsockname(fd, ret.sockAddr, &len); 1603 if (catchSocketError!"getsockname"(err)) 1604 return NetworkAddress.init; 1605 if (len > ret.sockAddrLen) 1606 ret.family = AF_INET6; 1607 return ret; 1608 } 1609 1610 void noDelay(in fd_t fd, bool b) { 1611 m_status = StatusInfo.init; 1612 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof); 1613 } 1614 1615 private bool closeRemoteSocket(fd_t fd, bool forced) { 1616 1617 INT err; 1618 1619 static if (LOG) try log("Shutdown FD#" ~ fd.to!string); 1620 catch (Throwable) {} 1621 if (forced) { 1622 err = shutdown(fd, SD_BOTH); 1623 closesocket(fd); 1624 } 1625 else 1626 err = shutdown(fd, SD_SEND); 1627 1628 try { 1629 TCPEventHandler* evh = fd in m_tcpHandlers; 1630 if (evh) { 1631 if (evh.conn.inbound) { 1632 try ThreadMem.free(evh.conn); 1633 catch(Exception e) { assert(false, "Failed to free resources"); } 1634 } 1635 1636 evh.conn = null; 1637 //static if (LOG) log("Remove event handler for " ~ fd.to!string); 1638 m_tcpHandlers.remove(fd); 1639 } 1640 } 1641 catch (Exception e) { 1642 setInternalError!"m_tcpHandlers.remove"(Status.ERROR); 1643 return false; 1644 } 1645 if (catchSocketError!"shutdown"(err)) 1646 return false; 1647 return true; 1648 } 1649 1650 // for connected sockets 1651 bool closeSocket(fd_t fd, bool connected, bool forced = false) 1652 { 1653 m_status = StatusInfo.init; 1654 if (!connected && forced) { 1655 try { 1656 if (fd in m_connHandlers) { 1657 static if (LOG) log("Removing connection handler for: " ~ fd.to!string); 1658 m_connHandlers.remove(fd); 1659 } 1660 } 1661 catch (Exception e) { 1662 setInternalError!"m_connHandlers.remove"(Status.ERROR); 1663 return false; 1664 } 1665 } 1666 else if (connected) 1667 closeRemoteSocket(fd, forced); 1668 1669 if (!connected || forced) { 1670 // todo: flush the socket here? 1671 1672 INT err = closesocket(fd); 1673 if (catchSocketError!"closesocket"(err)) 1674 return false; 1675 1676 } 1677 return true; 1678 } 1679 1680 bool closeConnection(fd_t fd) { 1681 return closeSocket(fd, true); 1682 } 1683 1684 NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 1685 { 1686 m_status = StatusInfo.init; 1687 1688 NetworkAddress addr; 1689 WSAPROTOCOL_INFOW hints; 1690 import std.conv : to; 1691 if (ipv6) { 1692 addr.family = AF_INET6; 1693 } 1694 else { 1695 addr.family = AF_INET; 1696 } 1697 1698 INT addrlen = addr.sockAddrLen; 1699 1700 LPWSTR str; 1701 try { 1702 str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr); 1703 } catch (Exception e) { 1704 setInternalError!"toStringz"(Status.ERROR, e.msg); 1705 return NetworkAddress.init; 1706 } 1707 1708 INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 1709 if (port != 0) addr.port = port; 1710 static if (LOG) try log(addr.toString()); 1711 catch (Throwable) {} 1712 if( catchSocketError!"getAddressFromIP"(err) ) 1713 return NetworkAddress.init; 1714 else assert(addrlen == addr.sockAddrLen); 1715 return addr; 1716 } 1717 1718 NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true) 1719 /*in { 1720 debug import libasync.internals.validator : validateHost; 1721 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 1722 } 1723 body */{ 1724 m_status = StatusInfo.init; 1725 import std.conv : to; 1726 NetworkAddress addr; 1727 ADDRINFOW hints; 1728 ADDRINFOW* infos; 1729 LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*); 1730 if (ipv6) { 1731 hints.ai_family = AF_INET6; 1732 addr.family = AF_INET6; 1733 } 1734 else { 1735 hints.ai_family = AF_INET; 1736 addr.family = AF_INET; 1737 } 1738 1739 if (tcp) { 1740 hints.ai_protocol = IPPROTO_TCP; 1741 hints.ai_socktype = SOCK_STREAM; 1742 } 1743 else { 1744 hints.ai_protocol = IPPROTO_UDP; 1745 hints.ai_socktype = SOCK_DGRAM; 1746 } 1747 if (port != 0) addr.port = port; 1748 1749 LPCWSTR str; 1750 1751 try { 1752 str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host); 1753 } catch (Exception e) { 1754 setInternalError!"toUTFz"(Status.ERROR, e.msg); 1755 return NetworkAddress.init; 1756 } 1757 1758 error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos); 1759 scope(exit) FreeAddrInfoW(infos); 1760 if (err != EWIN.WSA_OK) { 1761 setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err); 1762 return NetworkAddress.init; 1763 } 1764 1765 ubyte* pAddr = cast(ubyte*) infos.ai_addr; 1766 ubyte* data = cast(ubyte*) addr.sockAddr; 1767 data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy 1768 static if (LOG) try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString()); 1769 catch (Exception e){} 1770 return addr; 1771 } 1772 1773 pragma(inline, true) 1774 void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED) 1775 { 1776 if (details.length > 0) 1777 m_status.text = TRACE ~ ": " ~ details; 1778 else 1779 m_status.text = TRACE; 1780 m_error = error; 1781 m_status.code = s; 1782 static if(LOG) log(m_status); 1783 } 1784 private: 1785 bool onMessage(MSG msg) 1786 { 1787 m_status = StatusInfo.init; 1788 switch (msg.message) { 1789 case WM_TCP_SOCKET: 1790 auto evt = LOWORD(msg.lParam); 1791 auto err = HIWORD(msg.lParam); 1792 if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) { 1793 1794 if (evt == FD_ACCEPT) 1795 setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 1796 else { 1797 try { 1798 TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam); 1799 cb(TCPEvent.ERROR); 1800 } 1801 catch (Exception e) { 1802 // An Error callback should never fail... 1803 setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 1804 // assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 1805 } 1806 } 1807 } 1808 break; 1809 case WM_UDP_SOCKET: 1810 auto evt = LOWORD(msg.lParam); 1811 auto err = HIWORD(msg.lParam); 1812 if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) { 1813 try { 1814 UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam); 1815 cb(UDPEvent.ERROR); 1816 } 1817 catch (Exception e) { 1818 // An Error callback should never fail... 1819 setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 1820 } 1821 } 1822 break; 1823 case WM_TIMER: 1824 static if (LOG) try log("Timer callback: " ~ m_timer.fd.to!string); catch (Throwable) {} 1825 TimerHandler cb; 1826 bool cached = (m_timer.fd == cast(fd_t)msg.wParam); 1827 try { 1828 if (cached) 1829 cb = m_timer.cb; 1830 else 1831 cb = m_timerHandlers.get(cast(fd_t)msg.wParam); 1832 cb.ctxt.rearmed = false; 1833 cb(); 1834 1835 if (cb.ctxt.oneShot && !cb.ctxt.rearmed) 1836 kill(cb.ctxt); 1837 1838 } 1839 catch (Exception e) { 1840 // An Error callback should never fail... 1841 setInternalError!"del@TimerHandler"(Status.ERROR, e.msg); 1842 } 1843 1844 break; 1845 case WM_USER_EVENT: 1846 static if (LOG) log("User event"); 1847 1848 ulong uwParam = cast(ulong)msg.wParam; 1849 ulong ulParam = cast(ulong)msg.lParam; 1850 1851 ulong payloadAddr = (ulParam << 32) | uwParam; 1852 void* payloadPtr = cast(void*) payloadAddr; 1853 shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr; 1854 1855 static if (LOG) try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch (Throwable) {} 1856 try { 1857 assert(ctxt.id != 0); 1858 ctxt.handler(); 1859 } 1860 catch (Exception e) { 1861 setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 1862 } 1863 break; 1864 case WM_USER_SIGNAL: 1865 static if (LOG) log("User signal"); 1866 1867 ulong uwParam = cast(ulong)msg.wParam; 1868 ulong ulParam = cast(ulong)msg.lParam; 1869 1870 ulong payloadAddr = (ulParam << 32) | uwParam; 1871 void* payloadPtr = cast(void*) payloadAddr; 1872 AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr; 1873 1874 try { 1875 ctxt.handler(); 1876 } 1877 catch (Exception e) { 1878 setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 1879 } 1880 break; 1881 default: return false; // not handled, sends to wndProc 1882 } 1883 return true; 1884 } 1885 1886 bool onUDPEvent(WORD evt, WORD err, fd_t sock) { 1887 m_status = StatusInfo.init; 1888 try{ 1889 if (m_udpHandlers.get(sock) == UDPHandler.init) 1890 return false; 1891 } catch (Throwable) {} 1892 if (sock == 0) { // highly unlikely... 1893 setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined"); 1894 return false; 1895 } 1896 if (err) { 1897 setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err); 1898 try { 1899 //log("CLOSE FD#" ~ sock.to!string); 1900 (m_udpHandlers)[sock](UDPEvent.ERROR); 1901 } catch (Throwable) { // can't do anything about this... 1902 } 1903 return false; 1904 } 1905 1906 UDPHandler cb; 1907 switch(evt) { 1908 default: break; 1909 case FD_READ: 1910 try { 1911 static if (LOG) log("READ FD#" ~ sock.to!string); 1912 cb = m_udpHandlers.get(sock); 1913 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1914 cb(UDPEvent.READ); 1915 } 1916 catch (Exception e) { 1917 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1918 return false; 1919 } 1920 break; 1921 case FD_WRITE: 1922 try { 1923 static if (LOG) log("WRITE FD#" ~ sock.to!string); 1924 cb = m_udpHandlers.get(sock); 1925 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1926 cb(UDPEvent.WRITE); 1927 } 1928 catch (Exception e) { 1929 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1930 return false; 1931 } 1932 break; 1933 } 1934 return true; 1935 } 1936 1937 bool onTCPEvent(WORD evt, WORD err, fd_t sock) { 1938 m_status = StatusInfo.init; 1939 try{ 1940 if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init) 1941 return false; 1942 } catch (Throwable) {} 1943 if (sock == 0) { // highly unlikely... 1944 setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined"); 1945 return false; 1946 } 1947 if (err) { 1948 setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err); 1949 try { 1950 //log("CLOSE FD#" ~ sock.to!string); 1951 (m_tcpHandlers)[sock](TCPEvent.ERROR); 1952 } catch (Throwable) { // can't do anything about this... 1953 } 1954 return false; 1955 } 1956 1957 TCPEventHandler cb; 1958 switch(evt) { 1959 default: break; 1960 case FD_ACCEPT: 1961 version(Distributed) gs_mtx.lock_nothrow(); 1962 1963 static if (LOG) log("TCP Handlers: " ~ m_tcpHandlers.length.to!string); 1964 static if (LOG) log("Accepting connection"); 1965 /// Let another listener take the next connection 1966 TCPAcceptHandler list; 1967 try list = m_connHandlers[sock]; catch (Throwable) { assert(false, "Listening on an invalid socket..."); } 1968 scope(exit) { 1969 /// The connection rotation mechanism is handled by the TCPListenerDistMixins 1970 /// when registering the same AsyncTCPListener object on multiple event loops. 1971 /// This allows to even out the CPU usage on a server instance. 1972 version(Distributed) 1973 { 1974 HWND hwnd = list.ctxt.next(m_hwnd); 1975 if (hwnd !is HWND.init) { 1976 int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1977 if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) { 1978 error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1979 if (catchSocketError!"WSAAsyncSelect"(error)) 1980 assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 1981 } 1982 } 1983 else static if (LOG) log("Returned init!!"); 1984 gs_mtx.unlock_nothrow(); 1985 } 1986 } 1987 1988 NetworkAddress addr; 1989 addr.family = AF_INET; 1990 int addrlen = addr.sockAddrLen; 1991 fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1992 1993 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) { 1994 if (m_error == WSAEFAULT) { // not enough space for sockaddr 1995 addr.family = AF_INET6; 1996 addrlen = addr.sockAddrLen; 1997 csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1998 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) 1999 return false; 2000 } 2001 else return false; 2002 } 2003 2004 int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 2005 if ( catchSocketError!"WSAAsyncSelect"(ok) ) 2006 return false; 2007 2008 static if (LOG) log("Connection accepted: " ~ csock.to!string); 2009 2010 AsyncTCPConnection conn; 2011 try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop); 2012 catch (Exception e) { assert(false, "Failed allocation"); } 2013 conn.peer = addr; 2014 conn.socket = csock; 2015 conn.inbound = true; 2016 2017 try { 2018 // Do the callback to get a handler 2019 cb = list(conn); 2020 } 2021 catch(Exception e) { 2022 setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 2023 return false; 2024 } 2025 2026 try { 2027 m_tcpHandlers[csock] = cb; // keep the handler to setup the connection 2028 static if (LOG) log("ACCEPT&CONNECT FD#" ~ csock.to!string); 2029 *conn.connected = true; 2030 cb(TCPEvent.CONNECT); 2031 } 2032 catch (Exception e) { 2033 setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 2034 return false; 2035 } 2036 break; 2037 case FD_CONNECT: 2038 try { 2039 static if (LOG) log("CONNECT FD#" ~ sock.to!string); 2040 cb = m_tcpHandlers.get(sock); 2041 if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback"); 2042 *cb.conn.connecting = true; 2043 } 2044 catch(Exception e) { 2045 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 2046 return false; 2047 } 2048 break; 2049 case FD_READ: 2050 try { 2051 static if (LOG) log("READ FD#" ~ sock.to!string); 2052 cb = m_tcpHandlers.get(sock); 2053 if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback"); 2054 if (!cb.conn) break; 2055 if (*cb.conn.connected == false && *cb.conn.connecting) { 2056 static if (LOG) log("TCPEvent CONNECT FD#" ~ sock.to!string); 2057 2058 *cb.conn.connecting = false; 2059 *cb.conn.connected = true; 2060 cb(TCPEvent.CONNECT); 2061 } 2062 else { 2063 static if (LOG) log("TCPEvent READ FD#" ~ sock.to!string); 2064 cb(TCPEvent.READ); 2065 } 2066 } 2067 catch (Exception e) { 2068 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 2069 return false; 2070 } 2071 break; 2072 case FD_WRITE: 2073 // todo: don't send the first write for consistency with epoll? 2074 2075 try { 2076 //import std.stdio; 2077 static if (LOG) log("WRITE FD#" ~ sock.to!string); 2078 cb = m_tcpHandlers.get(sock); 2079 if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 2080 if (!cb.conn) break; 2081 if (*cb.conn.connected == false && *cb.conn.connecting) { 2082 *cb.conn.connecting = false; 2083 *cb.conn.connected = true; 2084 cb(TCPEvent.CONNECT); 2085 } 2086 else { 2087 cb(TCPEvent.WRITE); 2088 } 2089 } 2090 catch (Exception e) { 2091 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 2092 return false; 2093 } 2094 break; 2095 case FD_CLOSE: 2096 // called after shutdown() 2097 INT ret; 2098 bool connected = true; 2099 try { 2100 static if (LOG) log("CLOSE FD#" ~ sock.to!string); 2101 if (sock in m_tcpHandlers) { 2102 cb = m_tcpHandlers.get(sock); 2103 if (*cb.conn.connected || *cb.conn.connecting) { 2104 cb(TCPEvent.CLOSE); 2105 *cb.conn.connecting = false; 2106 *cb.conn.connected = false; 2107 } else 2108 connected = false; 2109 } 2110 else 2111 connected = false; 2112 } 2113 catch (Exception e) { 2114 if (m_status.code == Status.OK) 2115 setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 2116 return false; 2117 } 2118 2119 closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket 2120 2121 break; 2122 } 2123 return true; 2124 } 2125 2126 bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt) 2127 { 2128 INT err; 2129 static if (LOG) log("Binding to UDP " ~ ctxt.local.toString()); 2130 2131 if (!setOption(fd, TCPOption.REUSEADDR, true)) { 2132 closesocket(fd); 2133 return false; 2134 } 2135 2136 err = .bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 2137 if (catchSocketError!"bind"(err)) { 2138 closesocket(fd); 2139 return false; 2140 } 2141 err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE); 2142 if (catchSocketError!"WSAAsyncSelect"(err)) { 2143 closesocket(fd); 2144 return false; 2145 } 2146 2147 return true; 2148 } 2149 2150 bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false) 2151 in { 2152 assert(m_threadId == GetCurrentThreadId()); 2153 assert(ctxt.local !is NetworkAddress.init); 2154 } 2155 body { 2156 INT err; 2157 if (!reusing) { 2158 err = .bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 2159 if (catchSocketError!"bind"(err)) { 2160 closesocket(fd); 2161 return false; 2162 } 2163 2164 err = .listen(fd, 128); 2165 if (catchSocketError!"listen"(err)) { 2166 closesocket(fd); 2167 return false; 2168 } 2169 2170 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 2171 if (catchSocketError!"WSAAsyncSelect"(err)) { 2172 closesocket(fd); 2173 return false; 2174 } 2175 } 2176 2177 return true; 2178 } 2179 2180 bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt) 2181 in { 2182 assert(ctxt.peer !is NetworkAddress.init); 2183 assert(ctxt.peer.port != 0, "Connecting to an invalid port"); 2184 } 2185 body { 2186 INT err; 2187 NetworkAddress bind_addr; 2188 bind_addr.family = ctxt.peer.family; 2189 2190 if (ctxt.peer.family == AF_INET) 2191 bind_addr.sockAddrInet4.sin_addr.s_addr = 0; 2192 else if (ctxt.peer.family == AF_INET6) 2193 bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0; 2194 else { 2195 status.code = Status.ERROR; 2196 status.text = "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string; 2197 return false; 2198 } 2199 2200 err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen); 2201 if ( catchSocketError!"bind"(err) ) 2202 return false; 2203 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 2204 if ( catchSocketError!"WSAAsyncSelect"(err) ) 2205 return false; 2206 err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen); 2207 2208 auto errors = [ tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ]; 2209 2210 if (catchSocketErrorsEq!"connectEQ"(err, errors)) { 2211 *ctxt.connecting = true; 2212 return true; 2213 } 2214 else if (catchSocketError!"connect"(err)) 2215 return false; 2216 2217 return true; 2218 } 2219 2220 bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 2221 if (isIntegral!T) 2222 { 2223 foreach (validator ; cmp) { 2224 if (val == validator[0]) { 2225 m_status.text = TRACE; 2226 m_status.code = validator[1]; 2227 if (m_status.code == Status.EVLOOP_TIMEOUT) { 2228 static if (LOG) log(m_status); 2229 break; 2230 } 2231 m_error = GetLastErrorSafe(); 2232 static if(LOG) log(m_status); 2233 return true; 2234 } 2235 } 2236 return false; 2237 } 2238 2239 pragma(inline, true) 2240 bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 2241 if (isIntegral!T) 2242 { 2243 foreach (validator ; cmp) { 2244 if (val == validator[0]) { 2245 m_status.text = TRACE; 2246 m_error = WSAGetLastErrorSafe(); 2247 m_status.status = validator[1]; 2248 static if(LOG) log(m_status); 2249 return true; 2250 } 2251 } 2252 return false; 2253 } 2254 2255 bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...) 2256 if (isIntegral!T) 2257 { 2258 error_t err; 2259 foreach (validator ; cmp) { 2260 if (val == validator[0]) { 2261 if (err is EWIN.init) err = WSAGetLastErrorSafe(); 2262 if (err == validator[1]) { 2263 m_status.text = TRACE; 2264 m_error = WSAGetLastErrorSafe(); 2265 m_status.code = validator[2]; 2266 static if(LOG) log(m_status); 2267 return true; 2268 } 2269 } 2270 } 2271 return false; 2272 } 2273 2274 pragma(inline, true) 2275 bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR) 2276 if (isIntegral!T) 2277 { 2278 if (val == cmp) { 2279 m_status.text = TRACE; 2280 m_error = WSAGetLastErrorSafe(); 2281 m_status.code = Status.ABORT; 2282 static if(LOG) log(m_status); 2283 return true; 2284 } 2285 return false; 2286 } 2287 2288 pragma(inline, true) 2289 error_t WSAGetLastErrorSafe() { 2290 try { 2291 return cast(error_t) WSAGetLastError(); 2292 } catch(Exception e) { 2293 return EWIN.ERROR_ACCESS_DENIED; 2294 } 2295 } 2296 2297 pragma(inline, true) 2298 error_t GetLastErrorSafe() { 2299 try { 2300 return cast(error_t) GetLastError(); 2301 } catch(Exception e) { 2302 return EWIN.ERROR_ACCESS_DENIED; 2303 } 2304 } 2305 2306 void log(StatusInfo val) 2307 { 2308 static if (LOG) { 2309 import std.stdio; 2310 try { 2311 writeln("Backtrace: ", m_status.text); 2312 writeln(" | Status: ", m_status.code); 2313 writeln(" | Error: " , m_error); 2314 if ((m_error in EWSAMessages) !is null) 2315 writeln(" | Message: ", EWSAMessages[m_error]); 2316 } catch(Exception e) { 2317 return; 2318 } 2319 } 2320 } 2321 2322 void log(T)(lazy T val) 2323 { 2324 static if (LOG) { 2325 import std.stdio; 2326 try { 2327 writeln(val); 2328 } catch(Exception e) { 2329 return; 2330 } 2331 } 2332 } 2333 2334 } 2335 2336 mixin template COSocketMixins() { 2337 2338 private CleanupData m_impl; 2339 2340 struct CleanupData { 2341 bool connected; 2342 bool connecting; 2343 } 2344 2345 @property bool* connecting() { 2346 return &m_impl.connecting; 2347 } 2348 2349 @property bool* connected() { 2350 return &m_impl.connected; 2351 } 2352 2353 } 2354 /* 2355 mixin template TCPListenerDistMixins() 2356 { 2357 import core.sys.windows.windows : HWND; 2358 import libasync.internals.hashmap : HashMap; 2359 import core.sync.mutex; 2360 private { 2361 bool m_dist; 2362 2363 Tuple!(WinReference, bool*) m_handles; 2364 __gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist; 2365 __gshared Mutex gs_mutex; 2366 } 2367 2368 /// The TCP Listener schedules distributed connection handlers based on 2369 /// the event loops that are using the same AsyncTCPListener object. 2370 /// This is done by using WSAAsyncSelect on a different window after each 2371 /// accept TCPEvent. 2372 class WinReference { 2373 private { 2374 struct Item { 2375 HWND handle; 2376 bool active; 2377 } 2378 2379 Item[] m_items; 2380 } 2381 2382 this(HWND hndl, bool b) { 2383 append(hndl, b); 2384 } 2385 2386 void append(HWND hndl, bool b) { 2387 m_items ~= Item(hndl, b); 2388 } 2389 2390 HWND next(HWND me) { 2391 Item[] items; 2392 synchronized(gs_mutex) 2393 items = m_items; 2394 if (items.length == 1) 2395 return me; 2396 foreach (i, item; items) { 2397 if (item.active == true) { 2398 m_items[i].active = false; // remove responsibility 2399 if (m_items.length <= i + 1) { 2400 m_items[0].active = true; // set responsibility 2401 auto ret = m_items[0].handle; 2402 return ret; 2403 } 2404 else { 2405 m_items[i + 1].active = true; 2406 auto ret = m_items[i + 1].handle; 2407 return ret; 2408 } 2409 } 2410 2411 } 2412 assert(false); 2413 } 2414 2415 } 2416 2417 void init(HWND hndl, fd_t sock) { 2418 try { 2419 if (!gs_mutex) { 2420 gs_mutex = new Mutex; 2421 } 2422 synchronized(gs_mutex) { 2423 m_handles = gs_dist.get(sock); 2424 if (m_handles == typeof(m_handles).init) { 2425 gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist); 2426 m_handles = gs_dist.get(sock); 2427 assert(m_handles != typeof(m_handles).init); 2428 } 2429 else { 2430 m_handles[0].append(hndl, false); 2431 *m_handles[1] = true; // set first thread to dist 2432 m_dist = true; // set this thread to dist 2433 } 2434 } 2435 } catch (Exception e) { 2436 assert(false, e.toString()); 2437 } 2438 2439 } 2440 2441 HWND next(HWND me) { 2442 try { 2443 if (!m_dist) 2444 return HWND.init; 2445 return m_handles[0].next(me); 2446 } 2447 catch (Exception e) { 2448 assert(false, e.toString()); 2449 } 2450 } 2451 2452 }*/ 2453 private class DWHandlerInfo { 2454 DWHandler handler; 2455 Array!DWChangeInfo buffer; 2456 2457 this(DWHandler cb) { 2458 handler = cb; 2459 } 2460 } 2461 2462 private final class DWFolderWatcher { 2463 import libasync.internals.path; 2464 private: 2465 EventLoop m_evLoop; 2466 fd_t m_fd; 2467 bool m_recursive; 2468 HANDLE m_handle; 2469 Path m_path; 2470 DWFileEvent m_events; 2471 DWHandlerInfo m_handler; // contains buffer 2472 shared AsyncSignal m_signal; 2473 ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer; 2474 DWORD m_bytesTransferred; 2475 public: 2476 this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) { 2477 m_fd = fd; 2478 m_recursive = recursive; 2479 m_handle = cast(HANDLE)hndl; 2480 m_evLoop = evl; 2481 m_path = path; 2482 m_handler = handler; 2483 2484 m_signal = new shared AsyncSignal(m_evLoop); 2485 m_signal.run(&onChanged); 2486 triggerWatch(); 2487 } 2488 package: 2489 void close() { 2490 CloseHandle(m_handle); 2491 m_signal.kill(); 2492 } 2493 2494 void triggerChanged() { 2495 m_signal.trigger(); 2496 } 2497 2498 void onChanged() { 2499 ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred]; 2500 do { 2501 assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); 2502 auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr; 2503 DWFileEvent kind; 2504 switch( fni.Action ){ 2505 default: kind = DWFileEvent.MODIFIED; break; 2506 case 0x1: kind = DWFileEvent.CREATED; break; 2507 case 0x2: kind = DWFileEvent.DELETED; break; 2508 case 0x3: kind = DWFileEvent.MODIFIED; break; 2509 case 0x4: kind = DWFileEvent.MOVED_FROM; break; 2510 case 0x5: kind = DWFileEvent.MOVED_TO; break; 2511 } 2512 string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[] 2513 m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename))); 2514 if( fni.NextEntryOffset == 0 ) break; 2515 result = result[fni.NextEntryOffset .. $]; 2516 } while(result.length > 0); 2517 2518 triggerWatch(); 2519 2520 m_handler.handler(); 2521 } 2522 2523 void triggerWatch() { 2524 2525 static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME| 2526 FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE; 2527 2528 OVERLAPPED* overlapped = ThreadMem.alloc!OVERLAPPED(); 2529 overlapped.Internal = 0; 2530 overlapped.InternalHigh = 0; 2531 overlapped.Offset = 0; 2532 overlapped.OffsetHigh = 0; 2533 overlapped.Pointer = cast(void*)this; 2534 import std.stdio; 2535 DWORD bytesReturned; 2536 BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted); 2537 2538 static if (DEBUG) { 2539 import std.stdio; 2540 if (!success) 2541 writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]); 2542 } 2543 } 2544 2545 @property fd_t fd() const { 2546 return m_fd; 2547 } 2548 2549 @property HANDLE handle() const { 2550 return cast(HANDLE) m_handle; 2551 } 2552 2553 static nothrow extern(System) 2554 { 2555 void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) 2556 { 2557 import std.stdio; 2558 DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer); 2559 watcher.m_bytesTransferred = cbTransferred; 2560 try ThreadMem.free(overlapped); catch (Throwable) {} 2561 2562 static if (DEBUG) { 2563 if (dwError != 0) 2564 try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch (Throwable) {} 2565 } 2566 try watcher.triggerChanged(); 2567 catch (Exception e) { 2568 static if (DEBUG) { 2569 try writeln("Failed to trigger change"); catch (Throwable) {} 2570 } 2571 } 2572 } 2573 } 2574 } 2575 2576 /// Information for a single Windows overlapped I/O request; 2577 /// uses a freelist to minimize allocations. 2578 struct AsyncOverlapped 2579 { 2580 align (1): 2581 /// Required for Windows overlapped I/O requests 2582 OVERLAPPED overlapped; 2583 align: 2584 2585 union 2586 { 2587 AsyncAcceptRequest* accept; 2588 AsyncReceiveRequest* receive; 2589 AsyncSendRequest* send; 2590 } 2591 2592 @property void hEvent(HANDLE hEvent) @safe pure @nogc nothrow 2593 { overlapped.hEvent = hEvent; } 2594 2595 import libasync.internals.freelist; 2596 mixin FreeList!1_000; 2597 } 2598 2599 nothrow extern(System) 2600 { 2601 void onOverlappedReceiveComplete(error_t error, DWORD recvCount, AsyncOverlapped* overlapped, DWORD flags) 2602 { 2603 .tracef("onOverlappedReceiveComplete: error: %s, recvCount: %s, flags: %s", error, recvCount, flags); 2604 2605 auto request = overlapped.receive; 2606 2607 if (error == EWIN.WSA_OPERATION_ABORTED) { 2608 if (request.message) assumeWontThrow(NetworkMessage.free(request.message)); 2609 assumeWontThrow(AsyncReceiveRequest.free(request)); 2610 return; 2611 } 2612 2613 auto socket = overlapped.receive.socket; 2614 auto eventLoop = &socket.m_evLoop.m_evLoop; 2615 if (eventLoop.m_status.code != Status.OK) return; 2616 2617 eventLoop.m_status = StatusInfo.init; 2618 2619 assumeWontThrow(AsyncOverlapped.free(overlapped)); 2620 if (error == 0) { 2621 if (!request.message) { 2622 eventLoop.m_completedSocketReceives.insertBack(request); 2623 return; 2624 } else if (recvCount > 0 || !socket.connectionOriented) { 2625 request.message.count = request.message.count + recvCount; 2626 if (request.exact && !request.message.receivedAll) { 2627 eventLoop.submitRequest(request); 2628 return; 2629 } else { 2630 eventLoop.m_completedSocketReceives.insertBack(request); 2631 return; 2632 } 2633 } 2634 } else if (recvCount > 0) { 2635 eventLoop.m_completedSocketReceives.insertBack(request); 2636 return; 2637 } 2638 2639 assumeWontThrow(NetworkMessage.free(request.message)); 2640 assumeWontThrow(AsyncReceiveRequest.free(request)); 2641 2642 if (error == WSAECONNRESET || error == WSAECONNABORTED || recvCount == 0) { 2643 socket.kill(); 2644 socket.handleClose(); 2645 return; 2646 } 2647 2648 eventLoop.m_status.code = Status.ABORT; 2649 socket.kill(); 2650 socket.handleError(); 2651 } 2652 2653 void onOverlappedSendComplete(error_t error, DWORD sentCount, AsyncOverlapped* overlapped, DWORD flags) 2654 { 2655 .tracef("onOverlappedSendComplete: error: %s, sentCount: %s, flags: %s", error, sentCount, flags); 2656 2657 auto request = overlapped.send; 2658 2659 if (error == EWIN.WSA_OPERATION_ABORTED) { 2660 assumeWontThrow(NetworkMessage.free(request.message)); 2661 assumeWontThrow(AsyncSendRequest.free(request)); 2662 return; 2663 } 2664 2665 auto socket = overlapped.send.socket; 2666 auto eventLoop = &socket.m_evLoop.m_evLoop; 2667 if (eventLoop.m_status.code != Status.OK) return; 2668 2669 eventLoop.m_status = StatusInfo.init; 2670 2671 assumeWontThrow(AsyncOverlapped.free(overlapped)); 2672 if (error == 0) { 2673 request.message.count = request.message.count + sentCount; 2674 assert(request.message.sent); 2675 eventLoop.m_completedSocketSends.insertBack(request); 2676 return; 2677 } 2678 2679 assumeWontThrow(NetworkMessage.free(request.message)); 2680 assumeWontThrow(AsyncSendRequest.free(request)); 2681 2682 if (error == WSAECONNRESET || error == WSAECONNABORTED) { 2683 socket.kill(); 2684 socket.handleClose(); 2685 return; 2686 } 2687 2688 eventLoop.m_status.code = Status.ABORT; 2689 socket.kill(); 2690 socket.handleError(); 2691 } 2692 } 2693 2694 enum WM_TCP_SOCKET = WM_USER+102; 2695 enum WM_UDP_SOCKET = WM_USER+103; 2696 enum WM_USER_EVENT = WM_USER+104; 2697 enum WM_USER_SIGNAL = WM_USER+105; 2698 2699 nothrow: 2700 2701 __gshared Vector!(size_t, Malloc) gs_availID; 2702 __gshared size_t gs_maxID; 2703 __gshared core.sync.mutex.Mutex gs_mutex; 2704 2705 private size_t createIndex() { 2706 size_t idx; 2707 import std.algorithm : max; 2708 try { 2709 size_t getIdx() { 2710 if (!gs_availID.empty) { 2711 immutable size_t ret = gs_availID.back; 2712 gs_availID.removeBack(); 2713 return ret; 2714 } 2715 return 0; 2716 } 2717 2718 synchronized(gs_mutex) { 2719 idx = getIdx(); 2720 if (idx == 0) { 2721 import std.range : iota; 2722 gs_availID.insert( iota(gs_maxID + 1, max(32, gs_maxID * 2 + 1), 1) ); 2723 gs_maxID = gs_availID[$-1]; 2724 idx = getIdx(); 2725 } 2726 } 2727 } catch (Exception e) { 2728 assert(false, "Failed to generate necessary ID for Manual Event waiters: " ~ e.msg); 2729 } 2730 2731 return idx; 2732 } 2733 2734 void destroyIndex(AsyncTimer ctxt) { 2735 try { 2736 synchronized(gs_mutex) gs_availID ~= ctxt.id; 2737 } 2738 catch (Exception e) { 2739 assert(false, "Error destroying index: " ~ e.msg); 2740 } 2741 } 2742 2743 shared static this() { 2744 2745 try { 2746 if (!gs_mutex) { 2747 import core.sync.mutex; 2748 gs_mutex = new core.sync.mutex.Mutex; 2749 2750 gs_availID.reserve(32); 2751 2752 foreach (i; gs_availID.length .. gs_availID.capacity) { 2753 gs_availID.insertBack(i + 1); 2754 } 2755 2756 gs_maxID = 32; 2757 } 2758 } 2759 catch (Throwable) { 2760 assert(false, "Couldn't reserve necessary space for available Manual Events"); 2761 } 2762 2763 } 2764 2765 nothrow extern(System) { 2766 LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam) 2767 { 2768 auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); 2769 if (ptr is null) 2770 return DefWindowProcA(wnd, msg, wparam, lparam); 2771 auto appl = cast(EventLoopImpl*)ptr; 2772 MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init); 2773 if (appl.onMessage(obj)) { 2774 static if (DEBUG) { 2775 if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) { 2776 import std.stdio : writeln; 2777 try { writeln(appl.error, ": ", appl.m_status.text); } catch (Throwable) {} 2778 } 2779 } 2780 return 0; 2781 } 2782 else return DefWindowProcA(wnd, msg, wparam, lparam); 2783 } 2784 2785 BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam); 2786 2787 }