1 module libasync.posix2; 2 import libasync.types; 3 version (Posix): 4 // workaround for IDE indent bug on too big files 5 mixin template RunKill() 6 { 7 8 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 9 in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); } 10 body { 11 m_status = StatusInfo.init; 12 import libasync.internals.socket_compat : socket, SOCK_STREAM, IPPROTO_TCP; 13 import core.sys.posix.unistd : close; 14 fd_t fd = ctxt.preInitializedSocket; 15 16 if (fd == fd_t.init) 17 fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP); 18 19 if (catchError!("run AsyncTCPConnection")(fd)) 20 return 0; 21 22 /// Make sure the socket doesn't block on recv/send 23 if (!setNonBlock(fd)) { 24 static if (LOG) log("Close socket"); 25 close(fd); 26 return 0; 27 } 28 29 /// Enable Nagel's algorithm if specified 30 if (ctxt.noDelay) { 31 if (!setOption(fd, TCPOption.NODELAY, true)) { 32 static if (LOG) try log("Closing connection"); catch (Throwable e) {} 33 close(fd); 34 return 0; 35 } 36 } 37 38 /// Trigger the connection setup instructions 39 if (!initTCPConnection(fd, ctxt, del)) { 40 close(fd); 41 return 0; 42 } 43 44 return fd; 45 46 } 47 48 fd_t run(AsyncUDSConnection ctxt) 49 in { assert(ctxt.socket == fd_t.init, "UDS Connection is active. Use another instance."); } 50 body { 51 m_status = StatusInfo.init; 52 import libasync.internals.socket_compat : socket, connect, SOCK_STREAM, AF_UNIX; 53 import core.sys.posix.unistd : close; 54 55 auto fd = ctxt.preInitializedSocket; 56 if (fd == fd_t.init) { 57 fd = socket(AF_UNIX, SOCK_STREAM, 0); 58 } 59 60 if (catchError!"run AsyncUDSConnection"(fd)) { 61 return 0; 62 } 63 64 // Inbound sockets are already connected 65 if (ctxt.inbound) return fd; 66 67 /// Make sure the socket doesn't block on recv/send 68 if (!setNonBlock(fd)) { 69 static if (LOG) log("Close socket"); 70 close(fd); 71 return 0; 72 } 73 74 /// Start the connection 75 auto err = connect(fd, ctxt.peer.name, ctxt.peer.nameLen); 76 if (catchError!"connect"(err)) { 77 close(fd); 78 return 0; 79 } 80 81 return fd; 82 } 83 84 fd_t run(AsyncUDSListener ctxt) 85 in { 86 assert(ctxt.socket == fd_t.init, "UDS Listener already bound. Please run another instance."); 87 assert(ctxt.local !is UnixAddress.init, "No locally binding address specified. Use AsyncUDSListener.local = new UnixAddress(*)"); 88 } 89 body { 90 import libasync.internals.socket_compat : socket, bind, listen, SOCK_STREAM, AF_UNIX, SOMAXCONN; 91 import core.sys.posix.unistd : close, unlink; 92 import core.sys.posix.sys.un : sockaddr_un; 93 94 int err; 95 m_status = StatusInfo.init; 96 97 if (ctxt.unlinkFirst) { 98 import core.stdc.errno : errno, ENOENT; 99 err = unlink(cast(char*) (cast(sockaddr_un*) ctxt.local.name).sun_path); 100 if (err == -1 && errno != ENOENT) { 101 if (catchError!"unlink"(err)) {} 102 return 0; 103 } 104 } 105 106 auto fd = ctxt.socket; 107 if (fd == fd_t.init) { 108 fd = socket(AF_UNIX, SOCK_STREAM, 0); 109 } 110 111 if (catchError!"run AsyncUDSAccept"(fd)) { 112 return 0; 113 } 114 115 /// Make sure the socket returns instantly when calling listen() 116 if (!setNonBlock(fd)) { 117 static if (LOG) log("Close socket"); 118 close(fd); 119 return 0; 120 } 121 122 /// Bind and listen to socket 123 err = bind(fd, ctxt.local.name, ctxt.local.nameLen); 124 if (catchError!"bind"(err)) { 125 static if (LOG) log("Close socket"); 126 close(fd); 127 return 0; 128 } 129 130 err = listen(fd, SOMAXCONN); 131 if (catchError!"listen"(err)) { 132 static if (LOG) log("Close socket"); 133 close(fd); 134 return 0; 135 } 136 137 return fd; 138 } 139 140 fd_t run(AsyncSocket ctxt) 141 { 142 import core.sys.posix.unistd : close; 143 import libasync.internals.socket_compat : socket; 144 145 m_status = StatusInfo.init; 146 147 auto fd = ctxt.preInitializedHandle; 148 149 if (fd == INVALID_SOCKET) { 150 fd = socket(ctxt.info.domain, ctxt.info.type, ctxt.info.protocol); 151 } 152 153 if (catchError!"socket"(fd)) { 154 .error("Failed to create socket: ", error); 155 return INVALID_SOCKET; 156 } 157 158 if (!setNonBlock(fd)) { 159 .error("Failed to set socket non-blocking"); 160 close(fd); 161 return INVALID_SOCKET; 162 } 163 164 import core.sys.posix.unistd; 165 166 EventObject eventObject; 167 eventObject.socket = ctxt; 168 EventInfo* eventInfo = assumeWontThrow(ThreadMem.alloc!EventInfo(fd, EventType.Socket, eventObject, m_instanceId)); 169 170 ctxt.evInfo = eventInfo; 171 nothrow auto cleanup() { 172 assumeWontThrow(ThreadMem.free(eventInfo)); 173 ctxt.evInfo = null; 174 // socket must be closed by the caller 175 return INVALID_SOCKET; 176 } 177 178 fd_t err; 179 static if (EPOLL) 180 { 181 epoll_event osEvent; 182 osEvent.data.ptr = eventInfo; 183 osEvent.events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP; 184 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &osEvent); 185 if (catchError!"epoll_ctl"(err)) { 186 return cleanup(); 187 } 188 } 189 else /* if KQUEUE */ 190 { 191 kevent_t[2] osEvent; 192 EV_SET(&(osEvent[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, eventInfo); 193 EV_SET(&(osEvent[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, eventInfo); 194 err = kevent(m_kqueuefd, &(osEvent[0]), 2, null, 0, null); 195 if (catchError!"kevent_add_udp"(err)) { 196 return cleanup(); 197 } 198 } 199 200 return fd; 201 } 202 203 import libasync.internals.socket_compat : sockaddr, socklen_t; 204 bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen) 205 { 206 import libasync.internals.socket_compat : bind; 207 208 auto err = bind(ctxt.handle, addr, addrlen); 209 if (catchError!"bind"(err)) { 210 .error("Failed to bind socket: ", error); 211 assumeWontThrow(ThreadMem.free(ctxt.evInfo)); 212 ctxt.evInfo = null; 213 return false; 214 } 215 216 return true; 217 } 218 219 bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen) 220 { 221 import libasync.internals.socket_compat : connect; 222 223 auto err = connect(ctxt.handle, addr, addrlen); 224 if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t) SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ])) { 225 return true; 226 } else if (catchError!"connect"(err)) { 227 .error("Failed to connect socket: ", error); 228 assumeWontThrow(ThreadMem.free(ctxt.evInfo)); 229 ctxt.evInfo = null; 230 return false; 231 } 232 233 return true; 234 } 235 236 bool listen(AsyncSocket ctxt, int backlog) 237 { 238 import libasync.internals.socket_compat : listen; 239 240 auto err = listen(ctxt.handle, backlog); 241 if (catchError!"bind"(err)) { 242 .error("Failed to listen on socket: ", error); 243 assumeWontThrow(ThreadMem.free(ctxt.evInfo)); 244 ctxt.evInfo = null; 245 return false; 246 } 247 248 return true; 249 } 250 251 bool kill(AsyncSocket ctxt, bool forced = false) { 252 m_status = StatusInfo.init; 253 254 import core.sys.posix.unistd : close; 255 256 auto fd = ctxt.resetHandle(); 257 258 static if (EPOLL) 259 { 260 epoll_event osEvent = void; 261 epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &osEvent); 262 } 263 else /* if KQUEUE */ 264 { 265 kevent_t osEvent; 266 EV_SET(&osEvent, fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 267 EV_SET(&osEvent, fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 268 } 269 270 if (ctxt.connectionOriented && ctxt.passive) { 271 foreach (request; m_completedSocketAccepts) if (request.socket is ctxt) { 272 m_completedSocketAccepts.removeFront(); 273 auto socket = request.socket; 274 auto peer = request.onComplete(request.peer, request.family, socket.info.type, socket.info.protocol); 275 assumeWontThrow(AsyncAcceptRequest.free(request)); 276 if (!peer.run) return false; 277 } 278 } 279 280 if (!ctxt.passive) { 281 foreach (request; m_completedSocketReceives) if (request.socket is ctxt) { 282 m_completedSocketReceives.removeFront(); 283 if (request.message) { 284 assumeWontThrow(request.onComplete.get!0)(request.message.transferred); 285 assumeWontThrow(NetworkMessage.free(request.message)); 286 } else { 287 assumeWontThrow(request.onComplete.get!1)(); 288 } 289 assumeWontThrow(AsyncReceiveRequest.free(request)); 290 } 291 292 foreach (request; m_completedSocketSends) if (request.socket is ctxt) { 293 m_completedSocketSends.removeFront(); 294 request.onComplete(); 295 assumeWontThrow(NetworkMessage.free(request.message)); 296 assumeWontThrow(AsyncSendRequest.free(request)); 297 } 298 } 299 300 foreach (request; ctxt.m_pendingReceives) { 301 ctxt.m_pendingReceives.removeFront(); 302 if (request.message) assumeWontThrow(NetworkMessage.free(request.message)); 303 assumeWontThrow(AsyncReceiveRequest.free(request)); 304 } 305 306 foreach (request; ctxt.m_pendingSends) { 307 ctxt.m_pendingSends.removeFront(); 308 assumeWontThrow(NetworkMessage.free(request.message)); 309 assumeWontThrow(AsyncSendRequest.free(request)); 310 } 311 312 if (ctxt.connectionOriented && !ctxt.passive && ctxt.connected) { 313 bool has_socket = fd != INVALID_SOCKET; 314 ctxt.disconnecting = true; 315 316 if (forced) { 317 ctxt.connected = false; 318 ctxt.disconnecting = false; 319 if (ctxt.evInfo !is null) { 320 assumeWontThrow(ThreadMem.free(ctxt.evInfo)); 321 ctxt.evInfo = null; 322 } 323 } 324 325 return has_socket ? closeSocket(fd, true, forced) : true; 326 } 327 328 fd_t err = close(fd); 329 if (catchError!"socket close"(err)) { 330 return false; 331 } 332 333 if (ctxt.evInfo !is null) { 334 assumeWontThrow(ThreadMem.free(ctxt.evInfo)); 335 ctxt.evInfo = null; 336 } 337 338 return true; 339 } 340 341 bool run(AsyncEvent ctxt, EventHandler del) 342 { 343 fd_t fd = ctxt.id; 344 import libasync.internals.socket_compat : bind; 345 import core.sys.posix.unistd; 346 347 fd_t err; 348 349 EventObject eo; 350 eo.eventHandler = del; 351 EventInfo* ev; 352 try ev = ThreadMem.alloc!EventInfo(fd, EventType.Event, eo, m_instanceId); 353 catch (Exception e){ assert(false, "Allocation error"); } 354 ctxt.evInfo = ev; 355 nothrow bool closeAll() { 356 try ThreadMem.free(ev); 357 catch(Exception e){ assert(false, "Failed to free resources"); } 358 ctxt.evInfo = null; 359 // fd must be closed by the caller if return false 360 return false; 361 } 362 363 static if (EPOLL) 364 { 365 epoll_event _event; 366 _event.data.ptr = ev; 367 _event.events = EPOLLIN | EPOLLOUT | EPOLLET; 368 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 369 if (catchError!"epoll_ctl"(err)) { 370 return closeAll(); 371 } 372 } 373 else /* if KQUEUE */ 374 { 375 kevent_t[2] _event; 376 EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev); 377 EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev); 378 err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null); 379 if (catchError!"kevent_add_udp"(err)) 380 return closeAll(); 381 382 } 383 return true; 384 } 385 386 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 387 in { 388 //assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 389 assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 390 } 391 body { 392 m_status = StatusInfo.init; 393 import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP; 394 import core.sys.posix.unistd : close; 395 import core.sync.mutex; 396 fd_t fd = ctxt.socket; 397 bool reusing = true; 398 try if (fd == 0) { 399 reusing = false; 400 /// Create the listening socket 401 synchronized(g_mutex) { 402 fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP); 403 if (catchError!("run AsyncTCPAccept")(fd)) 404 return 0; 405 /// Allow multiple threads to listen on this address 406 if (!setOption(fd, TCPOption.REUSEADDR, true)) { 407 static if (LOG) log("Close socket"); 408 close(fd); 409 return 0; 410 } 411 if (!setOption(fd, TCPOption.REUSEPORT, true)) { 412 static if (LOG) log("Close socket"); 413 close(fd); 414 return 0; 415 } 416 } 417 418 /// Make sure the socket returns instantly when calling listen() 419 if (!setNonBlock(fd)) { 420 static if (LOG) log("Close socket"); 421 close(fd); 422 return 0; 423 } 424 425 // todo: defer accept 426 427 /// Automatically starts connections with noDelay if specified 428 if (ctxt.noDelay) { 429 if (!setOption(fd, TCPOption.NODELAY, true)) { 430 static if (LOG) try log("Closing connection"); catch (Throwable e) {} 431 close(fd); 432 return 0; 433 } 434 } 435 436 } catch (Throwable e) { assert(false, "Error in synchronized listener starter"); } 437 438 /// Setup the event polling 439 if (!initTCPListener(fd, ctxt, del, reusing)) { 440 static if (LOG) log("Close socket"); 441 close(fd); 442 return 0; 443 } 444 445 446 return fd; 447 448 } 449 450 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 451 m_status = StatusInfo.init; 452 453 import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP; 454 import core.sys.posix.unistd; 455 fd_t fd = ctxt.preInitializedSocket; 456 457 static if (LOG) try log("Address: " ~ ctxt.local.toString()); catch (Throwable e) {} 458 459 if (fd == fd_t.init) 460 fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP); 461 462 463 if (catchError!("run AsyncUDPSocket")(fd)) 464 return 0; 465 466 if (!setNonBlock(fd)) { 467 close(fd); 468 return 0; 469 } 470 471 if (!initUDPSocket(fd, ctxt, del)) { 472 close(fd); 473 return 0; 474 } 475 476 static if (LOG) try log("UDP Socket started FD#" ~ fd.to!string); 477 catch{} 478 /* 479 static if (!EPOLL) { 480 gs_fdPool.insert(fd); 481 }*/ 482 483 return fd; 484 } 485 486 fd_t run(AsyncNotifier ctxt) 487 { 488 489 m_status = StatusInfo.init; 490 491 fd_t err; 492 static if (EPOLL) { 493 fd_t fd = eventfd(0, EFD_NONBLOCK); 494 495 if (catchSocketError!("run AsyncNotifier")(fd)) 496 return 0; 497 498 epoll_event _event; 499 _event.events = EPOLLIN | EPOLLET; 500 } 501 else /* if KQUEUE */ 502 { 503 kevent_t _event; 504 fd_t fd = cast(fd_t)createIndex(); 505 } 506 EventType evtype = EventType.Notifier; 507 NotifierHandler evh; 508 evh.ctxt = ctxt; 509 510 evh.fct = (AsyncNotifier ctxt) { 511 try { 512 ctxt.handler(); 513 } catch (Exception e) { 514 //setInternalError!"AsyncTimer handler"(Status.ERROR); 515 } 516 }; 517 518 EventObject eobj; 519 eobj.notifierHandler = evh; 520 521 EventInfo* evinfo; 522 523 if (!ctxt.evInfo) { 524 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 525 catch (Exception e) { 526 assert(false, "Failed to allocate resources: " ~ e.msg); 527 } 528 529 ctxt.evInfo = evinfo; 530 } 531 532 static if (EPOLL) { 533 _event.data.ptr = cast(void*) evinfo; 534 535 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 536 if (catchSocketError!("epoll_add(eventfd)")(err)) 537 return fd_t.init; 538 } 539 else /* if KQUEUE */ 540 { 541 EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo); 542 543 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 544 545 if (catchError!"kevent_addSignal"(err)) 546 return fd_t.init; 547 } 548 549 return fd; 550 551 552 } 553 554 fd_t run(shared AsyncSignal ctxt) 555 { 556 static if (EPOLL) { 557 558 m_status = StatusInfo.init; 559 560 ctxt.evInfo = cast(shared) m_evSignal; 561 562 return cast(fd_t) (__libc_current_sigrtmin()); 563 } 564 else 565 { 566 m_status = StatusInfo.init; 567 568 ctxt.evInfo = cast(shared) m_evSignal; 569 570 return cast(fd_t) createIndex(ctxt); 571 572 } 573 } 574 575 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 576 m_status = StatusInfo.init; 577 578 static if (EPOLL) 579 { 580 import core.sys.posix.time : itimerspec, CLOCK_REALTIME; 581 582 fd_t fd = ctxt.id; 583 itimerspec its; 584 585 its.it_value.tv_sec = cast(typeof(its.it_value.tv_sec)) timeout.split!("seconds", "nsecs")().seconds; 586 its.it_value.tv_nsec = cast(typeof(its.it_value.tv_nsec)) timeout.split!("seconds", "nsecs")().nsecs; 587 if (!ctxt.oneShot) 588 { 589 its.it_interval.tv_sec = its.it_value.tv_sec; 590 its.it_interval.tv_nsec = its.it_value.tv_nsec; 591 592 } 593 594 if (fd == fd_t.init) { 595 fd = timerfd_create(CLOCK_REALTIME, 0); 596 if (catchError!"timer_create"(fd)) 597 return 0; 598 } 599 int err = timerfd_settime(fd, 0, &its, null); 600 601 if (catchError!"timer_settime"(err)) 602 return 0; 603 epoll_event _event; 604 605 EventType evtype; 606 evtype = EventType.Timer; 607 EventObject eobj; 608 eobj.timerHandler = del; 609 610 EventInfo* evinfo; 611 612 if (!ctxt.evInfo) { 613 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 614 catch (Exception e) { 615 assert(false, "Failed to allocate resources: " ~ e.msg); 616 } 617 618 ctxt.evInfo = evinfo; 619 } 620 else { 621 evinfo = ctxt.evInfo; 622 evinfo.evObj = eobj; 623 } 624 _event.events |= EPOLLIN | EPOLLET; 625 _event.data.ptr = evinfo; 626 if (ctxt.id > 0) { 627 err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 628 629 if (catchError!"epoll_ctl"(err)) 630 return fd_t.init; 631 } 632 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 633 634 if (catchError!"timer_epoll_add"(err)) 635 return 0; 636 return fd; 637 } 638 else /* if KQUEUE */ 639 { 640 fd_t fd = ctxt.id; 641 642 if (ctxt.id == 0) 643 fd = cast(fd_t) createIndex(); 644 EventType evtype; 645 evtype = EventType.Timer; 646 647 EventObject eobj; 648 eobj.timerHandler = del; 649 650 EventInfo* evinfo; 651 652 if (!ctxt.evInfo) { 653 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 654 catch (Exception e) { 655 assert(false, "Failed to allocate resources: " ~ e.msg); 656 } 657 658 ctxt.evInfo = evinfo; 659 } 660 else { 661 evinfo = ctxt.evInfo; 662 evinfo.evObj = eobj; 663 } 664 665 kevent_t _event; 666 int msecs = cast(int) timeout.total!"msecs"; 667 ushort flags_ = EV_ADD | EV_ENABLE; 668 //if (ctxt.oneShot) 669 // flags_ |= EV_CLEAR; 670 671 // www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally 672 EV_SET(&_event, fd, EVFILT_TIMER, flags_, 0, msecs + 30, cast(void*) evinfo); 673 674 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 675 676 if (catchError!"kevent_timer_add"(err)) 677 return 0; 678 679 return fd; 680 681 } 682 683 } 684 685 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 686 687 688 static if (EPOLL) 689 { 690 import core.sys.linux.sys.inotify; 691 enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect 692 assert(ctxt.fd == fd_t.init); 693 int fd = inotify_init1(IN_NONBLOCK); 694 if (catchError!"inotify_init1"(fd)) { 695 return fd_t.init; 696 } 697 epoll_event _event; 698 699 EventType evtype; 700 701 evtype = EventType.DirectoryWatcher; 702 EventObject eobj; 703 eobj.dwHandler = del; 704 705 EventInfo* evinfo; 706 707 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 708 709 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 710 catch (Exception e) { 711 assert(false, "Failed to allocate resources: " ~ e.msg); 712 } 713 714 ctxt.evInfo = evinfo; 715 716 _event.events |= EPOLLIN; 717 _event.data.ptr = evinfo; 718 719 int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 720 721 if (catchError!"epoll_ctl"(err)) 722 return fd_t.init; 723 return fd; 724 } 725 else /* if KQUEUE */ { 726 static size_t id = 0; 727 728 fd_t fd = cast(uint)++id; 729 730 EventType evtype; 731 732 evtype = EventType.DirectoryWatcher; 733 EventObject eobj; 734 eobj.dwHandler = del; 735 736 EventInfo* evinfo; 737 738 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 739 740 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 741 catch (Exception e) { 742 assert(false, "Failed to allocate resources: " ~ e.msg); 743 } 744 ctxt.evInfo = evinfo; 745 try m_watchers[fd] = evinfo; catch (Throwable e) {} 746 747 try m_changes[fd] = ThreadMem.alloc!(Array!DWChangeInfo)(); 748 catch (Exception e) { 749 assert(false, "Failed to allocate resources: " ~ e.msg); 750 } 751 752 /// events will be created in watch() 753 754 return fd; 755 756 } 757 } 758 759 AsyncUDSConnection accept(AsyncUDSListener ctxt) { 760 import core.stdc.errno : errno, EAGAIN, EWOULDBLOCK; 761 import libasync.internals.socket_compat : accept; 762 763 auto clientSocket = accept(ctxt.socket, null, null); 764 if (clientSocket == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { 765 // No more new incoming connections 766 return null; 767 } else if (catchError!".accept"(clientSocket)) { 768 // An error occured 769 return null; 770 } 771 772 // Allocate a new connection handler 773 AsyncUDSConnection conn; 774 try conn = ThreadMem.alloc!AsyncUDSConnection(ctxt.m_evLoop, clientSocket); 775 catch (Exception e){ assert(false, "Allocation failure"); } 776 conn.inbound = true; 777 778 return conn; 779 } 780 781 bool kill(AsyncDirectoryWatcher ctxt) { 782 783 static if (EPOLL) { 784 import core.sys.posix.unistd : close; 785 try 786 { 787 Array!(Tuple!(fd_t, uint)) remove_list; 788 foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) { 789 if (info.fd == ctxt.fd) 790 remove_list.insertBack(key); 791 } 792 793 foreach (Tuple!(fd_t, uint) key; remove_list[]) { 794 unwatch(key[0] /*fd_t*/, key[1]); 795 } 796 797 close(ctxt.fd); 798 ThreadMem.free(ctxt.evInfo); 799 ctxt.evInfo = null; 800 } 801 catch (Exception e) 802 { 803 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 804 return false; 805 } 806 807 } else /* if KQUEUE */ { 808 try { 809 Array!fd_t remove_list; 810 811 foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) { 812 if (info.fd == ctxt.fd) 813 remove_list.insertBack(wd); 814 } 815 816 foreach (wd; remove_list[]) { 817 unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries 818 } 819 820 ThreadMem.free(m_changes[ctxt.fd]); 821 m_watchers.remove(ctxt.fd); 822 m_changes.remove(ctxt.fd); 823 } 824 catch (Exception e) { 825 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 826 return false; 827 } 828 } 829 return true; 830 } 831 832 bool kill(AsyncTCPConnection ctxt, bool forced = false) 833 { 834 static if (LOG) log("Kill socket"); 835 m_status = StatusInfo.init; 836 fd_t fd = ctxt.socket; 837 bool has_socket = fd > 0; 838 ctxt.disconnecting = true; 839 if (forced) { 840 ctxt.connected = false; 841 ctxt.disconnecting = false; 842 if (ctxt.evInfo) { 843 try ThreadMem.free(ctxt.evInfo); 844 catch (Throwable e) { assert(false, "Failed to free resources"); } 845 ctxt.evInfo = null; 846 } 847 if (ctxt.inbound) try ThreadMem.free(ctxt); 848 catch (Throwable t) { assert(false, "Failed to free resources for context " ~ (cast(void*)ctxt).to!string ~ ": " ~ t.to!string); } 849 } 850 return has_socket ? closeSocket(fd, true, forced) : true; 851 } 852 853 bool kill(AsyncTCPListener ctxt) 854 { 855 static if (LOG) log("Kill listener"); 856 m_status = StatusInfo.init; 857 nothrow void cleanup() { 858 try ThreadMem.free(ctxt.evInfo); 859 catch (Throwable e) { assert(false, "Failed to free resources"); } 860 ctxt.evInfo = null; 861 } 862 scope(exit) { 863 cleanup(); 864 } 865 866 fd_t fd = ctxt.socket; 867 868 return closeSocket(fd, false, true); 869 } 870 871 bool kill(AsyncNotifier ctxt) 872 { 873 static if (EPOLL) 874 { 875 import core.sys.posix.unistd : close; 876 fd_t err = close(ctxt.id); 877 878 if (catchError!"close(eventfd)"(err)) 879 return false; 880 881 try ThreadMem.free(ctxt.evInfo); 882 catch (Exception e){ assert(false, "Error freeing resources"); } 883 884 return true; 885 } 886 else /* if KQUEUE */ 887 { 888 scope(exit) destroyIndex(ctxt); 889 890 if (ctxt.id == fd_t.init) 891 return false; 892 893 kevent_t _event; 894 EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null); 895 896 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 897 898 try ThreadMem.free(ctxt.evInfo); 899 catch (Exception e){ assert(false, "Error freeing resources"); } 900 901 if (catchError!"kevent_del(notifier)"(err)) { 902 return false; 903 } 904 return true; 905 } 906 907 } 908 909 bool kill(shared AsyncSignal ctxt) 910 { 911 912 static if (EPOLL) { 913 ctxt.evInfo = null; 914 } 915 else 916 { 917 m_status = StatusInfo.init; 918 destroyIndex(ctxt); 919 } 920 return true; 921 } 922 923 bool kill(AsyncTimer ctxt) { 924 import core.sys.posix.time; 925 m_status = StatusInfo.init; 926 927 static if (EPOLL) 928 { 929 import core.sys.posix.unistd : close; 930 fd_t err = close(ctxt.id); 931 if (catchError!"timer_kill"(err)) 932 return false; 933 934 if (ctxt.evInfo) { 935 try ThreadMem.free(ctxt.evInfo); 936 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 937 ctxt.evInfo = null; 938 } 939 940 } 941 else /* if KQUEUE */ 942 { 943 scope(exit) 944 destroyIndex(ctxt); 945 946 if (ctxt.evInfo) { 947 try ThreadMem.free(ctxt.evInfo); 948 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 949 ctxt.evInfo = null; 950 } 951 kevent_t _event; 952 EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null); 953 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 954 if (catchError!"kevent_del(timer)"(err)) 955 return false; 956 } 957 return true; 958 } 959 960 bool kill(AsyncUDPSocket ctxt) { 961 import core.sys.posix.unistd : close; 962 963 964 m_status = StatusInfo.init; 965 966 fd_t fd = ctxt.socket; 967 fd_t err = close(fd); 968 969 if (catchError!"udp close"(err)) 970 return false; 971 972 static if (!EPOLL) 973 { 974 kevent_t[2] events; 975 EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null); 976 EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null); 977 err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 978 979 if (catchError!"event_del(udp)"(err)) 980 return false; 981 } 982 983 try ThreadMem.free(ctxt.evInfo); 984 catch (Exception e){ 985 assert(false, "Failed to free resources: " ~ e.msg); 986 } 987 ctxt.evInfo = null; 988 return true; 989 } 990 991 bool kill(AsyncEvent ctxt, bool forced = false) { 992 import core.sys.posix.unistd : close; 993 994 static if (LOG) log("Kill event"); 995 m_status = StatusInfo.init; 996 fd_t fd = ctxt.id; 997 998 if (ctxt.stateful) { 999 bool has_socket = fd > 0; 1000 ctxt.disconnecting = true; 1001 1002 if (forced) { 1003 ctxt.connected = false; 1004 ctxt.disconnecting = false; 1005 if (ctxt.evInfo) { 1006 try ThreadMem.free(ctxt.evInfo); 1007 catch (Exception e) { 1008 assert(false, "Failed to free resources: " ~ e.msg); 1009 } 1010 ctxt.evInfo = null; 1011 } 1012 } 1013 1014 return has_socket ? closeSocket(fd, true, forced) : true; 1015 } 1016 1017 fd_t err = close(fd); 1018 if (catchError!"event close"(err)) 1019 return false; 1020 1021 if (ctxt.evInfo) { 1022 try ThreadMem.free(ctxt.evInfo); 1023 catch (Exception e) { 1024 assert(false, "Failed to free resources: " ~ e.msg); 1025 } 1026 ctxt.evInfo = null; 1027 } 1028 1029 return true; 1030 } 1031 1032 }