1 module libasync.posix2; 2 3 // workaround for IDE indent bug on too big files 4 mixin template RunKill() 5 { 6 7 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 8 in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); } 9 body { 10 m_status = StatusInfo.init; 11 import libasync.internals.socket_compat : socket, SOCK_STREAM; 12 import core.sys.posix.unistd : close; 13 14 fd_t fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, 0); 15 16 if (catchError!("run AsyncTCPConnection")(fd)) 17 return 0; 18 19 /// Make sure the socket doesn't block on recv/send 20 if (!setNonBlock(fd)) { 21 log("Close socket"); 22 close(fd); 23 return 0; 24 } 25 26 /// Enable Nagel's algorithm if specified 27 if (ctxt.noDelay) { 28 if (!setOption(fd, TCPOption.NODELAY, true)) { 29 try log("Closing connection"); catch {} 30 close(fd); 31 return 0; 32 } 33 } 34 35 /// Trigger the connection setup instructions 36 if (!initTCPConnection(fd, ctxt, del)) { 37 close(fd); 38 return 0; 39 } 40 41 return fd; 42 43 } 44 45 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 46 in { 47 //assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 48 assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 49 } 50 body { 51 m_status = StatusInfo.init; 52 import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP; 53 import core.sys.posix.unistd : close; 54 import core.sync.mutex; 55 __gshared Mutex g_mtx; 56 fd_t fd = ctxt.socket; 57 bool reusing = true; 58 try if (fd == 0) { 59 reusing = false; 60 /// Create the listening socket 61 synchronized(g_mutex) { 62 fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP); 63 if (catchError!("run AsyncTCPAccept")(fd)) 64 return 0; 65 /// Allow multiple threads to listen on this address 66 if (!setOption(fd, TCPOption.REUSEADDR, true)) { 67 log("Close socket"); 68 close(fd); 69 return 0; 70 } 71 } 72 73 /// Make sure the socket returns instantly when calling listen() 74 if (!setNonBlock(fd)) { 75 log("Close socket"); 76 close(fd); 77 return 0; 78 } 79 80 // todo: defer accept 81 82 /// Automatically starts connections with noDelay if specified 83 if (ctxt.noDelay) { 84 if (!setOption(fd, TCPOption.NODELAY, true)) { 85 try log("Closing connection"); catch {} 86 close(fd); 87 return 0; 88 } 89 } 90 91 } catch { assert(false, "Error in synchronized listener starter"); } 92 93 /// Setup the event polling 94 if (!initTCPListener(fd, ctxt, del, reusing)) { 95 log("Close socket"); 96 close(fd); 97 return 0; 98 } 99 100 101 return fd; 102 103 } 104 105 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 106 m_status = StatusInfo.init; 107 108 import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP; 109 import core.sys.posix.unistd; 110 111 try log("Address: " ~ ctxt.local.toString()); catch {} 112 113 fd_t fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP); 114 115 116 if (catchError!("run AsyncUDPSocket")(fd)) 117 return 0; 118 119 if (!setNonBlock(fd)) 120 return 0; 121 122 if (!initUDPSocket(fd, ctxt, del)) 123 return 0; 124 125 try log("UDP Socket started FD#" ~ fd.to!string); 126 catch{} 127 /* 128 static if (!EPOLL) { 129 gs_fdPool.insert(fd); 130 }*/ 131 132 return fd; 133 } 134 135 fd_t run(AsyncNotifier ctxt) 136 { 137 138 m_status = StatusInfo.init; 139 140 fd_t err; 141 static if (EPOLL) { 142 fd_t fd = eventfd(0, EFD_NONBLOCK); 143 144 if (catchSocketError!("run AsyncNotifier")(fd)) 145 return 0; 146 147 epoll_event _event; 148 _event.events = EPOLLIN | EPOLLET; 149 } 150 else /* if KQUEUE */ 151 { 152 kevent_t _event; 153 fd_t fd = cast(fd_t)createIndex(); 154 } 155 EventType evtype = EventType.Notifier; 156 NotifierHandler evh; 157 evh.ctxt = ctxt; 158 159 evh.fct = (AsyncNotifier ctxt) { 160 try { 161 ctxt.handler(); 162 } catch (Exception e) { 163 //setInternalError!"AsyncTimer handler"(Status.ERROR); 164 } 165 }; 166 167 EventObject eobj; 168 eobj.notifierHandler = evh; 169 170 EventInfo* evinfo; 171 172 if (!ctxt.evInfo) { 173 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 174 catch (Exception e) { 175 assert(false, "Failed to allocate resources: " ~ e.msg); 176 } 177 178 ctxt.evInfo = evinfo; 179 } 180 181 static if (EPOLL) { 182 _event.data.ptr = cast(void*) evinfo; 183 184 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 185 if (catchSocketError!("epoll_add(eventfd)")(err)) 186 return fd_t.init; 187 } 188 else /* if KQUEUE */ 189 { 190 EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo); 191 192 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 193 194 if (catchError!"kevent_addSignal"(err)) 195 return fd_t.init; 196 } 197 198 return fd; 199 200 201 } 202 203 fd_t run(shared AsyncSignal ctxt) 204 { 205 static if (EPOLL) { 206 207 m_status = StatusInfo.init; 208 209 ctxt.evInfo = cast(shared) m_evSignal; 210 211 return cast(fd_t) (__libc_current_sigrtmin()); 212 } 213 else 214 { 215 m_status = StatusInfo.init; 216 217 ctxt.evInfo = cast(shared) m_evSignal; 218 219 return cast(fd_t) createIndex(ctxt); 220 221 } 222 } 223 224 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 225 m_status = StatusInfo.init; 226 227 static if (EPOLL) 228 { 229 import core.sys.posix.time : itimerspec, CLOCK_REALTIME; 230 231 fd_t fd = ctxt.id; 232 itimerspec its; 233 234 its.it_value.tv_sec = timeout.split!("seconds", "nsecs")().seconds; 235 its.it_value.tv_nsec = timeout.split!("seconds", "nsecs")().nsecs; 236 if (!ctxt.oneShot) 237 { 238 its.it_interval.tv_sec = its.it_value.tv_sec; 239 its.it_interval.tv_nsec = its.it_value.tv_nsec; 240 241 } 242 243 if (fd == fd_t.init) { 244 fd = timerfd_create(CLOCK_REALTIME, 0); 245 if (catchError!"timer_create"(fd)) 246 return 0; 247 } 248 int err = timerfd_settime(fd, 0, &its, null); 249 250 if (catchError!"timer_settime"(err)) 251 return 0; 252 epoll_event _event; 253 254 EventType evtype; 255 evtype = EventType.Timer; 256 EventObject eobj; 257 eobj.timerHandler = del; 258 259 EventInfo* evinfo; 260 261 if (!ctxt.evInfo) { 262 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 263 catch (Exception e) { 264 assert(false, "Failed to allocate resources: " ~ e.msg); 265 } 266 267 ctxt.evInfo = evinfo; 268 } 269 else { 270 evinfo = ctxt.evInfo; 271 evinfo.evObj = eobj; 272 } 273 _event.events |= EPOLLIN | EPOLLET; 274 _event.data.ptr = evinfo; 275 if (ctxt.id > 0) { 276 err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 277 278 if (catchError!"epoll_ctl"(err)) 279 return fd_t.init; 280 } 281 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 282 283 if (catchError!"timer_epoll_add"(err)) 284 return 0; 285 return fd; 286 } 287 else /* if KQUEUE */ 288 { 289 fd_t fd = ctxt.id; 290 291 if (ctxt.id == 0) 292 fd = cast(fd_t) createIndex(); 293 EventType evtype; 294 evtype = EventType.Timer; 295 296 EventObject eobj; 297 eobj.timerHandler = del; 298 299 EventInfo* evinfo; 300 301 if (!ctxt.evInfo) { 302 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 303 catch (Exception e) { 304 assert(false, "Failed to allocate resources: " ~ e.msg); 305 } 306 307 ctxt.evInfo = evinfo; 308 } 309 else { 310 evinfo = ctxt.evInfo; 311 evinfo.evObj = eobj; 312 } 313 314 kevent_t _event; 315 316 int msecs = cast(int) timeout.total!"msecs"; 317 318 // www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally 319 EV_SET(&_event, fd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, msecs, cast(void*) evinfo); 320 321 if (ctxt.oneShot) 322 _event.flags |= EV_ONESHOT; 323 324 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 325 326 if (catchError!"kevent_timer_add"(err)) 327 return 0; 328 329 return fd; 330 331 } 332 333 } 334 335 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 336 337 338 static if (EPOLL) 339 { 340 import core.sys.linux.sys.inotify; 341 enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect 342 assert(ctxt.fd == fd_t.init); 343 int fd = inotify_init1(IN_NONBLOCK); 344 if (catchError!"inotify_init1"(fd)) { 345 return fd_t.init; 346 } 347 epoll_event _event; 348 349 EventType evtype; 350 351 evtype = EventType.DirectoryWatcher; 352 EventObject eobj; 353 eobj.dwHandler = del; 354 355 EventInfo* evinfo; 356 357 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 358 359 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 360 catch (Exception e) { 361 assert(false, "Failed to allocate resources: " ~ e.msg); 362 } 363 364 ctxt.evInfo = evinfo; 365 366 _event.events |= EPOLLIN; 367 _event.data.ptr = evinfo; 368 369 int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 370 371 if (catchError!"epoll_ctl"(err)) 372 return fd_t.init; 373 return fd; 374 } 375 else /* if KQUEUE */ { 376 static size_t id = 0; 377 378 fd_t fd = cast(uint)++id; 379 380 EventType evtype; 381 382 evtype = EventType.DirectoryWatcher; 383 EventObject eobj; 384 eobj.dwHandler = del; 385 386 EventInfo* evinfo; 387 388 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 389 390 try evinfo = FreeListObjectAlloc!EventInfo.alloc(fd, evtype, eobj, m_instanceId); 391 catch (Exception e) { 392 assert(false, "Failed to allocate resources: " ~ e.msg); 393 } 394 ctxt.evInfo = evinfo; 395 try m_watchers[fd] = evinfo; catch {} 396 397 try m_changes[fd] = FreeListObjectAlloc!(Array!DWChangeInfo).alloc(); 398 catch (Exception e) { 399 assert(false, "Failed to allocate resources: " ~ e.msg); 400 } 401 402 /// events will be created in watch() 403 404 return fd; 405 406 } 407 } 408 409 bool kill(AsyncDirectoryWatcher ctxt) { 410 411 static if (EPOLL) { 412 import core.sys.posix.unistd : close; 413 try 414 { 415 Array!(Tuple!(fd_t, uint)) remove_list; 416 foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) { 417 if (info.fd == ctxt.fd) 418 remove_list.insertBack(key); 419 } 420 421 foreach (Tuple!(fd_t, uint) key; remove_list[]) { 422 unwatch(key[0] /*fd_t*/, key[1]); 423 } 424 425 close(ctxt.fd); 426 FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 427 ctxt.evInfo = null; 428 } 429 catch (Exception e) 430 { 431 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 432 return false; 433 } 434 435 } else /* if KQUEUE */ { 436 try { 437 Array!fd_t remove_list; 438 439 foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) { 440 if (info.fd == ctxt.fd) 441 remove_list.insertBack(wd); 442 } 443 444 foreach (wd; remove_list[]) { 445 unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries 446 } 447 448 FreeListObjectAlloc!(Array!DWChangeInfo).free(m_changes[ctxt.fd]); 449 m_watchers.remove(ctxt.fd); 450 m_changes.remove(ctxt.fd); 451 } 452 catch (Exception e) { 453 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 454 return false; 455 } 456 } 457 return true; 458 } 459 460 bool kill(AsyncTCPConnection ctxt, bool forced = false) 461 { 462 log("Kill socket"); 463 m_status = StatusInfo.init; 464 fd_t fd = ctxt.socket; 465 scope(exit) { 466 if (forced) { 467 ctxt.connected = false; 468 } 469 ctxt.disconnecting = true; 470 } 471 if (ctxt.connected) 472 return closeSocket(fd, true, forced); 473 else { 474 return true; 475 } 476 } 477 478 bool kill(AsyncTCPListener ctxt) 479 { 480 log("Kill listener"); 481 m_status = StatusInfo.init; 482 nothrow void cleanup() { 483 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 484 catch { assert(false, "Failed to free resources"); } 485 ctxt.evInfo = null; 486 } 487 scope(exit) { 488 cleanup(); 489 } 490 491 fd_t fd = ctxt.socket; 492 493 return closeSocket(fd, false, true); 494 } 495 496 bool kill(AsyncNotifier ctxt) 497 { 498 static if (EPOLL) 499 { 500 import core.sys.posix.unistd : close; 501 fd_t err = close(ctxt.id); 502 503 if (catchError!"close(eventfd)"(err)) 504 return false; 505 506 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 507 catch (Exception e){ assert(false, "Error freeing resources"); } 508 509 return true; 510 } 511 else /* if KQUEUE */ 512 { 513 scope(exit) destroyIndex(ctxt); 514 515 if (ctxt.id == fd_t.init) 516 return false; 517 518 kevent_t _event; 519 EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null); 520 521 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 522 523 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 524 catch (Exception e){ assert(false, "Error freeing resources"); } 525 526 if (catchError!"kevent_del(notifier)"(err)) { 527 return false; 528 } 529 return true; 530 } 531 532 } 533 534 bool kill(shared AsyncSignal ctxt) 535 { 536 537 static if (EPOLL) { 538 ctxt.evInfo = null; 539 } 540 else 541 { 542 m_status = StatusInfo.init; 543 destroyIndex(ctxt); 544 } 545 return true; 546 } 547 548 bool kill(AsyncTimer ctxt) { 549 import core.sys.posix.time; 550 m_status = StatusInfo.init; 551 552 static if (EPOLL) 553 { 554 import core.sys.posix.unistd : close; 555 fd_t err = close(ctxt.id); 556 if (catchError!"timer_kill"(err)) 557 return false; 558 559 if (ctxt.evInfo) { 560 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 561 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 562 ctxt.evInfo = null; 563 } 564 565 } 566 else /* if KQUEUE */ 567 { 568 scope(exit) 569 destroyIndex(ctxt); 570 571 if (ctxt.evInfo) { 572 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 573 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 574 ctxt.evInfo = null; 575 } 576 kevent_t _event; 577 EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null); 578 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 579 if (catchError!"kevent_del(timer)"(err)) 580 return false; 581 } 582 return true; 583 } 584 585 bool kill(AsyncUDPSocket ctxt) { 586 import core.sys.posix.unistd : close; 587 588 589 m_status = StatusInfo.init; 590 591 fd_t fd = ctxt.socket; 592 fd_t err = close(fd); 593 594 if (catchError!"udp close"(err)) 595 return false; 596 597 static if (!EPOLL) 598 { 599 kevent_t[2] events; 600 EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null); 601 EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null); 602 err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 603 604 if (catchError!"event_del(udp)"(err)) 605 return false; 606 } 607 608 609 try FreeListObjectAlloc!EventInfo.free(ctxt.evInfo); 610 catch (Exception e){ 611 assert(false, "Failed to free resources: " ~ e.msg); 612 } 613 ctxt.evInfo = null; 614 return true; 615 } 616 617 }