1 module libasync.windows; 2 3 version (Windows): 4 5 import core.atomic; 6 import core.thread : Fiber; 7 import libasync.types; 8 import libasync.internals.hashmap; 9 import libasync.internals.memory; 10 import std.container : Array; 11 import std.string : toStringz; 12 import std.conv : to; 13 import std.datetime : Duration, msecs, seconds; 14 import std.algorithm : min; 15 import libasync.internals.win32; 16 import std.traits : isIntegral; 17 import std.typecons : Tuple, tuple; 18 import std.utf : toUTFz; 19 import core.sync.mutex; 20 import libasync.events; 21 pragma(lib, "ws2_32"); 22 pragma(lib, "ole32"); 23 alias fd_t = SIZE_T; 24 alias error_t = EWIN; 25 26 //todo : see if new connections with SO_REUSEADDR are evenly distributed between threads 27 28 29 package struct EventLoopImpl { 30 pragma(msg, "Using Windows IOCP for events"); 31 32 private: 33 HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array 34 HashMap!(fd_t, TCPEventHandler) m_tcpHandlers; 35 HashMap!(fd_t, TimerHandler) m_timerHandlers; 36 HashMap!(fd_t, UDPHandler) m_udpHandlers; 37 HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too 38 HashMap!(uint, DWFolderWatcher) m_dwFolders; 39 nothrow: 40 private: 41 struct TimerCache { 42 TimerHandler cb; 43 fd_t fd; 44 } 45 TimerCache m_timer; 46 47 EventLoop m_evLoop; 48 bool m_started; 49 wstring m_window; 50 HWND m_hwnd; 51 DWORD m_threadId; 52 HANDLE[] m_waitObjects; 53 ushort m_instanceId; 54 StatusInfo m_status; 55 error_t m_error = EWIN.WSA_OK; 56 __gshared Mutex gs_mtx; 57 package: 58 @property bool started() const { 59 return m_started; 60 } 61 bool init(EventLoop evl) 62 in { assert(!m_started); } 63 body 64 { 65 try if (!gs_mtx) 66 gs_mtx = new Mutex; catch {} 67 static ushort j; 68 assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux."); 69 j += 1; 70 m_status = StatusInfo.init; 71 72 import core.thread; 73 try Thread.getThis().priority = Thread.PRIORITY_MAX; 74 catch (Exception e) { assert(false, "Could not set thread priority"); } 75 76 m_evLoop = evl; 77 shared static ushort i; 78 m_instanceId = i; 79 core.atomic.atomicOp!"+="(i, cast(ushort) 1); 80 wstring inststr; 81 try { inststr = m_instanceId.to!wstring; } 82 catch (Exception e) { 83 return false; 84 } 85 m_window = "VibeWin32MessageWindow" ~ inststr; 86 wstring classname = "VibeWin32MessageWindow" ~ inststr; 87 88 LPCWSTR wnz; 89 LPCWSTR clsn; 90 try { 91 wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*); 92 clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*); 93 } catch (Exception e) { 94 setInternalError!"toUTFz"(Status.ERROR, e.msg); 95 return false; 96 } 97 98 m_threadId = GetCurrentThreadId(); 99 WNDCLASSW wc; 100 wc.lpfnWndProc = &wndProc; 101 wc.lpszClassName = clsn; 102 RegisterClassW(&wc); 103 m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE, 104 cast(HMENU) null, null, null); 105 try log("Window registered: " ~ m_hwnd.to!string); catch{} 106 auto ptr = cast(ULONG_PTR)cast(void*)&this; 107 SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr); 108 assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this ); 109 WSADATA wd; 110 m_error = cast(error_t) WSAStartup(0x0202, &wd); 111 if (m_error == EWIN.WSA_OK) 112 m_status.code = Status.OK; 113 else { 114 m_status.code = Status.ABORT; 115 static if(LOG) log(m_status); 116 return false; 117 } 118 assert(wd.wVersion == 0x0202); 119 m_started = true; 120 return true; 121 } 122 123 // todo: find where to call this 124 void exit() { 125 cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0); 126 } 127 128 @property StatusInfo status() const { 129 return m_status; 130 } 131 132 @property string error() const { 133 string* ptr; 134 string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init; 135 return pv; 136 } 137 138 bool loop(Duration timeout = 0.seconds) 139 in { 140 assert(Fiber.getThis() is null); 141 assert(m_started); 142 } 143 body { 144 DWORD msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max); 145 /* 146 * Waits until one or all of the specified objects are in the signaled state 147 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx 148 */ 149 DWORD signal = MsgWaitForMultipleObjectsEx( 150 cast(DWORD)0, 151 null, 152 msTimeout, 153 QS_ALLEVENTS, 154 MWMO_ALERTABLE | MWMO_INPUTAVAILABLE // MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion) 155 // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting 156 ); 157 158 auto errors = 159 [ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ]; /* WAIT_FAILED: Failed to call MsgWait..() */ 160 161 if (signal == WAIT_TIMEOUT) 162 return true; 163 164 if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) { 165 log("Event Loop Exiting because of error"); 166 return false; 167 } 168 169 MSG msg; 170 while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) { 171 m_status = StatusInfo.init; 172 TranslateMessage(&msg); 173 DispatchMessageW(&msg); 174 175 if (m_status.code == Status.ERROR) { 176 log(m_status.text); 177 return false; 178 } 179 } 180 return true; 181 } 182 183 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 184 { 185 m_status = StatusInfo.init; 186 fd_t fd = ctxt.socket; 187 bool reusing; 188 if (fd == fd_t.init) { 189 190 fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 191 192 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 193 return 0; 194 195 if (!setOption(fd, TCPOption.REUSEADDR, true)) 196 return 0; 197 198 // todo: defer accept? 199 200 if (ctxt.noDelay) { 201 if (!setOption(fd, TCPOption.NODELAY, true)) 202 return 0; 203 } 204 } else reusing = true; 205 206 if (initTCPListener(fd, ctxt, reusing)) 207 { 208 try { 209 log("Running listener on socket fd#" ~ fd.to!string); 210 m_connHandlers[fd] = del; 211 ctxt.init(m_hwnd, fd); 212 } 213 catch (Exception e) { 214 setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg); 215 closeSocket(fd, false); 216 return 0; 217 } 218 } 219 else 220 { 221 return 0; 222 } 223 224 225 return fd; 226 } 227 228 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 229 in { 230 assert(ctxt.socket == fd_t.init); 231 assert(ctxt.peer.family != AF_UNSPEC); 232 } 233 body { 234 m_status = StatusInfo.init; 235 fd_t fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 236 log("Starting connection at: " ~ fd.to!string); 237 if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) 238 return 0; 239 240 try { 241 (m_tcpHandlers)[fd] = del; 242 } 243 catch (Exception e) { 244 setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg); 245 return 0; 246 } 247 248 debug { 249 TCPEventHandler evh; 250 try evh = m_tcpHandlers.get(fd); 251 catch (Exception e) { log("Failed"); return 0; } 252 assert( evh !is TCPEventHandler.init); 253 } 254 255 if (ctxt.noDelay) { 256 if (!setOption(fd, TCPOption.NODELAY, true)) 257 return 0; 258 } 259 260 if (!initTCPConnection(fd, ctxt)) { 261 try { 262 log("Remove event handler for " ~ fd.to!string); 263 m_tcpHandlers.remove(fd); 264 } 265 catch (Exception e) { 266 setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg); 267 } 268 269 closeSocket(fd, false); 270 return 0; 271 } 272 273 274 try log("Client started FD#" ~ fd.to!string); 275 catch{} 276 return fd; 277 } 278 279 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 280 m_status = StatusInfo.init; 281 fd_t fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); 282 283 if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET)) 284 return 0; 285 286 if (initUDPSocket(fd, ctxt)) 287 { 288 try { 289 (m_udpHandlers)[fd] = del; 290 } 291 catch (Exception e) { 292 setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg); 293 closeSocket(fd, false); 294 return 0; 295 } 296 } 297 298 try log("UDP Socket started FD#" ~ fd.to!string); 299 catch{} 300 301 return fd; 302 } 303 304 fd_t run(shared AsyncSignal ctxt) { 305 m_status = StatusInfo.init; 306 try log("Signal subscribed to: " ~ m_hwnd.to!string); catch {} 307 return (cast(fd_t)m_hwnd); 308 } 309 310 fd_t run(AsyncNotifier ctxt) { 311 m_status = StatusInfo.init; 312 //try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch {} 313 return cast(fd_t) m_hwnd; 314 } 315 316 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 317 if (timeout < 0.seconds) 318 timeout = 0.seconds; 319 timeout += 10.msecs(); // round up to the next 10 msecs to avoid premature timer events 320 m_status = StatusInfo.init; 321 fd_t timer_id = ctxt.id; 322 if (timer_id == fd_t.init) { 323 timer_id = createIndex(); 324 } 325 try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch {} 326 327 BOOL err; 328 try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null); 329 catch(Exception e) { 330 setInternalError!"SetTimer"(Status.ERROR); 331 return 0; 332 } 333 334 if (err == 0) 335 { 336 m_error = GetLastErrorSafe(); 337 m_status.code = Status.ERROR; 338 m_status.text = "kill(AsyncTimer)"; 339 log(m_status); 340 return 0; 341 } 342 343 if (m_timer.fd == fd_t.init) 344 { 345 m_timer.fd = timer_id; 346 m_timer.cb = del; 347 } 348 else { 349 try 350 { 351 (m_timerHandlers)[timer_id] = del; 352 } 353 catch (Exception e) { 354 setInternalError!"HashMap assign"(Status.ERROR); 355 return 0; 356 } 357 } 358 359 360 return timer_id; 361 } 362 363 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) 364 { 365 static fd_t ids; 366 auto fd = ++ids; 367 368 try (m_dwHandlers)[fd] = new DWHandlerInfo(del); 369 catch (Exception e) { 370 setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg); 371 } 372 373 return fd; 374 375 } 376 377 bool kill(AsyncDirectoryWatcher ctxt) { 378 379 try { 380 Array!DWFolderWatcher toFree; 381 foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) { 382 if (v.fd == ctxt.fd) { 383 CloseHandle(v.handle); 384 m_dwFolders.remove(k); 385 } 386 } 387 388 foreach (DWFolderWatcher obj; toFree[]) 389 FreeListObjectAlloc!DWFolderWatcher.free(obj); 390 391 // todo: close all the handlers... 392 m_dwHandlers.remove(ctxt.fd); 393 } 394 catch (Exception e) { 395 setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg); 396 return false; 397 } 398 399 return true; 400 } 401 402 bool kill(AsyncTCPConnection ctxt, bool forced = false) 403 { 404 405 m_status = StatusInfo.init; 406 fd_t fd = ctxt.socket; 407 408 log("Killing socket "~ fd.to!string); 409 try { 410 auto cb = m_tcpHandlers.get(ctxt.socket); 411 if (cb != TCPEventHandler.init){ 412 *cb.conn.connected = false; 413 *cb.conn.connecting = false; 414 return closeSocket(fd, true, forced); 415 } 416 } catch (Exception e) { 417 setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg); 418 assert(false); 419 //return false; 420 } 421 422 return true; 423 } 424 425 bool kill(AsyncTCPListener ctxt) 426 { 427 m_status = StatusInfo.init; 428 fd_t fd = ctxt.socket; 429 try { 430 if ((ctxt.socket in m_connHandlers) !is null) { 431 return closeSocket(fd, false, true); 432 } 433 } catch (Exception e) { 434 setInternalError!"in m_connHandlers"(Status.ERROR, e.msg); 435 return false; 436 } 437 438 return true; 439 } 440 441 bool kill(shared AsyncSignal ctxt) { 442 return true; 443 } 444 445 bool kill(AsyncNotifier ctxt) { 446 return true; 447 } 448 449 bool kill(AsyncTimer ctxt) { 450 m_status = StatusInfo.init; 451 452 log("Kill timer"); 453 454 BOOL err = KillTimer(m_hwnd, ctxt.id); 455 if (err == 0) 456 { 457 m_error = GetLastErrorSafe(); 458 m_status.code = Status.ERROR; 459 m_status.text = "kill(AsyncTimer)"; 460 log(m_status); 461 return false; 462 } 463 464 destroyIndex(ctxt); 465 466 if (m_timer.fd == ctxt.id) { 467 ctxt.id = 0; 468 m_timer = TimerCache.init; 469 } else { 470 try { 471 m_timerHandlers.remove(ctxt.id); 472 } 473 catch (Exception e) { 474 setInternalError!"HashMap remove"(Status.ERROR); 475 return 0; 476 } 477 } 478 479 480 return true; 481 } 482 483 bool kill(AsyncUDPSocket ctxt) { 484 m_status = StatusInfo.init; 485 486 fd_t fd = ctxt.socket; 487 INT err = closesocket(fd); 488 if (catchSocketError!"closesocket"(err)) 489 return false; 490 491 try m_udpHandlers.remove(ctxt.socket); 492 catch (Exception e) { 493 setInternalError!"HashMap remove"(Status.ERROR); 494 return 0; 495 } 496 497 return true; 498 } 499 500 bool setOption(T)(fd_t fd, TCPOption option, in T value) { 501 m_status = StatusInfo.init; 502 int err; 503 try { 504 nothrow bool errorHandler() { 505 if (catchSocketError!"setOption:"(err)) { 506 try m_status.text ~= option.to!string; 507 catch (Exception e){ assert(false, "to!string conversion failure"); } 508 return false; 509 } 510 511 return true; 512 } 513 514 515 static HashMap!(fd_t, tcp_keepalive)* kcache; 516 517 final switch (option) { 518 519 case TCPOption.NODELAY: // true/false 520 static if (!is(T == bool)) 521 assert(false, "NODELAY value type must be bool, not " ~ T.stringof); 522 else { 523 BOOL val = value?1:0; 524 socklen_t len = val.sizeof; 525 err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); 526 return errorHandler(); 527 } 528 case TCPOption.REUSEADDR: // true/false 529 static if (!is(T == bool)) 530 assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof); 531 else 532 { 533 BOOL val = value?1:0; 534 socklen_t len = val.sizeof; 535 err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); 536 return errorHandler(); 537 } 538 case TCPOption.QUICK_ACK: 539 static if (!is(T == bool)) 540 assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof); 541 else { 542 m_status.code = Status.NOT_IMPLEMENTED; 543 return false; // quick ack is not implemented 544 } 545 case TCPOption.KEEPALIVE_ENABLE: // true/false 546 static if (!is(T == bool)) 547 assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof); 548 else 549 { 550 BOOL val = value?1:0; 551 socklen_t len = val.sizeof; 552 err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); 553 return errorHandler(); 554 } 555 case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn 556 static if (!isIntegral!T) 557 assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof); 558 else { 559 m_status.code = Status.NOT_IMPLEMENTED; 560 return false; 561 } 562 case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets 563 static if (!is(T == Duration)) 564 assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof); 565 else { 566 567 if (!kcache) 568 kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator()); 569 570 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 571 tcp_keepalive sReturned; 572 DWORD dwBytes; 573 kaSettings.onoff = ULONG(1); 574 if (kaSettings.keepalivetime == ULONG.init) { 575 kaSettings.keepalivetime = 1000; 576 } 577 kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG; 578 (*kcache)[fd] = kaSettings; 579 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 580 581 return errorHandler(); 582 } 583 case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start 584 static if (!is(T == Duration)) 585 assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof); 586 else { 587 588 if (!kcache) 589 kcache = new HashMap!(fd_t, tcp_keepalive)(defaultAllocator()); 590 591 tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init); 592 tcp_keepalive sReturned; 593 DWORD dwBytes; 594 kaSettings.onoff = ULONG(1); 595 if (kaSettings.keepaliveinterval == ULONG.init) { 596 kaSettings.keepaliveinterval = 75*1000; 597 } 598 kaSettings.keepalivetime = value.total!"msecs".to!ULONG; 599 600 (*kcache)[fd] = kaSettings; 601 err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null); 602 603 return errorHandler(); 604 } 605 case TCPOption.BUFFER_RECV: // bytes 606 static if (!isIntegral!T) 607 assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof); 608 else { 609 int val = value.to!int; 610 socklen_t len = val.sizeof; 611 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); 612 return errorHandler(); 613 } 614 case TCPOption.BUFFER_SEND: // bytes 615 static if (!isIntegral!T) 616 assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof); 617 else { 618 int val = value.to!int; 619 socklen_t len = val.sizeof; 620 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); 621 return errorHandler(); 622 } 623 case TCPOption.TIMEOUT_RECV: 624 static if (!is(T == Duration)) 625 assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof); 626 else { 627 DWORD val = value.total!"msecs".to!DWORD; 628 socklen_t len = val.sizeof; 629 err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len); 630 return errorHandler(); 631 } 632 case TCPOption.TIMEOUT_SEND: 633 static if (!is(T == Duration)) 634 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 635 else { 636 DWORD val = value.total!"msecs".to!DWORD; 637 socklen_t len = val.sizeof; 638 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len); 639 return errorHandler(); 640 } 641 case TCPOption.TIMEOUT_HALFOPEN: 642 static if (!is(T == Duration)) 643 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 644 else { 645 m_status.code = Status.NOT_IMPLEMENTED; 646 return false; 647 } 648 case TCPOption.LINGER: // bool onOff, int seconds 649 static if (!is(T == Tuple!(bool, int))) 650 assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof); 651 else { 652 linger l = linger(val[0]?1:0, val[1].to!USHORT); 653 socklen_t llen = l.sizeof; 654 err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen); 655 return errorHandler(); 656 } 657 case TCPOption.CONGESTION: 658 static if (!isIntegral!T) 659 assert(false, "CONGESTION value type must be integral, not " ~ T.stringof); 660 else { 661 m_status.code = Status.NOT_IMPLEMENTED; 662 return false; 663 } 664 case TCPOption.CORK: 665 static if (!isIntegral!T) 666 assert(false, "CORK value type must be int, not " ~ T.stringof); 667 else { 668 m_status.code = Status.NOT_IMPLEMENTED; 669 return false; 670 } 671 case TCPOption.DEFER_ACCEPT: // seconds 672 static if (!isIntegral!T) 673 assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof); 674 else { 675 int val = value.to!int; 676 socklen_t len = val.sizeof; 677 err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len); 678 return errorHandler(); 679 } 680 } 681 682 } 683 catch (Exception e) { 684 return false; 685 } 686 687 } 688 689 uint read(in fd_t fd, ref ubyte[] data) 690 { 691 return 0; 692 } 693 694 uint write(in fd_t fd, in ubyte[] data) 695 { 696 return 0; 697 } 698 699 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 700 size_t i; 701 Array!DWChangeInfo* changes; 702 try { 703 changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer); 704 if ((*changes).empty) 705 return 0; 706 707 import std.algorithm : min; 708 size_t cnt = min(dst.length, changes.length); 709 foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { 710 try log("reading change: " ~ change.path); catch {} 711 dst[i] = (*changes)[i]; 712 i++; 713 } 714 changes.linearRemove((*changes)[0 .. cnt]); 715 } 716 catch (Exception e) { 717 setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); 718 return 0; 719 } 720 try log("Changes returning with: " ~ i.to!string); catch {} 721 return cast(uint) i; 722 } 723 724 uint watch(in fd_t fd, in WatchInfo info) { 725 m_status = StatusInfo.init; 726 uint wd; 727 try { 728 HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()), 729 FILE_LIST_DIRECTORY, 730 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, 731 null, 732 OPEN_EXISTING, 733 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 734 null); 735 wd = cast(uint) hndl; 736 DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init); 737 assert(handler !is null); 738 log("Watching: " ~ info.path.toNativeString()); 739 (m_dwFolders)[wd] = FreeListObjectAlloc!DWFolderWatcher.alloc(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive); 740 } catch (Exception e) { 741 setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg); 742 return 0; 743 } 744 return wd; 745 } 746 747 bool unwatch(in fd_t fd, in fd_t _wd) { 748 uint wd = cast(uint) _wd; 749 m_status = StatusInfo.init; 750 try { 751 DWFolderWatcher fw = m_dwFolders.get(wd, null); 752 assert(fw !is null); 753 m_dwFolders.remove(wd); 754 fw.close(); 755 FreeListObjectAlloc!DWFolderWatcher.free(fw); 756 } catch (Exception e) { 757 setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg); 758 return false; 759 } 760 return true; 761 } 762 763 bool notify(T)(in fd_t fd, in T payload) 764 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 765 { 766 m_status = StatusInfo.init; 767 import std.conv; 768 769 auto payloadPtr = cast(ubyte*)payload; 770 auto payloadAddr = cast(ulong)payloadPtr; 771 772 WPARAM wparam = payloadAddr & 0xffffffff; 773 LPARAM lparam = cast(uint) (payloadAddr >> 32); 774 775 BOOL err; 776 static if (is(T == AsyncNotifier)) 777 err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam); 778 else 779 err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam); 780 try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch {} 781 if (err == 0) 782 { 783 m_error = GetLastErrorSafe(); 784 m_status.code = Status.ERROR; 785 m_status.text = "notify"; 786 log(m_status); 787 return false; 788 } 789 return true; 790 } 791 792 uint recv(in fd_t fd, ref ubyte[] data) 793 { 794 m_status = StatusInfo.init; 795 int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 796 797 //try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch {} 798 if (catchSocketError!".recv"(ret)) { // ret == -1 799 if (m_error == error_t.WSAEWOULDBLOCK) 800 m_status.code = Status.ASYNC; 801 return 0; // TODO: handle some errors more specifically 802 } 803 m_status.code = Status.OK; 804 805 return cast(uint) ret; 806 } 807 808 uint send(in fd_t fd, in ubyte[] data) 809 { 810 m_status = StatusInfo.init; 811 //try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string); 812 //catch{} 813 int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0); 814 815 if (catchSocketError!"send"(ret)) { 816 if (m_error == error_t.WSAEWOULDBLOCK) 817 m_status.code = Status.ASYNC; 818 return 0; // TODO: handle some errors more specifically 819 } 820 m_status.code = Status.ASYNC; 821 return cast(uint) ret; 822 } 823 824 bool broadcast(in fd_t fd, bool b) { 825 int val = b?1:0; 826 socklen_t len = val.sizeof; 827 int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len); 828 if (catchSocketError!"setsockopt"(err)) 829 return false; 830 831 return true; 832 833 } 834 835 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) 836 { 837 m_status = StatusInfo.init; 838 socklen_t addrLen; 839 addr.family = AF_INET; 840 int ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen); 841 842 if (addrLen > addr.sockAddrLen) { 843 addr.family = AF_INET6; 844 } 845 846 try log("RECVFROM " ~ ret.to!string ~ "B"); catch {} 847 if (catchSocketError!".recvfrom"(ret)) { // ret == -1 848 if (m_error == WSAEWOULDBLOCK) 849 m_status.code = Status.ASYNC; 850 return 0; // TODO: handle some errors more specifically 851 } 852 m_status.code = Status.OK; 853 854 return cast(uint) ret; 855 } 856 857 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) 858 { 859 m_status = StatusInfo.init; 860 try log("SENDTO " ~ data.length.to!string ~ "B"); catch{} 861 int ret; 862 if (addr != NetworkAddress.init) 863 ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen); 864 else 865 ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0); 866 867 if (catchSocketError!".sendTo"(ret)) { // ret == -1 868 if (m_error == WSAEWOULDBLOCK) 869 m_status.code = Status.ASYNC; 870 return 0; // TODO: handle some errors more specifically 871 } 872 873 m_status.code = Status.OK; 874 return cast(uint) ret; 875 } 876 877 NetworkAddress localAddr(in fd_t fd, bool ipv6) { 878 NetworkAddress ret; 879 import libasync.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr; 880 if (ipv6) 881 ret.family = AF_INET6; 882 else 883 ret.family = AF_INET; 884 socklen_t len = ret.sockAddrLen; 885 int err = getsockname(fd, ret.sockAddr, &len); 886 if (catchSocketError!"getsockname"(err)) 887 return NetworkAddress.init; 888 if (len > ret.sockAddrLen) 889 ret.family = AF_INET6; 890 return ret; 891 } 892 893 void noDelay(in fd_t fd, bool b) { 894 m_status = StatusInfo.init; 895 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof); 896 } 897 898 private bool closeRemoteSocket(fd_t fd, bool forced) { 899 900 INT err; 901 902 try log("Shutdown FD#" ~ fd.to!string); 903 catch{} 904 if (forced) { 905 err = shutdown(fd, SD_BOTH); 906 closesocket(fd); 907 } 908 else 909 err = shutdown(fd, SD_SEND); 910 911 try { 912 TCPEventHandler* evh = fd in m_tcpHandlers; 913 if (evh && evh.conn.inbound) { 914 try FreeListObjectAlloc!AsyncTCPConnection.free(evh.conn); 915 catch(Exception e) { assert(false, "Failed to free resources"); } 916 evh.conn = null; 917 //log("Remove event handler for " ~ fd.to!string); 918 m_tcpHandlers.remove(fd); 919 } 920 } 921 catch (Exception e) { 922 setInternalError!"m_tcpHandlers.remove"(Status.ERROR); 923 return false; 924 } 925 if (catchSocketError!"shutdown"(err)) 926 return false; 927 return true; 928 } 929 930 // for connected sockets 931 bool closeSocket(fd_t fd, bool connected, bool forced = false) 932 { 933 m_status = StatusInfo.init; 934 if (!connected && forced) { 935 try { 936 if (fd in m_connHandlers) { 937 log("Removing connection handler for: " ~ fd.to!string); 938 m_connHandlers.remove(fd); 939 } 940 } 941 catch (Exception e) { 942 setInternalError!"m_connHandlers.remove"(Status.ERROR); 943 return false; 944 } 945 } 946 else if (connected) 947 closeRemoteSocket(fd, forced); 948 949 if (!connected || forced) { 950 // todo: flush the socket here? 951 952 INT err = closesocket(fd); 953 if (catchSocketError!"closesocket"(err)) 954 return false; 955 956 } 957 return true; 958 } 959 960 bool closeConnection(fd_t fd) { 961 return closeSocket(fd, true); 962 } 963 964 NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 965 in { 966 debug import libasync.internals.validator : validateIPv4, validateIPv6; 967 debug assert( validateIPv4(ipAddr) || validateIPv6(ipAddr), "Trying to connect to an invalid IP address"); 968 } 969 body { 970 m_status = StatusInfo.init; 971 972 NetworkAddress addr; 973 WSAPROTOCOL_INFOW hints; 974 import std.conv : to; 975 if (ipv6) { 976 addr.family = AF_INET6; 977 } 978 else { 979 addr.family = AF_INET; 980 } 981 982 INT addrlen = addr.sockAddrLen; 983 984 LPWSTR str; 985 try { 986 str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr); 987 } catch (Exception e) { 988 setInternalError!"toStringz"(Status.ERROR, e.msg); 989 return NetworkAddress.init; 990 } 991 992 INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); 993 if (port != 0) addr.port = port; 994 try log(addr.toString()); 995 catch {} 996 if( catchSocketError!"getAddressFromIP"(err) ) 997 return NetworkAddress.init; 998 else assert(addrlen == addr.sockAddrLen); 999 return addr; 1000 } 1001 1002 NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true) 1003 /*in { 1004 debug import libasync.internals.validator : validateHost; 1005 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 1006 } 1007 body */{ 1008 m_status = StatusInfo.init; 1009 import std.conv : to; 1010 NetworkAddress addr; 1011 ADDRINFOW hints; 1012 ADDRINFOW* infos; 1013 LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*); 1014 if (ipv6) { 1015 hints.ai_family = AF_INET6; 1016 addr.family = AF_INET6; 1017 } 1018 else { 1019 hints.ai_family = AF_INET; 1020 addr.family = AF_INET; 1021 } 1022 1023 if (tcp) { 1024 hints.ai_protocol = IPPROTO_TCP; 1025 hints.ai_socktype = SOCK_STREAM; 1026 } 1027 else { 1028 hints.ai_protocol = IPPROTO_UDP; 1029 hints.ai_socktype = SOCK_DGRAM; 1030 } 1031 if (port != 0) addr.port = port; 1032 1033 LPCWSTR str; 1034 1035 try { 1036 str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host); 1037 } catch (Exception e) { 1038 setInternalError!"toUTFz"(Status.ERROR, e.msg); 1039 return NetworkAddress.init; 1040 } 1041 1042 error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos); 1043 if (err != EWIN.WSA_OK) { 1044 setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err); 1045 return NetworkAddress.init; 1046 } 1047 1048 ubyte* pAddr = cast(ubyte*) infos.ai_addr; 1049 ubyte* data = cast(ubyte*) addr.sockAddr; 1050 data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy 1051 FreeAddrInfoW(infos); 1052 try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString()); 1053 catch (Exception e){} 1054 return addr; 1055 } 1056 1057 void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED) 1058 { 1059 if (details.length > 0) 1060 m_status.text = TRACE ~ ": " ~ details; 1061 else 1062 m_status.text = TRACE; 1063 m_error = error; 1064 m_status.code = s; 1065 static if(LOG) log(m_status); 1066 } 1067 private: 1068 bool onMessage(MSG msg) 1069 { 1070 m_status = StatusInfo.init; 1071 switch (msg.message) { 1072 case WM_TCP_SOCKET: 1073 auto evt = LOWORD(msg.lParam); 1074 auto err = HIWORD(msg.lParam); 1075 if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) { 1076 1077 if (evt == FD_ACCEPT) 1078 setInternalError!"del@TCPAccept.ERROR"(Status.ERROR); 1079 else { 1080 try { 1081 TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam); 1082 cb(TCPEvent.ERROR); 1083 } 1084 catch (Exception e) { 1085 // An Error callback should never fail... 1086 setInternalError!"del@TCPEvent.ERROR"(Status.ERROR); 1087 // assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string); 1088 } 1089 } 1090 } 1091 break; 1092 case WM_UDP_SOCKET: 1093 auto evt = LOWORD(msg.lParam); 1094 auto err = HIWORD(msg.lParam); 1095 if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) { 1096 try { 1097 UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam); 1098 cb(UDPEvent.ERROR); 1099 } 1100 catch (Exception e) { 1101 // An Error callback should never fail... 1102 setInternalError!"del@UDPEvent.ERROR"(Status.ERROR); 1103 } 1104 } 1105 break; 1106 case WM_TIMER: 1107 log("Timer callback"); 1108 TimerHandler cb; 1109 bool cached = (m_timer.fd == cast(fd_t)msg.wParam); 1110 try { 1111 if (cached) 1112 cb = m_timer.cb; 1113 else 1114 cb = m_timerHandlers.get(cast(fd_t)msg.wParam); 1115 1116 cb.ctxt.rearmed = false; 1117 1118 if (cb.ctxt.oneShot) 1119 kill(cb.ctxt); 1120 1121 cb(); 1122 1123 if (cb.ctxt.oneShot && cb.ctxt.rearmed) 1124 run(cb.ctxt, cb, cb.ctxt.timeout); 1125 1126 } 1127 catch (Exception e) { 1128 // An Error callback should never fail... 1129 setInternalError!"del@TimerHandler"(Status.ERROR, e.msg); 1130 } 1131 1132 break; 1133 case WM_USER_EVENT: 1134 log("User event"); 1135 1136 ulong uwParam = cast(ulong)msg.wParam; 1137 ulong ulParam = cast(ulong)msg.lParam; 1138 1139 ulong payloadAddr = (ulParam << 32) | uwParam; 1140 void* payloadPtr = cast(void*) payloadAddr; 1141 shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr; 1142 1143 try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch {} 1144 try { 1145 assert(ctxt.id != 0); 1146 ctxt.handler(); 1147 } 1148 catch (Exception e) { 1149 setInternalError!"WM_USER_EVENT@handler"(Status.ERROR); 1150 } 1151 break; 1152 case WM_USER_SIGNAL: 1153 log("User signal"); 1154 1155 ulong uwParam = cast(ulong)msg.wParam; 1156 ulong ulParam = cast(ulong)msg.lParam; 1157 1158 ulong payloadAddr = (ulParam << 32) | uwParam; 1159 void* payloadPtr = cast(void*) payloadAddr; 1160 AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr; 1161 1162 try { 1163 ctxt.handler(); 1164 } 1165 catch (Exception e) { 1166 setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR); 1167 } 1168 break; 1169 default: return false; // not handled, sends to wndProc 1170 } 1171 return true; 1172 } 1173 1174 bool onUDPEvent(WORD evt, WORD err, fd_t sock) { 1175 m_status = StatusInfo.init; 1176 try{ 1177 if (m_udpHandlers.get(sock) == UDPHandler.init) 1178 return false; 1179 } catch {} 1180 if (sock == 0) { // highly unlikely... 1181 setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined"); 1182 return false; 1183 } 1184 if (err) { 1185 setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err); 1186 try { 1187 //log("CLOSE FD#" ~ sock.to!string); 1188 (m_udpHandlers)[sock](UDPEvent.ERROR); 1189 } catch { // can't do anything about this... 1190 } 1191 return false; 1192 } 1193 1194 UDPHandler cb; 1195 switch(evt) { 1196 default: break; 1197 case FD_READ: 1198 try { 1199 log("READ FD#" ~ sock.to!string); 1200 cb = m_udpHandlers.get(sock); 1201 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1202 cb(UDPEvent.READ); 1203 } 1204 catch (Exception e) { 1205 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1206 return false; 1207 } 1208 break; 1209 case FD_WRITE: 1210 try { 1211 log("WRITE FD#" ~ sock.to!string); 1212 cb = m_udpHandlers.get(sock); 1213 assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1214 cb(UDPEvent.WRITE); 1215 } 1216 catch (Exception e) { 1217 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1218 return false; 1219 } 1220 break; 1221 } 1222 return true; 1223 } 1224 1225 bool onTCPEvent(WORD evt, WORD err, fd_t sock) { 1226 m_status = StatusInfo.init; 1227 try{ 1228 if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init) 1229 assert( false ); 1230 } catch {} 1231 if (sock == 0) { // highly unlikely... 1232 setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined"); 1233 assert(false); 1234 } 1235 if (err) { 1236 setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err); 1237 try { 1238 //log("CLOSE FD#" ~ sock.to!string); 1239 (m_tcpHandlers)[sock](TCPEvent.ERROR); 1240 } catch { // can't do anything about this... 1241 } 1242 return false; 1243 } 1244 1245 TCPEventHandler cb; 1246 switch(evt) { 1247 default: break; 1248 case FD_ACCEPT: 1249 gs_mtx.lock_nothrow(); 1250 1251 log("TCP Handlers: " ~ m_tcpHandlers.length.to!string); 1252 log("Accepting connection"); 1253 /// Let another listener take the next connection 1254 TCPAcceptHandler list; 1255 try list = m_connHandlers[sock]; catch { assert(false, "Listening on an invalid socket..."); } 1256 scope(exit) { 1257 /// The connection rotation mechanism is handled by the TCPListenerDistMixins 1258 /// when registering the same AsyncTCPListener object on multiple event loops. 1259 /// This allows to even out the CPU usage on a server instance. 1260 HWND hwnd = list.ctxt.next(m_hwnd); 1261 if (hwnd !is HWND.init) { 1262 int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1263 if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) { 1264 error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1265 if (catchSocketError!"WSAAsyncSelect"(error)) 1266 assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); 1267 } 1268 } 1269 else log("Returned init!!"); 1270 1271 gs_mtx.unlock_nothrow(); 1272 } 1273 1274 NetworkAddress addr; 1275 addr.family = AF_INET; 1276 int addrlen = addr.sockAddrLen; 1277 fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1278 1279 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) { 1280 if (m_error == WSAEFAULT) { // not enough space for sockaddr 1281 addr.family = AF_INET6; 1282 addrlen = addr.sockAddrLen; 1283 csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1284 if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) 1285 return false; 1286 } 1287 else return false; 1288 } 1289 1290 int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 1291 if ( catchSocketError!"WSAAsyncSelect"(ok) ) 1292 return false; 1293 1294 log("Connection accepted: " ~ csock.to!string); 1295 1296 AsyncTCPConnection conn; 1297 try conn = FreeListObjectAlloc!AsyncTCPConnection.alloc(m_evLoop); 1298 catch (Exception e) { assert(false, "Failed allocation"); } 1299 conn.peer = addr; 1300 conn.socket = csock; 1301 conn.inbound = true; 1302 1303 try { 1304 // Do the callback to get a handler 1305 cb = list(conn); 1306 } 1307 catch(Exception e) { 1308 setInternalError!"onConnected"(Status.EVLOOP_FAILURE); 1309 return false; 1310 } 1311 1312 try { 1313 m_tcpHandlers[csock] = cb; // keep the handler to setup the connection 1314 log("ACCEPT&CONNECT FD#" ~ csock.to!string); 1315 *conn.connecting = true; 1316 //cb(TCPEvent.CONNECT); 1317 } 1318 catch (Exception e) { 1319 setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT); 1320 return false; 1321 } 1322 break; 1323 case FD_CONNECT: 1324 try { 1325 log("CONNECT FD#" ~ sock.to!string); 1326 cb = m_tcpHandlers.get(sock); 1327 if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1328 *cb.conn.connecting = true; 1329 } 1330 catch(Exception e) { 1331 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 1332 return false; 1333 } 1334 break; 1335 case FD_READ: 1336 try { 1337 log("READ FD#" ~ sock.to!string); 1338 cb = m_tcpHandlers.get(sock); 1339 if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1340 if (!cb.conn) break; 1341 if (*cb.conn.connected == false && *cb.conn.connecting) { 1342 *cb.conn.connecting = false; 1343 *cb.conn.connected = true; 1344 cb(TCPEvent.CONNECT); 1345 } 1346 else 1347 cb(TCPEvent.READ); 1348 } 1349 catch (Exception e) { 1350 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1351 return false; 1352 } 1353 break; 1354 case FD_WRITE: 1355 // todo: don't send the first write for consistency with epoll? 1356 1357 try { 1358 //import std.stdio; 1359 log("WRITE FD#" ~ sock.to!string); 1360 cb = m_tcpHandlers.get(sock); 1361 if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); 1362 if (!cb.conn) break; 1363 if (*cb.conn.connected == false && *cb.conn.connecting) { 1364 *cb.conn.connecting = false; 1365 *cb.conn.connected = true; 1366 cb(TCPEvent.CONNECT); 1367 } 1368 else { 1369 cb(TCPEvent.WRITE); 1370 } 1371 } 1372 catch (Exception e) { 1373 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1374 return false; 1375 } 1376 break; 1377 case FD_CLOSE: 1378 // called after shutdown() 1379 INT ret; 1380 bool connected = true; 1381 try { 1382 log("CLOSE FD#" ~ sock.to!string); 1383 if (sock in m_tcpHandlers) { 1384 cb = m_tcpHandlers.get(sock); 1385 if (*cb.conn.connected) { 1386 cb(TCPEvent.CLOSE); 1387 *cb.conn.connecting = false; 1388 *cb.conn.connected = false; 1389 } else 1390 connected = false; 1391 } 1392 else 1393 connected = false; 1394 } 1395 catch (Exception e) { 1396 if (m_status.code == Status.OK) 1397 setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 1398 return false; 1399 } 1400 1401 closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket 1402 1403 break; 1404 } 1405 return true; 1406 } 1407 1408 bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt) 1409 { 1410 INT err; 1411 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1412 if (catchSocketError!"bind"(err)) { 1413 closesocket(fd); 1414 return false; 1415 } 1416 err = listen(fd, 128); 1417 if (catchSocketError!"listen"(err)) { 1418 closesocket(fd); 1419 return false; 1420 } 1421 err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE); 1422 if (catchSocketError!"WSAAsyncSelect"(err)) { 1423 closesocket(fd); 1424 return false; 1425 } 1426 1427 return true; 1428 } 1429 1430 bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false) 1431 in { 1432 assert(m_threadId == GetCurrentThreadId()); 1433 assert(ctxt.local !is NetworkAddress.init); 1434 } 1435 body { 1436 INT err; 1437 if (!reusing) { 1438 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1439 if (catchSocketError!"bind"(err)) { 1440 closesocket(fd); 1441 return false; 1442 } 1443 1444 err = listen(fd, 128); 1445 if (catchSocketError!"listen"(err)) { 1446 closesocket(fd); 1447 return false; 1448 } 1449 1450 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT); 1451 if (catchSocketError!"WSAAsyncSelect"(err)) { 1452 closesocket(fd); 1453 return false; 1454 } 1455 } 1456 1457 return true; 1458 } 1459 1460 bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt) 1461 in { 1462 assert(ctxt.peer !is NetworkAddress.init); 1463 assert(ctxt.peer.port != 0, "Connecting to an invalid port"); 1464 } 1465 body { 1466 INT err; 1467 NetworkAddress bind_addr; 1468 bind_addr.family = ctxt.peer.family; 1469 1470 if (ctxt.peer.family == AF_INET) 1471 bind_addr.sockAddrInet4.sin_addr.s_addr = 0; 1472 else if (ctxt.peer.family == AF_INET6) 1473 bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0; 1474 else assert(false, "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string); 1475 1476 err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen); 1477 if ( catchSocketError!"bind"(err) ) 1478 return false; 1479 err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE); 1480 if ( catchSocketError!"WSAAsyncSelect"(err) ) 1481 return false; 1482 err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen); 1483 1484 auto errors = [ tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ]; 1485 1486 if (catchSocketErrorsEq!"connectEQ"(err, errors)) 1487 return true; 1488 else if (catchSocketError!"connect"(err)) 1489 return false; 1490 1491 return true; 1492 } 1493 1494 bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 1495 if (isIntegral!T) 1496 { 1497 foreach (validator ; cmp) { 1498 if (val == validator[0]) { 1499 m_status.text = TRACE; 1500 m_status.code = validator[1]; 1501 if (m_status.code == Status.EVLOOP_TIMEOUT) { 1502 log(m_status); 1503 break; 1504 } 1505 m_error = GetLastErrorSafe(); 1506 static if(LOG) log(m_status); 1507 return true; 1508 } 1509 } 1510 return false; 1511 } 1512 1513 bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 1514 if (isIntegral!T) 1515 { 1516 foreach (validator ; cmp) { 1517 if (val == validator[0]) { 1518 m_status.text = TRACE; 1519 m_error = WSAGetLastErrorSafe(); 1520 m_status.status = validator[1]; 1521 static if(LOG) log(m_status); 1522 return true; 1523 } 1524 } 1525 return false; 1526 } 1527 1528 bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...) 1529 if (isIntegral!T) 1530 { 1531 error_t err; 1532 foreach (validator ; cmp) { 1533 if (val == validator[0]) { 1534 if (err is EWIN.init) err = WSAGetLastErrorSafe(); 1535 if (err == validator[1]) { 1536 m_status.text = TRACE; 1537 m_error = WSAGetLastErrorSafe(); 1538 m_status.code = validator[2]; 1539 static if(LOG) log(m_status); 1540 return true; 1541 } 1542 } 1543 } 1544 return false; 1545 } 1546 1547 1548 bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR) 1549 if (isIntegral!T) 1550 { 1551 if (val == cmp) { 1552 m_status.text = TRACE; 1553 m_error = WSAGetLastErrorSafe(); 1554 m_status.code = Status.ABORT; 1555 static if(LOG) log(m_status); 1556 return true; 1557 } 1558 return false; 1559 } 1560 1561 error_t WSAGetLastErrorSafe() { 1562 try { 1563 return cast(error_t) WSAGetLastError(); 1564 } catch(Exception e) { 1565 return EWIN.ERROR_ACCESS_DENIED; 1566 } 1567 } 1568 1569 error_t GetLastErrorSafe() { 1570 try { 1571 return cast(error_t) GetLastError(); 1572 } catch(Exception e) { 1573 return EWIN.ERROR_ACCESS_DENIED; 1574 } 1575 } 1576 1577 void log(StatusInfo val) 1578 { 1579 static if (LOG) { 1580 import std.stdio; 1581 try { 1582 writeln("Backtrace: ", m_status.text); 1583 writeln(" | Status: ", m_status.code); 1584 writeln(" | Error: " , m_error); 1585 if ((m_error in EWSAMessages) !is null) 1586 writeln(" | Message: ", EWSAMessages[m_error]); 1587 } catch(Exception e) { 1588 return; 1589 } 1590 } 1591 } 1592 1593 void log(T)(T val) 1594 { 1595 static if (LOG) { 1596 import std.stdio; 1597 try { 1598 writeln(val); 1599 } catch(Exception e) { 1600 return; 1601 } 1602 } 1603 } 1604 1605 } 1606 1607 mixin template TCPConnectionMixins() { 1608 1609 private CleanupData m_impl; 1610 1611 struct CleanupData { 1612 bool connected; 1613 bool connecting; 1614 } 1615 1616 @property bool* connecting() { 1617 return &m_impl.connecting; 1618 } 1619 1620 @property bool* connected() { 1621 return &m_impl.connected; 1622 } 1623 1624 } 1625 1626 mixin template TCPListenerDistMixins() 1627 { 1628 import std.c.windows.windows : HWND; 1629 import libasync.internals.hashmap : HashMap; 1630 import core.sync.mutex; 1631 private { 1632 bool m_dist; 1633 1634 Tuple!(WinReference, bool*) m_handles; 1635 __gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist; 1636 __gshared Mutex gs_mutex; 1637 } 1638 1639 /// The TCP Listener schedules distributed connection handlers based on 1640 /// the event loops that are using the same AsyncTCPListener object. 1641 /// This is done by using WSAAsyncSelect on a different window after each 1642 /// accept TCPEvent. 1643 class WinReference { 1644 private { 1645 struct Item { 1646 HWND handle; 1647 bool active; 1648 } 1649 1650 Item[] m_items; 1651 } 1652 1653 this(HWND hndl, bool b) { 1654 append(hndl, b); 1655 } 1656 1657 void append(HWND hndl, bool b) { 1658 m_items ~= Item(hndl, b); 1659 } 1660 1661 HWND next(HWND me) { 1662 Item[] items; 1663 synchronized(gs_mutex) 1664 items = m_items; 1665 if (items.length == 1) 1666 return me; 1667 foreach (i, item; items) { 1668 if (item.active == true) { 1669 m_items[i].active = false; // remove responsibility 1670 if (m_items.length <= i + 1) { 1671 m_items[0].active = true; // set responsibility 1672 auto ret = m_items[0].handle; 1673 return ret; 1674 } 1675 else { 1676 m_items[i + 1].active = true; 1677 auto ret = m_items[i + 1].handle; 1678 return ret; 1679 } 1680 } 1681 1682 } 1683 assert(false); 1684 } 1685 1686 } 1687 1688 void init(HWND hndl, fd_t sock) { 1689 try { 1690 if (!gs_mutex) { 1691 gs_mutex = new Mutex; 1692 } 1693 synchronized(gs_mutex) { 1694 m_handles = gs_dist.get(sock); 1695 if (m_handles == typeof(m_handles).init) { 1696 gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist); 1697 m_handles = gs_dist.get(sock); 1698 assert(m_handles != typeof(m_handles).init); 1699 } 1700 else { 1701 m_handles[0].append(hndl, false); 1702 *m_handles[1] = true; // set first thread to dist 1703 m_dist = true; // set this thread to dist 1704 } 1705 } 1706 } catch (Exception e) { 1707 assert(false, e.toString()); 1708 } 1709 1710 } 1711 1712 HWND next(HWND me) { 1713 try { 1714 if (!m_dist) 1715 return HWND.init; 1716 return m_handles[0].next(me); 1717 } 1718 catch (Exception e) { 1719 assert(false, e.toString()); 1720 } 1721 } 1722 1723 } 1724 private class DWHandlerInfo { 1725 DWHandler handler; 1726 Array!DWChangeInfo buffer; 1727 1728 this(DWHandler cb) { 1729 handler = cb; 1730 } 1731 } 1732 1733 private final class DWFolderWatcher { 1734 import libasync.internals.path; 1735 private: 1736 EventLoop m_evLoop; 1737 fd_t m_fd; 1738 bool m_recursive; 1739 HANDLE m_handle; 1740 Path m_path; 1741 DWFileEvent m_events; 1742 DWHandlerInfo m_handler; // contains buffer 1743 shared AsyncSignal m_signal; 1744 ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer; 1745 DWORD m_bytesTransferred; 1746 public: 1747 this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) { 1748 import std.stdio : writeln; 1749 try writeln("Creating directory watcher for path: " ~ path.toNativeString()); catch {} 1750 m_fd = fd; 1751 m_recursive = recursive; 1752 m_handle = cast(HANDLE)hndl; 1753 m_evLoop = evl; 1754 m_path = path; 1755 m_handler = handler; 1756 1757 m_signal = new shared AsyncSignal(m_evLoop); 1758 m_signal.run(&onChanged); 1759 triggerWatch(); 1760 } 1761 package: 1762 void close() { 1763 CloseHandle(m_handle); 1764 m_signal.kill(); 1765 } 1766 1767 void triggerChanged() { 1768 m_signal.trigger(); 1769 } 1770 1771 void onChanged() { 1772 ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred]; 1773 do { 1774 assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); 1775 auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr; 1776 DWFileEvent kind; 1777 switch( fni.Action ){ 1778 default: kind = DWFileEvent.MODIFIED; break; 1779 case 0x1: kind = DWFileEvent.CREATED; break; 1780 case 0x2: kind = DWFileEvent.DELETED; break; 1781 case 0x3: kind = DWFileEvent.MODIFIED; break; 1782 case 0x4: kind = DWFileEvent.MOVED_FROM; break; 1783 case 0x5: kind = DWFileEvent.MOVED_TO; break; 1784 } 1785 string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[] 1786 m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename))); 1787 if( fni.NextEntryOffset == 0 ) break; 1788 result = result[fni.NextEntryOffset .. $]; 1789 } while(result.length > 0); 1790 1791 triggerWatch(); 1792 1793 m_handler.handler(); 1794 } 1795 1796 void triggerWatch() { 1797 1798 static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME| 1799 FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE; 1800 1801 OVERLAPPED* overlapped = FreeListObjectAlloc!OVERLAPPED.alloc(); 1802 overlapped.Internal = 0; 1803 overlapped.InternalHigh = 0; 1804 overlapped.Offset = 0; 1805 overlapped.OffsetHigh = 0; 1806 overlapped.Pointer = cast(void*)this; 1807 import std.stdio; 1808 DWORD bytesReturned; 1809 BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted); 1810 1811 import std.stdio; 1812 if (!success) 1813 writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]); 1814 1815 } 1816 1817 @property fd_t fd() const { 1818 return m_fd; 1819 } 1820 1821 @property HANDLE handle() const { 1822 return cast(HANDLE) m_handle; 1823 } 1824 1825 static nothrow extern(System) 1826 { 1827 void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) 1828 { 1829 import std.stdio; 1830 DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer); 1831 watcher.m_bytesTransferred = cbTransferred; 1832 try FreeListObjectAlloc!OVERLAPPED.free(overlapped); catch {} 1833 if (dwError != 0) 1834 try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch{} 1835 try watcher.triggerChanged(); 1836 catch (Exception e) { 1837 try writeln("Failed to trigger change"); catch {} 1838 } 1839 } 1840 } 1841 1842 } 1843 1844 /** 1845 Represents a network/socket address. (taken from vibe.core.net) 1846 */ 1847 public struct NetworkAddress { 1848 import std.c.windows.winsock : sockaddr, sockaddr_in, sockaddr_in6; 1849 private union { 1850 sockaddr addr; 1851 sockaddr_in addr_ip4; 1852 sockaddr_in6 addr_ip6; 1853 } 1854 1855 @property bool ipv6() const pure nothrow { return this.family == AF_INET6; } 1856 1857 /** Family (AF_) of the socket address. 1858 */ 1859 @property ushort family() const pure nothrow { return addr.sa_family; } 1860 /// ditto 1861 @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } 1862 1863 /** The port in host byte order. 1864 */ 1865 @property ushort port() 1866 const pure nothrow { 1867 switch (this.family) { 1868 default: assert(false, "port() called for invalid address family."); 1869 case AF_INET: return ntoh(addr_ip4.sin_port); 1870 case AF_INET6: return ntoh(addr_ip6.sin6_port); 1871 } 1872 } 1873 /// ditto 1874 @property void port(ushort val) 1875 pure nothrow { 1876 switch (this.family) { 1877 default: assert(false, "port() called for invalid address family."); 1878 case AF_INET: addr_ip4.sin_port = hton(val); break; 1879 case AF_INET6: addr_ip6.sin6_port = hton(val); break; 1880 } 1881 } 1882 1883 /** A pointer to a sockaddr struct suitable for passing to socket functions. 1884 */ 1885 @property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; } 1886 1887 /** Size of the sockaddr struct that is returned by sockAddr(). 1888 */ 1889 @property int sockAddrLen() 1890 const pure nothrow { 1891 switch (this.family) { 1892 default: assert(false, "sockAddrLen() called for invalid address family."); 1893 case AF_INET: return addr_ip4.sizeof; 1894 case AF_INET6: return addr_ip6.sizeof; 1895 } 1896 } 1897 1898 @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow 1899 in { assert (family == AF_INET); } 1900 body { return &addr_ip4; } 1901 1902 @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow 1903 in { assert (family == AF_INET6); } 1904 body { return &addr_ip6; } 1905 1906 /** Returns a string representation of the IP address 1907 */ 1908 string toAddressString() 1909 const { 1910 import std.array : appender; 1911 import std.string : format; 1912 import std.format : formattedWrite; 1913 1914 switch (this.family) { 1915 default: assert(false, "toAddressString() called for invalid address family."); 1916 case AF_INET: 1917 ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; 1918 return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 1919 case AF_INET6: 1920 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 1921 auto ret = appender!string(); 1922 ret.reserve(40); 1923 foreach (i; 0 .. 8) { 1924 if (i > 0) ret.put(':'); 1925 ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2])); 1926 } 1927 return ret.data; 1928 } 1929 } 1930 1931 /** Returns a full string representation of the address, including the port number. 1932 */ 1933 string toString() 1934 const { 1935 1936 import std.string : format; 1937 1938 auto ret = toAddressString(); 1939 switch (this.family) { 1940 default: assert(false, "toString() called for invalid address family."); 1941 case AF_INET: return ret ~ format(":%s", port); 1942 case AF_INET6: return format("[%s]:%s", ret, port); 1943 } 1944 } 1945 1946 } 1947 1948 private pure nothrow { 1949 import std.bitmanip; 1950 1951 ushort ntoh(ushort val) 1952 { 1953 version (LittleEndian) return swapEndian(val); 1954 else version (BigEndian) return val; 1955 else static assert(false, "Unknown endianness."); 1956 } 1957 1958 ushort hton(ushort val) 1959 { 1960 version (LittleEndian) return swapEndian(val); 1961 else version (BigEndian) return val; 1962 else static assert(false, "Unknown endianness."); 1963 } 1964 } 1965 enum WM_TCP_SOCKET = WM_USER+102; 1966 enum WM_UDP_SOCKET = WM_USER+103; 1967 enum WM_USER_EVENT = WM_USER+104; 1968 enum WM_USER_SIGNAL = WM_USER+105; 1969 1970 nothrow: 1971 size_t g_idxCapacity = 8; 1972 Array!size_t g_idxAvailable; 1973 1974 // called on run 1975 size_t createIndex() { 1976 size_t idx; 1977 import std.algorithm : max; 1978 import std.range : iota; 1979 try { 1980 1981 size_t getIdx() { 1982 1983 if (!g_idxAvailable.empty) { 1984 immutable size_t ret = g_idxAvailable.back; 1985 g_idxAvailable.removeBack(); 1986 return ret; 1987 } 1988 return 0; 1989 } 1990 1991 idx = getIdx(); 1992 if (idx == 0) { 1993 import std.range : iota; 1994 // todo: not sure about this 1995 g_idxAvailable.insert( iota(g_idxCapacity, max(32, g_idxCapacity * 2), 1) ); 1996 g_idxCapacity = max(32, g_idxCapacity * 2); 1997 idx = getIdx(); 1998 } 1999 2000 } catch {} 2001 2002 return idx; 2003 } 2004 2005 void destroyIndex(AsyncTimer ctxt) { 2006 try { 2007 g_idxAvailable.insert(ctxt.id); 2008 } 2009 catch (Exception e) { 2010 assert(false, "Error destroying index: " ~ e.msg); 2011 } 2012 } 2013 2014 2015 nothrow extern(System) { 2016 LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam) 2017 { 2018 auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); 2019 if (ptr is null) 2020 return DefWindowProcA(wnd, msg, wparam, lparam); 2021 auto appl = cast(EventLoopImpl*)ptr; 2022 MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init); 2023 if (appl.onMessage(obj)) { 2024 static if (DEBUG) { 2025 if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) { 2026 import std.stdio : writeln; 2027 try { writeln(appl.error, ": ", appl.m_status.text); } catch {} 2028 } 2029 } 2030 return 0; 2031 } 2032 else return DefWindowProcA(wnd, msg, wparam, lparam); 2033 } 2034 2035 BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam); 2036 2037 }