1 module libasync.windows; 2 3 version (Windows): 4 5 import core.atomic; 6 import core.thread : Fiber; 7 import libasync.types; 8 import std.container : Array; 9 import std.string : toStringz; 10 import std.conv : to; 11 import std.datetime : Duration, msecs, seconds; 12 import std.algorithm : min; 13 import libasync.internals.win32; 14 import std.traits : isIntegral; 15 import std.typecons : Tuple, tuple; 16 import std.utf : toUTFz; 17 import core.sync.mutex; 18 import libasync.events; 19 import memutils.utils; 20 import memutils.hashmap; 21 import memutils.vector; 22 pragma(lib, "ws2_32"); 23 pragma(lib, "ole32"); 24 alias fd_t = SIZE_T; 25 alias error_t = EWIN; 26 27 //todo : see if new connections with SO_REUSEADDR are evenly distributed between threads 28 29 30 package struct EventLoopImpl { 31 pragma(msg, "Using Windows IOCP for events"); 32 33 private: 34 HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array 35 HashMap!(fd_t, TCPEventHandler) m_tcpHandlers; 36 HashMap!(fd_t, TimerHandler) m_timerHandlers; 37 HashMap!(fd_t, UDPHandler) m_udpHandlers; 38 HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too 39 HashMap!(uint, DWFolderWatcher) m_dwFolders; 40 HashMap!(fd_t, tcp_keepalive)* kcache; 41 ~this() { kcache.destroy(); } 42 nothrow: 43 private: 44 struct TimerCache { 45 TimerHandler cb; 46 fd_t fd; 47 } 48 TimerCache m_timer; 49 50 EventLoop m_evLoop; 51 bool m_started; 52 wstring m_window; 53 HWND m_hwnd; 54 DWORD m_threadId; 55 HANDLE[] m_waitObjects; 56 ushort m_instanceId; 57 StatusInfo m_status; 58 error_t m_error = EWIN.WSA_OK; 59 __gshared Mutex gs_mtx; 60 package: 61 @property bool started() const { 62 return m_started; 63 } 64 bool init(EventLoop evl) 65 in { assert(!m_started); } 66 body 67 { 68 try if (!gs_mtx) 69 gs_mtx = new Mutex; catch {} 70 static ushort j; 71 assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux."); 72 j += 1; 73 m_status = StatusInfo.init; 74 75 import core.thread; 76 //try Thread.getThis().priority = Thread.PRIORITY_MAX; 77 //catch (Exception e) { assert(false, "Could not set thread priority"); } 78 SetThreadPriority(GetCurrentThread(), 31); 79 m_evLoop = evl; 80 shared static ushort i; 81 m_instanceId = i; 82 core.atomic.atomicOp!"+="(i, cast(ushort) 1); 83 wstring inststr; 84 import std.conv : to; 85 try { inststr = m_instanceId.to!wstring; } 86 catch (Exception e) { 87 return false; 88 } 89 m_window = "VibeWin32MessageWindow" ~ inststr; 90 wstring classname = "VibeWin32MessageWindow" ~ inststr; 91 92 LPCWSTR wnz; 93 LPCWSTR clsn; 94 try { 95 wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*); 96 clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*); 97 } catch (Exception e) { 98 setInternalError!"toUTFz"(Status.ERROR, e.msg); 99 return false; 100 } 101 102 m_threadId = GetCurrentThreadId(); 103 WNDCLASSW wc; 104 wc.lpfnWndProc = &wndProc; 105 wc.lpszClassName = clsn; 106 RegisterClassW(&wc); 107 m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE, 108 cast(HMENU) null, null, null); 109 try log("Window registered: " ~ m_hwnd.to!string); catch{} 110 auto ptr = cast(ULONG_PTR)cast(void*)&this; 111 SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr); 112 assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this ); 113 WSADATA wd; 114 m_error = cast(error_t) WSAStartup(0x0202, &wd); 115 if (m_error == EWIN.WSA_OK) 116 m_status.code = Status.OK; 117 else { 118 m_status.code = Status.ABORT; 119 static if(LOG) log(m_status); 120 return false; 121 } 122 assert(wd.wVersion == 0x0202); 123 m_started = true; 124 return true; 125 } 126 127 // todo: find where to call this 128 void exit() { 129 cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0); 130 } 131 132 @property StatusInfo status() const { 133 return m_status; 134 } 135 136 @property string error() const { 137 string* ptr; 138 string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init; 139 return pv; 140 } 141 142 bool loop(Duration timeout = 0.seconds) 143 in { 144 assert(Fiber.getThis() is null); 145 assert(m_started); 146 } 147 body { 148 DWORD msTimeout; 149 150 if (timeout == -1.seconds) 151 msTimeout = DWORD.max; 152 else msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max); 153 154 /* 155 * Waits until one or all of the specified objects are in the signaled state 156 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx 157 */ 158 DWORD signal = MsgWaitForMultipleObjectsEx( 159 cast(DWORD)0, 160 null, 161 msTimeout, 162 QS_ALLEVENTS, 163 MWMO_ALERTABLE | MWMO_INPUTAVAILABLE // MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion) 164 // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting 165 ); 166 167 auto errors = 168 [ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ]; /* WAIT_FAILED: Failed to call MsgWait..() */ 169 170 if (signal == WAIT_TIMEOUT) 171 return true; 172 173 if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) { 174 log("Event Loop Exiting because of error"); 175 return false; 176 } 177 178 MSG msg; 179 while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) { 180 m_status = StatusInfo.init; 181 TranslateMessage(&msg); 182 DispatchMessageW(&msg); 183 184 if (m_status.code == Status.ERROR) { 185 log(m_status.text); 186 return false; 187 } 188 } 189 return true; 190 } 191 192 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 193 { 194 m_status = StatusInfo.init; 195 fd_t fd = ctxt.socket; 196 bool reusing; 197 if (fd == fd_t.init) { 198 199 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 200 201 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 202 return 0; 203 204 if (!setOption(fd, TCPOption.REUSEADDR, true)) 205 return 0; 206 207 // todo: defer accept? 208 209 if (ctxt.noDelay) { 210 if (!setOption(fd, TCPOption.NODELAY, true)) 211 return 0; 212 } 213 } else reusing = true; 214 215 if (initTCPListener(fd, ctxt, reusing)) 216 { 217 try { 218 log("Running listener on socket fd#" ~ fd.to!string); 219 m_connHandlers[fd] = del; 220 version(Distributed)ctxt.init(m_hwnd, fd); 221 } 222 catch (Exception e) { 223 setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg); 224 closeSocket(fd, false); 225 return 0; 226 } 227 } 228 else 229 { 230 return 0; 231 } 232 233 234 return fd; 235 } 236 237 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 238 in { 239 assert(ctxt.socket == fd_t.init); 240 assert(ctxt.peer.family != AF_UNSPEC); 241 } 242 body { 243 m_status = StatusInfo.init; 244 fd_t fd = ctxt.preInitializedSocket; 245 246 if (fd == fd_t.init) 247 fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 248 log("Starting connection at: " ~ fd.to!string); 249 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 250 return 0; 251 252 try { 253 (m_tcpHandlers)[fd] = del; 254 } 255 catch (Exception e) { 256 setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg); 257 return 0; 258 } 259 260 if (ctxt.noDelay) { 261 if (!setOption(fd, TCPOption.NODELAY, true)) 262 return 0; 263 } 264 265 if (!initTCPConnection(fd, ctxt)) { 266 try { 267 log("Remove event handler for " ~ fd.to!string); 268 m_tcpHandlers.remove(fd); 269 } 270 catch (Exception e) { 271 setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg); 272 } 273 274 closeSocket(fd, false); 275 return 0; 276 } 277 278 279 try log("Client started FD#" ~ fd.to!string); 280 catch{} 281 return fd; 282 } 283 284 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 285 m_status = StatusInfo.init; 286 fd_t fd = ctxt.preInitializedSocket; 287 288 if (fd == fd_t.init) 289 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); 290 291 if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET)) 292 return 0; 293 294 if (initUDPSocket(fd, ctxt)) 295 { 296 try { 297 (m_udpHandlers)[fd] = del; 298 } 299 catch (Exception e) { 300 setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg); 301 closeSocket(fd, false); 302 return 0; 303 } 304 } 305 306 try log("UDP Socket started FD#" ~ fd.to!string); 307 catch{} 308 309 return fd; 310 } 311 312 fd_t run(shared AsyncSignal ctxt) { 313 m_status = StatusInfo.init; 314 try log("Signal subscribed to: " ~ m_hwnd.to!string); catch {} 315 return (cast(fd_t)m_hwnd); 316 } 317 318 fd_t run(AsyncNotifier ctxt) { 319 m_status = StatusInfo.init; 320 //try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch {} 321 return cast(fd_t) m_hwnd; 322 } 323 324 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 325 if (timeout < 0.seconds) 326 timeout = 0.seconds; 327 m_status = StatusInfo.init; 328 fd_t timer_id = ctxt.id; 329 if (timer_id == fd_t.init) { 330 timer_id = createIndex(); 331 } 332 try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch {} 333 334 BOOL err; 335 try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint+30, null); 336 catch(Exception e) { 337 setInternalError!"SetTimer"(Status.ERROR); 338 return 0; 339 } 340 341 if (err == 0) 342 { 343 m_error = GetLastErrorSafe(); 344 m_status.code = Status.ERROR; 345 m_status.text = "kill(AsyncTimer)"; 346 log(m_status); 347 return 0; 348 } 349 350 if (m_timer.fd == fd_t.init) 351 { 352 m_timer.fd = timer_id; 353 m_timer.cb = del; 354 } 355 else { 356 try 357 { 358 (m_timerHandlers)[timer_id] = del; 359 } 360 catch (Exception e) { 361 setInternalError!"HashMap assign"(Status.ERROR); 362 return 0; 363 } 364 } 365 366 367 return timer_id; 368 } 369 370 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) 371 { 372 static fd_t ids; 373 auto fd = ++ids; 374 375 try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 376 catch (Exception e) { 377 setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg); 378 } 379 380 return fd; 381 382 } 383 384 bool kill(AsyncDirectoryWatcher ctxt) { 385 386 try { 387 Array!DWFolderWatcher toFree; 388 foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) { 389 if (v.fd == ctxt.fd) { 390 CloseHandle(v.handle); 391 m_dwFolders.remove(k); 392 } 393 } 394 395 foreach (DWFolderWatcher obj; toFree[]) 396 ThreadMem.free(obj); 397 398 // todo: close all the handlers... 399 m_dwHandlers.remove(ctxt.fd); 400 } 401 catch (Exception e) { 402 setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg); 403 return false; 404 } 405 406 return true; 407 } 408 409 bool kill(AsyncTCPConnection ctxt, bool forced = false) 410 { 411 412 m_status = StatusInfo.init; 413 fd_t fd = ctxt.socket; 414 415 log("Killing socket "~ fd.to!string); 416 try { 417 auto cb = m_tcpHandlers.get(ctxt.socket); 418 if (cb != TCPEventHandler.init){ 419 *cb.conn.connected = false; 420 *cb.conn.connecting = false; 421 return closeSocket(fd, true, forced); 422 } 423 } catch (Exception e) { 424 setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg); 425 assert(false); 426 //return false; 427 } 428 429 return true; 430 } 431 432 bool kill(AsyncTCPListener ctxt) 433 { 434 m_status = StatusInfo.init; 435 fd_t fd = ctxt.socket; 436 try { 437 if ((ctxt.socket in m_connHandlers) !is null) { 438 return closeSocket(fd, false, true); 439 } 440 } catch (Exception e) { 441 setInternalError!"in m_connHandlers"(Status.ERROR, e.msg); 442 return false; 443 } 444 445 return true; 446 } 447 448 bool kill(shared AsyncSignal ctxt) { 449 return true; 450 } 451 452 bool kill(AsyncNotifier ctxt) { 453 return true; 454 } 455 456 bool kill(AsyncTimer ctxt) { 457 m_status = StatusInfo.init; 458 459 log("Kill timer"); 460 461 BOOL err = KillTimer(m_hwnd, ctxt.id); 462 if (err == 0) 463 { 464 m_error = GetLastErrorSafe(); 465 m_status.code = Status.ERROR; 466 m_status.text = "kill(AsyncTimer)"; 467 log(m_status); 468 return false; 469 } 470 471 destroyIndex(ctxt); 472 473 if (m_timer.fd == ctxt.id) { 474 ctxt.id = 0; 475 m_timer = TimerCache.init; 476 } else { 477 try { 478 m_timerHandlers.remove(ctxt.id); 479 } 480 catch (Exception e) { 481 setInternalError!"HashMap remove"(Status.ERROR); 482 return 0; 483 } 484 } 485 486 487 return true; 488 } 489 490 bool kill(AsyncUDPSocket ctxt) { 491 m_status = StatusInfo.init; 492 493 fd_t fd = ctxt.socket; 494 INT err = closesocket(fd); 495 if (catchSocketError!"closesocket"(err)) 496 return false; 497 498 try m_udpHandlers.remove(ctxt.socket); 499 catch (Exception e) { 500 setInternalError!"HashMap remove"(Status.ERROR); 501 return 0; 502 } 503 504 return true; 505 } 506 507 bool setOption(T)(fd_t fd, TCPOption option, in T value) { 508 m_status = StatusInfo.init; 509 int err; 510 try { 511 nothrow bool errorHandler() { 512 if (catchSocketError!"setOption:"(err)) { 513 try m_status.text ~= option.to!string; 514 catch (Exception e){ assert(false, "to!string conversion failure"); } 515 return false; 516 } 517 518 return true; 519 } 520 521 522 523 final switch (option) { 524 525 case TCPOption.NODELAY: // true/false 526 static if (!is(T == bool)) 527 assert(false, "NODELAY value type must be bool, not " ~ T.stringof); 528 else { 529 BOOL val = value?1:0; 530 socklen_t len = val.sizeof; 531 err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); 532 return errorHandler(); 533 } 534 case TCPOption.REUSEADDR: // true/false 535 static if (!is(T == bool)) 536 assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof); 537 else 538 { 539 BOOL val = value?1:0; 540 socklen_t len = val.sizeof; 541 err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); 542 return errorHandler(); 543 } 544 case TCPOption.QUICK_ACK: 545 static if (!is(T == bool)) 546 assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof); 547 else { 548 m_status.code = Status.NOT_IMPLEMENTED; 549 return false; // quick ack is not implemented 550 } 551 case TCPOption.KEEPALIVE_ENABLE: // true/false 552 static if (!is(T == bool)) 553 assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof); 554 else 555 { 556 BOOL val = value?1:0; 557 socklen_t len = val.sizeof; 558 err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); 559 return errorHandler(); 560 } 561 case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn 562 static if (!isIntegral!T) 563 assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof); 564 else { 565 m_status.code = Status.NOT_IMPLEMENTED; 566 return false; 567 } 568 case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets 569 static if (!is(T == Duration)) 570 assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof); 571 else { 572 573 if (!kcache) 574 kcache = new HashMap!(fd_t, tcp_keepalive)(); 575 576 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 577 tcp_keepalive sReturned; 578 DWORD dwBytes; 579 kaSettings.onoff = ULONG(1); 580 if (kaSettings.keepalivetime == ULONG.init) { 581 kaSettings.keepalivetime = 1000; 582 } 583 kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG; 584 (*kcache)[fd] = kaSettings; 585 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 586 587 return errorHandler(); 588 } 589 case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start 590 static if (!is(T == Duration)) 591 assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof); 592 else { 593 594 if (!kcache) 595 kcache = new HashMap!(fd_t, tcp_keepalive)(); 596 597 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 598 tcp_keepalive sReturned; 599 DWORD dwBytes; 600 kaSettings.onoff = ULONG(1); 601 if (kaSettings.keepaliveinterval == ULONG.init) { 602 kaSettings.keepaliveinterval = 75*1000; 603 } 604 kaSettings.keepalivetime = value.total!"msecs".to!ULONG; 605 606 (*kcache)[fd] = kaSettings; 607 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 608 609 return errorHandler(); 610 } 611 case TCPOption.BUFFER_RECV: // bytes 612 static if (!isIntegral!T) 613 assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof); 614 else { 615 int val = value.to!int; 616 socklen_t len = val.sizeof; 617 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); 618 return errorHandler(); 619 } 620 case TCPOption.BUFFER_SEND: // bytes 621 static if (!isIntegral!T) 622 assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof); 623 else { 624 int val = value.to!int; 625 socklen_t len = val.sizeof; 626 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); 627 return errorHandler(); 628 } 629 case TCPOption.TIMEOUT_RECV: 630 static if (!is(T == Duration)) 631 assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof); 632 else { 633 DWORD val = value.total!"msecs".to!DWORD; 634 socklen_t len = val.sizeof; 635 err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len); 636 return errorHandler(); 637 } 638 case TCPOption.TIMEOUT_SEND: 639 static if (!is(T == Duration)) 640 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 641 else { 642 DWORD val = value.total!"msecs".to!DWORD; 643 socklen_t len = val.sizeof; 644 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len); 645 return errorHandler(); 646 } 647 case TCPOption.TIMEOUT_HALFOPEN: 648 static if (!is(T == Duration)) 649 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 650 else { 651 m_status.code = Status.NOT_IMPLEMENTED; 652 return false; 653 } 654 case TCPOption.LINGER: // bool onOff, int seconds 655 static if (!is(T == Tuple!(bool, int))) 656 assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof); 657 else { 658 linger l = linger(val[0]?1:0, val[1].to!USHORT); 659 socklen_t llen = l.sizeof; 660 err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen); 661 return errorHandler(); 662 } 663 case TCPOption.CONGESTION: 664 static if (!isIntegral!T) 665 assert(false, "CONGESTION value type must be integral, not " ~ T.stringof); 666 else { 667 m_status.code = Status.NOT_IMPLEMENTED; 668 return false; 669 } 670 case TCPOption.CORK: 671 static if (!isIntegral!T) 672 assert(false, "CORK value type must be int, not " ~ T.stringof); 673 else { 674 m_status.code = Status.NOT_IMPLEMENTED; 675 return false; 676 } 677 case TCPOption.DEFER_ACCEPT: // seconds 678 static if (!isIntegral!T) 679 assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof); 680 else { 681 int val = value.to!int; 682 socklen_t len = val.sizeof; 683 err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len); 684 return errorHandler(); 685 } 686 } 687 688 } 689 catch (Exception e) { 690 return false; 691 } 692 693 } 694 695 uint read(in fd_t fd, ref ubyte[] data) 696 { 697 return 0; 698 } 699 700 uint write(in fd_t fd, in ubyte[] data) 701 { 702 return 0; 703 } 704 705 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 706 size_t i; 707 Array!DWChangeInfo* changes; 708 try { 709 changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer); 710 if ((*changes).empty) 711 return 0; 712 713 import std.algorithm : min; 714 size_t cnt = min(dst.length, changes.length); 715 foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { 716 try log("reading change: " ~ change.path); catch {} 717 dst[i] = (*changes)[i]; 718 i++; 719 } 720 changes.linearRemove((*changes)[0 .. cnt]); 721 } 722 catch (Exception e) { 723 setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); 724 return 0; 725 } 726 try log("Changes returning with: " ~ i.to!string); catch {} 727 return cast(uint) i; 728 } 729 730 uint watch(in fd_t fd, in WatchInfo info) { 731 m_status = StatusInfo.init; 732 uint wd; 733 try { 734 HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()), 735 FILE_LIST_DIRECTORY, 736 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, 737 null, 738 OPEN_EXISTING, 739 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 740 null); 741 wd = cast(uint) hndl; 742 DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init); 743 assert(handler !is null); 744 log("Watching: " ~ info.path.toNativeString()); 745 (m_dwFolders)[wd] = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive); 746 } catch (Exception e) { 747 setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg); 748 return 0; 749 } 750 return wd; 751 } 752 753 bool unwatch(in fd_t fd, in fd_t _wd) { 754 uint wd = cast(uint) _wd; 755 m_status = StatusInfo.init; 756 try { 757 DWFolderWatcher fw = m_dwFolders.get(wd, null); 758 assert(fw !is null); 759 m_dwFolders.remove(wd); 760 fw.close(); 761 ThreadMem.free(fw); 762 } catch (Exception e) { 763 setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg); 764 return false; 765 } 766 return true; 767 } 768 769 bool notify(T)(in fd_t fd, in T payload) 770 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 771 { 772 m_status = StatusInfo.init; 773 import std.conv; 774 775 auto payloadPtr = cast(ubyte*)payload; 776 auto payloadAddr = cast(ulong)payloadPtr; 777 778 WPARAM wparam = payloadAddr & 0xffffffff; 779 LPARAM lparam = cast(uint) (payloadAddr >> 32); 780 781 BOOL err; 782 static if (is(T == AsyncNotifier)) 783 err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam); 784 else 785 err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam); 786 try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch {} 787 if (err == 0) 788 { 789 m_error = GetLastErrorSafe(); 790 m_status.code = Status.ERROR; 791 m_status.text = "notify"; 792 log(m_status); 793 return false; 794 } 795 return true; 796 } 797 798 uint recv(in fd_t fd, ref ubyte[] data) 799 { 800 m_status = StatusInfo.init; 801 int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 802 803 //try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch {} 804 if (catchSocketError!".recv"(ret)) { // ret == -1 805 if (m_error == error_t.WSAEWOULDBLOCK) 806 m_status.code = Status.ASYNC; 807 return 0; // TODO: handle some errors more specifically 808 } 809 m_status.code = Status.OK; 810 811 return cast(uint) ret; 812 } 813 814 uint send(in fd_t fd, in ubyte[] data) 815 { 816 m_status = StatusInfo.init; 817 //try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string); 818 //catch{} 819 int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0); 820 821 if (catchSocketError!"send"(ret)) { 822 if (m_error == error_t.WSAEWOULDBLOCK) 823 m_status.code = Status.ASYNC; 824 return 0; // TODO: handle some errors more specifically 825 } 826 m_status.code = Status.ASYNC; 827 return cast(uint) ret; 828 } 829 830 bool broadcast(in fd_t fd, bool b) { 831 int val = b?1:0; 832 socklen_t len = val.sizeof; 833 int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len); 834 if (catchSocketError!"setsockopt"(err)) 835 return false; 836 837 return true; 838 839 } 840 841 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) 842 { 843 m_status = StatusInfo.init; 844 socklen_t addrLen; 845 addr.family = AF_INET; 846 int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen); 847 848 if (addrLen > addr.sockAddrLen) { 849 addr.family = AF_INET6; 850 } 851 852 try log("RECVFROM " ~ ret.to!string ~ "B"); catch {} 853 if (catchSocketError!".recvfrom"(ret)) { // ret == -1 854 if (m_error == WSAEWOULDBLOCK) 855 m_status.code = Status.ASYNC; 856 return 0; // TODO: handle some errors more specifically 857 } 858 m_status.code = Status.OK; 859 860 return cast(uint) ret; 861 } 862 863 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) 864 { 865 m_status = StatusInfo.init; 866 try log("SENDTO " ~ data.length.to!string ~ "B"); catch{} 867 int ret; 868 if (addr != NetworkAddress.init) 869 ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen); 870 else 871 ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 872 873 if (catchSocketError!".sendTo"(ret)) { // ret == -1 874 if (m_error == WSAEWOULDBLOCK) 875 m_status.code = Status.ASYNC; 876 return 0; // TODO: handle some errors more specifically 877 } 878 879 m_status.code = Status.OK; 880 return cast(uint) ret; 881 } 882 883 NetworkAddress localAddr(in fd_t fd, bool ipv6) { 884 NetworkAddress ret; 885 import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr; 886 if (ipv6) 887 ret.family = AF_INET6; 888 else 889 ret.family = AF_INET; 890 socklen_t len = ret.sockAddrLen; 891 int err = getsockname(fd, ret.sockAddr, &len); 892 if (catchSocketError!"getsockname"(err)) 893 return NetworkAddress.init; 894 if (len > ret.sockAddrLen) 895 ret.family = AF_INET6; 896 return ret; 897 } 898 899 void noDelay(in fd_t fd, bool b) { 900 m_status = StatusInfo.init; 901 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof); 902 } 903 904 private bool closeRemoteSocket(fd_t fd, bool forced) { 905 906 INT err; 907 908 try log("Shutdown FD#" ~ fd.to!string); 909 catch{} 910 if (forced) { 911 err = shutdown(fd, SD_BOTH); 912 closesocket(fd); 913 } 914 else 915 err = shutdown(fd, SD_SEND); 916 917 try { 918 TCPEventHandler* evh = fd in m_tcpHandlers; 919 if (evh) { 920 if (evh.conn.inbound) { 921 try ThreadMem.free(evh.conn); 922 catch(Exception e) { assert(false, "Failed to free resources"); } 923 } 924 925 evh.conn = null; 926 //log("Remove event handler for " ~ fd.to!string); 927 m_tcpHandlers.remove(fd); 928 } 929 } 930 catch (Exception e) { 931 setInternalError!"m_tcpHandlers.remove"(Status.ERROR); 932 return false; 933 } 934 if (catchSocketError!"shutdown"(err)) 935 return false; 936 return true; 937 } 938 939 // for connected sockets 940 bool closeSocket(fd_t fd, bool connected, bool forced = false) 941 { 942 m_status = StatusInfo.init; 943 if (!connected && forced) { 944 try { 945 if (fd in m_connHandlers) { 946 log("Removing connection handler for: " ~ fd.to!string); 947 m_connHandlers.remove(fd); 948 } 949 } 950 catch (Exception e) { 951 setInternalError!"m_connHandlers.remove"(Status.ERROR); 952 return false; 953 } 954 } 955 else if (connected) 956 closeRemoteSocket(fd, forced); 957 958 if (!connected || forced) { 959 // todo: flush the socket here? 960 961 INT err = closesocket(fd); 962 if (catchSocketError!"closesocket"(err)) 963 return false; 964 965 } 966 return true; 967 } 968 969 bool closeConnection(fd_t fd) { 970 return closeSocket(fd, true); 971 } 972 973 NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 974 { 975 m_status = StatusInfo.init; 976 977 NetworkAddress addr; 978 WSAPROTOCOL_INFOW hints; 979 import std.conv : to; 980 if (ipv6) { 981 addr.family = AF_INET6; 982 } 983 else { 984 addr.family = AF_INET; 985 } 986 987 INT addrlen = addr.sockAddrLen; 988 989 LPWSTR str; 990 try { 991 str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr); 992 } catch (Exception e) { 993 setInternalError!"toStringz"(Status.ERROR, e.msg); 994 return NetworkAddress.init; 995 } 996 997 INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 998 if (port != 0) addr.port = port; 999 try log(addr.toString()); 1000 catch {} 1001 if( catchSocketError!"getAddressFromIP"(err) ) 1002 return NetworkAddress.init; 1003 else assert(addrlen == addr.sockAddrLen); 1004 return addr; 1005 } 1006 1007 NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true) 1008 /*in { 1009 debug import libasync.internals.validator : validateHost; 1010 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 1011 } 1012 body */{ 1013 m_status = StatusInfo.init; 1014 import std.conv : to; 1015 NetworkAddress addr; 1016 ADDRINFOW hints; 1017 ADDRINFOW* infos; 1018 LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*); 1019 if (ipv6) { 1020 hints.ai_family = AF_INET6; 1021 addr.family = AF_INET6; 1022 } 1023 else { 1024 hints.ai_family = AF_INET; 1025 addr.family = AF_INET; 1026 } 1027 1028 if (tcp) { 1029 hints.ai_protocol = IPPROTO_TCP; 1030 hints.ai_socktype = SOCK_STREAM; 1031 } 1032 else { 1033 hints.ai_protocol = IPPROTO_UDP; 1034 hints.ai_socktype = SOCK_DGRAM; 1035 } 1036 if (port != 0) addr.port = port; 1037 1038 LPCWSTR str; 1039 1040 try { 1041 str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host); 1042 } catch (Exception e) { 1043 setInternalError!"toUTFz"(Status.ERROR, e.msg); 1044 return NetworkAddress.init; 1045 } 1046 1047 error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos); 1048 scope(exit) FreeAddrInfoW(infos); 1049 if (err != EWIN.WSA_OK) { 1050 setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err); 1051 return NetworkAddress.init; 1052 } 1053 1054 ubyte* pAddr = cast(ubyte*) infos.ai_addr; 1055 ubyte* data = cast(ubyte*) addr.sockAddr; 1056 data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy 1057 try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString()); 1058 catch (Exception e){} 1059 return addr; 1060 } 1061 1062 void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED) 1063 { 1064 if (details.length > 0) 1065 m_status.text = TRACE ~ ": " ~ details; 1066 else 1067 m_status.text = TRACE; 1068 m_error = error; 1069 m_status.code = s; 1070 static if(LOG) log(m_status); 1071 } 1072 private: 1073 bool onMessage(MSG msg) 1074 { 1075 m_status = StatusInfo.init; 1076 switch (msg.message) { 1077 case WM_TCP_SOCKET: 1078 auto evt = LOWORD(msg.lParam); 1079 auto err = HIWORD(msg.lParam); 1080 if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) { 1081 1082 if (evt == FD_ACCEPT) 1083 setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 1084 else { 1085 try { 1086 TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam); 1087 cb(TCPEvent.ERROR); 1088 } 1089 catch (Exception e) { 1090 // An Error callback should never fail... 1091 setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 1092 // assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 1093 } 1094 } 1095 } 1096 break; 1097 case WM_UDP_SOCKET: 1098 auto evt = LOWORD(msg.lParam); 1099 auto err = HIWORD(msg.lParam); 1100 if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) { 1101 try { 1102 UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam); 1103 cb(UDPEvent.ERROR); 1104 } 1105 catch (Exception e) { 1106 // An Error callback should never fail... 1107 setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 1108 } 1109 } 1110 break; 1111 case WM_TIMER: 1112 log("Timer callback"); 1113 TimerHandler cb; 1114 bool cached = (m_timer.fd == cast(fd_t)msg.wParam); 1115 try { 1116 if (cached) 1117 cb = m_timer.cb; 1118 else 1119 cb = m_timerHandlers.get(cast(fd_t)msg.wParam); 1120 1121 cb.ctxt.rearmed = false; 1122 1123 if (cb.ctxt.oneShot) 1124 kill(cb.ctxt); 1125 1126 cb(); 1127 1128 if (cb.ctxt.oneShot && cb.ctxt.rearmed) 1129 run(cb.ctxt, cb, cb.ctxt.timeout); 1130 1131 } 1132 catch (Exception e) { 1133 // An Error callback should never fail... 1134 setInternalError!"del@TimerHandler"(Status.ERROR, e.msg); 1135 } 1136 1137 break; 1138 case WM_USER_EVENT: 1139 log("User event"); 1140 1141 ulong uwParam = cast(ulong)msg.wParam; 1142 ulong ulParam = cast(ulong)msg.lParam; 1143 1144 ulong payloadAddr = (ulParam << 32) | uwParam; 1145 void* payloadPtr = cast(void*) payloadAddr; 1146 shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr; 1147 1148 try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch {} 1149 try { 1150 assert(ctxt.id != 0); 1151 ctxt.handler(); 1152 } 1153 catch (Exception e) { 1154 setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 1155 } 1156 break; 1157 case WM_USER_SIGNAL: 1158 log("User signal"); 1159 1160 ulong uwParam = cast(ulong)msg.wParam; 1161 ulong ulParam = cast(ulong)msg.lParam; 1162 1163 ulong payloadAddr = (ulParam << 32) | uwParam; 1164 void* payloadPtr = cast(void*) payloadAddr; 1165 AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr; 1166 1167 try { 1168 ctxt.handler(); 1169 } 1170 catch (Exception e) { 1171 setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 1172 } 1173 break; 1174 default: return false; // not handled, sends to wndProc 1175 } 1176 return true; 1177 } 1178 1179 bool onUDPEvent(WORD evt, WORD err, fd_t sock) { 1180 m_status = StatusInfo.init; 1181 try{ 1182 if (m_udpHandlers.get(sock) == UDPHandler.init) 1183 return false; 1184 } catch {} 1185 if (sock == 0) { // highly unlikely... 1186 setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined"); 1187 return false; 1188 } 1189 if (err) { 1190 setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err); 1191 try { 1192 //log("CLOSE FD#" ~ sock.to!string); 1193 (m_udpHandlers)[sock](UDPEvent.ERROR); 1194 } catch { // can't do anything about this... 1195 } 1196 return false; 1197 } 1198 1199 UDPHandler cb; 1200 switch(evt) { 1201 default: break; 1202 case FD_READ: 1203 try { 1204 log("READ FD#" ~ sock.to!string); 1205 cb = m_udpHandlers.get(sock); 1206 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1207 cb(UDPEvent.READ); 1208 } 1209 catch (Exception e) { 1210 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1211 return false; 1212 } 1213 break; 1214 case FD_WRITE: 1215 try { 1216 log("WRITE FD#" ~ sock.to!string); 1217 cb = m_udpHandlers.get(sock); 1218 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1219 cb(UDPEvent.WRITE); 1220 } 1221 catch (Exception e) { 1222 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1223 return false; 1224 } 1225 break; 1226 } 1227 return true; 1228 } 1229 1230 bool onTCPEvent(WORD evt, WORD err, fd_t sock) { 1231 m_status = StatusInfo.init; 1232 try{ 1233 if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init) 1234 return false; 1235 } catch {} 1236 if (sock == 0) { // highly unlikely... 1237 setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined"); 1238 return false; 1239 } 1240 if (err) { 1241 setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err); 1242 try { 1243 //log("CLOSE FD#" ~ sock.to!string); 1244 (m_tcpHandlers)[sock](TCPEvent.ERROR); 1245 } catch { // can't do anything about this... 1246 } 1247 return false; 1248 } 1249 1250 TCPEventHandler cb; 1251 switch(evt) { 1252 default: break; 1253 case FD_ACCEPT: 1254 version(Distributed) gs_mtx.lock_nothrow(); 1255 1256 log("TCP Handlers: " ~ m_tcpHandlers.length.to!string); 1257 log("Accepting connection"); 1258 /// Let another listener take the next connection 1259 TCPAcceptHandler list; 1260 try list = m_connHandlers[sock]; catch { assert(false, "Listening on an invalid socket..."); } 1261 scope(exit) { 1262 /// The connection rotation mechanism is handled by the TCPListenerDistMixins 1263 /// when registering the same AsyncTCPListener object on multiple event loops. 1264 /// This allows to even out the CPU usage on a server instance. 1265 version(Distributed) 1266 { 1267 HWND hwnd = list.ctxt.next(m_hwnd); 1268 if (hwnd !is HWND.init) { 1269 int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1270 if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) { 1271 error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1272 if (catchSocketError!"WSAAsyncSelect"(error)) 1273 assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 1274 } 1275 } 1276 else log("Returned init!!"); 1277 gs_mtx.unlock_nothrow(); 1278 } 1279 } 1280 1281 NetworkAddress addr; 1282 addr.family = AF_INET; 1283 int addrlen = addr.sockAddrLen; 1284 fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1285 1286 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) { 1287 if (m_error == WSAEFAULT) { // not enough space for sockaddr 1288 addr.family = AF_INET6; 1289 addrlen = addr.sockAddrLen; 1290 csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1291 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) 1292 return false; 1293 } 1294 else return false; 1295 } 1296 1297 int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 1298 if ( catchSocketError!"WSAAsyncSelect"(ok) ) 1299 return false; 1300 1301 log("Connection accepted: " ~ csock.to!string); 1302 1303 AsyncTCPConnection conn; 1304 try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop); 1305 catch (Exception e) { assert(false, "Failed allocation"); } 1306 conn.peer = addr; 1307 conn.socket = csock; 1308 conn.inbound = true; 1309 1310 try { 1311 // Do the callback to get a handler 1312 cb = list(conn); 1313 } 1314 catch(Exception e) { 1315 setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 1316 return false; 1317 } 1318 1319 try { 1320 m_tcpHandlers[csock] = cb; // keep the handler to setup the connection 1321 log("ACCEPT&CONNECT FD#" ~ csock.to!string); 1322 *conn.connected = true; 1323 cb(TCPEvent.CONNECT); 1324 } 1325 catch (Exception e) { 1326 setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 1327 return false; 1328 } 1329 break; 1330 case FD_CONNECT: 1331 try { 1332 log("CONNECT FD#" ~ sock.to!string); 1333 cb = m_tcpHandlers.get(sock); 1334 if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1335 *cb.conn.connecting = true; 1336 } 1337 catch(Exception e) { 1338 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 1339 return false; 1340 } 1341 break; 1342 case FD_READ: 1343 try { 1344 log("READ FD#" ~ sock.to!string); 1345 cb = m_tcpHandlers.get(sock); 1346 if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1347 if (!cb.conn) break; 1348 if (*cb.conn.connected == false && *cb.conn.connecting) { 1349 *cb.conn.connecting = false; 1350 *cb.conn.connected = true; 1351 cb(TCPEvent.CONNECT); 1352 } 1353 else 1354 cb(TCPEvent.READ); 1355 } 1356 catch (Exception e) { 1357 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1358 return false; 1359 } 1360 break; 1361 case FD_WRITE: 1362 // todo: don't send the first write for consistency with epoll? 1363 1364 try { 1365 //import std.stdio; 1366 log("WRITE FD#" ~ sock.to!string); 1367 cb = m_tcpHandlers.get(sock); 1368 if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1369 if (!cb.conn) break; 1370 if (*cb.conn.connected == false && *cb.conn.connecting) { 1371 *cb.conn.connecting = false; 1372 *cb.conn.connected = true; 1373 cb(TCPEvent.CONNECT); 1374 } 1375 else { 1376 cb(TCPEvent.WRITE); 1377 } 1378 } 1379 catch (Exception e) { 1380 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1381 return false; 1382 } 1383 break; 1384 case FD_CLOSE: 1385 // called after shutdown() 1386 INT ret; 1387 bool connected = true; 1388 try { 1389 log("CLOSE FD#" ~ sock.to!string); 1390 if (sock in m_tcpHandlers) { 1391 cb = m_tcpHandlers.get(sock); 1392 if (*cb.conn.connected) { 1393 cb(TCPEvent.CLOSE); 1394 *cb.conn.connecting = false; 1395 *cb.conn.connected = false; 1396 } else 1397 connected = false; 1398 } 1399 else 1400 connected = false; 1401 } 1402 catch (Exception e) { 1403 if (m_status.code == Status.OK) 1404 setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 1405 return false; 1406 } 1407 1408 closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket 1409 1410 break; 1411 } 1412 return true; 1413 } 1414 1415 bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt) 1416 { 1417 INT err; 1418 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1419 if (catchSocketError!"bind"(err)) { 1420 closesocket(fd); 1421 return false; 1422 } 1423 err = listen(fd, 128); 1424 if (catchSocketError!"listen"(err)) { 1425 closesocket(fd); 1426 return false; 1427 } 1428 err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE); 1429 if (catchSocketError!"WSAAsyncSelect"(err)) { 1430 closesocket(fd); 1431 return false; 1432 } 1433 1434 return true; 1435 } 1436 1437 bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false) 1438 in { 1439 assert(m_threadId == GetCurrentThreadId()); 1440 assert(ctxt.local !is NetworkAddress.init); 1441 } 1442 body { 1443 INT err; 1444 if (!reusing) { 1445 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1446 if (catchSocketError!"bind"(err)) { 1447 closesocket(fd); 1448 return false; 1449 } 1450 1451 err = listen(fd, 128); 1452 if (catchSocketError!"listen"(err)) { 1453 closesocket(fd); 1454 return false; 1455 } 1456 1457 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1458 if (catchSocketError!"WSAAsyncSelect"(err)) { 1459 closesocket(fd); 1460 return false; 1461 } 1462 } 1463 1464 return true; 1465 } 1466 1467 bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt) 1468 in { 1469 assert(ctxt.peer !is NetworkAddress.init); 1470 assert(ctxt.peer.port != 0, "Connecting to an invalid port"); 1471 } 1472 body { 1473 INT err; 1474 NetworkAddress bind_addr; 1475 bind_addr.family = ctxt.peer.family; 1476 1477 if (ctxt.peer.family == AF_INET) 1478 bind_addr.sockAddrInet4.sin_addr.s_addr = 0; 1479 else if (ctxt.peer.family == AF_INET6) 1480 bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0; 1481 else { 1482 status.code = Status.ERROR; 1483 status.text = "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string; 1484 return false; 1485 } 1486 1487 err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen); 1488 if ( catchSocketError!"bind"(err) ) 1489 return false; 1490 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 1491 if ( catchSocketError!"WSAAsyncSelect"(err) ) 1492 return false; 1493 err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen); 1494 1495 auto errors = [ tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ]; 1496 1497 if (catchSocketErrorsEq!"connectEQ"(err, errors)) 1498 return true; 1499 else if (catchSocketError!"connect"(err)) 1500 return false; 1501 1502 return true; 1503 } 1504 1505 bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 1506 if (isIntegral!T) 1507 { 1508 foreach (validator ; cmp) { 1509 if (val == validator[0]) { 1510 m_status.text = TRACE; 1511 m_status.code = validator[1]; 1512 if (m_status.code == Status.EVLOOP_TIMEOUT) { 1513 log(m_status); 1514 break; 1515 } 1516 m_error = GetLastErrorSafe(); 1517 static if(LOG) log(m_status); 1518 return true; 1519 } 1520 } 1521 return false; 1522 } 1523 1524 bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 1525 if (isIntegral!T) 1526 { 1527 foreach (validator ; cmp) { 1528 if (val == validator[0]) { 1529 m_status.text = TRACE; 1530 m_error = WSAGetLastErrorSafe(); 1531 m_status.status = validator[1]; 1532 static if(LOG) log(m_status); 1533 return true; 1534 } 1535 } 1536 return false; 1537 } 1538 1539 bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...) 1540 if (isIntegral!T) 1541 { 1542 error_t err; 1543 foreach (validator ; cmp) { 1544 if (val == validator[0]) { 1545 if (err is EWIN.init) err = WSAGetLastErrorSafe(); 1546 if (err == validator[1]) { 1547 m_status.text = TRACE; 1548 m_error = WSAGetLastErrorSafe(); 1549 m_status.code = validator[2]; 1550 static if(LOG) log(m_status); 1551 return true; 1552 } 1553 } 1554 } 1555 return false; 1556 } 1557 1558 1559 bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR) 1560 if (isIntegral!T) 1561 { 1562 if (val == cmp) { 1563 m_status.text = TRACE; 1564 m_error = WSAGetLastErrorSafe(); 1565 m_status.code = Status.ABORT; 1566 static if(LOG) log(m_status); 1567 return true; 1568 } 1569 return false; 1570 } 1571 1572 error_t WSAGetLastErrorSafe() { 1573 try { 1574 return cast(error_t) WSAGetLastError(); 1575 } catch(Exception e) { 1576 return EWIN.ERROR_ACCESS_DENIED; 1577 } 1578 } 1579 1580 error_t GetLastErrorSafe() { 1581 try { 1582 return cast(error_t) GetLastError(); 1583 } catch(Exception e) { 1584 return EWIN.ERROR_ACCESS_DENIED; 1585 } 1586 } 1587 1588 void log(StatusInfo val) 1589 { 1590 static if (LOG) { 1591 import std.stdio; 1592 try { 1593 writeln("Backtrace: ", m_status.text); 1594 writeln(" | Status: ", m_status.code); 1595 writeln(" | Error: " , m_error); 1596 if ((m_error in EWSAMessages) !is null) 1597 writeln(" | Message: ", EWSAMessages[m_error]); 1598 } catch(Exception e) { 1599 return; 1600 } 1601 } 1602 } 1603 1604 void log(T)(lazy T val) 1605 { 1606 static if (LOG) { 1607 import std.stdio; 1608 try { 1609 writeln(val); 1610 } catch(Exception e) { 1611 return; 1612 } 1613 } 1614 } 1615 1616 } 1617 1618 mixin template TCPConnectionMixins() { 1619 1620 private CleanupData m_impl; 1621 1622 struct CleanupData { 1623 bool connected; 1624 bool connecting; 1625 } 1626 1627 @property bool* connecting() { 1628 return &m_impl.connecting; 1629 } 1630 1631 @property bool* connected() { 1632 return &m_impl.connected; 1633 } 1634 1635 } 1636 /* 1637 mixin template TCPListenerDistMixins() 1638 { 1639 import std.c.windows.windows : HWND; 1640 import libasync.internals.hashmap : HashMap; 1641 import core.sync.mutex; 1642 private { 1643 bool m_dist; 1644 1645 Tuple!(WinReference, bool*) m_handles; 1646 __gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist; 1647 __gshared Mutex gs_mutex; 1648 } 1649 1650 /// The TCP Listener schedules distributed connection handlers based on 1651 /// the event loops that are using the same AsyncTCPListener object. 1652 /// This is done by using WSAAsyncSelect on a different window after each 1653 /// accept TCPEvent. 1654 class WinReference { 1655 private { 1656 struct Item { 1657 HWND handle; 1658 bool active; 1659 } 1660 1661 Item[] m_items; 1662 } 1663 1664 this(HWND hndl, bool b) { 1665 append(hndl, b); 1666 } 1667 1668 void append(HWND hndl, bool b) { 1669 m_items ~= Item(hndl, b); 1670 } 1671 1672 HWND next(HWND me) { 1673 Item[] items; 1674 synchronized(gs_mutex) 1675 items = m_items; 1676 if (items.length == 1) 1677 return me; 1678 foreach (i, item; items) { 1679 if (item.active == true) { 1680 m_items[i].active = false; // remove responsibility 1681 if (m_items.length <= i + 1) { 1682 m_items[0].active = true; // set responsibility 1683 auto ret = m_items[0].handle; 1684 return ret; 1685 } 1686 else { 1687 m_items[i + 1].active = true; 1688 auto ret = m_items[i + 1].handle; 1689 return ret; 1690 } 1691 } 1692 1693 } 1694 assert(false); 1695 } 1696 1697 } 1698 1699 void init(HWND hndl, fd_t sock) { 1700 try { 1701 if (!gs_mutex) { 1702 gs_mutex = new Mutex; 1703 } 1704 synchronized(gs_mutex) { 1705 m_handles = gs_dist.get(sock); 1706 if (m_handles == typeof(m_handles).init) { 1707 gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist); 1708 m_handles = gs_dist.get(sock); 1709 assert(m_handles != typeof(m_handles).init); 1710 } 1711 else { 1712 m_handles[0].append(hndl, false); 1713 *m_handles[1] = true; // set first thread to dist 1714 m_dist = true; // set this thread to dist 1715 } 1716 } 1717 } catch (Exception e) { 1718 assert(false, e.toString()); 1719 } 1720 1721 } 1722 1723 HWND next(HWND me) { 1724 try { 1725 if (!m_dist) 1726 return HWND.init; 1727 return m_handles[0].next(me); 1728 } 1729 catch (Exception e) { 1730 assert(false, e.toString()); 1731 } 1732 } 1733 1734 }*/ 1735 private class DWHandlerInfo { 1736 DWHandler handler; 1737 Array!DWChangeInfo buffer; 1738 1739 this(DWHandler cb) { 1740 handler = cb; 1741 } 1742 } 1743 1744 private final class DWFolderWatcher { 1745 import libasync.internals.path; 1746 private: 1747 EventLoop m_evLoop; 1748 fd_t m_fd; 1749 bool m_recursive; 1750 HANDLE m_handle; 1751 Path m_path; 1752 DWFileEvent m_events; 1753 DWHandlerInfo m_handler; // contains buffer 1754 shared AsyncSignal m_signal; 1755 ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer; 1756 DWORD m_bytesTransferred; 1757 public: 1758 this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) { 1759 m_fd = fd; 1760 m_recursive = recursive; 1761 m_handle = cast(HANDLE)hndl; 1762 m_evLoop = evl; 1763 m_path = path; 1764 m_handler = handler; 1765 1766 m_signal = new shared AsyncSignal(m_evLoop); 1767 m_signal.run(&onChanged); 1768 triggerWatch(); 1769 } 1770 package: 1771 void close() { 1772 CloseHandle(m_handle); 1773 m_signal.kill(); 1774 } 1775 1776 void triggerChanged() { 1777 m_signal.trigger(); 1778 } 1779 1780 void onChanged() { 1781 ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred]; 1782 do { 1783 assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); 1784 auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr; 1785 DWFileEvent kind; 1786 switch( fni.Action ){ 1787 default: kind = DWFileEvent.MODIFIED; break; 1788 case 0x1: kind = DWFileEvent.CREATED; break; 1789 case 0x2: kind = DWFileEvent.DELETED; break; 1790 case 0x3: kind = DWFileEvent.MODIFIED; break; 1791 case 0x4: kind = DWFileEvent.MOVED_FROM; break; 1792 case 0x5: kind = DWFileEvent.MOVED_TO; break; 1793 } 1794 string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[] 1795 m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename))); 1796 if( fni.NextEntryOffset == 0 ) break; 1797 result = result[fni.NextEntryOffset .. $]; 1798 } while(result.length > 0); 1799 1800 triggerWatch(); 1801 1802 m_handler.handler(); 1803 } 1804 1805 void triggerWatch() { 1806 1807 static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME| 1808 FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE; 1809 1810 OVERLAPPED* overlapped = ThreadMem.alloc!OVERLAPPED(); 1811 overlapped.Internal = 0; 1812 overlapped.InternalHigh = 0; 1813 overlapped.Offset = 0; 1814 overlapped.OffsetHigh = 0; 1815 overlapped.Pointer = cast(void*)this; 1816 import std.stdio; 1817 DWORD bytesReturned; 1818 BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted); 1819 1820 static if (DEBUG) { 1821 import std.stdio; 1822 if (!success) 1823 writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]); 1824 } 1825 } 1826 1827 @property fd_t fd() const { 1828 return m_fd; 1829 } 1830 1831 @property HANDLE handle() const { 1832 return cast(HANDLE) m_handle; 1833 } 1834 1835 static nothrow extern(System) 1836 { 1837 void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) 1838 { 1839 import std.stdio; 1840 DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer); 1841 watcher.m_bytesTransferred = cbTransferred; 1842 try ThreadMem.free(overlapped); catch {} 1843 1844 static if (DEBUG) { 1845 if (dwError != 0) 1846 try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch{} 1847 } 1848 try watcher.triggerChanged(); 1849 catch (Exception e) { 1850 static if (DEBUG) { 1851 try writeln("Failed to trigger change"); catch {} 1852 } 1853 } 1854 } 1855 } 1856 1857 } 1858 1859 /** 1860 Represents a network/socket address. (taken from vibe.core.net) 1861 */ 1862 public struct NetworkAddress { 1863 import std.c.windows.winsock : sockaddr, sockaddr_in, sockaddr_in6; 1864 private union { 1865 sockaddr addr; 1866 sockaddr_in addr_ip4; 1867 sockaddr_in6 addr_ip6; 1868 } 1869 1870 @property bool ipv6() const pure nothrow { return this.family == AF_INET6; } 1871 1872 /** Family (AF_) of the socket address. 1873 */ 1874 @property ushort family() const pure nothrow { return addr.sa_family; } 1875 /// ditto 1876 @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } 1877 1878 /** The port in host byte order. 1879 */ 1880 @property ushort port() 1881 const pure nothrow { 1882 switch (this.family) { 1883 default: assert(false, "port() called for invalid address family: " ~ this.family.to!string); 1884 case AF_INET: return ntoh(addr_ip4.sin_port); 1885 case AF_INET6: return ntoh(addr_ip6.sin6_port); 1886 } 1887 } 1888 /// ditto 1889 @property void port(ushort val) 1890 pure nothrow { 1891 switch (this.family) { 1892 default: assert(false, "port() called for invalid address family."); 1893 case AF_INET: addr_ip4.sin_port = hton(val); break; 1894 case AF_INET6: addr_ip6.sin6_port = hton(val); break; 1895 } 1896 } 1897 1898 /** A pointer to a sockaddr struct suitable for passing to socket functions. 1899 */ 1900 @property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; } 1901 1902 /** Size of the sockaddr struct that is returned by sockAddr(). 1903 */ 1904 @property uint sockAddrLen() 1905 const pure nothrow { 1906 switch (this.family) { 1907 default: assert(false, "sockAddrLen() called for invalid address family."); 1908 case AF_INET: return addr_ip4.sizeof; 1909 case AF_INET6: return addr_ip6.sizeof; 1910 } 1911 } 1912 1913 @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow 1914 in { assert (family == AF_INET); } 1915 body { return &addr_ip4; } 1916 1917 @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow 1918 in { assert (family == AF_INET6); } 1919 body { return &addr_ip6; } 1920 1921 /** Returns a string representation of the IP address 1922 */ 1923 string toAddressString() 1924 const { 1925 import std.array : appender; 1926 import std.string : format; 1927 import std.format : formattedWrite; 1928 1929 switch (this.family) { 1930 default: assert(false, "toAddressString() called for invalid address family."); 1931 case AF_INET: 1932 ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; 1933 return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 1934 case AF_INET6: 1935 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 1936 auto ret = appender!string(); 1937 ret.reserve(40); 1938 foreach (i; 0 .. 8) { 1939 if (i > 0) ret.put(':'); 1940 ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2])); 1941 } 1942 return ret.data; 1943 } 1944 } 1945 1946 /** Returns a full string representation of the address, including the port number. 1947 */ 1948 string toString() 1949 const { 1950 1951 import std.string : format; 1952 1953 auto ret = toAddressString(); 1954 switch (this.family) { 1955 default: assert(false, "toString() called for invalid address family."); 1956 case AF_INET: return ret ~ format(":%s", port); 1957 case AF_INET6: return format("[%s]:%s", ret, port); 1958 } 1959 } 1960 1961 } 1962 1963 private pure nothrow { 1964 import std.bitmanip; 1965 1966 ushort ntoh(ushort val) 1967 { 1968 version (LittleEndian) return swapEndian(val); 1969 else version (BigEndian) return val; 1970 else static assert(false, "Unknown endianness."); 1971 } 1972 1973 ushort hton(ushort val) 1974 { 1975 version (LittleEndian) return swapEndian(val); 1976 else version (BigEndian) return val; 1977 else static assert(false, "Unknown endianness."); 1978 } 1979 } 1980 enum WM_TCP_SOCKET = WM_USER+102; 1981 enum WM_UDP_SOCKET = WM_USER+103; 1982 enum WM_USER_EVENT = WM_USER+104; 1983 enum WM_USER_SIGNAL = WM_USER+105; 1984 1985 nothrow: 1986 size_t g_idxCapacity = 8; 1987 Array!size_t g_idxAvailable; 1988 1989 // called on run 1990 size_t createIndex() { 1991 size_t idx; 1992 import std.algorithm : max; 1993 import std.range : iota; 1994 try { 1995 1996 size_t getIdx() { 1997 1998 if (!g_idxAvailable.empty) { 1999 immutable size_t ret = g_idxAvailable.back; 2000 g_idxAvailable.removeBack(); 2001 return ret; 2002 } 2003 return 0; 2004 } 2005 2006 idx = getIdx(); 2007 if (idx == 0) { 2008 import std.range : iota; 2009 // todo: not sure about this 2010 g_idxAvailable.insert( iota(g_idxCapacity, max(32, g_idxCapacity * 2), 1) ); 2011 g_idxCapacity = max(32, g_idxCapacity * 2); 2012 idx = getIdx(); 2013 } 2014 2015 } catch {} 2016 2017 return idx; 2018 } 2019 2020 void destroyIndex(AsyncTimer ctxt) { 2021 try { 2022 g_idxAvailable.insert(ctxt.id); 2023 } 2024 catch (Exception e) { 2025 assert(false, "Error destroying index: " ~ e.msg); 2026 } 2027 } 2028 2029 2030 nothrow extern(System) { 2031 LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam) 2032 { 2033 auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); 2034 if (ptr is null) 2035 return DefWindowProcA(wnd, msg, wparam, lparam); 2036 auto appl = cast(EventLoopImpl*)ptr; 2037 MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init); 2038 if (appl.onMessage(obj)) { 2039 static if (DEBUG) { 2040 if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) { 2041 import std.stdio : writeln; 2042 try { writeln(appl.error, ": ", appl.m_status.text); } catch {} 2043 } 2044 } 2045 return 0; 2046 } 2047 else return DefWindowProcA(wnd, msg, wparam, lparam); 2048 } 2049 2050 BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam); 2051 2052 }