1 module libasync.posix2; 2 3 // workaround for IDE indent bug on too big files 4 mixin template RunKill() 5 { 6 7 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) 8 in { assert(ctxt.socket == fd_t.init, "TCP Connection is active. Use another instance."); } 9 body { 10 m_status = StatusInfo.init; 11 import libasync.internals.socket_compat : socket, SOCK_STREAM, IPPROTO_TCP; 12 import core.sys.posix.unistd : close; 13 fd_t fd = ctxt.preInitializedSocket; 14 15 if (fd == fd_t.init) 16 fd = socket(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP); 17 18 if (catchError!("run AsyncTCPConnection")(fd)) 19 return 0; 20 21 /// Make sure the socket doesn't block on recv/send 22 if (!setNonBlock(fd)) { 23 log("Close socket"); 24 close(fd); 25 return 0; 26 } 27 28 /// Enable Nagel's algorithm if specified 29 if (ctxt.noDelay) { 30 if (!setOption(fd, TCPOption.NODELAY, true)) { 31 try log("Closing connection"); catch {} 32 close(fd); 33 return 0; 34 } 35 } 36 37 /// Trigger the connection setup instructions 38 if (!initTCPConnection(fd, ctxt, del)) { 39 close(fd); 40 return 0; 41 } 42 43 return fd; 44 45 } 46 47 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) 48 in { 49 //assert(ctxt.socket == fd_t.init, "TCP Listener already bound. Please run another instance."); 50 assert(ctxt.local.addr !is typeof(ctxt.local.addr).init, "No locally binding address specified. Use AsyncTCPListener.local = EventLoop.resolve*"); 51 } 52 body { 53 m_status = StatusInfo.init; 54 import libasync.internals.socket_compat : socket, SOCK_STREAM, socklen_t, setsockopt, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP; 55 import core.sys.posix.unistd : close; 56 import core.sync.mutex; 57 __gshared Mutex g_mtx; 58 fd_t fd = ctxt.socket; 59 bool reusing = true; 60 try if (fd == 0) { 61 reusing = false; 62 /// Create the listening socket 63 synchronized(g_mutex) { 64 fd = socket(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP); 65 if (catchError!("run AsyncTCPAccept")(fd)) 66 return 0; 67 /// Allow multiple threads to listen on this address 68 if (!setOption(fd, TCPOption.REUSEADDR, true)) { 69 log("Close socket"); 70 close(fd); 71 return 0; 72 } 73 } 74 75 /// Make sure the socket returns instantly when calling listen() 76 if (!setNonBlock(fd)) { 77 log("Close socket"); 78 close(fd); 79 return 0; 80 } 81 82 // todo: defer accept 83 84 /// Automatically starts connections with noDelay if specified 85 if (ctxt.noDelay) { 86 if (!setOption(fd, TCPOption.NODELAY, true)) { 87 try log("Closing connection"); catch {} 88 close(fd); 89 return 0; 90 } 91 } 92 93 } catch { assert(false, "Error in synchronized listener starter"); } 94 95 /// Setup the event polling 96 if (!initTCPListener(fd, ctxt, del, reusing)) { 97 log("Close socket"); 98 close(fd); 99 return 0; 100 } 101 102 103 return fd; 104 105 } 106 107 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 108 m_status = StatusInfo.init; 109 110 import libasync.internals.socket_compat : socket, SOCK_DGRAM, IPPROTO_UDP; 111 import core.sys.posix.unistd; 112 fd_t fd = ctxt.preInitializedSocket; 113 114 try log("Address: " ~ ctxt.local.toString()); catch {} 115 116 if (fd == fd_t.init) 117 fd = socket(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP); 118 119 120 if (catchError!("run AsyncUDPSocket")(fd)) 121 return 0; 122 123 if (!setNonBlock(fd)) 124 return 0; 125 126 if (!initUDPSocket(fd, ctxt, del)) 127 return 0; 128 129 try log("UDP Socket started FD#" ~ fd.to!string); 130 catch{} 131 /* 132 static if (!EPOLL) { 133 gs_fdPool.insert(fd); 134 }*/ 135 136 return fd; 137 } 138 139 fd_t run(AsyncNotifier ctxt) 140 { 141 142 m_status = StatusInfo.init; 143 144 fd_t err; 145 static if (EPOLL) { 146 fd_t fd = eventfd(0, EFD_NONBLOCK); 147 148 if (catchSocketError!("run AsyncNotifier")(fd)) 149 return 0; 150 151 epoll_event _event; 152 _event.events = EPOLLIN | EPOLLET; 153 } 154 else /* if KQUEUE */ 155 { 156 kevent_t _event; 157 fd_t fd = cast(fd_t)createIndex(); 158 } 159 EventType evtype = EventType.Notifier; 160 NotifierHandler evh; 161 evh.ctxt = ctxt; 162 163 evh.fct = (AsyncNotifier ctxt) { 164 try { 165 ctxt.handler(); 166 } catch (Exception e) { 167 //setInternalError!"AsyncTimer handler"(Status.ERROR); 168 } 169 }; 170 171 EventObject eobj; 172 eobj.notifierHandler = evh; 173 174 EventInfo* evinfo; 175 176 if (!ctxt.evInfo) { 177 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 178 catch (Exception e) { 179 assert(false, "Failed to allocate resources: " ~ e.msg); 180 } 181 182 ctxt.evInfo = evinfo; 183 } 184 185 static if (EPOLL) { 186 _event.data.ptr = cast(void*) evinfo; 187 188 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 189 if (catchSocketError!("epoll_add(eventfd)")(err)) 190 return fd_t.init; 191 } 192 else /* if KQUEUE */ 193 { 194 EV_SET(&_event, fd, EVFILT_USER, EV_ADD | EV_CLEAR, NOTE_FFCOPY, 0, evinfo); 195 196 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 197 198 if (catchError!"kevent_addSignal"(err)) 199 return fd_t.init; 200 } 201 202 return fd; 203 204 205 } 206 207 fd_t run(shared AsyncSignal ctxt) 208 { 209 static if (EPOLL) { 210 211 m_status = StatusInfo.init; 212 213 ctxt.evInfo = cast(shared) m_evSignal; 214 215 return cast(fd_t) (__libc_current_sigrtmin()); 216 } 217 else 218 { 219 m_status = StatusInfo.init; 220 221 ctxt.evInfo = cast(shared) m_evSignal; 222 223 return cast(fd_t) createIndex(ctxt); 224 225 } 226 } 227 228 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 229 m_status = StatusInfo.init; 230 231 static if (EPOLL) 232 { 233 import core.sys.posix.time : itimerspec, CLOCK_REALTIME; 234 235 fd_t fd = ctxt.id; 236 itimerspec its; 237 238 its.it_value.tv_sec = cast(typeof(its.it_value.tv_sec)) timeout.split!("seconds", "nsecs")().seconds; 239 its.it_value.tv_nsec = cast(typeof(its.it_value.tv_nsec)) timeout.split!("seconds", "nsecs")().nsecs; 240 if (!ctxt.oneShot) 241 { 242 its.it_interval.tv_sec = its.it_value.tv_sec; 243 its.it_interval.tv_nsec = its.it_value.tv_nsec; 244 245 } 246 247 if (fd == fd_t.init) { 248 fd = timerfd_create(CLOCK_REALTIME, 0); 249 if (catchError!"timer_create"(fd)) 250 return 0; 251 } 252 int err = timerfd_settime(fd, 0, &its, null); 253 254 if (catchError!"timer_settime"(err)) 255 return 0; 256 epoll_event _event; 257 258 EventType evtype; 259 evtype = EventType.Timer; 260 EventObject eobj; 261 eobj.timerHandler = del; 262 263 EventInfo* evinfo; 264 265 if (!ctxt.evInfo) { 266 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 267 catch (Exception e) { 268 assert(false, "Failed to allocate resources: " ~ e.msg); 269 } 270 271 ctxt.evInfo = evinfo; 272 } 273 else { 274 evinfo = ctxt.evInfo; 275 evinfo.evObj = eobj; 276 } 277 _event.events |= EPOLLIN | EPOLLET; 278 _event.data.ptr = evinfo; 279 if (ctxt.id > 0) { 280 err = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, ctxt.id, null); 281 282 if (catchError!"epoll_ctl"(err)) 283 return fd_t.init; 284 } 285 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 286 287 if (catchError!"timer_epoll_add"(err)) 288 return 0; 289 return fd; 290 } 291 else /* if KQUEUE */ 292 { 293 fd_t fd = ctxt.id; 294 295 if (ctxt.id == 0) 296 fd = cast(fd_t) createIndex(); 297 EventType evtype; 298 evtype = EventType.Timer; 299 300 EventObject eobj; 301 eobj.timerHandler = del; 302 303 EventInfo* evinfo; 304 305 if (!ctxt.evInfo) { 306 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 307 catch (Exception e) { 308 assert(false, "Failed to allocate resources: " ~ e.msg); 309 } 310 311 ctxt.evInfo = evinfo; 312 } 313 else { 314 evinfo = ctxt.evInfo; 315 evinfo.evObj = eobj; 316 } 317 318 kevent_t _event; 319 int msecs = cast(int) timeout.total!"msecs"; 320 ushort flags_ = EV_ADD | EV_ENABLE; 321 //if (ctxt.oneShot) 322 // flags_ |= EV_CLEAR; 323 324 // www.khmere.com/freebsd_book/html/ch06.html - EV_CLEAR set internally 325 EV_SET(&_event, fd, EVFILT_TIMER, flags_, 0, msecs + 30, cast(void*) evinfo); 326 327 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 328 329 if (catchError!"kevent_timer_add"(err)) 330 return 0; 331 332 return fd; 333 334 } 335 336 } 337 338 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 339 340 341 static if (EPOLL) 342 { 343 import core.sys.linux.sys.inotify; 344 enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect 345 assert(ctxt.fd == fd_t.init); 346 int fd = inotify_init1(IN_NONBLOCK); 347 if (catchError!"inotify_init1"(fd)) { 348 return fd_t.init; 349 } 350 epoll_event _event; 351 352 EventType evtype; 353 354 evtype = EventType.DirectoryWatcher; 355 EventObject eobj; 356 eobj.dwHandler = del; 357 358 EventInfo* evinfo; 359 360 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 361 362 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 363 catch (Exception e) { 364 assert(false, "Failed to allocate resources: " ~ e.msg); 365 } 366 367 ctxt.evInfo = evinfo; 368 369 _event.events |= EPOLLIN; 370 _event.data.ptr = evinfo; 371 372 int err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 373 374 if (catchError!"epoll_ctl"(err)) 375 return fd_t.init; 376 return fd; 377 } 378 else /* if KQUEUE */ { 379 static size_t id = 0; 380 381 fd_t fd = cast(uint)++id; 382 383 EventType evtype; 384 385 evtype = EventType.DirectoryWatcher; 386 EventObject eobj; 387 eobj.dwHandler = del; 388 389 EventInfo* evinfo; 390 391 assert (!ctxt.evInfo, "Cannot run the same DirectoryWatcher again. This should have been caught earlier..."); 392 393 try evinfo = ThreadMem.alloc!EventInfo(fd, evtype, eobj, m_instanceId); 394 catch (Exception e) { 395 assert(false, "Failed to allocate resources: " ~ e.msg); 396 } 397 ctxt.evInfo = evinfo; 398 try m_watchers[fd] = evinfo; catch {} 399 400 try m_changes[fd] = ThreadMem.alloc!(Array!DWChangeInfo)(); 401 catch (Exception e) { 402 assert(false, "Failed to allocate resources: " ~ e.msg); 403 } 404 405 /// events will be created in watch() 406 407 return fd; 408 409 } 410 } 411 412 bool kill(AsyncDirectoryWatcher ctxt) { 413 414 static if (EPOLL) { 415 import core.sys.posix.unistd : close; 416 try 417 { 418 Array!(Tuple!(fd_t, uint)) remove_list; 419 foreach (ref const Tuple!(fd_t, uint) key, ref const DWFolderInfo info; m_dwFolders) { 420 if (info.fd == ctxt.fd) 421 remove_list.insertBack(key); 422 } 423 424 foreach (Tuple!(fd_t, uint) key; remove_list[]) { 425 unwatch(key[0] /*fd_t*/, key[1]); 426 } 427 428 close(ctxt.fd); 429 ThreadMem.free(ctxt.evInfo); 430 ctxt.evInfo = null; 431 } 432 catch (Exception e) 433 { 434 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 435 return false; 436 } 437 438 } else /* if KQUEUE */ { 439 try { 440 Array!fd_t remove_list; 441 442 foreach (ref const fd_t wd, ref const DWFolderInfo info; m_dwFolders) { 443 if (info.fd == ctxt.fd) 444 remove_list.insertBack(wd); 445 } 446 447 foreach (wd; remove_list[]) { 448 unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries 449 } 450 451 ThreadMem.free(m_changes[ctxt.fd]); 452 m_watchers.remove(ctxt.fd); 453 m_changes.remove(ctxt.fd); 454 } 455 catch (Exception e) { 456 setInternalError!"Kill.DirectoryWatcher"(Status.ERROR, "Error killing directory watcher"); 457 return false; 458 } 459 } 460 return true; 461 } 462 463 bool kill(AsyncTCPConnection ctxt, bool forced = false) 464 { 465 log("Kill socket"); 466 m_status = StatusInfo.init; 467 fd_t fd = ctxt.socket; 468 bool has_socket = fd > 0; 469 ctxt.disconnecting = true; 470 if (forced) { 471 ctxt.connected = false; 472 ctxt.disconnecting = false; 473 if (ctxt.evInfo) { 474 try ThreadMem.free(ctxt.evInfo); 475 catch { assert(false, "Failed to free resources"); } 476 ctxt.evInfo = null; 477 } 478 if (ctxt.inbound) try ThreadMem.free(ctxt); 479 catch (Throwable t) { assert(false, "Failed to free resources for context " ~ (cast(void*)ctxt).to!string ~ ": " ~ t.to!string); } 480 } 481 return has_socket ? closeSocket(fd, true, forced) : true; 482 } 483 484 bool kill(AsyncTCPListener ctxt) 485 { 486 log("Kill listener"); 487 m_status = StatusInfo.init; 488 nothrow void cleanup() { 489 try ThreadMem.free(ctxt.evInfo); 490 catch { assert(false, "Failed to free resources"); } 491 ctxt.evInfo = null; 492 } 493 scope(exit) { 494 cleanup(); 495 } 496 497 fd_t fd = ctxt.socket; 498 499 return closeSocket(fd, false, true); 500 } 501 502 bool kill(AsyncNotifier ctxt) 503 { 504 static if (EPOLL) 505 { 506 import core.sys.posix.unistd : close; 507 fd_t err = close(ctxt.id); 508 509 if (catchError!"close(eventfd)"(err)) 510 return false; 511 512 try ThreadMem.free(ctxt.evInfo); 513 catch (Exception e){ assert(false, "Error freeing resources"); } 514 515 return true; 516 } 517 else /* if KQUEUE */ 518 { 519 scope(exit) destroyIndex(ctxt); 520 521 if (ctxt.id == fd_t.init) 522 return false; 523 524 kevent_t _event; 525 EV_SET(&_event, ctxt.id, EVFILT_USER, EV_DELETE | EV_DISABLE, 0, 0, null); 526 527 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 528 529 try ThreadMem.free(ctxt.evInfo); 530 catch (Exception e){ assert(false, "Error freeing resources"); } 531 532 if (catchError!"kevent_del(notifier)"(err)) { 533 return false; 534 } 535 return true; 536 } 537 538 } 539 540 bool kill(shared AsyncSignal ctxt) 541 { 542 543 static if (EPOLL) { 544 ctxt.evInfo = null; 545 } 546 else 547 { 548 m_status = StatusInfo.init; 549 destroyIndex(ctxt); 550 } 551 return true; 552 } 553 554 bool kill(AsyncTimer ctxt) { 555 import core.sys.posix.time; 556 m_status = StatusInfo.init; 557 558 static if (EPOLL) 559 { 560 import core.sys.posix.unistd : close; 561 fd_t err = close(ctxt.id); 562 if (catchError!"timer_kill"(err)) 563 return false; 564 565 if (ctxt.evInfo) { 566 try ThreadMem.free(ctxt.evInfo); 567 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 568 ctxt.evInfo = null; 569 } 570 571 } 572 else /* if KQUEUE */ 573 { 574 scope(exit) 575 destroyIndex(ctxt); 576 577 if (ctxt.evInfo) { 578 try ThreadMem.free(ctxt.evInfo); 579 catch (Exception e) { assert(false, "Failed to free resources: " ~ e.msg); } 580 ctxt.evInfo = null; 581 } 582 kevent_t _event; 583 EV_SET(&_event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null); 584 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 585 if (catchError!"kevent_del(timer)"(err)) 586 return false; 587 } 588 return true; 589 } 590 591 bool kill(AsyncUDPSocket ctxt) { 592 import core.sys.posix.unistd : close; 593 594 595 m_status = StatusInfo.init; 596 597 fd_t fd = ctxt.socket; 598 fd_t err = close(fd); 599 600 if (catchError!"udp close"(err)) 601 return false; 602 603 static if (!EPOLL) 604 { 605 kevent_t[2] events; 606 EV_SET(&(events[0]), ctxt.socket, EVFILT_READ, EV_DELETE, 0, 0, null); 607 EV_SET(&(events[1]), ctxt.socket, EVFILT_WRITE, EV_DELETE, 0, 0, null); 608 err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 609 610 if (catchError!"event_del(udp)"(err)) 611 return false; 612 } 613 614 615 try ThreadMem.free(ctxt.evInfo); 616 catch (Exception e){ 617 assert(false, "Failed to free resources: " ~ e.msg); 618 } 619 ctxt.evInfo = null; 620 return true; 621 } 622 623 }