1 module libasync.posix; 2 3 version (Posix): 4 5 import libasync.types; 6 import std.string : toStringz; 7 import std.conv : to; 8 import std.datetime : Duration, msecs, seconds, SysTime; 9 import std.traits : isIntegral; 10 import std.typecons : Tuple, tuple; 11 import std.container : Array; 12 import std.exception; 13 14 import core.stdc.errno; 15 import libasync.events; 16 import libasync.internals.path; 17 import core.sys.posix.signal; 18 import libasync.posix2; 19 import libasync.internals.logging; 20 import core.sync.mutex; 21 import memutils.utils; 22 import memutils.hashmap; 23 24 alias fd_t = int; 25 26 27 version(linux) { 28 import libasync.internals.epoll; 29 extern(C) nothrow @nogc { 30 int __libc_current_sigrtmin(); 31 int __libc_current_sigrtmax(); 32 version(CRuntime_Glibc) int __res_init(); 33 } 34 bool g_signalsBlocked; 35 package nothrow void blockSignals() { 36 try { 37 /// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal 38 sigset_t mask; 39 // todo: use more signals for more event loops per thread.. (is this necessary?) 40 //foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) { 41 //import std.stdio : writeln; 42 //try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch (Throwable e) {} 43 sigemptyset(&mask); 44 sigaddset(&mask, cast(int) __libc_current_sigrtmin()); 45 pthread_sigmask(SIG_BLOCK, &mask, null); 46 //} 47 } catch (Throwable) {} 48 } 49 static this() { 50 blockSignals(); 51 g_signalsBlocked = true; 52 } 53 } 54 static if (is_OSX || is_iOS) { 55 import libasync.internals.kqueue; 56 } 57 version(FreeBSD) { 58 import libasync.internals.kqueue; 59 } 60 version(DragonFlyBSD) { 61 import libasync.internals.kqueue; 62 } 63 64 __gshared Mutex g_mutex; 65 66 static if (!EPOLL) { 67 private struct DWFileInfo { 68 fd_t folder; 69 Path path; 70 SysTime lastModified; 71 bool is_dir; 72 } 73 } 74 75 private struct DWFolderInfo { 76 WatchInfo wi; 77 fd_t fd; 78 } 79 80 package struct EventLoopImpl { 81 static if (EPOLL) { 82 pragma(msg, "Using Linux EPOLL for events"); 83 } 84 else /* if KQUEUE */ 85 { 86 pragma(msg, "Using FreeBSD KQueue for events"); 87 } 88 89 package: 90 alias error_t = EPosix; 91 92 nothrow: 93 private: 94 95 /// members 96 EventLoop m_evLoop; 97 ushort m_instanceId; 98 bool m_started; 99 StatusInfo m_status; 100 error_t m_error = EPosix.EOK; 101 EventInfo* m_evSignal; 102 static if (EPOLL){ 103 fd_t m_epollfd; 104 HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path) 105 } 106 else /* if KQUEUE */ 107 { 108 fd_t m_kqueuefd; 109 HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher 110 HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder) 111 HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file) 112 HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher 113 114 } 115 116 AsyncAcceptRequest.Queue m_completedSocketAccepts; 117 AsyncReceiveRequest.Queue m_completedSocketReceives; 118 AsyncSendRequest.Queue m_completedSocketSends; 119 120 package: 121 122 /// workaround for IDE indent bug on too big files 123 mixin RunKill!(); 124 125 @property bool started() const { 126 return m_started; 127 } 128 129 bool init(EventLoop evl) 130 in { assert(!m_started); } 131 body 132 { 133 134 import core.atomic; 135 shared static ushort i; 136 string* failer = null; 137 138 139 m_instanceId = i; 140 static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId); 141 142 core.atomic.atomicOp!"+="(i, cast(ushort) 1); 143 m_evLoop = evl; 144 145 import core.thread; 146 //try Thread.getThis().priority = Thread.PRIORITY_MAX; 147 //catch (Exception e) { assert(false, "Could not set thread priority"); } 148 149 try 150 if (!g_mutex) 151 g_mutex = new Mutex; 152 catch (Throwable) {} 153 154 static if (EPOLL) 155 { 156 157 if (!g_signalsBlocked) 158 blockSignals(); 159 assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel"); 160 m_epollfd = epoll_create1(EPOLL_CLOEXEC); 161 162 if (catchError!"epoll_create1"(m_epollfd)) 163 return false; 164 165 import core.sys.linux.sys.signalfd; 166 import core.thread : getpid; 167 168 fd_t err; 169 fd_t sfd; 170 171 sigset_t mask; 172 173 try { 174 sigemptyset(&mask); 175 sigaddset(&mask, __libc_current_sigrtmin()); 176 err = pthread_sigmask(SIG_BLOCK, &mask, null); 177 if (catchError!"sigprocmask"(err)) 178 { 179 m_status.code = Status.EVLOOP_FAILURE; 180 return false; 181 } 182 } catch (Throwable) { } 183 184 185 186 sfd = signalfd(-1, &mask, SFD_NONBLOCK); 187 assert(sfd > 0, "Failed to setup signalfd in epoll"); 188 189 EventType evtype; 190 191 epoll_event _event; 192 _event.events = EPOLLIN; 193 evtype = EventType.Signal; 194 try 195 m_evSignal = ThreadMem.alloc!EventInfo(sfd, evtype, EventObject.init, m_instanceId); 196 catch (Exception e){ 197 assert(false, "Allocation error"); 198 } 199 _event.data.ptr = cast(void*) m_evSignal; 200 201 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event); 202 if (catchError!"EPOLL_CTL_ADD(sfd)"(err)) 203 { 204 return false; 205 } 206 207 } 208 else /* if KQUEUE */ 209 { 210 try { 211 if (!gs_queueMutex) { 212 gs_queueMutex = ThreadMem.alloc!ReadWriteMutex(); 213 gs_signalQueue = Array!(Array!AsyncSignal)(); 214 gs_idxQueue = Array!(Array!size_t)(); 215 } 216 if (g_evIdxAvailable.empty) { 217 g_evIdxAvailable.reserve(32); 218 219 foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) { 220 g_evIdxAvailable.insertBack(k + 1); 221 } 222 g_evIdxCapacity = 32; 223 g_idxCapacity = 32; 224 } 225 } catch (Throwable) { assert(false, "Initialization failed"); } 226 m_kqueuefd = kqueue(); 227 int err; 228 try { 229 sigset_t mask; 230 sigemptyset(&mask); 231 sigaddset(&mask, SIGXCPU); 232 233 err = sigprocmask(SIG_BLOCK, &mask, null); 234 } catch (Throwable) {} 235 236 EventType evtype = EventType.Signal; 237 238 // use GC because ThreadMem fails at emplace for shared objects 239 try 240 m_evSignal = ThreadMem.alloc!EventInfo(SIGXCPU, evtype, EventObject.init, m_instanceId); 241 catch (Exception e) { 242 assert(false, "Failed to allocate resources"); 243 } 244 245 if (catchError!"siprocmask"(err)) 246 return 0; 247 248 kevent_t _event; 249 EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal); 250 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 251 if (catchError!"kevent_add(SIGXCPU)"(err)) 252 assert(false, "Add SIGXCPU failed at kevent call"); 253 } 254 255 static if (LOG) try log("init in thread " ~ Thread.getThis().name); catch (Throwable) {} 256 257 return true; 258 } 259 260 void exit() { 261 import core.sys.posix.unistd : close; 262 static if (EPOLL) { 263 close(m_epollfd); // not necessary? 264 265 // not necessary: 266 //try ThreadMem.free(m_evSignal); 267 //catch (Exception e) { assert(false, "Failed to free resources"); } 268 269 } 270 else 271 close(m_kqueuefd); 272 } 273 274 @property const(StatusInfo) status() const { 275 return m_status; 276 } 277 278 @property string error() const { 279 string* ptr; 280 return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init; 281 } 282 283 bool loop(Duration timeout = 0.seconds) 284 //in { assert(Fiber.getThis() is null); } 285 { 286 import libasync.internals.memory; 287 288 int num = void; 289 290 static if (EPOLL) { 291 static align(1) epoll_event[] events; 292 if (events is null) 293 { 294 try events = new epoll_event[128]; 295 catch (Exception e) { 296 assert(false, "Could not allocate events array: " ~ e.msg); 297 } 298 } 299 } else /* if KQUEUE */ { 300 import core.sys.posix.time : time_t; 301 import core.sys.posix.config : c_long; 302 303 static kevent_t[] events; 304 if (events.length == 0) { 305 try events = allocArray!kevent_t(manualAllocator(), 128); 306 catch (Exception e) { assert(false, "Could not allocate events array"); } 307 } 308 } 309 310 auto waitForEvents(Duration timeout) 311 { 312 static if (EPOLL) { 313 int timeout_ms; 314 if (timeout == 0.seconds) // return immediately 315 timeout_ms = 0; 316 else if (timeout == -1.seconds) // wait indefinitely 317 timeout_ms = -1; 318 else timeout_ms = cast(int) timeout.total!"msecs"; 319 /// Retrieve pending events 320 scope (exit) assert(events !is null && events.length <= 128); 321 return epoll_wait(m_epollfd, cast(epoll_event*) &events[0], 128, timeout_ms); 322 } else /* if KQUEUE */ { 323 if (timeout != -1.seconds) { 324 time_t secs = timeout.split!("seconds", "nsecs")().seconds; 325 c_long ns = timeout.split!("seconds", "nsecs")().nsecs; 326 auto tspec = libasync.internals.kqueue.timespec(secs, ns); 327 328 return kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec); 329 } else { 330 return kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, null); 331 } 332 } 333 } 334 335 auto handleEvents() 336 { 337 bool success = true; 338 339 static Tuple!(int, Status)[] errors = [ tuple(EINTR, Status.EVLOOP_TIMEOUT) ]; 340 341 if (catchEvLoopErrors!"event_poll'ing"(num, errors)) 342 return false; 343 344 if (num > 0) 345 static if (LOG) log("Got " ~ num.to!string ~ " event(s)"); 346 347 foreach(i; 0 .. num) { 348 success = false; 349 m_status = StatusInfo.init; 350 static if (EPOLL) 351 { 352 epoll_event _event = events[i]; 353 static if (LOG) try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch (Throwable e) {} 354 EventInfo* info = cast(EventInfo*) _event.data.ptr; 355 int event_flags = cast(int) _event.events; 356 } 357 else /* if KQUEUE */ 358 { 359 kevent_t _event = events[i]; 360 EventInfo* info = cast(EventInfo*) _event.udata; 361 //log("Got info"); 362 int event_flags = (_event.filter << 16) | (_event.flags & 0xffff); 363 //log("event flags"); 364 } 365 366 //if (info.owner != m_instanceId) 367 // static if (LOG) try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string); 368 // catch{} 369 //log("owner"); 370 switch (info.evType) { 371 case EventType.Event: 372 if (info.fd == 0) 373 break; 374 375 import core.sys.posix.unistd : close; 376 success = onEvent(info.fd, info.evObj.eventHandler, event_flags); 377 378 if (!success) { 379 close(info.fd); 380 assumeWontThrow(ThreadMem.free(info)); 381 } 382 break; 383 case EventType.Socket: 384 auto socket = info.evObj.socket; 385 if (socket.passive) { 386 success = onCOPSocketEvent(socket, event_flags); 387 } else if (socket.connectionOriented) { 388 success = onCOASocketEvent(socket, event_flags); 389 } else { 390 success = onCLSocketEvent(socket, event_flags); 391 } 392 break; 393 case EventType.TCPAccept: 394 if (info.fd == 0) 395 break; 396 success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags); 397 break; 398 399 case EventType.Notifier: 400 401 static if (LOG) log("Got notifier!"); 402 try info.evObj.notifierHandler(); 403 catch (Exception e) { 404 setInternalError!"notifierHandler"(Status.ERROR); 405 } 406 break; 407 408 case EventType.DirectoryWatcher: 409 static if (LOG) log("Got DirectoryWatcher event!"); 410 static if (!EPOLL) { 411 // in KQUEUE all events will be consumed here, because they must be pre-processed 412 try { 413 DWFileEvent fevent; 414 if (_event.fflags & (NOTE_LINK | NOTE_WRITE)) 415 fevent = DWFileEvent.CREATED; 416 else if (_event.fflags & NOTE_DELETE) 417 fevent = DWFileEvent.DELETED; 418 else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE)) 419 fevent = DWFileEvent.MODIFIED; 420 else if (_event.fflags & NOTE_RENAME) 421 fevent = DWFileEvent.MOVED_FROM; 422 else if (_event.fflags & NOTE_RENAME) 423 fevent = DWFileEvent.MOVED_TO; 424 else 425 assert(false, "No event found?"); 426 427 DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init); 428 429 if (fi == DWFolderInfo.init) { 430 DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init); 431 assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher"); 432 fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init); 433 assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher"); 434 } 435 436 // all recursive events will be generated here 437 if (!compareFolderFiles(fi, fevent)) { 438 continue; 439 } 440 441 } catch (Exception e) { 442 static if (LOG) log("Could not process DirectoryWatcher event: " ~ e.msg); 443 break; 444 } 445 446 } 447 448 try info.evObj.dwHandler(); 449 catch (Exception e) { 450 setInternalError!"dwHandler"(Status.ERROR); 451 } 452 break; 453 454 case EventType.Timer: 455 static if (LOG) try log("Got timer! " ~ info.fd.to!string); catch (Throwable e) {} 456 static if (EPOLL) { 457 static long val; 458 import core.sys.posix.unistd : read; 459 read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof); 460 } else { 461 } 462 try info.evObj.timerHandler(); 463 catch (Exception e) { 464 setInternalError!"timerHandler"(Status.ERROR); 465 } 466 static if (!EPOLL) { 467 auto ctxt = info.evObj.timerHandler.ctxt; 468 if (ctxt && ctxt.oneShot && !ctxt.rearmed) { 469 kevent_t __event; 470 EV_SET(&__event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null); 471 int err = kevent(m_kqueuefd, &__event, 1, null, 0, null); 472 if (catchError!"kevent_del(timer)"(err)) 473 return false; 474 } 475 } 476 break; 477 478 case EventType.Signal: 479 static if (LOG) try log("Got signal!"); catch (Throwable e) {} 480 481 static if (EPOLL) { 482 483 static if (LOG) try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch (Throwable e) {} 484 import core.sys.linux.sys.signalfd : signalfd_siginfo; 485 import core.sys.posix.unistd : read; 486 signalfd_siginfo fdsi; 487 fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof); 488 shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr; 489 490 try sig.handler(); 491 catch (Exception e) { 492 setInternalError!"signal handler"(Status.ERROR); 493 } 494 495 496 } 497 else /* if KQUEUE */ 498 { 499 static AsyncSignal[] sigarr; 500 501 if (sigarr.length == 0) { 502 try sigarr = new AsyncSignal[32]; 503 catch (Exception e) { assert(false, "Could not allocate signals array"); } 504 } 505 506 bool more = popSignals(sigarr); 507 foreach (AsyncSignal sig; sigarr) 508 { 509 shared AsyncSignal ptr = cast(shared AsyncSignal) sig; 510 if (ptr is null) 511 break; 512 try (cast(shared AsyncSignal)sig).handler(); 513 catch (Exception e) { 514 setInternalError!"signal handler"(Status.ERROR); 515 } 516 } 517 } 518 break; 519 520 case EventType.UDPSocket: 521 import core.sys.posix.unistd : close; 522 success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags); 523 524 nothrow void abortHandler(bool graceful) { 525 526 close(info.fd); 527 info.evObj.udpHandler.conn.socket = 0; 528 try info.evObj.udpHandler(UDPEvent.ERROR); 529 catch (Exception e) { } 530 try ThreadMem.free(info); 531 catch (Exception e){ assert(false, "Error freeing resources"); } 532 } 533 534 if (!success && m_status.code == Status.ABORT) { 535 abortHandler(true); 536 537 } 538 else if (!success && m_status.code == Status.ERROR) { 539 abortHandler(false); 540 } 541 break; 542 case EventType.TCPTraffic: 543 assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid"); 544 545 success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn); 546 547 nothrow void abortTCPHandler(bool graceful) { 548 549 nothrow void closeAll() { 550 static if (LOG) try log("closeAll()"); catch (Throwable e) {} 551 if (info.evObj.tcpEvHandler.conn.connected) 552 closeSocket(info.fd, true, true); 553 554 info.evObj.tcpEvHandler.conn.socket = 0; 555 } 556 557 /// Close the connection after an unexpected socket error 558 if (graceful) { 559 try info.evObj.tcpEvHandler(TCPEvent.CLOSE); 560 catch (Exception e) { static if(LOG) log("Close failure"); } 561 closeAll(); 562 } 563 564 /// Kill the connection after an internal error 565 else { 566 try info.evObj.tcpEvHandler(TCPEvent.ERROR); 567 catch (Exception e) { static if(LOG) log("Error failure"); } 568 closeAll(); 569 } 570 571 if (info.evObj.tcpEvHandler.conn.inbound) { 572 static if (LOG) log("Freeing inbound connection FD#" ~ info.fd.to!string); 573 try ThreadMem.free(info.evObj.tcpEvHandler.conn); 574 catch (Exception e){ assert(false, "Error freeing resources"); } 575 } 576 try ThreadMem.free(info); 577 catch (Exception e){ assert(false, "Error freeing resources"); } 578 } 579 580 if (!success && m_status.code == Status.ABORT) { 581 abortTCPHandler(true); 582 } 583 else if (!success && m_status.code == Status.ERROR) { 584 abortTCPHandler(false); 585 } 586 break; 587 default: 588 break; 589 } 590 591 } 592 593 return success; 594 } 595 596 if (m_completedSocketAccepts.empty && m_completedSocketReceives.empty && m_completedSocketSends.empty) { 597 num = waitForEvents(timeout); 598 return handleEvents(); 599 } else { 600 num = waitForEvents(0.seconds); 601 if (num != 0 && !handleEvents()) return false; 602 603 foreach (request; m_completedSocketAccepts) { 604 m_completedSocketAccepts.removeFront(); 605 auto socket = request.socket; 606 auto peer = request.onComplete(request.peer, request.family, socket.info.type, socket.info.protocol); 607 assumeWontThrow(AsyncAcceptRequest.free(request)); 608 if (!peer.run) { 609 m_status.code = Status.ABORT; 610 peer.kill(); 611 peer.handleError(); 612 return false; 613 } 614 } 615 616 foreach (request; m_completedSocketReceives) { 617 if (request.socket.receiveContinuously) { 618 m_completedSocketReceives.removeFront(); 619 assumeWontThrow(request.onComplete.get!0)(request.message.transferred); 620 if (request.socket.receiveContinuously && request.socket.alive) { 621 request.message.count = 0; 622 submitRequest(request); 623 } else { 624 assumeWontThrow(NetworkMessage.free(request.message)); 625 assumeWontThrow(AsyncReceiveRequest.free(request)); 626 } 627 } else { 628 m_completedSocketReceives.removeFront(); 629 if (request.message) { 630 assumeWontThrow(request.onComplete.get!0)(request.message.transferred); 631 assumeWontThrow(NetworkMessage.free(request.message)); 632 } else { 633 assumeWontThrow(request.onComplete.get!1)(); 634 } 635 assumeWontThrow(AsyncReceiveRequest.free(request)); 636 } 637 } 638 639 foreach (request; m_completedSocketSends) { 640 m_completedSocketSends.removeFront(); 641 request.onComplete(); 642 assumeWontThrow(NetworkMessage.free(request.message)); 643 assumeWontThrow(AsyncSendRequest.free(request)); 644 } 645 646 return true; 647 } 648 } 649 650 bool setOption(T)(fd_t fd, TCPOption option, in T value) { 651 m_status = StatusInfo.init; 652 import std.traits : isIntegral; 653 654 import libasync.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT; 655 int err; 656 nothrow bool errorHandler() { 657 if (catchError!"setOption:"(err)) { 658 try m_status.text ~= option.to!string; 659 catch (Exception e){ assert(false, "to!string conversion failure"); } 660 return false; 661 } 662 663 return true; 664 } 665 final switch (option) { 666 case TCPOption.NODELAY: // true/false 667 static if (!is(T == bool)) 668 assert(false, "NODELAY value type must be bool, not " ~ T.stringof); 669 else { 670 int val = value?1:0; 671 socklen_t len = val.sizeof; 672 err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); 673 return errorHandler(); 674 } 675 case TCPOption.REUSEADDR: // true/false 676 static if (!is(T == bool)) 677 assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof); 678 else { 679 int val = value?1:0; 680 socklen_t len = val.sizeof; 681 err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); 682 if (!errorHandler()) 683 return false; 684 version (Posix) { 685 version (linux) { 686 return true; 687 } else { 688 // BSD systems have SO_REUSEPORT 689 import libasync.internals.socket_compat : SO_REUSEPORT; 690 err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len); 691 return errorHandler(); 692 } 693 } 694 } 695 case TCPOption.REUSEPORT: // true/false 696 // use a standalone REUSEPORT option to handle SO_REUSEPORT on linux 697 version (linux) { 698 static if (!is(T == bool)) 699 assert(false, "REUSEPORT value type must be bool, not " ~ T.stringof); 700 else { 701 // BSD systems have SO_REUSEPORT 702 import libasync.internals.socket_compat : SO_REUSEPORT; 703 int val = value?1:0; 704 err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, val.sizeof); 705 706 // Not all linux kernels support SO_REUSEPORT 707 // ignore invalid and not supported errors on linux 708 if (errno == EINVAL || errno == ENOPROTOOPT) { 709 return true; 710 } 711 712 return errorHandler(); 713 } 714 } else return true; 715 case TCPOption.QUICK_ACK: 716 static if (!is(T == bool)) 717 assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof); 718 else { 719 static if (EPOLL) { 720 int val = value?1:0; 721 socklen_t len = val.sizeof; 722 err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len); 723 return errorHandler(); 724 } 725 else /* not linux */ { 726 return false; 727 } 728 } 729 case TCPOption.KEEPALIVE_ENABLE: // true/false 730 static if (!is(T == bool)) 731 assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof); 732 else 733 { 734 int val = value?1:0; 735 socklen_t len = val.sizeof; 736 err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); 737 return errorHandler(); 738 } 739 case TCPOption.KEEPALIVE_COUNT: // ## 740 static if (!isIntegral!T) 741 assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof); 742 else { 743 int val = value.total!"msecs".to!uint; 744 socklen_t len = val.sizeof; 745 err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len); 746 return errorHandler(); 747 } 748 case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds 749 static if (!is(T == Duration)) 750 assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof); 751 else { 752 int val; 753 try val = value.total!"seconds".to!uint; catch (Throwable e) { return false; } 754 socklen_t len = val.sizeof; 755 err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len); 756 return errorHandler(); 757 } 758 case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start 759 static if (!is(T == Duration)) 760 assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof); 761 else { 762 int val; 763 try val = value.total!"seconds".to!uint; catch (Throwable e) { return false; } 764 socklen_t len = val.sizeof; 765 err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len); 766 return errorHandler(); 767 } 768 case TCPOption.BUFFER_RECV: // bytes 769 static if (!isIntegral!T) 770 assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof); 771 else { 772 int val = value.to!int; 773 socklen_t len = val.sizeof; 774 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); 775 return errorHandler(); 776 } 777 case TCPOption.BUFFER_SEND: // bytes 778 static if (!isIntegral!T) 779 assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof); 780 else { 781 int val = value.to!int; 782 socklen_t len = val.sizeof; 783 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); 784 return errorHandler(); 785 } 786 case TCPOption.TIMEOUT_RECV: 787 static if (!is(T == Duration)) 788 assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof); 789 else { 790 import core.sys.posix.sys.time : timeval; 791 time_t secs = cast(time_t) value.split!("seconds", "usecs")().seconds; 792 suseconds_t us; 793 try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch (Throwable e) {} 794 timeval t = timeval(secs, us); 795 socklen_t len = t.sizeof; 796 err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len); 797 return errorHandler(); 798 } 799 case TCPOption.TIMEOUT_SEND: 800 static if (!is(T == Duration)) 801 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 802 else { 803 import core.sys.posix.sys.time : timeval; 804 auto timeout = value.split!("seconds", "usecs")(); 805 timeval t; 806 try t = timeval(timeout.seconds.to!time_t, timeout.usecs.to!suseconds_t); 807 catch (Exception) { return false; } 808 socklen_t len = t.sizeof; 809 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len); 810 return errorHandler(); 811 } 812 case TCPOption.TIMEOUT_HALFOPEN: 813 static if (!is(T == Duration)) 814 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 815 else { 816 uint val; 817 try val = value.total!"msecs".to!uint; catch (Throwable e) { 818 return false; 819 } 820 socklen_t len = val.sizeof; 821 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len); 822 return errorHandler(); 823 } 824 case TCPOption.LINGER: // bool onOff, int seconds 825 static if (!is(T == Tuple!(bool, int))) 826 assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof); 827 else { 828 linger l = linger(val[0]?1:0, val[1]); 829 socklen_t llen = l.sizeof; 830 err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen); 831 return errorHandler(); 832 } 833 case TCPOption.CONGESTION: 834 static if (!isIntegral!T) 835 assert(false, "CONGESTION value type must be integral, not " ~ T.stringof); 836 else { 837 int val = value.to!int; 838 len = int.sizeof; 839 err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len); 840 return errorHandler(); 841 } 842 case TCPOption.CORK: 843 static if (!isIntegral!T) 844 assert(false, "CORK value type must be int, not " ~ T.stringof); 845 else { 846 static if (EPOLL) { 847 int val = value.to!int; 848 socklen_t len = val.sizeof; 849 err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len); 850 return errorHandler(); 851 } 852 else /* if KQUEUE */ { 853 int val = value.to!int; 854 socklen_t len = val.sizeof; 855 err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len); 856 return errorHandler(); 857 858 } 859 } 860 case TCPOption.DEFER_ACCEPT: // seconds 861 static if (!isIntegral!T) 862 assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof); 863 else { 864 static if (EPOLL) { 865 int val = value.to!int; 866 socklen_t len = val.sizeof; 867 err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len); 868 return errorHandler(); 869 } 870 else /* if KQUEUE */ { 871 // todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9) 872 /*int val = value.to!int; 873 socklen_t len = val.sizeof; 874 err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len); 875 return errorHandler(); 876 */ 877 assert(false, "TCPOption.DEFER_ACCEPT is not implemented"); 878 } 879 } 880 } 881 882 } 883 884 pragma(inline, true) 885 uint recv(in fd_t fd, ref ubyte[] data) 886 { 887 static if (LOG) try log("Recv from FD: " ~ fd.to!string); catch (Throwable e) {} 888 m_status = StatusInfo.init; 889 import libasync.internals.socket_compat : recv; 890 int ret = cast(int) recv(fd, cast(void*) data.ptr, data.length, cast(int)0); 891 892 static if (LOG) log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string); 893 if (catchError!".recv"(ret)){ 894 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 895 m_status.code = Status.ASYNC; 896 else if (m_error == EPosix.EINTR) 897 m_status.code = Status.RETRY; 898 899 return 0; 900 } 901 902 return cast(uint) ret < 0 ? 0 : ret; 903 } 904 905 pragma(inline, true) 906 uint send(in fd_t fd, in ubyte[] data) 907 { 908 static if (LOG) try log("Send to FD: " ~ fd.to!string); catch (Throwable e) {} 909 m_status = StatusInfo.init; 910 import libasync.internals.socket_compat : send; 911 int ret = cast(int) send(fd, cast(const(void)*) data.ptr, data.length, cast(int)0); 912 static if (LOG) try log("Sent: " ~ ret.to!string); catch (Throwable e) {} 913 if (catchError!"send"(ret)) { // ret == -1 914 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 915 m_status.code = Status.ASYNC; 916 else if (m_error == EPosix.EINTR) 917 m_status.code = Status.RETRY; 918 return 0; 919 920 } 921 return cast(uint) ret < 0 ? 0 : ret; 922 } 923 924 size_t recvMsg(in fd_t fd, NetworkMessage* msg) 925 { 926 import libasync.internals.socket_compat : recvmsg, msghdr, iovec, sockaddr_storage; 927 928 while (true) { 929 auto err = recvmsg(fd, msg.header, 0); 930 931 .tracef("recvmsg system call on FD %d returned %d", fd, err); 932 if (err == SOCKET_ERROR) { 933 m_error = lastError(); 934 935 if (m_error == EPosix.EINTR) { 936 .tracef("recvmsg system call on FD %d was interrupted before any transfer occured", fd); 937 continue; 938 } else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) { 939 .tracef("recvmsg system call on FD %d would have blocked", fd); 940 m_status.code = Status.ASYNC; 941 return 0; 942 } else if (m_error == EBADF || 943 m_error == EFAULT || 944 m_error == EINVAL || 945 m_error == ENOTCONN || 946 m_error == ENOTSOCK) { 947 .errorf("recvmsg system call on FD %d encountered fatal socket error: %s", fd, this.error); 948 assert(false); 949 } else if (catchError!"Receive message"(err)) { 950 .errorf("recvmsg system call on FD %d encountered socket error: %s", fd, this.error); 951 return 0; 952 } 953 } else { 954 .tracef("Received %d bytes on FD %d", err, fd); 955 m_status.code = Status.OK; 956 return err; 957 } 958 } 959 } 960 961 size_t sendMsg(in fd_t fd, NetworkMessage* msg) { 962 import libasync.internals.socket_compat : sendmsg; 963 964 .tracef("Send message on FD %d with size %d", fd, msg.header.msg_iov.iov_len); 965 m_status = StatusInfo.init; 966 967 while (true) { 968 auto err = sendmsg(fd, msg.header, 0); 969 970 .tracef("sendmsg system call on FD %d returned %d", fd, err); 971 if (err == SOCKET_ERROR) { 972 m_error = lastError(); 973 974 if (m_error == EPosix.EINTR) { 975 .tracef("sendmsg system call on FD %d was interrupted before any transfer occured", fd); 976 continue; 977 } else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) { 978 .tracef("sendmsg system call on FD %d would have blocked", fd); 979 m_status.code = Status.ASYNC; 980 return 0; 981 } else if (m_error == ECONNRESET || 982 m_error == EPIPE) { 983 return 0; 984 } else if (m_error == EBADF || 985 m_error == EDESTADDRREQ || 986 m_error == EFAULT || 987 m_error == EINVAL || 988 m_error == EISCONN || 989 m_error == ENOTSOCK || 990 m_error == EOPNOTSUPP) { 991 .errorf("sendmsg system call on FD %d encountered fatal socket error: %s", fd, this.error); 992 assert(false); 993 // ENOTCONN, EMSGSIZE 994 } else if (catchError!"Send message"(err)) { 995 .errorf("sendmsg system call on FD %d encountered socket error: %s", fd, this.error); 996 return 0; 997 } 998 } else { 999 .tracef("Sent %d bytes on FD %d", err, fd); 1000 m_status.code = Status.OK; 1001 return err; 1002 } 1003 } 1004 } 1005 1006 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) 1007 { 1008 import libasync.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t; 1009 1010 m_status = StatusInfo.init; 1011 1012 addr.family = AF_INET6; 1013 socklen_t addrLen = addr.sockAddrLen; 1014 long ret = recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen); 1015 1016 if (addrLen < addr.sockAddrLen) { 1017 addr.family = AF_INET; 1018 } 1019 1020 static if (LOG) try log("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable e) {} 1021 if (catchError!".recvfrom"(ret)) { // ret == -1 1022 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 1023 m_status.code = Status.ASYNC; 1024 return 0; 1025 } 1026 1027 m_status.code = Status.OK; 1028 1029 return cast(uint) ret; 1030 } 1031 1032 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) 1033 { 1034 import libasync.internals.socket_compat : sendto; 1035 1036 m_status = StatusInfo.init; 1037 1038 static if (LOG) try log("SENDTO " ~ data.length.to!string ~ "B"); 1039 catch (Throwable e) {} 1040 long ret = sendto(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen); 1041 1042 if (catchError!".sendto"(ret)) { // ret == -1 1043 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 1044 m_status.code = Status.ASYNC; 1045 return 0; 1046 } 1047 1048 m_status.code = Status.OK; 1049 return cast(uint) ret; 1050 } 1051 1052 NetworkAddress localAddr(in fd_t fd, bool ipv6) { 1053 NetworkAddress ret; 1054 import libasync.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr; 1055 1056 if (ipv6) 1057 ret.family = AF_INET6; 1058 else 1059 ret.family = AF_INET; 1060 1061 socklen_t len = ret.sockAddrLen; 1062 int err = getsockname(fd, ret.sockAddr, &len); 1063 if (catchError!"getsockname"(err)) 1064 return NetworkAddress.init; 1065 if (len > ret.sockAddrLen) 1066 ret.family = AF_INET6; 1067 1068 return ret; 1069 } 1070 1071 bool notify(in fd_t fd, AsyncNotifier ctxt) 1072 { 1073 static if (EPOLL) 1074 { 1075 import core.sys.posix.unistd : write; 1076 1077 long val = 1; 1078 fd_t err = cast(fd_t) write(fd, &val, long.sizeof); 1079 1080 if (catchError!"write(notify)"(err)) { 1081 return false; 1082 } 1083 return true; 1084 } 1085 else /* if KQUEUE */ 1086 { 1087 kevent_t _event; 1088 EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo); 1089 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1090 1091 if (catchError!"kevent_notify"(err)) { 1092 return false; 1093 } 1094 return true; 1095 } 1096 } 1097 1098 bool notify(in fd_t fd, shared AsyncSignal ctxt) 1099 { 1100 static if (EPOLL) 1101 { 1102 1103 sigval sigvl; 1104 fd_t err; 1105 sigvl.sival_ptr = cast(void*) ctxt; 1106 try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch (Throwable) {} 1107 if (catchError!"sigqueue"(err)) { 1108 return false; 1109 } 1110 } 1111 else /* if KQUEUE */ 1112 { 1113 1114 import core.thread : getpid; 1115 1116 addSignal(ctxt); 1117 1118 try { 1119 static if (LOG) log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string); 1120 int err = core.sys.posix.signal.kill(getpid(), SIGXCPU); 1121 if (catchError!"notify(signal)"(err)) 1122 assert(false, "Signal could not be raised"); 1123 } catch (Throwable) {} 1124 } 1125 1126 return true; 1127 } 1128 1129 // no known uses 1130 uint read(in fd_t fd, ref ubyte[] data) 1131 { 1132 m_status = StatusInfo.init; 1133 return 0; 1134 } 1135 1136 // no known uses 1137 uint write(in fd_t fd, in ubyte[] data) 1138 { 1139 m_status = StatusInfo.init; 1140 return 0; 1141 } 1142 1143 uint watch(in fd_t fd, in WatchInfo info) { 1144 // note: info.wd is still 0 at this point. 1145 m_status = StatusInfo.init; 1146 import core.sys.linux.sys.inotify; 1147 import std.file : dirEntries, isDir, SpanMode; 1148 1149 static if (EPOLL) { 1150 // Manually handle recursivity... All events show up under the same inotify 1151 uint events = info.events; // values for this API were pulled from inotify 1152 if (events & IN_DELETE) 1153 events |= IN_DELETE_SELF; 1154 if (events & IN_MOVED_FROM) 1155 events |= IN_MOVE_SELF; 1156 1157 nothrow fd_t addFolderRecursive(Path path) { 1158 fd_t ret; 1159 try { 1160 ret = inotify_add_watch(fd, path.toNativeString().toStringz, events); 1161 if (catchError!"inotify_add_watch"(ret)) 1162 return fd_t.init; 1163 static if (LOG) try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch (Throwable) {} 1164 assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string); 1165 m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd); 1166 } catch (Exception e) { 1167 try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch (Throwable) {} 1168 return 0; 1169 } 1170 1171 if (info.recursive) { 1172 try { 1173 foreach (de; path.toNativeString().dirEntries(SpanMode.shallow)) 1174 { 1175 Path de_path = Path(de.name); 1176 if (!de_path.absolute) 1177 de_path = path ~ Path(de.name); 1178 if (isDir(de_path.toNativeString())) 1179 if (addFolderRecursive(de_path) == 0) 1180 continue; 1181 } 1182 } catch (Exception e) { 1183 try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add sub-directories of " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch (Throwable) {} 1184 } 1185 } 1186 1187 return ret; 1188 } 1189 1190 return addFolderRecursive(info.path); 1191 1192 } else /* if KQUEUE */ { 1193 /// Manually handle recursivity & file tracking. Each folder is an event! 1194 /// E.g. file creation shows up as a folder change, we must be prepared to seek the file. 1195 import core.sys.posix.fcntl; 1196 import libasync.internals.kqueue; 1197 1198 uint events; 1199 if (info.events & DWFileEvent.CREATED) 1200 events |= NOTE_LINK | NOTE_WRITE; 1201 if (info.events & DWFileEvent.DELETED) 1202 events |= NOTE_DELETE; 1203 if (info.events & DWFileEvent.MODIFIED) 1204 events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE; 1205 if (info.events & DWFileEvent.MOVED_FROM) 1206 events |= NOTE_RENAME; 1207 if (info.events & DWFileEvent.MOVED_TO) 1208 events |= NOTE_RENAME; 1209 1210 EventInfo* evinfo; 1211 try { 1212 evinfo = m_watchers[fd]; 1213 } catch (Throwable) { 1214 assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); 1215 } 1216 1217 /// we need a file descriptor for the containers, so we open files but we don't monitor them 1218 /// todo: track indexes internally? 1219 nothrow fd_t addRecursive(Path path, bool is_dir) { 1220 int ret; 1221 try { 1222 static if (LOG) log("Adding path: " ~ path.toNativeString()); 1223 1224 ret = open(path.toNativeString().toStringz, O_EVTONLY); 1225 if (catchError!"open(watch)"(ret)) 1226 return 0; 1227 1228 if (is_dir) 1229 m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd); 1230 1231 kevent_t _event; 1232 1233 EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo); 1234 1235 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1236 1237 if (catchError!"kevent_timer_add"(err)) 1238 return 0; 1239 1240 1241 if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) { 1242 Path filePath = Path(de.name); 1243 if (!filePath.absolute) 1244 filePath = path ~ filePath; 1245 fd_t fwd; 1246 if (info.recursive && isDir(filePath.toNativeString())) 1247 fwd = addRecursive(filePath, true); 1248 else { 1249 fwd = addRecursive(filePath, false); // gets an ID but will not scan 1250 m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString())); 1251 } 1252 1253 } 1254 1255 } catch (Exception e) { 1256 try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg); 1257 catch (Throwable) {} 1258 return 0; 1259 } 1260 return ret; 1261 } 1262 1263 fd_t wd; 1264 1265 try { 1266 wd = addRecursive(info.path, isDir(info.path.toNativeString())); 1267 1268 if (wd == 0) 1269 return 0; 1270 1271 } 1272 catch (Exception e) { 1273 setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg); 1274 } 1275 1276 return cast(uint) wd; 1277 } 1278 } 1279 1280 bool unwatch(in fd_t fd, in uint wd) { 1281 // the wd can be used with m_dwFolders to find the DWFolderInfo 1282 // and unwatch everything recursively. 1283 1284 m_status = StatusInfo.init; 1285 static if (EPOLL) { 1286 /// If recursive, all subfolders must also be unwatched recursively by removing them 1287 /// from containers and from inotify 1288 import core.sys.linux.sys.inotify; 1289 1290 nothrow bool removeAll(DWFolderInfo fi) { 1291 int err; 1292 try { 1293 1294 bool inotify_unwatch(uint wd) { 1295 err = inotify_rm_watch(fd, wd); 1296 1297 if (catchError!"inotify_rm_watch"(err)) 1298 return false; 1299 return true; 1300 } 1301 1302 if (!inotify_unwatch(fi.wi.wd)) 1303 return false; 1304 1305 /*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) 1306 { 1307 if (file.folder == fi.wi.wd) { 1308 inotify_unwatch(id); 1309 m_dwFiles.remove(id); 1310 } 1311 }*/ 1312 m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd)); 1313 1314 if (fi.wi.recursive) { 1315 // find all subdirectories by comparing the path 1316 Array!(Tuple!(fd_t, uint)) remove_list; 1317 foreach (ref const key, ref const DWFolderInfo folder; m_dwFolders) { 1318 if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) { 1319 1320 if (!inotify_unwatch(folder.wi.wd)) 1321 return false; 1322 1323 remove_list.insertBack(key); 1324 } 1325 } 1326 foreach (rm_wd; remove_list[]) 1327 m_dwFolders.remove(rm_wd); 1328 1329 } 1330 return true; 1331 } catch (Exception e) { 1332 try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString()); 1333 catch (Throwable) {} 1334 return false; 1335 } 1336 } 1337 1338 DWFolderInfo info; 1339 1340 try { 1341 info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init); 1342 if (info == DWFolderInfo.init) { 1343 setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string); 1344 return false; 1345 } 1346 } catch (Throwable) { } 1347 1348 return removeAll(info); 1349 } 1350 else /* if KQUEUE */ { 1351 1352 /// Recursivity must be handled manually, so we must unwatch subfiles and subfolders 1353 /// recursively, remove the container entries, close the file descriptor, and disable the vnode events. 1354 1355 nothrow bool removeAll(DWFolderInfo fi) { 1356 import core.sys.posix.unistd : close; 1357 1358 1359 bool event_unset(uint id) { 1360 kevent_t _event; 1361 EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null); 1362 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1363 if (catchError!"kevent_unwatch"(err)) 1364 return false; 1365 return true; 1366 } 1367 1368 bool removeFolder(uint wd) { 1369 if (!event_unset(fi.wi.wd)) 1370 return false; 1371 m_dwFolders.remove(fi.wi.wd); 1372 int err = close(fi.wi.wd); 1373 if (catchError!"close dir"(err)) 1374 return false; 1375 return true; 1376 } 1377 1378 try { 1379 removeFolder(fi.wi.wd); 1380 1381 if (fi.wi.recursive) { 1382 import std.container.array; 1383 Array!fd_t remove_list; // keep track of unwatched folders recursively 1384 Array!fd_t remove_file_list; 1385 // search for subfolders and unset them / close their wd 1386 foreach (ref const DWFolderInfo folder; m_dwFolders) { 1387 if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) { 1388 1389 if (!event_unset(folder.wi.wd)) 1390 return false; 1391 1392 // search for subfiles, close their descriptors and remove them from the file list 1393 foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) { 1394 if (file.folder == folder.wi.wd) { 1395 close(fwd); 1396 remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop 1397 } 1398 } 1399 1400 remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop 1401 } 1402 } 1403 1404 foreach (wd; remove_file_list[]) 1405 m_dwFiles.remove(wd); 1406 1407 foreach (rm_wd; remove_list[]) 1408 removeFolder(rm_wd); 1409 1410 1411 } 1412 } catch (Exception e) { 1413 try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString()); 1414 catch (Throwable) {} 1415 return false; 1416 } 1417 1418 return true; 1419 } 1420 1421 DWFolderInfo info; 1422 try info = m_dwFolders.get(wd, DWFolderInfo.init); catch (Throwable) {} 1423 1424 if (!removeAll(info)) 1425 return false; 1426 return true; 1427 } 1428 } 1429 1430 // returns the amount of changes 1431 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 1432 m_status = StatusInfo.init; 1433 1434 static if (EPOLL) { 1435 assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array"); 1436 import core.sys.linux.sys.inotify; 1437 import core.sys.posix.unistd : read; 1438 import core.stdc.stdio : FILENAME_MAX; 1439 import core.stdc.string : strlen; 1440 ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void; 1441 ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof); 1442 if (catchError!"read()"(nread)) 1443 { 1444 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 1445 m_status.code = Status.ASYNC; 1446 return 0; 1447 } 1448 assert(nread > 0); 1449 1450 1451 /// starts (recursively) watching all newly created folders in a recursive entry, 1452 /// creates events for additional files/folders founds, and unwatches all deleted folders 1453 void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) { 1454 import std.file : dirEntries, SpanMode, isDir; 1455 assert(fi.wi.recursive); 1456 // get a list of stuff in the created/moved folder 1457 if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) { 1458 foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) { 1459 Path entryPath = Path(de.name); 1460 if (!entryPath.absolute) 1461 entryPath = fi.wi.path ~ entryPath; 1462 1463 if (fi.wi.recursive && isDir(entryPath.toNativeString())) { 1464 1465 watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) ); 1466 void genEvents(Path subpath) { 1467 foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) { 1468 auto subsubpath = Path(de.name); 1469 if (!subsubpath.absolute) 1470 subsubpath = subpath ~ subsubpath; 1471 changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath)); 1472 if (isDir(subsubpath.toNativeString())) 1473 genEvents(subsubpath); 1474 } 1475 } 1476 1477 genEvents(entryPath); 1478 1479 } 1480 } 1481 } 1482 } 1483 1484 size_t i; 1485 do 1486 { 1487 for (auto p = buf.ptr; p < buf.ptr + nread; ) 1488 { 1489 inotify_event* ev = cast(inotify_event*)p; 1490 p += inotify_event.sizeof + ev.len; 1491 1492 DWFileEvent evtype; 1493 evtype = DWFileEvent.CREATED; 1494 if (ev.mask & IN_CREATE) 1495 evtype = DWFileEvent.CREATED; 1496 if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF) 1497 evtype = DWFileEvent.DELETED; 1498 if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF) 1499 evtype = DWFileEvent.MOVED_FROM; 1500 if (ev.mask & (IN_MOVED_TO)) 1501 evtype = DWFileEvent.MOVED_TO; 1502 if (ev.mask & IN_MODIFY) 1503 evtype = DWFileEvent.MODIFIED; 1504 1505 import std.path : buildPath; 1506 import core.stdc.string : strlen; 1507 string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup; 1508 DWFolderInfo fi; 1509 Path path; 1510 try { 1511 fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init); 1512 if (fi == DWFolderInfo.init) { 1513 setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string); 1514 continue; 1515 } 1516 path = fi.wi.path ~ Path(name); 1517 } 1518 catch (Exception e) { 1519 setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders"); 1520 return 0; 1521 } 1522 1523 dst[i] = DWChangeInfo(evtype, path); 1524 import std.file : isDir; 1525 bool is_dir; 1526 try is_dir = isDir(path.toNativeString()); catch (Throwable) {} 1527 if (fi.wi.recursive && is_dir) { 1528 1529 try { 1530 Array!DWChangeInfo changes; 1531 recurseInto(fi, evtype, changes); 1532 // stop watching if the folder was deleted 1533 if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) { 1534 unwatch(fi.fd, fi.wi.wd); 1535 } 1536 foreach (change; changes[]) { 1537 i++; 1538 if (dst.length <= i) 1539 dst ~= change; 1540 else dst[i] = change; 1541 } 1542 } 1543 catch (Exception e) { 1544 setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively."); 1545 return 0; 1546 } 1547 1548 } 1549 1550 1551 i++; 1552 if (i >= dst.length) 1553 return cast(uint) i; 1554 } 1555 static if (LOG) foreach (j; 0 .. i) { 1556 static if (LOG) try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch (Throwable e) {} 1557 } 1558 nread = read(fd, buf.ptr, buf.sizeof); 1559 if (catchError!"read()"(nread)) { 1560 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 1561 m_status.code = Status.ASYNC; 1562 return cast(uint) i; 1563 } 1564 } while (nread > 0); 1565 1566 return cast(uint) i; 1567 } 1568 else /* if KQUEUE */ { 1569 Array!(DWChangeInfo)* changes; 1570 size_t i; 1571 try { 1572 changes = m_changes[fd]; 1573 import std.algorithm : min; 1574 size_t cnt = min(dst.length, changes.length); 1575 foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { 1576 dst[i] = (*changes)[i]; 1577 i++; 1578 } 1579 changes.linearRemove((*changes)[0 .. cnt]); 1580 } 1581 catch (Exception e) { 1582 setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); 1583 return false; 1584 } 1585 return cast(uint) i; 1586 } 1587 } 1588 1589 void submitRequest(AsyncAcceptRequest* request) 1590 { 1591 request.socket.m_pendingAccepts.insertBack(request); 1592 processPendingAccepts(request.socket); 1593 } 1594 1595 void submitRequest(AsyncReceiveRequest* request) 1596 { 1597 request.socket.m_pendingReceives.insertBack(request); 1598 processPendingReceives(request.socket); 1599 } 1600 1601 void submitRequest(AsyncSendRequest* request) 1602 { 1603 request.socket.m_pendingSends.insertBack(request); 1604 processPendingSends(request.socket); 1605 } 1606 1607 bool broadcast(in fd_t fd, bool b) { 1608 m_status = StatusInfo.init; 1609 1610 import libasync.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET; 1611 1612 int val = b?1:0; 1613 socklen_t len = val.sizeof; 1614 int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len); 1615 if (catchError!"setsockopt"(err)) 1616 return false; 1617 1618 return true; 1619 } 1620 1621 private bool closeRemoteSocket(fd_t fd, bool forced) { 1622 1623 int err; 1624 static if (LOG) log("shutdown"); 1625 import libasync.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD; 1626 if (forced) 1627 err = shutdown(fd, SHUT_RDWR); 1628 else 1629 err = shutdown(fd, SHUT_WR); 1630 1631 static if (!EPOLL) { 1632 kevent_t[2] events; 1633 static if (LOG) try log("!!DISC delete events"); catch (Throwable e) {} 1634 EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 1635 EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 1636 kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 1637 } 1638 1639 if (err == SOCKET_ERROR && errno == ENOTCONN) { 1640 // The socket has already been shut down, we can recover from that 1641 } else if (catchError!"shutdown"(err)) { 1642 return false; 1643 } 1644 1645 return true; 1646 } 1647 1648 // for connected sockets 1649 bool closeSocket(fd_t fd, bool connected, bool forced = false) 1650 { 1651 static if (LOG) log("closeSocket"); 1652 if (connected && !closeRemoteSocket(fd, forced) && !forced) 1653 return false; 1654 1655 if (!connected || forced) { 1656 // todo: flush the socket here? 1657 1658 import core.sys.posix.unistd : close; 1659 static if (LOG) log("close"); 1660 int err = close(fd); 1661 if (catchError!"closesocket"(err)) 1662 return false; 1663 } 1664 return true; 1665 } 1666 1667 1668 NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 1669 { 1670 import libasync.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV; 1671 addrinfo hints; 1672 hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver! 1673 1674 return getAddressInfo(ipAddr, port, ipv6, tcp, hints); 1675 } 1676 1677 1678 NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true) 1679 /*in { 1680 debug import libasync.internals.validator : validateHost; 1681 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 1682 } 1683 body */{ 1684 import libasync.internals.socket_compat : addrinfo; 1685 addrinfo hints; 1686 return getAddressInfo(host, port, ipv6, tcp, hints); 1687 } 1688 1689 void setInternalError(string TRACE)(in Status s, string details = "", error_t error = cast(EPosix) errno()) 1690 { 1691 if (details.length > 0) 1692 m_status.text = TRACE ~ ": " ~ details; 1693 else m_status.text = TRACE; 1694 m_error = error; 1695 m_status.code = s; 1696 static if(LOG) log(m_status); 1697 } 1698 private: 1699 1700 void processPendingAccepts(AsyncSocket socket) 1701 { 1702 if (socket.readBlocked) return; 1703 foreach (request; socket.m_pendingAccepts) { 1704 // Try to accept a single connection on the socket 1705 auto result = attemptConnectionAcceptance(socket); 1706 request.peer = result[0]; 1707 request.family = result[1]; 1708 1709 if (status.code != Status.OK && !socket.readBlocked) { 1710 socket.kill(); 1711 socket.handleError(); 1712 return; 1713 } else if (request.peer != INVALID_SOCKET) { 1714 socket.m_pendingAccepts.removeFront(); 1715 m_completedSocketAccepts.insertBack(request); 1716 } else { 1717 break; 1718 } 1719 } 1720 } 1721 1722 void processPendingReceives(AsyncSocket socket) 1723 { 1724 if (socket.readBlocked) return; 1725 foreach (request; socket.m_pendingReceives) { 1726 // Try to fit all bytes available in the OS receive buffer 1727 // into the current request's message's buffer, or try a 1728 // a zero byte receive, should there be no such message. 1729 bool received = void; 1730 if (request.message) received = attemptMessageReception(socket, request.message); 1731 else received = attemptZeroByteReceive(socket); 1732 1733 if (status.code != Status.OK && !socket.readBlocked) { 1734 if (received) m_completedSocketReceives.insertBack(request); 1735 socket.kill(); 1736 socket.handleError(); 1737 return; 1738 } else if (request.exact) { 1739 if (request.message.receivedAll) { 1740 socket.m_pendingReceives.removeFront(); 1741 m_completedSocketReceives.insertBack(request); 1742 } else { 1743 break; 1744 } 1745 // New bytes or zero-sized connectionless datagram 1746 } else if (received || !socket.connectionOriented && !socket.readBlocked) { 1747 socket.m_pendingReceives.removeFront(); 1748 m_completedSocketReceives.insertBack(request); 1749 } else { 1750 break; 1751 } 1752 } 1753 } 1754 1755 void processPendingSends(AsyncSocket socket) 1756 { 1757 if (socket.writeBlocked) return; 1758 foreach (request; socket.m_pendingSends) { 1759 // Try to fit all bytes of the current request's buffer 1760 // into the OS send buffer. 1761 auto sent = attemptMessageTransmission(socket, request.message); 1762 1763 if (status.code != Status.OK && !socket.writeBlocked) { 1764 socket.kill(); 1765 socket.handleError(); 1766 return; 1767 } else if (sent) { 1768 socket.m_pendingSends.removeFront(); 1769 m_completedSocketSends.insertBack(request); 1770 } else { 1771 break; 1772 } 1773 } 1774 } 1775 1776 auto attemptConnectionAcceptance(AsyncSocket socket) 1777 { 1778 import core.sys.posix.fcntl : O_NONBLOCK; 1779 import libasync.internals.socket_compat : accept, accept4, sockaddr_storage, socklen_t; 1780 1781 fd_t peer = void; 1782 sockaddr_storage remote = void; 1783 socklen_t remoteLength = remote.sizeof; 1784 1785 enum common = q{ 1786 if (peer == SOCKET_ERROR) { 1787 m_error = lastError(); 1788 1789 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) { 1790 m_status.code = Status.ASYNC; 1791 socket.readBlocked = true; 1792 return tuple(INVALID_SOCKET, sockaddr.sa_family.init); 1793 } else if (m_error == EBADF || 1794 m_error == EINTR || 1795 m_error == EINVAL || 1796 m_error == ENOTSOCK || 1797 m_error == EOPNOTSUPP || 1798 m_error == EFAULT) { 1799 assert(false, "accept{4} system call on FD " ~ socket.handle.to!string ~ " encountered fatal socket error: " ~ this.error); 1800 } else if (catchError!"accept"(peer)) { 1801 .errorf("accept{4} system call on FD %d encountered socket error: %s", socket.handle, this.error); 1802 return tuple(INVALID_SOCKET, sockaddr.sa_family.init); 1803 } 1804 } 1805 }; 1806 1807 version (linux) { 1808 peer = accept4(socket.handle, cast(sockaddr*) &remote, &remoteLength, O_NONBLOCK); 1809 mixin(common); 1810 } else { 1811 peer = accept(socket.handle, cast(sockaddr*) &remote, &remoteLength); 1812 mixin(common); 1813 if (!setNonBlock(peer)) { 1814 .error("Failed to set accepted peer socket non-blocking"); 1815 return tuple(INVALID_SOCKET, sockaddr.sa_family.init); 1816 } 1817 } 1818 1819 return tuple(peer, remote.ss_family); 1820 } 1821 1822 bool attemptZeroByteReceive(AsyncSocket socket) 1823 { 1824 import libasync.internals.socket_compat : recv, MSG_PEEK; 1825 1826 ubyte buffer = void; 1827 auto fd = socket.handle; 1828 1829 while (true) { 1830 auto err = recv(fd, &buffer, 1, MSG_PEEK); 1831 1832 .tracef("recv system call on FD %d returned %d", fd, err); 1833 if (err == SOCKET_ERROR) { 1834 m_error = lastError(); 1835 1836 if (m_error == EPosix.EINTR) { 1837 .tracef("recv system call on FD %d was interrupted before any transfer occured", fd); 1838 continue; 1839 } else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) { 1840 .tracef("recv system call on FD %d would have blocked", fd); 1841 m_status.code = Status.ASYNC; 1842 socket.readBlocked = true; 1843 return false; 1844 } else if (m_error == EBADF || 1845 m_error == EFAULT || 1846 m_error == EINVAL || 1847 m_error == ENOTCONN || 1848 m_error == ENOTSOCK) { 1849 .errorf("recv system call on FD %d encountered fatal socket error: %s", fd, this.error); 1850 assert(false); 1851 } else if (catchError!"Receive message"(err)) { 1852 .errorf("recv system call on FD %d encountered socket error: %s", fd, this.error); 1853 return false; 1854 } 1855 } else { 1856 .tracef("Received %d bytes on FD %d", err, fd); 1857 m_status.code = Status.OK; 1858 if (socket.connectionOriented && !err) { 1859 socket.readBlocked = true; 1860 } 1861 return err > 0; 1862 } 1863 } 1864 } 1865 1866 /** 1867 * Appends as much of the bytes currently available in the OS receive 1868 * buffer to the given message's transferred bytes as the message's 1869 * buffer's remaining free bytes and the state of the OS receive buffer 1870 * allow for, advancing the message's count of transferred bytes in the process. 1871 * Sets $(D readBlocked) on indication by the OS that there were 1872 * not enough bytes available in the OS receive buffer. 1873 * Returns: $(D true) if any bytes were transferred. 1874 */ 1875 bool attemptMessageReception(AsyncSocket socket, NetworkMessage* msg) 1876 in { 1877 assert(socket.connectionOriented && !msg.receivedAll || !msg.receivedAny, "Message already received"); 1878 } body { 1879 bool received; 1880 size_t recvCount = void; 1881 1882 if (socket.datagramOriented) { 1883 recvCount = recvMsg(socket.handle, msg); 1884 msg.count = msg.count + recvCount; 1885 received = received || recvCount > 0; 1886 } else do { 1887 recvCount = recvMsg(socket.handle, msg); 1888 msg.count = msg.count + recvCount; 1889 received = received || recvCount > 0; 1890 } while (recvCount > 0 && !msg.receivedAll); 1891 1892 // More bytes may yet become available in the future 1893 if (status.code == Status.ASYNC) { 1894 socket.readBlocked = true; 1895 // Connection was shutdown in an orderly fashion by the remote peer 1896 } else if (socket.connectionOriented && status.code == Status.OK && !recvCount) { 1897 socket.readBlocked = true; 1898 } 1899 1900 return received; 1901 } 1902 1903 /** 1904 * Transfers as much of the given message's untransferred bytes 1905 * into the OS send buffer as the latter's state allows for, 1906 * advancing the message's count of transferred bytes in the process. 1907 * Sets $(DDOC_MEMBERS writeBlocked) on indication by the OS that 1908 * there was not enough space available in the OS send buffer. 1909 * Returns: $(D true) if all of the message's bytes 1910 * have been transferred. 1911 */ 1912 bool attemptMessageTransmission(AsyncSocket socket, NetworkMessage* msg) 1913 in { assert(!msg.sent, "Message already sent"); } 1914 body { 1915 size_t sentCount = void; 1916 1917 do { 1918 sentCount = sendMsg(socket.handle, msg); 1919 msg.count = msg.count + sentCount; 1920 } while (sentCount > 0 && !msg.sent); 1921 1922 if (status.code == Status.ASYNC) { 1923 socket.writeBlocked = true; 1924 } 1925 1926 return msg.sent; 1927 } 1928 1929 /// For DirectoryWatcher 1930 /// In kqueue/vnode, all we get is the folder in which changes occured. 1931 /// We have to figure out what changed exactly and put the results in a container 1932 /// for the readChanges call. 1933 static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) { 1934 import std.file; 1935 import std.path : buildPath; 1936 try { 1937 Array!Path currFiles; 1938 auto wd = fi.wi.wd; 1939 auto path = fi.wi.path; 1940 auto fd = fi.fd; 1941 Array!(DWChangeInfo)* changes = m_changes.get(fd, null); 1942 assert(changes !is null, "Invalid wd, could not find changes array."); 1943 //import std.stdio : writeln; 1944 //writeln("Scanning path: ", path.toNativeString()); 1945 //writeln("m_dwFiles length: ", m_dwFiles.length); 1946 1947 // get a list of the folder 1948 foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) { 1949 //writeln(de.name); 1950 Path entryPath = Path(de.name); 1951 if (!entryPath.absolute) 1952 entryPath = path ~ entryPath; 1953 bool found; 1954 1955 if (!de.isDir()) { 1956 // compare it to the cached list fixme: make it faster using another container? 1957 foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) { 1958 if (file.folder != wd) continue; // this file isn't in the evented folder 1959 if (file.path == entryPath) { 1960 found = true; 1961 static if (LOG) log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string); 1962 // Check if it was modified 1963 if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified) 1964 { 1965 DWFileInfo dwf = file; 1966 dwf.lastModified = de.timeLastModified; 1967 m_dwFiles[id] = dwf; 1968 changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path)); 1969 } 1970 break; 1971 } 1972 } 1973 } else { 1974 foreach (ref const DWFolderInfo folder; m_dwFolders) { 1975 if (folder.wi.path == entryPath) { 1976 found = true; 1977 break; 1978 } 1979 } 1980 } 1981 1982 // This file/folder is new in the folder 1983 if (!found) { 1984 changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath)); 1985 1986 if (fi.wi.recursive && de.isDir()) { 1987 /// This is the complicated part. The folder needs to be watched, and all the events 1988 /// generated for every file/folder found recursively inside it, 1989 /// Useful e.g. when mkdir -p is used. 1990 watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) ); 1991 void genEvents(Path subpath) { 1992 foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) { 1993 auto subsubpath = Path(de.name); 1994 if (!subsubpath.absolute()) 1995 subsubpath = subpath ~ subsubpath; 1996 changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath)); 1997 if (isDir(subsubpath.toNativeString())) 1998 genEvents(subsubpath); 1999 } 2000 } 2001 2002 genEvents(entryPath); 2003 2004 } 2005 else { 2006 EventInfo* evinfo; 2007 try evinfo = m_watchers[fd]; catch(Throwable) { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); } 2008 2009 static if (LOG) log("Adding path: " ~ path.toNativeString()); 2010 2011 import core.sys.posix.fcntl : open; 2012 fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY); 2013 if (catchError!"open(watch)"(fwd)) 2014 return 0; 2015 2016 kevent_t _event; 2017 2018 EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo); 2019 2020 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 2021 2022 if (catchError!"kevent_timer_add"(err)) 2023 return 0; 2024 2025 m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false); 2026 2027 } 2028 } 2029 2030 // This file/folder is now current. This avoids a deletion event. 2031 currFiles.insert(entryPath); 2032 } 2033 2034 /// Now search for files/folders that were deleted in this directory (no recursivity needed). 2035 /// Unwatch this directory and generate delete event only for the root dir 2036 foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) { 2037 if (file.folder != wd) continue; // skip those files in another folder than the evented one 2038 bool found; 2039 foreach (Path curr; currFiles) { 2040 if (file.path == curr){ 2041 found = true; 2042 break; 2043 } 2044 } 2045 // this file/folder was in the folder but it's not there anymore 2046 if (!found) { 2047 // writeln("Deleting: ", file.path.toNativeString()); 2048 kevent_t _event; 2049 EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null); 2050 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 2051 if (catchError!"kevent_unwatch"(err)) 2052 return false; 2053 import core.sys.posix.unistd : close; 2054 err = close(id); 2055 if (catchError!"close(dwFile)"(err)) 2056 return false; 2057 changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path)); 2058 2059 if (fi.wi.recursive && file.is_dir) 2060 unwatch(fd, id); 2061 2062 m_dwFiles.remove(id); 2063 2064 } 2065 2066 } 2067 if(changes.empty) 2068 return false; // unhandled event, skip the callback 2069 2070 // fixme: how to implement moved_from moved_to for rename? 2071 } 2072 catch (Exception e) 2073 { 2074 try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch(Throwable) {} 2075 return false; 2076 } 2077 return true; 2078 } 2079 2080 // socket must not be connected 2081 bool setNonBlock(fd_t fd) { 2082 import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK; 2083 int flags = fcntl(fd, F_GETFL); 2084 flags |= O_NONBLOCK; 2085 int err = fcntl(fd, F_SETFL, flags); 2086 if (catchError!"F_SETFL O_NONBLOCK"(err)) { 2087 closeSocket(fd, false); 2088 return false; 2089 } 2090 return true; 2091 } 2092 2093 bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events) 2094 { 2095 import libasync.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept; 2096 enum O_NONBLOCK = 0x800; // octal 04000 2097 2098 static if (EPOLL) 2099 { 2100 const uint epoll_events = cast(uint) events; 2101 const bool incoming = cast(bool) (epoll_events & EPOLLIN); 2102 const bool error = cast(bool) (epoll_events & EPOLLERR); 2103 } 2104 else 2105 { 2106 const short kqueue_events = cast(short) (events >> 16); 2107 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2108 const bool incoming = cast(bool)(kqueue_events & EVFILT_READ); 2109 const bool error = cast(bool)(kqueue_flags & EV_ERROR); 2110 } 2111 2112 if (incoming) { // accept incoming connection 2113 do { 2114 NetworkAddress addr; 2115 addr.family = AF_INET; 2116 socklen_t addrlen = addr.sockAddrLen; 2117 2118 bool ret; 2119 static if (EPOLL) { 2120 /// Accept the connection and create a client socket 2121 fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK); 2122 2123 if (catchError!".accept"(csock)) { 2124 return true;// this way we know there's nothing left to accept 2125 } 2126 } else /* if KQUEUE */ { 2127 fd_t csock = accept(fd, addr.sockAddr, &addrlen); 2128 2129 if (catchError!".accept"(csock)) { 2130 return true; 2131 } 2132 2133 // Make non-blocking so subsequent calls to recv/send return immediately 2134 if (!setNonBlock(csock)) { 2135 continue; 2136 } 2137 } 2138 2139 // Set client address family based on address length 2140 if (addrlen > addr.sockAddrLen) 2141 addr.family = AF_INET6; 2142 if (addrlen == socklen_t.init) { 2143 setInternalError!"addrlen"(Status.ABORT); 2144 import core.sys.posix.unistd : close; 2145 close(csock); 2146 continue; 2147 } 2148 2149 // Allocate a new connection handler object 2150 AsyncTCPConnection conn; 2151 try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop); 2152 catch (Exception e){ assert(false, "Allocation failure"); } 2153 conn.peer = addr; 2154 conn.socket = csock; 2155 conn.inbound = true; 2156 2157 nothrow void closeClient() { 2158 try ThreadMem.free(conn); 2159 catch (Exception e){ assert(false, "Free failure"); } 2160 closeSocket(csock, true, true); 2161 } 2162 2163 // Get the connection handler from the callback 2164 TCPEventHandler evh; 2165 try { 2166 evh = del(conn); 2167 if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) { 2168 static if (LOG) try log("Failed to connect"); catch (Throwable e) {} 2169 closeClient(); 2170 continue; 2171 } 2172 static if (LOG) try log("Connection Started with " ~ csock.to!string); catch (Throwable e) {} 2173 } 2174 catch (Exception e) { 2175 static if (LOG) log("Close socket"); 2176 closeClient(); 2177 continue; 2178 } 2179 2180 // Announce connection state to the connection handler 2181 try { 2182 static if (LOG) log("Connected to: " ~ addr.toString()); 2183 evh.conn.connected = true; 2184 evh(TCPEvent.CONNECT); 2185 } 2186 catch (Exception e) { 2187 closeClient(); 2188 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 2189 } 2190 /*if (m_status.code == Status.ABORT) 2191 { 2192 try evh(TCPEvent.ERROR); 2193 catch (Throwable e) {} 2194 }*/ 2195 } while(true); 2196 2197 } 2198 2199 if (error) { // socket failure 2200 m_status.text = "listen socket error"; 2201 int err; 2202 import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR; 2203 socklen_t len = int.sizeof; 2204 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); 2205 m_error = cast(error_t) err; 2206 m_status.code = Status.ABORT; 2207 static if(LOG) log(m_status); 2208 2209 // call with null to announce a failure 2210 try del(null); 2211 catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); } 2212 2213 /// close the listener? 2214 // closeSocket(fd, false); 2215 } 2216 return true; 2217 } 2218 2219 bool onUDPTraffic(fd_t fd, UDPHandler del, int events) 2220 { 2221 static if (EPOLL) 2222 { 2223 const uint epoll_events = cast(uint) events; 2224 const bool read = cast(bool) (epoll_events & EPOLLIN); 2225 const bool write = cast(bool) (epoll_events & EPOLLOUT); 2226 const bool error = cast(bool) (epoll_events & EPOLLERR); 2227 } 2228 else 2229 { 2230 const short kqueue_events = cast(short) (events >> 16); 2231 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2232 const bool read = cast(bool) (kqueue_events & EVFILT_READ); 2233 const bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 2234 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 2235 } 2236 2237 if (read) { 2238 try { 2239 del(UDPEvent.READ); 2240 } 2241 catch (Exception e) { 2242 setInternalError!"del@UDPEvent.READ"(Status.ABORT); 2243 return false; 2244 } 2245 } 2246 2247 if (write) { 2248 2249 try { 2250 del(UDPEvent.WRITE); 2251 } 2252 catch (Exception e) { 2253 setInternalError!"del@UDPEvent.WRITE"(Status.ABORT); 2254 return false; 2255 } 2256 } 2257 2258 if (error) // socket failure 2259 { 2260 2261 import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR; 2262 import core.sys.posix.unistd : close; 2263 int err; 2264 socklen_t errlen = err.sizeof; 2265 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 2266 setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err); 2267 close(fd); 2268 return false; 2269 } 2270 2271 return true; 2272 } 2273 2274 bool onEvent(fd_t fd, EventHandler del, int events) 2275 { 2276 bool connect = void, close = void; 2277 auto conn = del.ev; 2278 2279 static if (EPOLL) 2280 { 2281 const uint epoll_events = cast(uint) events; 2282 const bool read = cast(bool) (epoll_events & EPOLLIN); 2283 const bool write = cast(bool) (epoll_events & EPOLLOUT); 2284 const bool error = cast(bool) (epoll_events & EPOLLERR); 2285 if (conn.stateful) { 2286 connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected; 2287 close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP)); 2288 } 2289 } 2290 else 2291 { 2292 const short kqueue_events = cast(short) (events >> 16); 2293 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2294 const bool read = cast(bool) (kqueue_events & EVFILT_READ); 2295 const bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 2296 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 2297 if (conn.stateful) { 2298 connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected); 2299 close = cast(bool) (kqueue_flags & EV_EOF); 2300 } 2301 } 2302 2303 if (write && (!conn.stateful || conn.connected && !conn.disconnecting && conn.writeBlocked)) { 2304 if (conn.stateful) conn.writeBlocked = false; 2305 static if (LOG) try log("!write"); catch (Throwable e) {} 2306 try { 2307 del(EventCode.WRITE); 2308 } 2309 catch (Exception e) { 2310 setInternalError!"del@Event.WRITE"(Status.ABORT); 2311 return false; 2312 } 2313 } 2314 2315 if (read && (!conn.stateful || conn.connected && !conn.disconnecting)) { 2316 static if (LOG) try log("!read"); catch (Throwable e) {} 2317 try { 2318 del(EventCode.READ); 2319 } 2320 catch (Exception e) { 2321 setInternalError!"del@Event.READ"(Status.ABORT); 2322 return false; 2323 } 2324 } 2325 2326 if (conn.stateful && close && conn.connected && !conn.disconnecting) 2327 { 2328 static if (LOG) try log("!close"); catch (Throwable e) {} 2329 // todo: See if this hack is still necessary 2330 if (!conn.connected && conn.disconnecting) 2331 return true; 2332 2333 try del(EventCode.CLOSE); 2334 catch (Exception e) { 2335 setInternalError!"del@Event.CLOSE"(Status.ABORT); 2336 return false; 2337 } 2338 2339 // Careful here, the delegate might have closed the connection already 2340 if (conn.connected) { 2341 closeSocket(fd, !conn.disconnecting, conn.connected); 2342 2343 m_status.code = Status.ABORT; 2344 conn.disconnecting = true; 2345 conn.connected = false; 2346 conn.writeBlocked = true; 2347 conn.id = 0; 2348 2349 try ThreadMem.free(conn.evInfo); 2350 catch (Exception e){ assert(false, "Error freeing resources"); } 2351 } 2352 return true; 2353 } 2354 2355 if (error) // failure 2356 { 2357 setInternalError!"EPOLLERR"(Status.ABORT, null); 2358 try { 2359 del(EventCode.ERROR); 2360 } 2361 catch (Exception e) 2362 { 2363 setInternalError!"del@Event.ERROR"(Status.ABORT); 2364 // ignore failure... 2365 } 2366 return false; 2367 } 2368 2369 if (conn.stateful && connect) { 2370 static if (LOG) try log("!connect"); catch (Throwable e) {} 2371 conn.connected = true; 2372 try del(EventCode.CONNECT); 2373 catch (Exception e) { 2374 setInternalError!"del@Event.CONNECT"(Status.ABORT); 2375 return false; 2376 } 2377 return true; 2378 } 2379 2380 return true; 2381 } 2382 2383 /// Handle an event for a connectionless socket 2384 bool onCLSocketEvent(AsyncSocket socket, int events) 2385 { 2386 static if (EPOLL) 2387 { 2388 const uint epoll_events = cast(uint) events; 2389 const bool read = cast(bool) (epoll_events & EPOLLIN); 2390 const bool write = cast(bool) (epoll_events & EPOLLOUT); 2391 const bool error = cast(bool) (epoll_events & EPOLLERR); 2392 } 2393 else 2394 { 2395 const short kqueue_events = cast(short) (events >> 16); 2396 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2397 const bool read = cast(bool) (kqueue_events & EVFILT_READ); 2398 const bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 2399 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 2400 } 2401 2402 if (read) { 2403 tracef("Read on FD %d", socket.handle); 2404 2405 socket.readBlocked = false; 2406 processPendingReceives(socket); 2407 } 2408 2409 if (write) { 2410 tracef("Write on FD %d", socket.handle); 2411 2412 socket.writeBlocked = false; 2413 processPendingSends(socket); 2414 } 2415 2416 if (error) { 2417 tracef("Error on FD %d", socket.handle); 2418 2419 auto err = cast(error_t) socket.lastError; 2420 setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err); 2421 socket.kill(); 2422 socket.handleError(); 2423 return false; 2424 } 2425 2426 return true; 2427 } 2428 2429 /// Handle an event for a connection-oriented, active socket 2430 bool onCOASocketEvent(AsyncSocket socket, int events) 2431 { 2432 static if (EPOLL) { 2433 const uint epoll_events = cast(uint) events; 2434 bool read = cast(bool) (epoll_events & EPOLLIN); 2435 bool write = cast(bool) (epoll_events & EPOLLOUT); 2436 const bool error = cast(bool) (epoll_events & EPOLLERR); 2437 const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !socket.disconnecting && !socket.connected; 2438 const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP)); 2439 } else { 2440 const short kqueue_events = cast(short) (events >> 16); 2441 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2442 bool read = cast(bool) (kqueue_events & EVFILT_READ); 2443 bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 2444 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 2445 const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !socket.disconnecting && !socket.connected); 2446 const bool close = cast(bool) (kqueue_flags & EV_EOF); 2447 } 2448 2449 tracef("AsyncSocket events: (read: %s, write: %s, error: %s, connect: %s, close: %s)", read, write, error, connect, close); 2450 2451 if (error) { 2452 tracef("Error on FD %d", socket.handle); 2453 2454 auto err = cast(error_t) socket.lastError; 2455 if (err == ECONNRESET || 2456 err == EPIPE) { 2457 socket.kill(); 2458 socket.handleClose(); 2459 return true; 2460 } 2461 2462 setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err); 2463 socket.kill(); 2464 socket.handleError(); 2465 return false; 2466 } 2467 2468 if (connect) { 2469 tracef("Connect on FD %d", socket.handle); 2470 2471 socket.connected = true; 2472 socket.readBlocked = false; 2473 socket.writeBlocked = false; 2474 socket.handleConnect(); 2475 read = false; 2476 write = false; 2477 } 2478 2479 if ((/+read ||+/ write) && socket.connected && !socket.disconnecting && socket.writeBlocked) { 2480 tracef("Write on FD %d", socket.handle); 2481 2482 socket.writeBlocked = false; 2483 processPendingSends(socket); 2484 }/+ else { 2485 read = true; 2486 }+/ 2487 2488 if (read && socket.connected && !socket.disconnecting && socket.readBlocked) { 2489 tracef("Read on FD %d", socket.handle); 2490 2491 socket.readBlocked = false; 2492 processPendingReceives(socket); 2493 } 2494 2495 if (close && socket.connected && !socket.disconnecting) 2496 { 2497 tracef("Close on FD %d", socket.handle); 2498 socket.kill(); 2499 socket.handleClose(); 2500 return true; 2501 } 2502 2503 return true; 2504 } 2505 2506 /// Handle an event for a connection-oriented, passive socket 2507 bool onCOPSocketEvent(AsyncSocket socket, int events) 2508 { 2509 import core.sys.posix.fcntl : O_NONBLOCK; 2510 import libasync.internals.socket_compat : accept, accept4, sockaddr, socklen_t; 2511 2512 static if (EPOLL) { 2513 const uint epoll_events = cast(uint) events; 2514 const bool incoming = cast(bool) (epoll_events & EPOLLIN); 2515 const bool error = cast(bool) (epoll_events & EPOLLERR); 2516 } else { 2517 const short kqueue_events = cast(short) (events >> 16); 2518 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2519 const bool incoming = cast(bool) (kqueue_events & EVFILT_READ); 2520 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 2521 } 2522 2523 tracef("AsyncSocket events: (incoming: %s, error: %s)", incoming, error); 2524 2525 if (incoming) { 2526 tracef("Incoming on FD %d", socket.handle); 2527 2528 socket.readBlocked = false; 2529 processPendingAccepts(socket); 2530 } 2531 2532 if (error) { 2533 tracef("Error on FD %d", socket.handle); 2534 2535 auto err = cast(error_t) socket.lastError; 2536 setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err); 2537 socket.kill(); 2538 socket.handleError(); 2539 return false; 2540 } 2541 2542 return true; 2543 } 2544 2545 bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn) 2546 { 2547 //log("TCP Traffic at FD#" ~ fd.to!string); 2548 2549 static if (EPOLL) 2550 { 2551 const uint epoll_events = cast(uint) events; 2552 const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected; 2553 bool read = cast(bool) (epoll_events & EPOLLIN); 2554 const bool write = cast(bool) (epoll_events & EPOLLOUT); 2555 const bool error = cast(bool) (epoll_events & EPOLLERR); 2556 const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP)); 2557 } 2558 else /* if KQUEUE */ 2559 { 2560 const short kqueue_events = cast(short) (events >> 16); 2561 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 2562 const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected); 2563 bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect; 2564 const bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 2565 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 2566 const bool close = cast(bool) (kqueue_flags & EV_EOF); 2567 } 2568 2569 if (error) 2570 { 2571 import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR; 2572 int err; 2573 static if (LOG) try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch (Throwable e) {} 2574 socklen_t errlen = err.sizeof; 2575 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 2576 setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err); 2577 try 2578 del(TCPEvent.ERROR); 2579 catch (Exception e) 2580 { 2581 setInternalError!"del@TCPEvent.ERROR"(Status.ABORT); 2582 // ignore failure... 2583 } 2584 return false; 2585 } 2586 2587 2588 if (connect) 2589 { 2590 static if (LOG) try log("!connect"); catch (Throwable e) {} 2591 conn.connected = true; 2592 try del(TCPEvent.CONNECT); 2593 catch (Exception e) { 2594 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 2595 return false; 2596 } 2597 return true; 2598 } 2599 2600 2601 if ((read || write) && conn.connected && !conn.disconnecting && conn.writeBlocked) 2602 { 2603 conn.writeBlocked = false; 2604 static if (LOG) try log("!write"); catch (Throwable e) {} 2605 try del(TCPEvent.WRITE); 2606 catch (Exception e) { 2607 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 2608 return false; 2609 } 2610 } 2611 else { 2612 read = true; 2613 } 2614 2615 if (read && conn.connected && !conn.disconnecting) 2616 { 2617 static if (LOG) try log("!read"); catch (Throwable e) {} 2618 try del(TCPEvent.READ); 2619 catch (Exception e) { 2620 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 2621 return false; 2622 } 2623 } 2624 2625 if (close && conn.connected && !conn.disconnecting) 2626 { 2627 static if (LOG) try log("!close"); catch (Throwable e) {} 2628 // todo: See if this hack is still necessary 2629 if (!conn.connected && conn.disconnecting) 2630 return true; 2631 2632 try del(TCPEvent.CLOSE); 2633 catch (Exception e) { 2634 setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 2635 return false; 2636 } 2637 2638 // Careful here, the delegate might have closed the connection already 2639 if (conn.connected) { 2640 closeSocket(fd, !conn.disconnecting, conn.connected); 2641 2642 m_status.code = Status.ABORT; 2643 conn.disconnecting = true; 2644 conn.connected = false; 2645 conn.writeBlocked = true; 2646 del.conn.socket = 0; 2647 2648 try ThreadMem.free(del.conn.evInfo); 2649 catch (Exception e){ assert(false, "Error freeing resources"); } 2650 2651 if (del.conn.inbound) { 2652 static if (LOG) log("Freeing inbound connection"); 2653 try ThreadMem.free(del.conn); 2654 catch (Exception e){ assert(false, "Error freeing resources"); } 2655 } 2656 } 2657 } 2658 return true; 2659 } 2660 2661 bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del) 2662 { 2663 import libasync.internals.socket_compat : bind; 2664 import core.sys.posix.unistd; 2665 2666 fd_t err; 2667 2668 EventObject eo; 2669 eo.udpHandler = del; 2670 EventInfo* ev; 2671 try ev = ThreadMem.alloc!EventInfo(fd, EventType.UDPSocket, eo, m_instanceId); 2672 catch (Exception e){ assert(false, "Allocation error"); } 2673 ctxt.evInfo = ev; 2674 nothrow bool closeAll() { 2675 try ThreadMem.free(ev); 2676 catch(Exception e){ assert(false, "Failed to free resources"); } 2677 ctxt.evInfo = null; 2678 // socket will be closed by caller if return false 2679 return false; 2680 } 2681 2682 static if (EPOLL) 2683 { 2684 epoll_event _event; 2685 _event.data.ptr = ev; 2686 _event.events = EPOLLIN | EPOLLOUT | EPOLLET; 2687 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 2688 if (catchError!"epoll_ctl"(err)) { 2689 return closeAll(); 2690 } 2691 nothrow void deregisterEvent() 2692 { 2693 epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &_event); 2694 } 2695 } 2696 else /* if KQUEUE */ 2697 { 2698 kevent_t[2] _event; 2699 EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev); 2700 EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev); 2701 err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null); 2702 if (catchError!"kevent_add_udp"(err)) 2703 return closeAll(); 2704 2705 nothrow void deregisterEvent() { 2706 EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 2707 EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 2708 kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(libasync.internals.kqueue.timespec*) null); 2709 } 2710 2711 } 2712 2713 /// Start accepting packets 2714 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 2715 if (catchError!"bind"(err)) { 2716 deregisterEvent(); 2717 return closeAll(); 2718 } 2719 2720 return true; 2721 } 2722 2723 bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false) 2724 in { 2725 assert(ctxt.local !is NetworkAddress.init); 2726 } 2727 body { 2728 import libasync.internals.socket_compat : bind, listen, SOMAXCONN; 2729 fd_t err; 2730 2731 /// Create callback object 2732 EventObject eo; 2733 eo.tcpAcceptHandler = del; 2734 EventInfo* ev; 2735 2736 try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPAccept, eo, m_instanceId); 2737 catch (Exception e){ assert(false, "Allocation error"); } 2738 ctxt.evInfo = ev; 2739 nothrow bool closeAll() { 2740 try ThreadMem.free(ev); 2741 catch(Exception e){ assert(false, "Failed free"); } 2742 ctxt.evInfo = null; 2743 // Socket is closed by run() 2744 //closeSocket(fd, false); 2745 return false; 2746 } 2747 2748 /// Add socket to event loop 2749 static if (EPOLL) 2750 { 2751 epoll_event _event; 2752 _event.data.ptr = ev; 2753 _event.events = EPOLLIN | EPOLLET; 2754 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 2755 if (catchError!"epoll_ctl_add"(err)) 2756 return closeAll(); 2757 2758 nothrow void deregisterEvent() { 2759 // epoll cleans itself when closing the socket 2760 } 2761 } 2762 else /* if KQUEUE */ 2763 { 2764 kevent_t _event; 2765 EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev); 2766 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 2767 if (catchError!"kevent_add_listener"(err)) 2768 return closeAll(); 2769 2770 nothrow void deregisterEvent() { 2771 EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null); 2772 kevent(m_kqueuefd, &_event, 1, null, 0, null); 2773 // wouldn't know how to deal with errors here... 2774 } 2775 } 2776 2777 /// Bind and listen to socket 2778 if (!reusing) { 2779 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 2780 if (catchError!"bind"(err)) { 2781 deregisterEvent(); 2782 return closeAll(); 2783 } 2784 2785 err = listen(fd, SOMAXCONN); 2786 if (catchError!"listen"(err)) { 2787 deregisterEvent(); 2788 return closeAll(); 2789 } 2790 2791 } 2792 return true; 2793 } 2794 2795 bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false) 2796 in { 2797 assert(ctxt.peer.port != 0, "Connecting to an invalid port"); 2798 } 2799 body { 2800 2801 fd_t err; 2802 2803 /// Create callback object 2804 import libasync.internals.socket_compat : connect; 2805 EventObject eo; 2806 eo.tcpEvHandler = del; 2807 EventInfo* ev; 2808 2809 try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPTraffic, eo, m_instanceId); 2810 catch (Exception e){ assert(false, "Allocation error"); } 2811 assert(ev !is null); 2812 ctxt.evInfo = ev; 2813 nothrow bool destroyEvInfo() { 2814 try ThreadMem.free(ev); 2815 catch(Exception e){ assert(false, "Failed to free resources"); } 2816 ctxt.evInfo = null; 2817 2818 // Socket will be closed by run() 2819 // closeSocket(fd, false); 2820 return false; 2821 } 2822 2823 /// Add socket and callback object to event loop 2824 static if (EPOLL) 2825 { 2826 epoll_event _event = void; 2827 _event.data.ptr = ev; 2828 _event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET; 2829 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 2830 static if (LOG) log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string); 2831 if (catchError!"epoll_ctl_add"(err)) 2832 return destroyEvInfo(); 2833 2834 nothrow void deregisterEvent() { 2835 // will be handled automatically when socket is closed 2836 } 2837 } 2838 else /* if KQUEUE */ 2839 { 2840 kevent_t[2] events = void; 2841 static if (LOG) try log("Register event ptr " ~ ev.to!string); catch (Throwable e) {} 2842 assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection"); 2843 EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev); 2844 EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev); 2845 assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev); 2846 assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId); 2847 err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 2848 if (catchError!"kevent_add_tcp"(err)) 2849 return destroyEvInfo(); 2850 2851 // todo: verify if this allocates on the GC? 2852 nothrow void deregisterEvent() { 2853 EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 2854 EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 2855 kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 2856 // wouldn't know how to deal with errors here... 2857 } 2858 } 2859 2860 // Inbound objects are already connected 2861 if (inbound) return true; 2862 2863 // Connect is blocking, but this makes the socket non-blocking for send/recv 2864 if (!setNonBlock(fd)) { 2865 deregisterEvent(); 2866 return destroyEvInfo(); 2867 } 2868 2869 /// Start the connection 2870 err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen); 2871 if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ])) 2872 return true; 2873 if (catchError!"connect"(err)) { 2874 deregisterEvent(); 2875 return destroyEvInfo(); 2876 } 2877 2878 return true; 2879 } 2880 2881 pragma(inline, true) 2882 bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR) 2883 if (isIntegral!T) 2884 { 2885 if (val == cmp) { 2886 m_status.text = TRACE; 2887 m_error = lastError(); 2888 m_status.code = Status.ABORT; 2889 static if(LOG) log(m_status); 2890 return true; 2891 } 2892 return false; 2893 } 2894 2895 pragma(inline, true) 2896 bool catchSocketError(string TRACE)(fd_t fd) 2897 { 2898 m_status.text = TRACE; 2899 int err; 2900 import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR; 2901 socklen_t len = int.sizeof; 2902 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); 2903 m_error = cast(error_t) err; 2904 if (m_error != EPosix.EOK) { 2905 m_status.code = Status.ABORT; 2906 static if(LOG) log(m_status); 2907 return true; 2908 } 2909 2910 return false; 2911 } 2912 2913 bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 2914 if (isIntegral!T) 2915 { 2916 if (val == SOCKET_ERROR) { 2917 int err = errno; 2918 foreach (validator ; cmp) { 2919 if (errno == validator[0]) { 2920 m_status.text = TRACE; 2921 m_error = lastError(); 2922 m_status.code = validator[1]; 2923 static if(LOG) log(m_status); 2924 return true; 2925 } 2926 } 2927 2928 m_status.text = TRACE; 2929 m_status.code = Status.EVLOOP_FAILURE; 2930 m_error = lastError(); 2931 static if (LOG) log(m_status); 2932 return true; 2933 } 2934 return false; 2935 } 2936 2937 /** 2938 * If the value at val matches the tuple first argument T, get the last error, 2939 * and if the last error matches tuple second argument error_t, set the Status as 2940 * tuple third argument Status. 2941 * 2942 * Repeats for each comparison tuple until a match in which case returns true. 2943 */ 2944 bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...) 2945 if (isIntegral!T) 2946 { 2947 error_t err; 2948 foreach (validator ; cmp) { 2949 if (val == validator[0]) { 2950 if (err is EPosix.init) err = lastError(); 2951 if (err == validator[1]) { 2952 m_status.text = TRACE; 2953 m_status.code = validator[2]; 2954 if (m_status.code == Status.EVLOOP_TIMEOUT) { 2955 static if (LOG) log(m_status); 2956 break; 2957 } 2958 m_error = lastError(); 2959 static if(LOG) log(m_status); 2960 return true; 2961 } 2962 } 2963 } 2964 return false; 2965 } 2966 2967 pragma(inline, true) 2968 error_t lastError() { 2969 try { 2970 return cast(error_t) errno; 2971 } catch(Exception e) { 2972 return EPosix.EACCES; 2973 } 2974 2975 } 2976 2977 void log(StatusInfo val) 2978 { 2979 static if (LOG) { 2980 import std.stdio; 2981 try { 2982 writeln("Backtrace: ", m_status.text); 2983 writeln(" | Status: ", m_status.code); 2984 writeln(" | Error: " , m_error); 2985 if ((m_error in EPosixMessages) !is null) 2986 writeln(" | Message: ", EPosixMessages[m_error]); 2987 } catch(Exception e) { 2988 return; 2989 } 2990 } 2991 } 2992 2993 void log(T)(T val) 2994 { 2995 static if (LOG) { 2996 import std.stdio; 2997 try { 2998 writeln(val); 2999 } catch(Exception e) { 3000 return; 3001 } 3002 } 3003 } 3004 3005 NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints) 3006 { 3007 m_status = StatusInfo.init; 3008 import libasync.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo; 3009 3010 NetworkAddress addr; 3011 addrinfo* infos; 3012 error_t err; 3013 if (ipv6) { 3014 addr.family = AF_INET6; 3015 hints.ai_family = AF_INET6; 3016 } 3017 else { 3018 addr.family = AF_INET; 3019 hints.ai_family = AF_INET; 3020 } 3021 if (tcp) { 3022 hints.ai_socktype = SOCK_STREAM; 3023 hints.ai_protocol = IPPROTO_TCP; 3024 } 3025 else { 3026 hints.ai_socktype = SOCK_DGRAM; 3027 hints.ai_protocol = IPPROTO_UDP; 3028 } 3029 3030 static if (LOG) { 3031 log("Resolving " ~ host ~ ":" ~ port.to!string); 3032 } 3033 3034 auto chost = host.toStringz(); 3035 3036 if (port != 0) { 3037 addr.port = port; 3038 const(char)* cPort = cast(const(char)*) port.to!string.toStringz; 3039 err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos); 3040 } 3041 else { 3042 err = cast(error_t) getaddrinfo(chost, null, &hints, &infos); 3043 } 3044 3045 if (err != EPosix.EOK) { 3046 3047 /// Unfortunately, glibc < 2.26 has a bug that the DNS resolver caches the contents 3048 /// of /etc/resolve.conf. (See https://sourceware.org/bugzilla/show_bug.cgi?id=984) 3049 /// An issue of Pidgen bug tracker(https://developer.pidgin.im/ticket/2825) shows 3050 /// that calling res_init to refresh the nameserver list. 3051 version (CRuntime_Glibc) 3052 { 3053 version (linux) 3054 { 3055 __res_init(); 3056 } 3057 /// At least res_init isn't thread-safe on OSX/iOS, so nothing to do. 3058 } 3059 version(iOS) { 3060 // ios uses a different error reporting for getaddrinfo 3061 import libasync.internals.socket_compat : gai_strerror; 3062 import std.string : fromStringz; 3063 setInternalError!"getAddressInfo"(Status.ERROR, gai_strerror(cast(int) err).fromStringz.to!string); 3064 } 3065 else setInternalError!"getAddressInfo"(Status.ERROR, string.init, err); 3066 return NetworkAddress.init; 3067 } 3068 ubyte* pAddr = cast(ubyte*) infos.ai_addr; 3069 ubyte* data = cast(ubyte*) addr.sockAddr; 3070 data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy 3071 freeaddrinfo(infos); 3072 return addr; 3073 } 3074 3075 3076 3077 } 3078 3079 3080 static if (!EPOLL) 3081 { 3082 import std.container : Array; 3083 import core.sync.mutex : Mutex; 3084 import core.sync.rwmutex : ReadWriteMutex; 3085 size_t g_evIdxCapacity; 3086 Array!size_t g_evIdxAvailable; 3087 3088 // called on run 3089 nothrow size_t createIndex() { 3090 size_t idx; 3091 import std.algorithm : max; 3092 try { 3093 3094 size_t getIdx() { 3095 3096 if (!g_evIdxAvailable.empty) { 3097 immutable size_t ret = g_evIdxAvailable.back; 3098 g_evIdxAvailable.removeBack(); 3099 return ret; 3100 } 3101 return 0; 3102 } 3103 3104 idx = getIdx(); 3105 if (idx == 0) { 3106 import std.range : iota; 3107 g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) ); 3108 g_evIdxCapacity = max(32, g_evIdxCapacity * 2); 3109 idx = getIdx(); 3110 } 3111 3112 } catch (Throwable e) { 3113 static if (DEBUG) { 3114 import std.stdio : writeln; 3115 try writeln(e.toString()); catch (Throwable e) {} 3116 } 3117 3118 } 3119 return idx; 3120 } 3121 3122 nothrow void destroyIndex(AsyncNotifier ctxt) { 3123 try { 3124 g_evIdxAvailable.insert(ctxt.id); 3125 } 3126 catch (Exception e) { 3127 assert(false, "Error destroying index: " ~ e.msg); 3128 } 3129 } 3130 3131 nothrow void destroyIndex(AsyncTimer ctxt) { 3132 try { 3133 g_evIdxAvailable.insert(ctxt.id); 3134 } 3135 catch (Exception e) { 3136 assert(false, "Error destroying index: " ~ e.msg); 3137 } 3138 } 3139 3140 size_t* g_threadId; 3141 size_t g_idxCapacity; 3142 Array!size_t g_idxAvailable; 3143 3144 __gshared ReadWriteMutex gs_queueMutex; 3145 __gshared Array!(Array!AsyncSignal) gs_signalQueue; 3146 __gshared Array!(Array!size_t) gs_idxQueue; // signals notified 3147 3148 3149 // loop 3150 nothrow bool popSignals(ref AsyncSignal[] sigarr) { 3151 bool more; 3152 try { 3153 foreach (ref AsyncSignal sig; sigarr) { 3154 if (!sig) 3155 break; 3156 sig = null; 3157 } 3158 size_t len; 3159 synchronized(gs_queueMutex.reader) { 3160 3161 if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty) 3162 return false; 3163 3164 len = gs_idxQueue[*g_threadId].length; 3165 import std.stdio; 3166 if (sigarr.length < len) { 3167 more = true; 3168 len = sigarr.length; 3169 } 3170 3171 size_t i; 3172 foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){ 3173 sigarr[i] = gs_signalQueue[*g_threadId][idx]; 3174 i++; 3175 } 3176 } 3177 3178 synchronized (gs_queueMutex.writer) { 3179 gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]); 3180 } 3181 } 3182 catch (Exception e) { 3183 assert(false, "Could not get pending signals: " ~ e.msg); 3184 } 3185 return more; 3186 } 3187 3188 // notify 3189 nothrow void addSignal(shared AsyncSignal ctxt) { 3190 try { 3191 size_t thread_id = ctxt.threadId; 3192 bool must_resize; 3193 import std.stdio; 3194 synchronized (gs_queueMutex.writer) { 3195 if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) { 3196 gs_idxQueue.reserve(thread_id + 1); 3197 foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) { 3198 gs_idxQueue.insertBack(Array!size_t.init); 3199 } 3200 } 3201 if (gs_idxQueue[thread_id].empty) 3202 { 3203 gs_idxQueue[thread_id].reserve(32); 3204 } 3205 3206 gs_idxQueue[thread_id].insertBack(ctxt.id); 3207 3208 } 3209 3210 } 3211 catch (Exception e) { 3212 assert(false, "Array error: " ~ e.msg); 3213 } 3214 } 3215 3216 // called on run 3217 nothrow size_t createIndex(shared AsyncSignal ctxt) { 3218 size_t idx; 3219 import std.algorithm : max; 3220 try { 3221 bool must_resize; 3222 3223 synchronized (gs_queueMutex.reader) { 3224 if (gs_signalQueue.length < *g_threadId) 3225 must_resize = true; 3226 } 3227 3228 /// make sure the signal queue is big enough for this thread ID 3229 if (must_resize) { 3230 synchronized (gs_queueMutex.writer) { 3231 while (gs_signalQueue.length <= *g_threadId) 3232 gs_signalQueue.insertBack(Array!AsyncSignal.init); 3233 } 3234 } 3235 3236 size_t getIdx() { 3237 3238 if (!g_idxAvailable.empty) { 3239 immutable size_t ret = g_idxAvailable.back; 3240 g_idxAvailable.removeBack(); 3241 return ret; 3242 } 3243 return 0; 3244 } 3245 3246 idx = getIdx(); 3247 if (idx == 0) { 3248 import std.range : iota; 3249 g_idxAvailable.insert( iota(g_idxCapacity + 1, max(32, g_idxCapacity * 2), 1) ); 3250 g_idxCapacity = g_idxAvailable[$-1]; 3251 idx = getIdx(); 3252 } 3253 3254 synchronized (gs_queueMutex.writer) { 3255 if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) { 3256 3257 gs_signalQueue.reserve(*g_threadId + 1); 3258 foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) { 3259 gs_signalQueue.insertBack(Array!AsyncSignal.init); 3260 } 3261 3262 } 3263 3264 if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) { 3265 3266 gs_signalQueue[*g_threadId].reserve(idx + 1); 3267 foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) { 3268 gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null); 3269 } 3270 3271 } 3272 3273 gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt; 3274 } 3275 } catch(Throwable) {} 3276 3277 return idx; 3278 } 3279 3280 // called on kill 3281 nothrow void destroyIndex(shared AsyncSignal ctxt) { 3282 try { 3283 g_idxAvailable.insert(ctxt.id); 3284 synchronized (gs_queueMutex.writer) { 3285 gs_signalQueue[*g_threadId][ctxt.id] = null; 3286 } 3287 } 3288 catch (Exception e) { 3289 assert(false, "Error destroying index: " ~ e.msg); 3290 } 3291 } 3292 } 3293 3294 mixin template COSocketMixins() { 3295 3296 private CleanupData m_impl; 3297 3298 struct CleanupData { 3299 EventInfo* evInfo; 3300 bool connected; 3301 bool disconnecting; 3302 bool writeBlocked; 3303 bool readBlocked; 3304 } 3305 3306 @property bool disconnecting() const @safe pure @nogc { 3307 return m_impl.disconnecting; 3308 } 3309 3310 @property void disconnecting(bool b) @safe pure @nogc { 3311 m_impl.disconnecting = b; 3312 } 3313 3314 @property bool connected() const @safe pure @nogc { 3315 return m_impl.connected; 3316 } 3317 3318 @property void connected(bool b) @safe pure @nogc { 3319 m_impl.connected = b; 3320 } 3321 3322 @property bool writeBlocked() const @safe pure @nogc { 3323 return m_impl.writeBlocked; 3324 } 3325 3326 @property void writeBlocked(bool b) @safe pure @nogc { 3327 m_impl.writeBlocked = b; 3328 } 3329 3330 @property bool readBlocked() const @safe pure @nogc { 3331 return m_impl.readBlocked; 3332 } 3333 3334 @property void readBlocked(bool b) @safe pure @nogc { 3335 m_impl.readBlocked = b; 3336 } 3337 3338 @property EventInfo* evInfo() @safe pure @nogc { 3339 return m_impl.evInfo; 3340 } 3341 3342 @property void evInfo(EventInfo* info) @safe pure @nogc { 3343 m_impl.evInfo = info; 3344 } 3345 3346 } 3347 3348 mixin template EvInfoMixinsShared() { 3349 3350 private CleanupData m_impl; 3351 3352 shared struct CleanupData { 3353 EventInfo* evInfo; 3354 } 3355 3356 static if (EPOLL) { 3357 import core.sys.posix.pthread : pthread_t; 3358 private pthread_t m_pthreadId; 3359 synchronized @property pthread_t pthreadId() { 3360 return cast(pthread_t) m_pthreadId; 3361 } 3362 /* todo: support multiple event loops per thread? 3363 private ushort m_sigId; 3364 synchronized @property ushort sigId() { 3365 return cast(ushort)m_loopId; 3366 } 3367 synchronized @property void sigId(ushort id) { 3368 m_loopId = cast(shared)id; 3369 } 3370 */ 3371 } 3372 else /* if KQUEUE */ 3373 { 3374 private shared(size_t)* m_owner_id; 3375 synchronized @property size_t threadId() { 3376 return cast(size_t) *m_owner_id; 3377 } 3378 } 3379 3380 @property shared(EventInfo*) evInfo() { 3381 return m_impl.evInfo; 3382 } 3383 3384 @property void evInfo(shared(EventInfo*) info) { 3385 m_impl.evInfo = info; 3386 } 3387 3388 } 3389 3390 mixin template EvInfoMixins() { 3391 3392 private CleanupData m_impl; 3393 3394 struct CleanupData { 3395 EventInfo* evInfo; 3396 } 3397 3398 @property EventInfo* evInfo() { 3399 return m_impl.evInfo; 3400 } 3401 3402 @property void evInfo(EventInfo* info) { 3403 m_impl.evInfo = info; 3404 } 3405 } 3406 3407 union EventObject { 3408 TCPAcceptHandler tcpAcceptHandler; 3409 TCPEventHandler tcpEvHandler; 3410 AsyncSocket socket; 3411 TimerHandler timerHandler; 3412 DWHandler dwHandler; 3413 UDPHandler udpHandler; 3414 NotifierHandler notifierHandler; 3415 EventHandler eventHandler; 3416 } 3417 3418 enum EventType : char { 3419 TCPAccept, 3420 TCPTraffic, 3421 UDPSocket, 3422 Socket, 3423 Notifier, 3424 Signal, 3425 Timer, 3426 DirectoryWatcher, 3427 Event // custom 3428 } 3429 3430 struct EventInfo { 3431 fd_t fd; 3432 EventType evType; 3433 EventObject evObj; 3434 ushort owner; 3435 }