1 module libasync.windows; 2 3 version (Windows): 4 5 import core.atomic; 6 import core.thread : Fiber; 7 import libasync.types; 8 import libasync.internals.hashmap; 9 import libasync.internals.memory; 10 import std.container : Array; 11 import std.string : toStringz; 12 import std.conv : to; 13 import std.datetime : Duration, msecs, seconds; 14 import std.algorithm : min; 15 import libasync.internals.win32; 16 import std.traits : isIntegral; 17 import std.typecons : Tuple, tuple; 18 import std.utf : toUTFz; 19 import core.sync.mutex; 20 import libasync.events; 21 pragma(lib, "ws2_32"); 22 pragma(lib, "ole32"); 23 alias fd_t = SIZE_T; 24 alias error_t = EWIN; 25 26 //todo : see if new connections with SO_REUSEADDR are evenly distributed between threads 27 28 29 package struct EventLoopImpl { 30 pragma(msg, "Using Windows IOCP for events"); 31 32 private: 33 HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array 34 HashMap!(fd_t, TCPEventHandler) m_tcpHandlers; 35 HashMap!(fd_t, TimerHandler) m_timerHandlers; 36 HashMap!(fd_t, UDPHandler) m_udpHandlers; 37 HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too 38 HashMap!(uint, DWFolderWatcher) m_dwFolders; 39 nothrow: 40 private: 41 struct TimerCache { 42 TimerHandler cb; 43 fd_t fd; 44 } 45 TimerCache m_timer; 46 47 EventLoop m_evLoop; 48 bool m_started; 49 wstring m_window; 50 HWND m_hwnd; 51 DWORD m_threadId; 52 HANDLE[] m_waitObjects; 53 ushort m_instanceId; 54 StatusInfo m_status; 55 error_t m_error = EWIN.WSA_OK; 56 __gshared Mutex gs_mtx; 57 package: 58 @property bool started() const { 59 return m_started; 60 } 61 bool init(EventLoop evl) 62 in { assert(!m_started); } 63 body 64 { 65 try if (!gs_mtx) 66 gs_mtx = new Mutex; catch {} 67 static ushort j; 68 assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux."); 69 j += 1; 70 m_status = StatusInfo.init; 71 72 import core.thread; 73 try Thread.getThis().priority = Thread.PRIORITY_MAX; 74 catch (Exception e) { assert(false, "Could not set thread priority"); } 75 76 m_evLoop = evl; 77 shared static ushort i; 78 m_instanceId = i; 79 core.atomic.atomicOp!"+="(i, cast(ushort) 1); 80 wstring inststr; 81 try { inststr = m_instanceId.to!wstring; } 82 catch (Exception e) { 83 return false; 84 } 85 m_window = "VibeWin32MessageWindow" ~ inststr; 86 wstring classname = "VibeWin32MessageWindow" ~ inststr; 87 88 LPCWSTR wnz; 89 LPCWSTR clsn; 90 try { 91 wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*); 92 clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*); 93 } catch (Exception e) { 94 setInternalError!"toUTFz"(Status.ERROR, e.msg); 95 return false; 96 } 97 98 m_threadId = GetCurrentThreadId(); 99 WNDCLASSW wc; 100 wc.lpfnWndProc = &wndProc; 101 wc.lpszClassName = clsn; 102 RegisterClassW(&wc); 103 m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE, 104 cast(HMENU) null, null, null); 105 try log("Window registered: " ~ m_hwnd.to!string); catch{} 106 auto ptr = cast(ULONG_PTR)cast(void*)&this; 107 SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr); 108 assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this ); 109 WSADATA wd; 110 m_error = cast(error_t) WSAStartup(0x0202, &wd); 111 if (m_error == EWIN.WSA_OK) 112 m_status.code = Status.OK; 113 else { 114 m_status.code = Status.ABORT; 115 static if(LOG) log(m_status); 116 return false; 117 } 118 assert(wd.wVersion == 0x0202); 119 m_started = true; 120 return true; 121 } 122 123 // todo: find where to call this 124 void exit() { 125 cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0); 126 } 127 128 @property StatusInfo status() const { 129 return m_status; 130 } 131 132 @property string error() const { 133 string* ptr; 134 string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init; 135 return pv; 136 } 137 138 bool loop(Duration timeout = 0.seconds) 139 in { 140 assert(Fiber.getThis() is null); 141 assert(m_started); 142 } 143 body { 144 DWORD msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max); 145 /* 146 * Waits until one or all of the specified objects are in the signaled state 147 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx 148 */ 149 DWORD signal = MsgWaitForMultipleObjectsEx( 150 cast(DWORD)0, 151 null, 152 msTimeout, 153 QS_ALLEVENTS, 154 MWMO_ALERTABLE | MWMO_INPUTAVAILABLE // MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion) 155 // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting 156 ); 157 158 auto errors = 159 [ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ]; /* WAIT_FAILED: Failed to call MsgWait..() */ 160 161 if (signal == WAIT_TIMEOUT) 162 return true; 163 164 if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) { 165 log("Event Loop Exiting because of error"); 166 return false; 167 } 168 169 MSG msg; 170 while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) { 171 m_status = StatusInfo.init; 172 TranslateMessage(&msg); 173 DispatchMessageW(&msg); 174 175 if (m_status.code == Status.ERROR) { 176 log(m_status.text); 177 return false; 178 } 179 } 180 return true; 181 } 182 183 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 184 { 185 m_status = StatusInfo.init; 186 fd_t fd = ctxt.socket; 187 bool reusing; 188 if (fd == fd_t.init) { 189 190 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 191 192 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 193 return 0; 194 195 if (!setOption(fd, TCPOption.REUSEADDR, true)) 196 return 0; 197 198 // todo: defer accept? 199 200 if (ctxt.noDelay) { 201 if (!setOption(fd, TCPOption.NODELAY, true)) 202 return 0; 203 } 204 } else reusing = true; 205 206 if (initTCPListener(fd, ctxt, reusing)) 207 { 208 try { 209 log("Running listener on socket fd#" ~ fd.to!string); 210 m_connHandlers[fd] = del; 211 ctxt.init(m_hwnd, fd); 212 } 213 catch (Exception e) { 214 setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg); 215 closeSocket(fd, false); 216 return 0; 217 } 218 } 219 else 220 { 221 return 0; 222 } 223 224 225 return fd; 226 } 227 228 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 229 in { 230 assert(ctxt.socket == fd_t.init); 231 assert(ctxt.peer.family != AF_UNSPEC); 232 } 233 body { 234 m_status = StatusInfo.init; 235 fd_t fd = ctxt.preInitializedSocket; 236 237 if (fd == fd_t.init) 238 fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 239 log("Starting connection at: " ~ fd.to!string); 240 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 241 return 0; 242 243 try { 244 (m_tcpHandlers)[fd] = del; 245 } 246 catch (Exception e) { 247 setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg); 248 return 0; 249 } 250 251 debug { 252 TCPEventHandler evh; 253 try evh = m_tcpHandlers.get(fd); 254 catch (Exception e) { log("Failed"); return 0; } 255 assert( evh !is TCPEventHandler.init); 256 } 257 258 if (ctxt.noDelay) { 259 if (!setOption(fd, TCPOption.NODELAY, true)) 260 return 0; 261 } 262 263 if (!initTCPConnection(fd, ctxt)) { 264 try { 265 log("Remove event handler for " ~ fd.to!string); 266 m_tcpHandlers.remove(fd); 267 } 268 catch (Exception e) { 269 setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg); 270 } 271 272 closeSocket(fd, false); 273 return 0; 274 } 275 276 277 try log("Client started FD#" ~ fd.to!string); 278 catch{} 279 return fd; 280 } 281 282 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 283 m_status = StatusInfo.init; 284 fd_t fd = ctxt.preInitializedSocket; 285 286 if (fd == fd_t.init) 287 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); 288 289 if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET)) 290 return 0; 291 292 if (initUDPSocket(fd, ctxt)) 293 { 294 try { 295 (m_udpHandlers)[fd] = del; 296 } 297 catch (Exception e) { 298 setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg); 299 closeSocket(fd, false); 300 return 0; 301 } 302 } 303 304 try log("UDP Socket started FD#" ~ fd.to!string); 305 catch{} 306 307 return fd; 308 } 309 310 fd_t run(shared AsyncSignal ctxt) { 311 m_status = StatusInfo.init; 312 try log("Signal subscribed to: " ~ m_hwnd.to!string); catch {} 313 return (cast(fd_t)m_hwnd); 314 } 315 316 fd_t run(AsyncNotifier ctxt) { 317 m_status = StatusInfo.init; 318 //try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch {} 319 return cast(fd_t) m_hwnd; 320 } 321 322 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 323 if (timeout < 0.seconds) 324 timeout = 0.seconds; 325 timeout += 10.msecs(); // round up to the next 10 msecs to avoid premature timer events 326 m_status = StatusInfo.init; 327 fd_t timer_id = ctxt.id; 328 if (timer_id == fd_t.init) { 329 timer_id = createIndex(); 330 } 331 try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch {} 332 333 BOOL err; 334 try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null); 335 catch(Exception e) { 336 setInternalError!"SetTimer"(Status.ERROR); 337 return 0; 338 } 339 340 if (err == 0) 341 { 342 m_error = GetLastErrorSafe(); 343 m_status.code = Status.ERROR; 344 m_status.text = "kill(AsyncTimer)"; 345 log(m_status); 346 return 0; 347 } 348 349 if (m_timer.fd == fd_t.init) 350 { 351 m_timer.fd = timer_id; 352 m_timer.cb = del; 353 } 354 else { 355 try 356 { 357 (m_timerHandlers)[timer_id] = del; 358 } 359 catch (Exception e) { 360 setInternalError!"HashMap assign"(Status.ERROR); 361 return 0; 362 } 363 } 364 365 366 return timer_id; 367 } 368 369 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) 370 { 371 static fd_t ids; 372 auto fd = ++ids; 373 374 try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 375 catch (Exception e) { 376 setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg); 377 } 378 379 return fd; 380 381 } 382 383 bool kill(AsyncDirectoryWatcher ctxt) { 384 385 try { 386 Array!DWFolderWatcher toFree; 387 foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) { 388 if (v.fd == ctxt.fd) { 389 CloseHandle(v.handle); 390 m_dwFolders.remove(k); 391 } 392 } 393 394 foreach (DWFolderWatcher obj; toFree[]) 395 FreeListObjectAlloc!DWFolderWatcher.free(obj); 396 397 // todo: close all the handlers... 398 m_dwHandlers.remove(ctxt.fd); 399 } 400 catch (Exception e) { 401 setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg); 402 return false; 403 } 404 405 return true; 406 } 407 408 bool kill(AsyncTCPConnection ctxt, bool forced = false) 409 { 410 411 m_status = StatusInfo.init; 412 fd_t fd = ctxt.socket; 413 414 log("Killing socket "~ fd.to!string); 415 try { 416 auto cb = m_tcpHandlers.get(ctxt.socket); 417 if (cb != TCPEventHandler.init){ 418 *cb.conn.connected = false; 419 *cb.conn.connecting = false; 420 return closeSocket(fd, true, forced); 421 } 422 } catch (Exception e) { 423 setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg); 424 assert(false); 425 //return false; 426 } 427 428 return true; 429 } 430 431 bool kill(AsyncTCPListener ctxt) 432 { 433 m_status = StatusInfo.init; 434 fd_t fd = ctxt.socket; 435 try { 436 if ((ctxt.socket in m_connHandlers) !is null) { 437 return closeSocket(fd, false, true); 438 } 439 } catch (Exception e) { 440 setInternalError!"in m_connHandlers"(Status.ERROR, e.msg); 441 return false; 442 } 443 444 return true; 445 } 446 447 bool kill(shared AsyncSignal ctxt) { 448 return true; 449 } 450 451 bool kill(AsyncNotifier ctxt) { 452 return true; 453 } 454 455 bool kill(AsyncTimer ctxt) { 456 m_status = StatusInfo.init; 457 458 log("Kill timer"); 459 460 BOOL err = KillTimer(m_hwnd, ctxt.id); 461 if (err == 0) 462 { 463 m_error = GetLastErrorSafe(); 464 m_status.code = Status.ERROR; 465 m_status.text = "kill(AsyncTimer)"; 466 log(m_status); 467 return false; 468 } 469 470 destroyIndex(ctxt); 471 472 if (m_timer.fd == ctxt.id) { 473 ctxt.id = 0; 474 m_timer = TimerCache.init; 475 } else { 476 try { 477 m_timerHandlers.remove(ctxt.id); 478 } 479 catch (Exception e) { 480 setInternalError!"HashMap remove"(Status.ERROR); 481 return 0; 482 } 483 } 484 485 486 return true; 487 } 488 489 bool kill(AsyncUDPSocket ctxt) { 490 m_status = StatusInfo.init; 491 492 fd_t fd = ctxt.socket; 493 INT err = closesocket(fd); 494 if (catchSocketError!"closesocket"(err)) 495 return false; 496 497 try m_udpHandlers.remove(ctxt.socket); 498 catch (Exception e) { 499 setInternalError!"HashMap remove"(Status.ERROR); 500 return 0; 501 } 502 503 return true; 504 } 505 506 bool setOption(T)(fd_t fd, TCPOption option, in T value) { 507 m_status = StatusInfo.init; 508 int err; 509 try { 510 nothrow bool errorHandler() { 511 if (catchSocketError!"setOption:"(err)) { 512 try m_status.text ~= option.to!string; 513 catch (Exception e){ assert(false, "to!string conversion failure"); } 514 return false; 515 } 516 517 return true; 518 } 519 520 521 static HashMap!(fd_t, tcp_keepalive)* kcache; 522 523 final switch (option) { 524 525 case TCPOption.NODELAY: // true/false 526 static if (!is(T == bool)) 527 assert(false, "NODELAY value type must be bool, not " ~ T.stringof); 528 else { 529 BOOL val = value?1:0; 530 socklen_t len = val.sizeof; 531 err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); 532 return errorHandler(); 533 } 534 case TCPOption.REUSEADDR: // true/false 535 static if (!is(T == bool)) 536 assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof); 537 else 538 { 539 BOOL val = value?1:0; 540 socklen_t len = val.sizeof; 541 err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); 542 return errorHandler(); 543 } 544 case TCPOption.QUICK_ACK: 545 static if (!is(T == bool)) 546 assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof); 547 else { 548 m_status.code = Status.NOT_IMPLEMENTED; 549 return false; // quick ack is not implemented 550 } 551 case TCPOption.KEEPALIVE_ENABLE: // true/false 552 static if (!is(T == bool)) 553 assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof); 554 else 555 { 556 BOOL val = value?1:0; 557 socklen_t len = val.sizeof; 558 err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); 559 return errorHandler(); 560 } 561 case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn 562 static if (!isIntegral!T) 563 assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof); 564 else { 565 m_status.code = Status.NOT_IMPLEMENTED; 566 return false; 567 } 568 case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets 569 static if (!is(T == Duration)) 570 assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof); 571 else { 572 573 if (!kcache) 574 kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator()); 575 576 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 577 tcp_keepalive sReturned; 578 DWORD dwBytes; 579 kaSettings.onoff = ULONG(1); 580 if (kaSettings.keepalivetime == ULONG.init) { 581 kaSettings.keepalivetime = 1000; 582 } 583 kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG; 584 (*kcache)[fd] = kaSettings; 585 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 586 587 return errorHandler(); 588 } 589 case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start 590 static if (!is(T == Duration)) 591 assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof); 592 else { 593 594 if (!kcache) 595 kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator()); 596 597 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 598 tcp_keepalive sReturned; 599 DWORD dwBytes; 600 kaSettings.onoff = ULONG(1); 601 if (kaSettings.keepaliveinterval == ULONG.init) { 602 kaSettings.keepaliveinterval = 75*1000; 603 } 604 kaSettings.keepalivetime = value.total!"msecs".to!ULONG; 605 606 (*kcache)[fd] = kaSettings; 607 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 608 609 return errorHandler(); 610 } 611 case TCPOption.BUFFER_RECV: // bytes 612 static if (!isIntegral!T) 613 assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof); 614 else { 615 int val = value.to!int; 616 socklen_t len = val.sizeof; 617 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); 618 return errorHandler(); 619 } 620 case TCPOption.BUFFER_SEND: // bytes 621 static if (!isIntegral!T) 622 assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof); 623 else { 624 int val = value.to!int; 625 socklen_t len = val.sizeof; 626 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); 627 return errorHandler(); 628 } 629 case TCPOption.TIMEOUT_RECV: 630 static if (!is(T == Duration)) 631 assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof); 632 else { 633 DWORD val = value.total!"msecs".to!DWORD; 634 socklen_t len = val.sizeof; 635 err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len); 636 return errorHandler(); 637 } 638 case TCPOption.TIMEOUT_SEND: 639 static if (!is(T == Duration)) 640 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 641 else { 642 DWORD val = value.total!"msecs".to!DWORD; 643 socklen_t len = val.sizeof; 644 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len); 645 return errorHandler(); 646 } 647 case TCPOption.TIMEOUT_HALFOPEN: 648 static if (!is(T == Duration)) 649 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 650 else { 651 m_status.code = Status.NOT_IMPLEMENTED; 652 return false; 653 } 654 case TCPOption.LINGER: // bool onOff, int seconds 655 static if (!is(T == Tuple!(bool, int))) 656 assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof); 657 else { 658 linger l = linger(val[0]?1:0, val[1].to!USHORT); 659 socklen_t llen = l.sizeof; 660 err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen); 661 return errorHandler(); 662 } 663 case TCPOption.CONGESTION: 664 static if (!isIntegral!T) 665 assert(false, "CONGESTION value type must be integral, not " ~ T.stringof); 666 else { 667 m_status.code = Status.NOT_IMPLEMENTED; 668 return false; 669 } 670 case TCPOption.CORK: 671 static if (!isIntegral!T) 672 assert(false, "CORK value type must be int, not " ~ T.stringof); 673 else { 674 m_status.code = Status.NOT_IMPLEMENTED; 675 return false; 676 } 677 case TCPOption.DEFER_ACCEPT: // seconds 678 static if (!isIntegral!T) 679 assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof); 680 else { 681 int val = value.to!int; 682 socklen_t len = val.sizeof; 683 err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len); 684 return errorHandler(); 685 } 686 } 687 688 } 689 catch (Exception e) { 690 return false; 691 } 692 693 } 694 695 uint read(in fd_t fd, ref ubyte[] data) 696 { 697 return 0; 698 } 699 700 uint write(in fd_t fd, in ubyte[] data) 701 { 702 return 0; 703 } 704 705 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 706 size_t i; 707 Array!DWChangeInfo* changes; 708 try { 709 changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer); 710 if ((*changes).empty) 711 return 0; 712 713 import std.algorithm : min; 714 size_t cnt = min(dst.length, changes.length); 715 foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { 716 try log("reading change: " ~ change.path); catch {} 717 dst[i] = (*changes)[i]; 718 i++; 719 } 720 changes.linearRemove((*changes)[0 .. cnt]); 721 } 722 catch (Exception e) { 723 setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); 724 return 0; 725 } 726 try log("Changes returning with: " ~ i.to!string); catch {} 727 return cast(uint) i; 728 } 729 730 uint watch(in fd_t fd, in WatchInfo info) { 731 m_status = StatusInfo.init; 732 uint wd; 733 try { 734 HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()), 735 FILE_LIST_DIRECTORY, 736 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, 737 null, 738 OPEN_EXISTING, 739 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 740 null); 741 wd = cast(uint) hndl; 742 DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init); 743 assert(handler !is null); 744 log("Watching: " ~ info.path.toNativeString()); 745 (m_dwFolders)[wd] = FreeListObjectAlloc!DWFolderWatcher.alloc(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive); 746 } catch (Exception e) { 747 setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg); 748 return 0; 749 } 750 return wd; 751 } 752 753 bool unwatch(in fd_t fd, in fd_t _wd) { 754 uint wd = cast(uint) _wd; 755 m_status = StatusInfo.init; 756 try { 757 DWFolderWatcher fw = m_dwFolders.get(wd, null); 758 assert(fw !is null); 759 m_dwFolders.remove(wd); 760 fw.close(); 761 FreeListObjectAlloc!DWFolderWatcher.free(fw); 762 } catch (Exception e) { 763 setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg); 764 return false; 765 } 766 return true; 767 } 768 769 bool notify(T)(in fd_t fd, in T payload) 770 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 771 { 772 m_status = StatusInfo.init; 773 import std.conv; 774 775 auto payloadPtr = cast(ubyte*)payload; 776 auto payloadAddr = cast(ulong)payloadPtr; 777 778 WPARAM wparam = payloadAddr & 0xffffffff; 779 LPARAM lparam = cast(uint) (payloadAddr >> 32); 780 781 BOOL err; 782 static if (is(T == AsyncNotifier)) 783 err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam); 784 else 785 err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam); 786 try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch {} 787 if (err == 0) 788 { 789 m_error = GetLastErrorSafe(); 790 m_status.code = Status.ERROR; 791 m_status.text = "notify"; 792 log(m_status); 793 return false; 794 } 795 return true; 796 } 797 798 uint recv(in fd_t fd, ref ubyte[] data) 799 { 800 m_status = StatusInfo.init; 801 int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 802 803 //try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch {} 804 if (catchSocketError!".recv"(ret)) { // ret == -1 805 if (m_error == error_t.WSAEWOULDBLOCK) 806 m_status.code = Status.ASYNC; 807 return 0; // TODO: handle some errors more specifically 808 } 809 m_status.code = Status.OK; 810 811 return cast(uint) ret; 812 } 813 814 uint send(in fd_t fd, in ubyte[] data) 815 { 816 m_status = StatusInfo.init; 817 //try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string); 818 //catch{} 819 int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0); 820 821 if (catchSocketError!"send"(ret)) { 822 if (m_error == error_t.WSAEWOULDBLOCK) 823 m_status.code = Status.ASYNC; 824 return 0; // TODO: handle some errors more specifically 825 } 826 m_status.code = Status.ASYNC; 827 return cast(uint) ret; 828 } 829 830 bool broadcast(in fd_t fd, bool b) { 831 int val = b?1:0; 832 socklen_t len = val.sizeof; 833 int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len); 834 if (catchSocketError!"setsockopt"(err)) 835 return false; 836 837 return true; 838 839 } 840 841 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) 842 { 843 m_status = StatusInfo.init; 844 socklen_t addrLen; 845 addr.family = AF_INET; 846 int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen); 847 848 if (addrLen > addr.sockAddrLen) { 849 addr.family = AF_INET6; 850 } 851 852 try log("RECVFROM " ~ ret.to!string ~ "B"); catch {} 853 if (catchSocketError!".recvfrom"(ret)) { // ret == -1 854 if (m_error == WSAEWOULDBLOCK) 855 m_status.code = Status.ASYNC; 856 return 0; // TODO: handle some errors more specifically 857 } 858 m_status.code = Status.OK; 859 860 return cast(uint) ret; 861 } 862 863 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) 864 { 865 m_status = StatusInfo.init; 866 try log("SENDTO " ~ data.length.to!string ~ "B"); catch{} 867 int ret; 868 if (addr != NetworkAddress.init) 869 ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen); 870 else 871 ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 872 873 if (catchSocketError!".sendTo"(ret)) { // ret == -1 874 if (m_error == WSAEWOULDBLOCK) 875 m_status.code = Status.ASYNC; 876 return 0; // TODO: handle some errors more specifically 877 } 878 879 m_status.code = Status.OK; 880 return cast(uint) ret; 881 } 882 883 NetworkAddress localAddr(in fd_t fd, bool ipv6) { 884 NetworkAddress ret; 885 import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr; 886 if (ipv6) 887 ret.family = AF_INET6; 888 else 889 ret.family = AF_INET; 890 socklen_t len = ret.sockAddrLen; 891 int err = getsockname(fd, ret.sockAddr, &len); 892 if (catchSocketError!"getsockname"(err)) 893 return NetworkAddress.init; 894 if (len > ret.sockAddrLen) 895 ret.family = AF_INET6; 896 return ret; 897 } 898 899 void noDelay(in fd_t fd, bool b) { 900 m_status = StatusInfo.init; 901 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof); 902 } 903 904 private bool closeRemoteSocket(fd_t fd, bool forced) { 905 906 INT err; 907 908 try log("Shutdown FD#" ~ fd.to!string); 909 catch{} 910 if (forced) { 911 err = shutdown(fd, SD_BOTH); 912 closesocket(fd); 913 } 914 else 915 err = shutdown(fd, SD_SEND); 916 917 try { 918 TCPEventHandler* evh = fd in m_tcpHandlers; 919 if (evh && evh.conn.inbound) { 920 try FreeListObjectAlloc!AsyncTCPConnection.free(evh.conn); 921 catch(Exception e) { assert(false, "Failed to free resources"); } 922 evh.conn = null; 923 //log("Remove event handler for " ~ fd.to!string); 924 m_tcpHandlers.remove(fd); 925 } 926 } 927 catch (Exception e) { 928 setInternalError!"m_tcpHandlers.remove"(Status.ERROR); 929 return false; 930 } 931 if (catchSocketError!"shutdown"(err)) 932 return false; 933 return true; 934 } 935 936 // for connected sockets 937 bool closeSocket(fd_t fd, bool connected, bool forced = false) 938 { 939 m_status = StatusInfo.init; 940 if (!connected && forced) { 941 try { 942 if (fd in m_connHandlers) { 943 log("Removing connection handler for: " ~ fd.to!string); 944 m_connHandlers.remove(fd); 945 } 946 } 947 catch (Exception e) { 948 setInternalError!"m_connHandlers.remove"(Status.ERROR); 949 return false; 950 } 951 } 952 else if (connected) 953 closeRemoteSocket(fd, forced); 954 955 if (!connected || forced) { 956 // todo: flush the socket here? 957 958 INT err = closesocket(fd); 959 if (catchSocketError!"closesocket"(err)) 960 return false; 961 962 } 963 return true; 964 } 965 966 bool closeConnection(fd_t fd) { 967 return closeSocket(fd, true); 968 } 969 970 NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 971 in { 972 debug import libasync.internals.validator : validateIPv4, validateIPv6; 973 debug assert( validateIPv4(ipAddr) || validateIPv6(ipAddr), "Trying to connect to an invalid IP address"); 974 } 975 body { 976 m_status = StatusInfo.init; 977 978 NetworkAddress addr; 979 WSAPROTOCOL_INFOW hints; 980 import std.conv : to; 981 if (ipv6) { 982 addr.family = AF_INET6; 983 } 984 else { 985 addr.family = AF_INET; 986 } 987 988 INT addrlen = addr.sockAddrLen; 989 990 LPWSTR str; 991 try { 992 str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr); 993 } catch (Exception e) { 994 setInternalError!"toStringz"(Status.ERROR, e.msg); 995 return NetworkAddress.init; 996 } 997 998 INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 999 if (port != 0) addr.port = port; 1000 try log(addr.toString()); 1001 catch {} 1002 if( catchSocketError!"getAddressFromIP"(err) ) 1003 return NetworkAddress.init; 1004 else assert(addrlen == addr.sockAddrLen); 1005 return addr; 1006 } 1007 1008 NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true) 1009 /*in { 1010 debug import libasync.internals.validator : validateHost; 1011 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 1012 } 1013 body */{ 1014 m_status = StatusInfo.init; 1015 import std.conv : to; 1016 NetworkAddress addr; 1017 ADDRINFOW hints; 1018 ADDRINFOW* infos; 1019 LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*); 1020 if (ipv6) { 1021 hints.ai_family = AF_INET6; 1022 addr.family = AF_INET6; 1023 } 1024 else { 1025 hints.ai_family = AF_INET; 1026 addr.family = AF_INET; 1027 } 1028 1029 if (tcp) { 1030 hints.ai_protocol = IPPROTO_TCP; 1031 hints.ai_socktype = SOCK_STREAM; 1032 } 1033 else { 1034 hints.ai_protocol = IPPROTO_UDP; 1035 hints.ai_socktype = SOCK_DGRAM; 1036 } 1037 if (port != 0) addr.port = port; 1038 1039 LPCWSTR str; 1040 1041 try { 1042 str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host); 1043 } catch (Exception e) { 1044 setInternalError!"toUTFz"(Status.ERROR, e.msg); 1045 return NetworkAddress.init; 1046 } 1047 1048 error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos); 1049 if (err != EWIN.WSA_OK) { 1050 setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err); 1051 return NetworkAddress.init; 1052 } 1053 1054 ubyte* pAddr = cast(ubyte*) infos.ai_addr; 1055 ubyte* data = cast(ubyte*) addr.sockAddr; 1056 data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy 1057 FreeAddrInfoW(infos); 1058 try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString()); 1059 catch (Exception e){} 1060 return addr; 1061 } 1062 1063 void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED) 1064 { 1065 if (details.length > 0) 1066 m_status.text = TRACE ~ ": " ~ details; 1067 else 1068 m_status.text = TRACE; 1069 m_error = error; 1070 m_status.code = s; 1071 static if(LOG) log(m_status); 1072 } 1073 private: 1074 bool onMessage(MSG msg) 1075 { 1076 m_status = StatusInfo.init; 1077 switch (msg.message) { 1078 case WM_TCP_SOCKET: 1079 auto evt = LOWORD(msg.lParam); 1080 auto err = HIWORD(msg.lParam); 1081 if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) { 1082 1083 if (evt == FD_ACCEPT) 1084 setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 1085 else { 1086 try { 1087 TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam); 1088 cb(TCPEvent.ERROR); 1089 } 1090 catch (Exception e) { 1091 // An Error callback should never fail... 1092 setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 1093 // assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 1094 } 1095 } 1096 } 1097 break; 1098 case WM_UDP_SOCKET: 1099 auto evt = LOWORD(msg.lParam); 1100 auto err = HIWORD(msg.lParam); 1101 if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) { 1102 try { 1103 UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam); 1104 cb(UDPEvent.ERROR); 1105 } 1106 catch (Exception e) { 1107 // An Error callback should never fail... 1108 setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 1109 } 1110 } 1111 break; 1112 case WM_TIMER: 1113 log("Timer callback"); 1114 TimerHandler cb; 1115 bool cached = (m_timer.fd == cast(fd_t)msg.wParam); 1116 try { 1117 if (cached) 1118 cb = m_timer.cb; 1119 else 1120 cb = m_timerHandlers.get(cast(fd_t)msg.wParam); 1121 1122 cb.ctxt.rearmed = false; 1123 1124 if (cb.ctxt.oneShot) 1125 kill(cb.ctxt); 1126 1127 cb(); 1128 1129 if (cb.ctxt.oneShot && cb.ctxt.rearmed) 1130 run(cb.ctxt, cb, cb.ctxt.timeout); 1131 1132 } 1133 catch (Exception e) { 1134 // An Error callback should never fail... 1135 setInternalError!"del@TimerHandler"(Status.ERROR, e.msg); 1136 } 1137 1138 break; 1139 case WM_USER_EVENT: 1140 log("User event"); 1141 1142 ulong uwParam = cast(ulong)msg.wParam; 1143 ulong ulParam = cast(ulong)msg.lParam; 1144 1145 ulong payloadAddr = (ulParam << 32) | uwParam; 1146 void* payloadPtr = cast(void*) payloadAddr; 1147 shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr; 1148 1149 try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch {} 1150 try { 1151 assert(ctxt.id != 0); 1152 ctxt.handler(); 1153 } 1154 catch (Exception e) { 1155 setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 1156 } 1157 break; 1158 case WM_USER_SIGNAL: 1159 log("User signal"); 1160 1161 ulong uwParam = cast(ulong)msg.wParam; 1162 ulong ulParam = cast(ulong)msg.lParam; 1163 1164 ulong payloadAddr = (ulParam << 32) | uwParam; 1165 void* payloadPtr = cast(void*) payloadAddr; 1166 AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr; 1167 1168 try { 1169 ctxt.handler(); 1170 } 1171 catch (Exception e) { 1172 setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 1173 } 1174 break; 1175 default: return false; // not handled, sends to wndProc 1176 } 1177 return true; 1178 } 1179 1180 bool onUDPEvent(WORD evt, WORD err, fd_t sock) { 1181 m_status = StatusInfo.init; 1182 try{ 1183 if (m_udpHandlers.get(sock) == UDPHandler.init) 1184 return false; 1185 } catch {} 1186 if (sock == 0) { // highly unlikely... 1187 setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined"); 1188 return false; 1189 } 1190 if (err) { 1191 setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err); 1192 try { 1193 //log("CLOSE FD#" ~ sock.to!string); 1194 (m_udpHandlers)[sock](UDPEvent.ERROR); 1195 } catch { // can't do anything about this... 1196 } 1197 return false; 1198 } 1199 1200 UDPHandler cb; 1201 switch(evt) { 1202 default: break; 1203 case FD_READ: 1204 try { 1205 log("READ FD#" ~ sock.to!string); 1206 cb = m_udpHandlers.get(sock); 1207 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1208 cb(UDPEvent.READ); 1209 } 1210 catch (Exception e) { 1211 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1212 return false; 1213 } 1214 break; 1215 case FD_WRITE: 1216 try { 1217 log("WRITE FD#" ~ sock.to!string); 1218 cb = m_udpHandlers.get(sock); 1219 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1220 cb(UDPEvent.WRITE); 1221 } 1222 catch (Exception e) { 1223 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1224 return false; 1225 } 1226 break; 1227 } 1228 return true; 1229 } 1230 1231 bool onTCPEvent(WORD evt, WORD err, fd_t sock) { 1232 m_status = StatusInfo.init; 1233 try{ 1234 if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init) 1235 assert( false ); 1236 } catch {} 1237 if (sock == 0) { // highly unlikely... 1238 setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined"); 1239 assert(false); 1240 } 1241 if (err) { 1242 setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err); 1243 try { 1244 //log("CLOSE FD#" ~ sock.to!string); 1245 (m_tcpHandlers)[sock](TCPEvent.ERROR); 1246 } catch { // can't do anything about this... 1247 } 1248 return false; 1249 } 1250 1251 TCPEventHandler cb; 1252 switch(evt) { 1253 default: break; 1254 case FD_ACCEPT: 1255 gs_mtx.lock_nothrow(); 1256 1257 log("TCP Handlers: " ~ m_tcpHandlers.length.to!string); 1258 log("Accepting connection"); 1259 /// Let another listener take the next connection 1260 TCPAcceptHandler list; 1261 try list = m_connHandlers[sock]; catch { assert(false, "Listening on an invalid socket..."); } 1262 scope(exit) { 1263 /// The connection rotation mechanism is handled by the TCPListenerDistMixins 1264 /// when registering the same AsyncTCPListener object on multiple event loops. 1265 /// This allows to even out the CPU usage on a server instance. 1266 HWND hwnd = list.ctxt.next(m_hwnd); 1267 if (hwnd !is HWND.init) { 1268 int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1269 if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) { 1270 error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1271 if (catchSocketError!"WSAAsyncSelect"(error)) 1272 assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 1273 } 1274 } 1275 else log("Returned init!!"); 1276 1277 gs_mtx.unlock_nothrow(); 1278 } 1279 1280 NetworkAddress addr; 1281 addr.family = AF_INET; 1282 int addrlen = addr.sockAddrLen; 1283 fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1284 1285 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) { 1286 if (m_error == WSAEFAULT) { // not enough space for sockaddr 1287 addr.family = AF_INET6; 1288 addrlen = addr.sockAddrLen; 1289 csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1290 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) 1291 return false; 1292 } 1293 else return false; 1294 } 1295 1296 int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 1297 if ( catchSocketError!"WSAAsyncSelect"(ok) ) 1298 return false; 1299 1300 log("Connection accepted: " ~ csock.to!string); 1301 1302 AsyncTCPConnection conn; 1303 try conn = FreeListObjectAlloc!AsyncTCPConnection.alloc(m_evLoop); 1304 catch (Exception e) { assert(false, "Failed allocation"); } 1305 conn.peer = addr; 1306 conn.socket = csock; 1307 conn.inbound = true; 1308 1309 try { 1310 // Do the callback to get a handler 1311 cb = list(conn); 1312 } 1313 catch(Exception e) { 1314 setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 1315 return false; 1316 } 1317 1318 try { 1319 m_tcpHandlers[csock] = cb; // keep the handler to setup the connection 1320 log("ACCEPT&CONNECT FD#" ~ csock.to!string); 1321 *conn.connecting = true; 1322 //cb(TCPEvent.CONNECT); 1323 } 1324 catch (Exception e) { 1325 setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 1326 return false; 1327 } 1328 break; 1329 case FD_CONNECT: 1330 try { 1331 log("CONNECT FD#" ~ sock.to!string); 1332 cb = m_tcpHandlers.get(sock); 1333 if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1334 *cb.conn.connecting = true; 1335 } 1336 catch(Exception e) { 1337 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 1338 return false; 1339 } 1340 break; 1341 case FD_READ: 1342 try { 1343 log("READ FD#" ~ sock.to!string); 1344 cb = m_tcpHandlers.get(sock); 1345 if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1346 if (!cb.conn) break; 1347 if (*cb.conn.connected == false && *cb.conn.connecting) { 1348 *cb.conn.connecting = false; 1349 *cb.conn.connected = true; 1350 cb(TCPEvent.CONNECT); 1351 } 1352 else 1353 cb(TCPEvent.READ); 1354 } 1355 catch (Exception e) { 1356 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1357 return false; 1358 } 1359 break; 1360 case FD_WRITE: 1361 // todo: don't send the first write for consistency with epoll? 1362 1363 try { 1364 //import std.stdio; 1365 log("WRITE FD#" ~ sock.to!string); 1366 cb = m_tcpHandlers.get(sock); 1367 if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1368 if (!cb.conn) break; 1369 if (*cb.conn.connected == false && *cb.conn.connecting) { 1370 *cb.conn.connecting = false; 1371 *cb.conn.connected = true; 1372 cb(TCPEvent.CONNECT); 1373 } 1374 else { 1375 cb(TCPEvent.WRITE); 1376 } 1377 } 1378 catch (Exception e) { 1379 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1380 return false; 1381 } 1382 break; 1383 case FD_CLOSE: 1384 // called after shutdown() 1385 INT ret; 1386 bool connected = true; 1387 try { 1388 log("CLOSE FD#" ~ sock.to!string); 1389 if (sock in m_tcpHandlers) { 1390 cb = m_tcpHandlers.get(sock); 1391 if (*cb.conn.connected) { 1392 cb(TCPEvent.CLOSE); 1393 *cb.conn.connecting = false; 1394 *cb.conn.connected = false; 1395 } else 1396 connected = false; 1397 } 1398 else 1399 connected = false; 1400 } 1401 catch (Exception e) { 1402 if (m_status.code == Status.OK) 1403 setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 1404 return false; 1405 } 1406 1407 closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket 1408 1409 break; 1410 } 1411 return true; 1412 } 1413 1414 bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt) 1415 { 1416 INT err; 1417 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1418 if (catchSocketError!"bind"(err)) { 1419 closesocket(fd); 1420 return false; 1421 } 1422 err = listen(fd, 128); 1423 if (catchSocketError!"listen"(err)) { 1424 closesocket(fd); 1425 return false; 1426 } 1427 err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE); 1428 if (catchSocketError!"WSAAsyncSelect"(err)) { 1429 closesocket(fd); 1430 return false; 1431 } 1432 1433 return true; 1434 } 1435 1436 bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false) 1437 in { 1438 assert(m_threadId == GetCurrentThreadId()); 1439 assert(ctxt.local !is NetworkAddress.init); 1440 } 1441 body { 1442 INT err; 1443 if (!reusing) { 1444 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1445 if (catchSocketError!"bind"(err)) { 1446 closesocket(fd); 1447 return false; 1448 } 1449 1450 err = listen(fd, 128); 1451 if (catchSocketError!"listen"(err)) { 1452 closesocket(fd); 1453 return false; 1454 } 1455 1456 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1457 if (catchSocketError!"WSAAsyncSelect"(err)) { 1458 closesocket(fd); 1459 return false; 1460 } 1461 } 1462 1463 return true; 1464 } 1465 1466 bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt) 1467 in { 1468 assert(ctxt.peer !is NetworkAddress.init); 1469 assert(ctxt.peer.port != 0, "Connecting to an invalid port"); 1470 } 1471 body { 1472 INT err; 1473 NetworkAddress bind_addr; 1474 bind_addr.family = ctxt.peer.family; 1475 1476 if (ctxt.peer.family == AF_INET) 1477 bind_addr.sockAddrInet4.sin_addr.s_addr = 0; 1478 else if (ctxt.peer.family == AF_INET6) 1479 bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0; 1480 else assert(false, "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string); 1481 1482 err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen); 1483 if ( catchSocketError!"bind"(err) ) 1484 return false; 1485 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 1486 if ( catchSocketError!"WSAAsyncSelect"(err) ) 1487 return false; 1488 err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen); 1489 1490 auto errors = [ tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ]; 1491 1492 if (catchSocketErrorsEq!"connectEQ"(err, errors)) 1493 return true; 1494 else if (catchSocketError!"connect"(err)) 1495 return false; 1496 1497 return true; 1498 } 1499 1500 bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 1501 if (isIntegral!T) 1502 { 1503 foreach (validator ; cmp) { 1504 if (val == validator[0]) { 1505 m_status.text = TRACE; 1506 m_status.code = validator[1]; 1507 if (m_status.code == Status.EVLOOP_TIMEOUT) { 1508 log(m_status); 1509 break; 1510 } 1511 m_error = GetLastErrorSafe(); 1512 static if(LOG) log(m_status); 1513 return true; 1514 } 1515 } 1516 return false; 1517 } 1518 1519 bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 1520 if (isIntegral!T) 1521 { 1522 foreach (validator ; cmp) { 1523 if (val == validator[0]) { 1524 m_status.text = TRACE; 1525 m_error = WSAGetLastErrorSafe(); 1526 m_status.status = validator[1]; 1527 static if(LOG) log(m_status); 1528 return true; 1529 } 1530 } 1531 return false; 1532 } 1533 1534 bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...) 1535 if (isIntegral!T) 1536 { 1537 error_t err; 1538 foreach (validator ; cmp) { 1539 if (val == validator[0]) { 1540 if (err is EWIN.init) err = WSAGetLastErrorSafe(); 1541 if (err == validator[1]) { 1542 m_status.text = TRACE; 1543 m_error = WSAGetLastErrorSafe(); 1544 m_status.code = validator[2]; 1545 static if(LOG) log(m_status); 1546 return true; 1547 } 1548 } 1549 } 1550 return false; 1551 } 1552 1553 1554 bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR) 1555 if (isIntegral!T) 1556 { 1557 if (val == cmp) { 1558 m_status.text = TRACE; 1559 m_error = WSAGetLastErrorSafe(); 1560 m_status.code = Status.ABORT; 1561 static if(LOG) log(m_status); 1562 return true; 1563 } 1564 return false; 1565 } 1566 1567 error_t WSAGetLastErrorSafe() { 1568 try { 1569 return cast(error_t) WSAGetLastError(); 1570 } catch(Exception e) { 1571 return EWIN.ERROR_ACCESS_DENIED; 1572 } 1573 } 1574 1575 error_t GetLastErrorSafe() { 1576 try { 1577 return cast(error_t) GetLastError(); 1578 } catch(Exception e) { 1579 return EWIN.ERROR_ACCESS_DENIED; 1580 } 1581 } 1582 1583 void log(StatusInfo val) 1584 { 1585 static if (LOG) { 1586 import std.stdio; 1587 try { 1588 writeln("Backtrace: ", m_status.text); 1589 writeln(" | Status: ", m_status.code); 1590 writeln(" | Error: " , m_error); 1591 if ((m_error in EWSAMessages) !is null) 1592 writeln(" | Message: ", EWSAMessages[m_error]); 1593 } catch(Exception e) { 1594 return; 1595 } 1596 } 1597 } 1598 1599 void log(T)(T val) 1600 { 1601 static if (LOG) { 1602 import std.stdio; 1603 try { 1604 writeln(val); 1605 } catch(Exception e) { 1606 return; 1607 } 1608 } 1609 } 1610 1611 } 1612 1613 mixin template TCPConnectionMixins() { 1614 1615 private CleanupData m_impl; 1616 1617 struct CleanupData { 1618 bool connected; 1619 bool connecting; 1620 } 1621 1622 @property bool* connecting() { 1623 return &m_impl.connecting; 1624 } 1625 1626 @property bool* connected() { 1627 return &m_impl.connected; 1628 } 1629 1630 } 1631 1632 mixin template TCPListenerDistMixins() 1633 { 1634 import std.c.windows.windows : HWND; 1635 import libasync.internals.hashmap : HashMap; 1636 import core.sync.mutex; 1637 private { 1638 bool m_dist; 1639 1640 Tuple!(WinReference, bool*) m_handles; 1641 __gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist; 1642 __gshared Mutex gs_mutex; 1643 } 1644 1645 /// The TCP Listener schedules distributed connection handlers based on 1646 /// the event loops that are using the same AsyncTCPListener object. 1647 /// This is done by using WSAAsyncSelect on a different window after each 1648 /// accept TCPEvent. 1649 class WinReference { 1650 private { 1651 struct Item { 1652 HWND handle; 1653 bool active; 1654 } 1655 1656 Item[] m_items; 1657 } 1658 1659 this(HWND hndl, bool b) { 1660 append(hndl, b); 1661 } 1662 1663 void append(HWND hndl, bool b) { 1664 m_items ~= Item(hndl, b); 1665 } 1666 1667 HWND next(HWND me) { 1668 Item[] items; 1669 synchronized(gs_mutex) 1670 items = m_items; 1671 if (items.length == 1) 1672 return me; 1673 foreach (i, item; items) { 1674 if (item.active == true) { 1675 m_items[i].active = false; // remove responsibility 1676 if (m_items.length <= i + 1) { 1677 m_items[0].active = true; // set responsibility 1678 auto ret = m_items[0].handle; 1679 return ret; 1680 } 1681 else { 1682 m_items[i + 1].active = true; 1683 auto ret = m_items[i + 1].handle; 1684 return ret; 1685 } 1686 } 1687 1688 } 1689 assert(false); 1690 } 1691 1692 } 1693 1694 void init(HWND hndl, fd_t sock) { 1695 try { 1696 if (!gs_mutex) { 1697 gs_mutex = new Mutex; 1698 } 1699 synchronized(gs_mutex) { 1700 m_handles = gs_dist.get(sock); 1701 if (m_handles == typeof(m_handles).init) { 1702 gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist); 1703 m_handles = gs_dist.get(sock); 1704 assert(m_handles != typeof(m_handles).init); 1705 } 1706 else { 1707 m_handles[0].append(hndl, false); 1708 *m_handles[1] = true; // set first thread to dist 1709 m_dist = true; // set this thread to dist 1710 } 1711 } 1712 } catch (Exception e) { 1713 assert(false, e.toString()); 1714 } 1715 1716 } 1717 1718 HWND next(HWND me) { 1719 try { 1720 if (!m_dist) 1721 return HWND.init; 1722 return m_handles[0].next(me); 1723 } 1724 catch (Exception e) { 1725 assert(false, e.toString()); 1726 } 1727 } 1728 1729 } 1730 private class DWHandlerInfo { 1731 DWHandler handler; 1732 Array!DWChangeInfo buffer; 1733 1734 this(DWHandler cb) { 1735 handler = cb; 1736 } 1737 } 1738 1739 private final class DWFolderWatcher { 1740 import libasync.internals.path; 1741 private: 1742 EventLoop m_evLoop; 1743 fd_t m_fd; 1744 bool m_recursive; 1745 HANDLE m_handle; 1746 Path m_path; 1747 DWFileEvent m_events; 1748 DWHandlerInfo m_handler; // contains buffer 1749 shared AsyncSignal m_signal; 1750 ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer; 1751 DWORD m_bytesTransferred; 1752 public: 1753 this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) { 1754 m_fd = fd; 1755 m_recursive = recursive; 1756 m_handle = cast(HANDLE)hndl; 1757 m_evLoop = evl; 1758 m_path = path; 1759 m_handler = handler; 1760 1761 m_signal = new shared AsyncSignal(m_evLoop); 1762 m_signal.run(&onChanged); 1763 triggerWatch(); 1764 } 1765 package: 1766 void close() { 1767 CloseHandle(m_handle); 1768 m_signal.kill(); 1769 } 1770 1771 void triggerChanged() { 1772 m_signal.trigger(); 1773 } 1774 1775 void onChanged() { 1776 ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred]; 1777 do { 1778 assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); 1779 auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr; 1780 DWFileEvent kind; 1781 switch( fni.Action ){ 1782 default: kind = DWFileEvent.MODIFIED; break; 1783 case 0x1: kind = DWFileEvent.CREATED; break; 1784 case 0x2: kind = DWFileEvent.DELETED; break; 1785 case 0x3: kind = DWFileEvent.MODIFIED; break; 1786 case 0x4: kind = DWFileEvent.MOVED_FROM; break; 1787 case 0x5: kind = DWFileEvent.MOVED_TO; break; 1788 } 1789 string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[] 1790 m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename))); 1791 if( fni.NextEntryOffset == 0 ) break; 1792 result = result[fni.NextEntryOffset .. $]; 1793 } while(result.length > 0); 1794 1795 triggerWatch(); 1796 1797 m_handler.handler(); 1798 } 1799 1800 void triggerWatch() { 1801 1802 static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME| 1803 FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE; 1804 1805 OVERLAPPED* overlapped = FreeListObjectAlloc!OVERLAPPED.alloc(); 1806 overlapped.Internal = 0; 1807 overlapped.InternalHigh = 0; 1808 overlapped.Offset = 0; 1809 overlapped.OffsetHigh = 0; 1810 overlapped.Pointer = cast(void*)this; 1811 import std.stdio; 1812 DWORD bytesReturned; 1813 BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted); 1814 1815 import std.stdio; 1816 if (!success) 1817 writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]); 1818 1819 } 1820 1821 @property fd_t fd() const { 1822 return m_fd; 1823 } 1824 1825 @property HANDLE handle() const { 1826 return cast(HANDLE) m_handle; 1827 } 1828 1829 static nothrow extern(System) 1830 { 1831 void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) 1832 { 1833 import std.stdio; 1834 DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer); 1835 watcher.m_bytesTransferred = cbTransferred; 1836 try FreeListObjectAlloc!OVERLAPPED.free(overlapped); catch {} 1837 if (dwError != 0) 1838 try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch{} 1839 try watcher.triggerChanged(); 1840 catch (Exception e) { 1841 try writeln("Failed to trigger change"); catch {} 1842 } 1843 } 1844 } 1845 1846 } 1847 1848 /** 1849 Represents a network/socket address. (taken from vibe.core.net) 1850 */ 1851 public struct NetworkAddress { 1852 import std.c.windows.winsock : sockaddr, sockaddr_in, sockaddr_in6; 1853 private union { 1854 sockaddr addr; 1855 sockaddr_in addr_ip4; 1856 sockaddr_in6 addr_ip6; 1857 } 1858 1859 @property bool ipv6() const pure nothrow { return this.family == AF_INET6; } 1860 1861 /** Family (AF_) of the socket address. 1862 */ 1863 @property ushort family() const pure nothrow { return addr.sa_family; } 1864 /// ditto 1865 @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } 1866 1867 /** The port in host byte order. 1868 */ 1869 @property ushort port() 1870 const pure nothrow { 1871 switch (this.family) { 1872 default: assert(false, "port() called for invalid address family."); 1873 case AF_INET: return ntoh(addr_ip4.sin_port); 1874 case AF_INET6: return ntoh(addr_ip6.sin6_port); 1875 } 1876 } 1877 /// ditto 1878 @property void port(ushort val) 1879 pure nothrow { 1880 switch (this.family) { 1881 default: assert(false, "port() called for invalid address family."); 1882 case AF_INET: addr_ip4.sin_port = hton(val); break; 1883 case AF_INET6: addr_ip6.sin6_port = hton(val); break; 1884 } 1885 } 1886 1887 /** A pointer to a sockaddr struct suitable for passing to socket functions. 1888 */ 1889 @property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; } 1890 1891 /** Size of the sockaddr struct that is returned by sockAddr(). 1892 */ 1893 @property uint sockAddrLen() 1894 const pure nothrow { 1895 switch (this.family) { 1896 default: assert(false, "sockAddrLen() called for invalid address family."); 1897 case AF_INET: return addr_ip4.sizeof; 1898 case AF_INET6: return addr_ip6.sizeof; 1899 } 1900 } 1901 1902 @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow 1903 in { assert (family == AF_INET); } 1904 body { return &addr_ip4; } 1905 1906 @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow 1907 in { assert (family == AF_INET6); } 1908 body { return &addr_ip6; } 1909 1910 /** Returns a string representation of the IP address 1911 */ 1912 string toAddressString() 1913 const { 1914 import std.array : appender; 1915 import std.string : format; 1916 import std.format : formattedWrite; 1917 1918 switch (this.family) { 1919 default: assert(false, "toAddressString() called for invalid address family."); 1920 case AF_INET: 1921 ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; 1922 return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 1923 case AF_INET6: 1924 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 1925 auto ret = appender!string(); 1926 ret.reserve(40); 1927 foreach (i; 0 .. 8) { 1928 if (i > 0) ret.put(':'); 1929 ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2])); 1930 } 1931 return ret.data; 1932 } 1933 } 1934 1935 /** Returns a full string representation of the address, including the port number. 1936 */ 1937 string toString() 1938 const { 1939 1940 import std.string : format; 1941 1942 auto ret = toAddressString(); 1943 switch (this.family) { 1944 default: assert(false, "toString() called for invalid address family."); 1945 case AF_INET: return ret ~ format(":%s", port); 1946 case AF_INET6: return format("[%s]:%s", ret, port); 1947 } 1948 } 1949 1950 } 1951 1952 private pure nothrow { 1953 import std.bitmanip; 1954 1955 ushort ntoh(ushort val) 1956 { 1957 version (LittleEndian) return swapEndian(val); 1958 else version (BigEndian) return val; 1959 else static assert(false, "Unknown endianness."); 1960 } 1961 1962 ushort hton(ushort val) 1963 { 1964 version (LittleEndian) return swapEndian(val); 1965 else version (BigEndian) return val; 1966 else static assert(false, "Unknown endianness."); 1967 } 1968 } 1969 enum WM_TCP_SOCKET = WM_USER+102; 1970 enum WM_UDP_SOCKET = WM_USER+103; 1971 enum WM_USER_EVENT = WM_USER+104; 1972 enum WM_USER_SIGNAL = WM_USER+105; 1973 1974 nothrow: 1975 size_t g_idxCapacity = 8; 1976 Array!size_t g_idxAvailable; 1977 1978 // called on run 1979 size_t createIndex() { 1980 size_t idx; 1981 import std.algorithm : max; 1982 import std.range : iota; 1983 try { 1984 1985 size_t getIdx() { 1986 1987 if (!g_idxAvailable.empty) { 1988 immutable size_t ret = g_idxAvailable.back; 1989 g_idxAvailable.removeBack(); 1990 return ret; 1991 } 1992 return 0; 1993 } 1994 1995 idx = getIdx(); 1996 if (idx == 0) { 1997 import std.range : iota; 1998 // todo: not sure about this 1999 g_idxAvailable.insert( iota(g_idxCapacity, max(32, g_idxCapacity * 2), 1) ); 2000 g_idxCapacity = max(32, g_idxCapacity * 2); 2001 idx = getIdx(); 2002 } 2003 2004 } catch {} 2005 2006 return idx; 2007 } 2008 2009 void destroyIndex(AsyncTimer ctxt) { 2010 try { 2011 g_idxAvailable.insert(ctxt.id); 2012 } 2013 catch (Exception e) { 2014 assert(false, "Error destroying index: " ~ e.msg); 2015 } 2016 } 2017 2018 2019 nothrow extern(System) { 2020 LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam) 2021 { 2022 auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); 2023 if (ptr is null) 2024 return DefWindowProcA(wnd, msg, wparam, lparam); 2025 auto appl = cast(EventLoopImpl*)ptr; 2026 MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init); 2027 if (appl.onMessage(obj)) { 2028 static if (DEBUG) { 2029 if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) { 2030 import std.stdio : writeln; 2031 try { writeln(appl.error, ": ", appl.m_status.text); } catch {} 2032 } 2033 } 2034 return 0; 2035 } 2036 else return DefWindowProcA(wnd, msg, wparam, lparam); 2037 } 2038 2039 BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam); 2040 2041 }