1 module libasync.posix2; 2 3 // workaround for IDE indent bug on too big files 4 mixin template RunKill() 5 { 6 7 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 8 in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); } 9 body { 10 m_status = StatusInfo.init; 11 import libasync.internals.socket_compat : socket, SOCK_STREAM, IPPROTO_TCP; 12 import core.sys.posix.unistd : close; 13 fd_t fd = ctxt.preInitializedSocket; 14 15 if (fd == fd_t.init) 16 fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP); 17 18 if (catchError!("run AsyncTCPConnection")(fd)) 19 return 0; 20 21 /// Make sure the socket doesn't block on recv/send 22 if (!setNonBlock(fd)) { 23 log("Close socket"); 24 close(fd); 25 return 0; 26 } 27 28 /// Enable Nagel's algorithm if specified 29 if (ctxt.noDelay) { 30 if (!setOption(fd, TCPOption.NODELAY, true)) { 31 try log("Closing connection"); catch {} 32 close(fd); 33 return 0; 34 } 35 } 36 37 /// Trigger the connection setup instructions 38 if (!initTCPConnection(fd, ctxt, del)) { 39 close(fd); 40 return 0; 41 } 42 43 return fd; 44 45 } 46 47 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 48 in { 49 //assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 50 assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 51 } 52 body { 53 m_status = StatusInfo.init; 54 import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP; 55 import core.sys.posix.unistd : close; 56 import core.sync.mutex; 57 __gshared Mutex g_mtx; 58 fd_t fd = ctxt.socket; 59 bool reusing = true; 60 try if (fd == 0) { 61 reusing = false; 62 /// Create the listening socket 63 synchronized(g_mutex) { 64 fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP); 65 if (catchError!("run AsyncTCPAccept")(fd)) 66 return 0; 67 /// Allow multiple threads to listen on this address 68 if (!setOption(fd, TCPOption.REUSEADDR, true)) { 69 log("Close socket"); 70 close(fd); 71 return 0; 72 } 73 } 74 75 /// Make sure the socket returns instantly when calling listen() 76 if (!setNonBlock(fd)) { 77 log("Close socket"); 78 close(fd); 79 return 0; 80 } 81 82 // todo: defer accept 83 84 /// Automatically starts connections with noDelay if specified 85 if (ctxt.noDelay) { 86 if (!setOption(fd, TCPOption.NODELAY, true)) { 87 try log("Closing connection"); catch {} 88 close(fd); 89 return 0; 90 } 91 } 92 93 } catch { assert(false, "Error in synchronized listener starter"); } 94 95 /// Setup the event polling 96 if (!initTCPListener(fd, ctxt, del, reusing)) { 97 log("Close socket"); 98 close(fd); 99 return 0; 100 } 101 102 103 return fd; 104 105 } 106 107 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 108 m_status = StatusInfo.init; 109 110 import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP; 111 import core.sys.posix.unistd; 112 fd_t fd = ctxt.preInitializedSocket; 113 114 try log("Address: " ~ ctxt.local.toString()); catch {} 115 116 if (fd == fd_t.init) 117 fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP); 118 119 120 if (catchError!("run AsyncUDPSocket")(fd)) 121 return 0; 122 123 if (!setNonBlock(fd)) 124 return 0; 125 126 if (!initUDPSocket(fd, ctxt, del)) 127 return 0; 128 129 try log("UDP Socket started FD#" ~ fd.to!string); 130 catch{} 131 /* 132 static if (!EPOLL) { 133 gs_fdPool.insert(fd); 134 }*/ 135 136 return fd; 137 } 138 139 fd_t run(AsyncNotifier ctxt) 140 { 141 142 m_status = StatusInfo.init; 143 144 fd_t err; 145 static if (EPOLL) { 146 fd_t fd = eventfd(0, EFD_NONBLOCK); 147 148 if (catchSocketError!("run AsyncNotifier")(fd)) 149 return 0; 150 151 epoll_event _event; 152 _event.events = EPOLLIN | EPOLLET; 153 } 154 else /* if KQUEUE */ 155 { 156 kevent_t _event; 157 fd_t fd = cast(fd_t)createIndex(); 158 } 159 EventType evtype = EventType.Notifier; 160 NotifierHandler evh; 161 evh.ctxt = ctxt; 162 163 evh.fct = (AsyncNotifier ctxt) { 164 try { 165 ctxt.handler(); 166 } catch (Exception e) { 167 //setInternalError!"AsyncTimer handler"(Status.ERROR); 168 } 169 }; 170 171 EventObject eobj; 172 eobj.notifierHandler = evh; 173 174 EventInfo* evinfo; 175 176 if (!ctxt.evInfo) { 177 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 178 catch (Exception e) { 179 assert(false, "Failed to allocate resources: " ~ e.msg); 180 } 181 182 ctxt.evInfo = evinfo; 183 } 184 185 static if (EPOLL) { 186 _event.data.ptr = cast(void*) evinfo; 187 188 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 189 if (catchSocketError!("epoll_add(eventfd)")(err)) 190 return fd_t.init; 191 } 192 else /* if KQUEUE */ 193 { 194 EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo); 195 196 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 197 198 if (catchError!"kevent_addSignal"(err)) 199 return fd_t.init; 200 } 201 202 return fd; 203 204 205 } 206 207 fd_t run(shared AsyncSignal ctxt) 208 { 209 static if (EPOLL) { 210 211 m_status = StatusInfo.init; 212 213 ctxt.evInfo = cast(shared) m_evSignal; 214 215 return cast(fd_t) (__libc_current_sigrtmin()); 216 } 217 else 218 { 219 m_status = StatusInfo.init; 220 221 ctxt.evInfo = cast(shared) m_evSignal; 222 223 return cast(fd_t) createIndex(ctxt); 224 225 } 226 } 227 228 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 229 m_status = StatusInfo.init; 230 231 static if (EPOLL) 232 { 233 import core.sys.posix.time : itimerspec, CLOCK_REALTIME; 234 235 fd_t fd = ctxt.id; 236 itimerspec its; 237 238 its.it_value.tv_sec = cast(typeof(its.it_value.tv_sec)) timeout.split!("seconds", "nsecs")().seconds; 239 its.it_value.tv_nsec = cast(typeof(its.it_value.tv_nsec)) timeout.split!("seconds", "nsecs")().nsecs; 240 if (!ctxt.oneShot) 241 { 242 its.it_interval.tv_sec = its.it_value.tv_sec; 243 its.it_interval.tv_nsec = its.it_value.tv_nsec; 244 245 } 246 247 if (fd == fd_t.init) { 248 fd = timerfd_create(CLOCK_REALTIME, 0); 249 if (catchError!"timer_create"(fd)) 250 return 0; 251 } 252 int err = timerfd_settime(fd, 0, &its, null); 253 254 if (catchError!"timer_settime"(err)) 255 return 0; 256 epoll_event _event; 257 258 EventType evtype; 259 evtype = EventType.Timer; 260 EventObject eobj; 261 eobj.timerHandler = del; 262 263 EventInfo* evinfo; 264 265 if (!ctxt.evInfo) { 266 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 267 catch (Exception e) { 268 assert(false, "Failed to allocate resources: " ~ e.msg); 269 } 270 271 ctxt.evInfo = evinfo; 272 } 273 else { 274 evinfo = ctxt.evInfo; 275 evinfo.evObj = eobj; 276 } 277 _event.events |= EPOLLIN | EPOLLET; 278 _event.data.ptr = evinfo; 279 if (ctxt.id > 0) { 280 err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 281 282 if (catchError!"epoll_ctl"(err)) 283 return fd_t.init; 284 } 285 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 286 287 if (catchError!"timer_epoll_add"(err)) 288 return 0; 289 return fd; 290 } 291 else /* if KQUEUE */ 292 { 293 fd_t fd = ctxt.id; 294 295 if (ctxt.id == 0) 296 fd = cast(fd_t) createIndex(); 297 EventType evtype; 298 evtype = EventType.Timer; 299 300 EventObject eobj; 301 eobj.timerHandler = del; 302 303 EventInfo* evinfo; 304 305 if (!ctxt.evInfo) { 306 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 307 catch (Exception e) { 308 assert(false, "Failed to allocate resources: " ~ e.msg); 309 } 310 311 ctxt.evInfo = evinfo; 312 } 313 else { 314 evinfo = ctxt.evInfo; 315 evinfo.evObj = eobj; 316 } 317 318 kevent_t _event; 319 320 int msecs = cast(int) timeout.total!"msecs"; 321 322 // www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally 323 EV_SET(&_event, fd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, msecs, cast(void*) evinfo); 324 325 if (ctxt.oneShot) 326 _event.flags |= EV_ONESHOT; 327 328 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 329 330 if (catchError!"kevent_timer_add"(err)) 331 return 0; 332 333 return fd; 334 335 } 336 337 } 338 339 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 340 341 342 static if (EPOLL) 343 { 344 import core.sys.linux.sys.inotify; 345 enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect 346 assert(ctxt.fd == fd_t.init); 347 int fd = inotify_init1(IN_NONBLOCK); 348 if (catchError!"inotify_init1"(fd)) { 349 return fd_t.init; 350 } 351 epoll_event _event; 352 353 EventType evtype; 354 355 evtype = EventType.DirectoryWatcher; 356 EventObject eobj; 357 eobj.dwHandler = del; 358 359 EventInfo* evinfo; 360 361 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 362 363 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 364 catch (Exception e) { 365 assert(false, "Failed to allocate resources: " ~ e.msg); 366 } 367 368 ctxt.evInfo = evinfo; 369 370 _event.events |= EPOLLIN; 371 _event.data.ptr = evinfo; 372 373 int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 374 375 if (catchError!"epoll_ctl"(err)) 376 return fd_t.init; 377 return fd; 378 } 379 else /* if KQUEUE */ { 380 static size_t id = 0; 381 382 fd_t fd = cast(uint)++id; 383 384 EventType evtype; 385 386 evtype = EventType.DirectoryWatcher; 387 EventObject eobj; 388 eobj.dwHandler = del; 389 390 EventInfo* evinfo; 391 392 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 393 394 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 395 catch (Exception e) { 396 assert(false, "Failed to allocate resources: " ~ e.msg); 397 } 398 ctxt.evInfo = evinfo; 399 try m_watchers[fd] = evinfo; catch {} 400 401 try m_changes[fd] = FreeListObjectAlloc!(Array!DWChangeInfo).alloc(); 402 catch (Exception e) { 403 assert(false, "Failed to allocate resources: " ~ e.msg); 404 } 405 406 /// events will be created in watch() 407 408 return fd; 409 410 } 411 } 412 413 bool kill(AsyncDirectoryWatcher ctxt) { 414 415 static if (EPOLL) { 416 import core.sys.posix.unistd : close; 417 try 418 { 419 Array!(Tuple!(fd_t, uint)) remove_list; 420 foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) { 421 if (info.fd == ctxt.fd) 422 remove_list.insertBack(key); 423 } 424 425 foreach (Tuple!(fd_t, uint) key; remove_list[]) { 426 unwatch(key[0] /*fd_t*/, key[1]); 427 } 428 429 close(ctxt.fd); 430 FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 431 ctxt.evInfo = null; 432 } 433 catch (Exception e) 434 { 435 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 436 return false; 437 } 438 439 } else /* if KQUEUE */ { 440 try { 441 Array!fd_t remove_list; 442 443 foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) { 444 if (info.fd == ctxt.fd) 445 remove_list.insertBack(wd); 446 } 447 448 foreach (wd; remove_list[]) { 449 unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries 450 } 451 452 FreeListObjectAlloc!(Array!DWChangeInfo).free(m_changes[ctxt.fd]); 453 m_watchers.remove(ctxt.fd); 454 m_changes.remove(ctxt.fd); 455 } 456 catch (Exception e) { 457 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 458 return false; 459 } 460 } 461 return true; 462 } 463 464 bool kill(AsyncTCPConnection ctxt, bool forced = false) 465 { 466 log("Kill socket"); 467 m_status = StatusInfo.init; 468 fd_t fd = ctxt.socket; 469 if (ctxt.connected) { 470 ctxt.disconnecting = true; 471 if (forced) { 472 if (ctxt.evInfo) 473 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 474 catch { assert(false, "Failed to free resources"); } 475 try FreeListObjectAlloc!AsyncTCPConnection.free(ctxt); 476 catch { assert(false, "Failed to free resources"); } 477 } 478 return closeSocket(fd, true, forced); 479 } 480 else { 481 ctxt.disconnecting = true; 482 if (forced) { 483 if (ctxt.evInfo) 484 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 485 catch { assert(false, "Failed to free resources"); } 486 try FreeListObjectAlloc!AsyncTCPConnection.free(ctxt); 487 catch { assert(false, "Failed to free resources"); } 488 } 489 return true; 490 } 491 } 492 493 bool kill(AsyncTCPListener ctxt) 494 { 495 log("Kill listener"); 496 m_status = StatusInfo.init; 497 nothrow void cleanup() { 498 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 499 catch { assert(false, "Failed to free resources"); } 500 ctxt.evInfo = null; 501 } 502 scope(exit) { 503 cleanup(); 504 } 505 506 fd_t fd = ctxt.socket; 507 508 return closeSocket(fd, false, true); 509 } 510 511 bool kill(AsyncNotifier ctxt) 512 { 513 static if (EPOLL) 514 { 515 import core.sys.posix.unistd : close; 516 fd_t err = close(ctxt.id); 517 518 if (catchError!"close(eventfd)"(err)) 519 return false; 520 521 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 522 catch (Exception e){ assert(false, "Error freeing resources"); } 523 524 return true; 525 } 526 else /* if KQUEUE */ 527 { 528 scope(exit) destroyIndex(ctxt); 529 530 if (ctxt.id == fd_t.init) 531 return false; 532 533 kevent_t _event; 534 EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null); 535 536 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 537 538 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 539 catch (Exception e){ assert(false, "Error freeing resources"); } 540 541 if (catchError!"kevent_del(notifier)"(err)) { 542 return false; 543 } 544 return true; 545 } 546 547 } 548 549 bool kill(shared AsyncSignal ctxt) 550 { 551 552 static if (EPOLL) { 553 ctxt.evInfo = null; 554 } 555 else 556 { 557 m_status = StatusInfo.init; 558 destroyIndex(ctxt); 559 } 560 return true; 561 } 562 563 bool kill(AsyncTimer ctxt) { 564 import core.sys.posix.time; 565 m_status = StatusInfo.init; 566 567 static if (EPOLL) 568 { 569 import core.sys.posix.unistd : close; 570 fd_t err = close(ctxt.id); 571 if (catchError!"timer_kill"(err)) 572 return false; 573 574 if (ctxt.evInfo) { 575 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 576 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 577 ctxt.evInfo = null; 578 } 579 580 } 581 else /* if KQUEUE */ 582 { 583 scope(exit) 584 destroyIndex(ctxt); 585 586 if (ctxt.evInfo) { 587 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 588 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 589 ctxt.evInfo = null; 590 } 591 kevent_t _event; 592 EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null); 593 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 594 if (catchError!"kevent_del(timer)"(err)) 595 return false; 596 } 597 return true; 598 } 599 600 bool kill(AsyncUDPSocket ctxt) { 601 import core.sys.posix.unistd : close; 602 603 604 m_status = StatusInfo.init; 605 606 fd_t fd = ctxt.socket; 607 fd_t err = close(fd); 608 609 if (catchError!"udp close"(err)) 610 return false; 611 612 static if (!EPOLL) 613 { 614 kevent_t[2] events; 615 EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null); 616 EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null); 617 err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 618 619 if (catchError!"event_del(udp)"(err)) 620 return false; 621 } 622 623 624 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 625 catch (Exception e){ 626 assert(false, "Failed to free resources: " ~ e.msg); 627 } 628 ctxt.evInfo = null; 629 return true; 630 } 631 632 }