1 module libasync.posix; 2 3 version (Posix): 4 5 import libasync.types; 6 import std.string : toStringz; 7 import std.conv : to; 8 import std.datetime : Duration, msecs, seconds, SysTime; 9 import std.traits : isIntegral; 10 import std.typecons : Tuple, tuple; 11 import std.container : Array; 12 13 import core.stdc.errno; 14 import libasync.events; 15 import libasync.internals.path; 16 import core.sys.posix.signal; 17 import libasync.posix2; 18 import core.sync.mutex; 19 import memutils.utils; 20 import memutils.hashmap; 21 22 enum SOCKET_ERROR = -1; 23 alias fd_t = int; 24 25 26 version(linux) { 27 import libasync.internals.epoll; 28 const EPOLL = true; 29 extern(C) nothrow @nogc { 30 int __libc_current_sigrtmin(); 31 int __libc_current_sigrtmax(); 32 } 33 bool g_signalsBlocked; 34 package nothrow void blockSignals() { 35 try { 36 /// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal 37 sigset_t mask; 38 // todo: use more signals for more event loops per thread.. (is this necessary?) 39 //foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) { 40 //import std.stdio : writeln; 41 //try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch {} 42 sigemptyset(&mask); 43 sigaddset(&mask, cast(int) __libc_current_sigrtmin()); 44 pthread_sigmask(SIG_BLOCK, &mask, null); 45 //} 46 } catch {} 47 } 48 static this() { 49 blockSignals(); 50 g_signalsBlocked = true; 51 } 52 } 53 version(OSX) { 54 import libasync.internals.kqueue; 55 const EPOLL = false; 56 } 57 version(FreeBSD) { 58 import libasync.internals.kqueue; 59 const EPOLL = false; 60 } 61 62 __gshared Mutex g_mutex; 63 64 static if (!EPOLL) { 65 private struct DWFileInfo { 66 fd_t folder; 67 Path path; 68 SysTime lastModified; 69 bool is_dir; 70 } 71 } 72 73 private struct DWFolderInfo { 74 WatchInfo wi; 75 fd_t fd; 76 } 77 78 package struct EventLoopImpl { 79 static if (EPOLL) { 80 pragma(msg, "Using Linux EPOLL for events"); 81 } 82 else /* if KQUEUE */ 83 { 84 pragma(msg, "Using FreeBSD KQueue for events"); 85 } 86 87 package: 88 alias error_t = EPosix; 89 90 nothrow: 91 private: 92 93 /// members 94 EventLoop m_evLoop; 95 ushort m_instanceId; 96 bool m_started; 97 StatusInfo m_status; 98 error_t m_error = EPosix.EOK; 99 EventInfo* m_evSignal; 100 static if (EPOLL){ 101 fd_t m_epollfd; 102 HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path) 103 } 104 else /* if KQUEUE */ 105 { 106 fd_t m_kqueuefd; 107 HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher 108 HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder) 109 HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file) 110 HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher 111 112 } 113 114 package: 115 116 /// workaround for IDE indent bug on too big files 117 mixin RunKill!(); 118 119 @property bool started() const { 120 return m_started; 121 } 122 123 bool init(EventLoop evl) 124 in { assert(!m_started); } 125 body 126 { 127 128 import core.atomic; 129 shared static ushort i; 130 string* failer = null; 131 132 133 m_instanceId = i; 134 static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId); 135 136 core.atomic.atomicOp!"+="(i, cast(ushort) 1); 137 m_evLoop = evl; 138 139 import core.thread; 140 try Thread.getThis().priority = Thread.PRIORITY_MAX; 141 catch (Exception e) { assert(false, "Could not set thread priority"); } 142 143 try 144 if (!g_mutex) 145 g_mutex = new Mutex; 146 catch {} 147 148 static if (EPOLL) 149 { 150 151 if (!g_signalsBlocked) 152 blockSignals(); 153 assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel"); 154 m_epollfd = epoll_create1(0); 155 156 if (catchError!"epoll_create1"(m_epollfd)) 157 return false; 158 159 import core.sys.linux.sys.signalfd; 160 import core.thread : getpid; 161 162 fd_t err; 163 fd_t sfd; 164 165 sigset_t mask; 166 167 try { 168 sigemptyset(&mask); 169 sigaddset(&mask, __libc_current_sigrtmin()); 170 err = pthread_sigmask(SIG_BLOCK, &mask, null); 171 if (catchError!"sigprocmask"(err)) 172 { 173 m_status.code = Status.EVLOOP_FAILURE; 174 return false; 175 } 176 } catch { } 177 178 179 180 sfd = signalfd(-1, &mask, SFD_NONBLOCK); 181 assert(sfd > 0, "Failed to setup signalfd in epoll"); 182 183 EventType evtype; 184 185 epoll_event _event; 186 _event.events = EPOLLIN; 187 evtype = EventType.Signal; 188 try 189 m_evSignal = ThreadMem.alloc!EventInfo(sfd, evtype, EventObject.init, m_instanceId); 190 catch (Exception e){ 191 assert(false, "Allocation error"); 192 } 193 _event.data.ptr = cast(void*) m_evSignal; 194 195 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event); 196 if (catchError!"EPOLL_CTL_ADD(sfd)"(err)) 197 { 198 return false; 199 } 200 201 } 202 else /* if KQUEUE */ 203 { 204 try { 205 if (!gs_queueMutex) { 206 gs_queueMutex = ThreadMem.alloc!ReadWriteMutex(); 207 gs_signalQueue = Array!(Array!AsyncSignal)(); 208 gs_idxQueue = Array!(Array!size_t)(); 209 } 210 if (g_evIdxAvailable.empty) { 211 g_evIdxAvailable.reserve(32); 212 213 foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) { 214 g_evIdxAvailable.insertBack(k + 1); 215 } 216 g_evIdxCapacity = 32; 217 g_idxCapacity = 32; 218 } 219 } catch { assert(false, "Initialization failed"); } 220 m_kqueuefd = kqueue(); 221 int err; 222 try { 223 sigset_t mask; 224 sigemptyset(&mask); 225 sigaddset(&mask, SIGXCPU); 226 227 err = sigprocmask(SIG_BLOCK, &mask, null); 228 } catch {} 229 230 EventType evtype = EventType.Signal; 231 232 // use GC because ThreadMem fails at emplace for shared objects 233 try 234 m_evSignal = ThreadMem.alloc!EventInfo(SIGXCPU, evtype, EventObject.init, m_instanceId); 235 catch (Exception e) { 236 assert(false, "Failed to allocate resources"); 237 } 238 239 if (catchError!"siprocmask"(err)) 240 return 0; 241 242 kevent_t _event; 243 EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal); 244 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 245 if (catchError!"kevent_add(SIGXCPU)"(err)) 246 assert(false, "Add SIGXCPU failed at kevent call"); 247 } 248 249 try log("init in thread " ~ Thread.getThis().name); catch {} 250 251 return true; 252 } 253 254 void exit() { 255 import core.sys.posix.unistd : close; 256 static if (EPOLL) { 257 close(m_epollfd); // not necessary? 258 259 // not necessary: 260 //try ThreadMem.free(m_evSignal); 261 //catch (Exception e) { assert(false, "Failed to free resources"); } 262 263 } 264 else 265 close(m_kqueuefd); 266 } 267 268 @property const(StatusInfo) status() const { 269 return m_status; 270 } 271 272 @property string error() const { 273 string* ptr; 274 return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init; 275 } 276 277 bool loop(Duration timeout = 0.seconds) 278 //in { assert(Fiber.getThis() is null); } 279 { 280 281 import libasync.internals.memory; 282 bool success = true; 283 int num; 284 285 static if (EPOLL) { 286 287 static align(1) epoll_event[] events; 288 if (events is null) 289 { 290 try events = new epoll_event[128]; 291 catch (Exception e) { 292 assert(false, "Could not allocate events array: " ~ e.msg); 293 } 294 } 295 int timeout_ms; 296 if (timeout == 0.seconds) // return immediately 297 timeout_ms = 0; 298 else timeout_ms = cast(int)timeout.total!"msecs"; 299 /// Retrieve pending events 300 num = epoll_wait(m_epollfd, cast(epoll_event*)&events[0], 128, timeout_ms); 301 assert(events !is null && events.length <= 128); 302 303 304 } 305 else /* if KQUEUE */ { 306 import core.sys.posix.time : time_t; 307 import core.sys.posix.config : c_long; 308 static kevent_t[] events; 309 if (events.length == 0) { 310 try events = allocArray!kevent_t(manualAllocator(), 128); 311 catch (Exception e) { assert(false, "Could not allocate events array"); } 312 } 313 time_t secs = timeout.split!("seconds", "nsecs")().seconds; 314 c_long ns = timeout.split!("seconds", "nsecs")().nsecs; 315 auto tspec = libasync.internals.kqueue.timespec(secs, ns); 316 317 num = kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec); 318 319 } 320 321 auto errors = [ tuple(EINTR, Status.EVLOOP_TIMEOUT) ]; 322 323 if (catchEvLoopErrors!"event_poll'ing"(num, errors)) 324 return false; 325 326 if (num > 0) 327 log("Got " ~ num.to!string ~ " event(s)"); 328 329 foreach(i; 0 .. num) { 330 success = false; 331 m_status = StatusInfo.init; 332 static if (EPOLL) 333 { 334 epoll_event _event = events[i]; 335 try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch {} 336 EventInfo* info = cast(EventInfo*) _event.data.ptr; 337 int event_flags = cast(int) _event.events; 338 } 339 else /* if KQUEUE */ 340 { 341 kevent_t _event = events[i]; 342 EventInfo* info = cast(EventInfo*) _event.udata; 343 //log("Got info"); 344 int event_flags = (_event.filter << 16) | (_event.flags & 0xffff); 345 //log("event flags"); 346 } 347 348 //if (info.owner != m_instanceId) 349 // try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string); 350 // catch{} 351 //log("owner"); 352 switch (info.evType) { 353 case EventType.TCPAccept: 354 if (info.fd == 0) 355 break; 356 success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags); 357 break; 358 359 case EventType.Notifier: 360 361 log("Got notifier!"); 362 try info.evObj.notifierHandler(); 363 catch (Exception e) { 364 setInternalError!"notifierHandler"(Status.ERROR); 365 } 366 break; 367 368 case EventType.DirectoryWatcher: 369 log("Got DirectoryWatcher event!"); 370 static if (!EPOLL) { 371 // in KQUEUE all events will be consumed here, because they must be pre-processed 372 try { 373 DWFileEvent fevent; 374 if (_event.fflags & (NOTE_LINK | NOTE_WRITE)) 375 fevent = DWFileEvent.CREATED; 376 else if (_event.fflags & NOTE_DELETE) 377 fevent = DWFileEvent.DELETED; 378 else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE)) 379 fevent = DWFileEvent.MODIFIED; 380 else if (_event.fflags & NOTE_RENAME) 381 fevent = DWFileEvent.MOVED_FROM; 382 else if (_event.fflags & NOTE_RENAME) 383 fevent = DWFileEvent.MOVED_TO; 384 else 385 assert(false, "No event found?"); 386 387 DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init); 388 389 if (fi == DWFolderInfo.init) { 390 DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init); 391 assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher"); 392 fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init); 393 assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher"); 394 } 395 396 // all recursive events will be generated here 397 if (!compareFolderFiles(fi, fevent)) { 398 continue; 399 } 400 401 } catch (Exception e) { 402 log("Could not process DirectoryWatcher event: " ~ e.msg); 403 break; 404 } 405 406 } 407 408 try info.evObj.dwHandler(); 409 catch (Exception e) { 410 setInternalError!"dwHandler"(Status.ERROR); 411 } 412 break; 413 414 case EventType.Timer: 415 try log("Got timer! " ~ info.fd.to!string); catch {} 416 static if (EPOLL) { 417 static long val; 418 import core.sys.posix.unistd : read; 419 read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof); 420 } else { 421 } 422 try info.evObj.timerHandler(); 423 catch (Exception e) { 424 setInternalError!"timerHandler"(Status.ERROR); 425 } 426 static if (!EPOLL) { 427 auto ctxt = info.evObj.timerHandler.ctxt; 428 if (ctxt && ctxt.oneShot && !ctxt.rearmed) { 429 kevent_t __event; 430 EV_SET(&__event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null); 431 int err = kevent(m_kqueuefd, &__event, 1, null, 0, null); 432 if (catchError!"kevent_del(timer)"(err)) 433 return false; 434 } 435 } 436 break; 437 438 case EventType.Signal: 439 try log("Got signal!"); catch {} 440 441 static if (EPOLL) { 442 443 try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch {} 444 import core.sys.linux.sys.signalfd : signalfd_siginfo; 445 import core.sys.posix.unistd : read; 446 signalfd_siginfo fdsi; 447 fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof); 448 shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr; 449 450 try sig.handler(); 451 catch (Exception e) { 452 setInternalError!"signal handler"(Status.ERROR); 453 } 454 455 456 } 457 else /* if KQUEUE */ 458 { 459 static AsyncSignal[] sigarr; 460 461 if (sigarr.length == 0) { 462 try sigarr = new AsyncSignal[32]; 463 catch (Exception e) { assert(false, "Could not allocate signals array"); } 464 } 465 466 bool more = popSignals(sigarr); 467 foreach (AsyncSignal sig; sigarr) 468 { 469 shared AsyncSignal ptr = cast(shared AsyncSignal) sig; 470 if (ptr is null) 471 break; 472 try (cast(shared AsyncSignal)sig).handler(); 473 catch (Exception e) { 474 setInternalError!"signal handler"(Status.ERROR); 475 } 476 } 477 } 478 break; 479 480 case EventType.UDPSocket: 481 import core.sys.posix.unistd : close; 482 success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags); 483 484 nothrow void abortHandler(bool graceful) { 485 486 close(info.fd); 487 info.evObj.udpHandler.conn.socket = 0; 488 try info.evObj.udpHandler(UDPEvent.ERROR); 489 catch (Exception e) { } 490 try ThreadMem.free(info); 491 catch (Exception e){ assert(false, "Error freeing resources"); } 492 } 493 494 if (!success && m_status.code == Status.ABORT) { 495 abortHandler(true); 496 497 } 498 else if (!success && m_status.code == Status.ERROR) { 499 abortHandler(false); 500 } 501 break; 502 case EventType.TCPTraffic: 503 assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid"); 504 505 success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn); 506 507 nothrow void abortTCPHandler(bool graceful) { 508 509 nothrow void closeAll() { 510 try log("closeAll()"); catch {} 511 if (info.evObj.tcpEvHandler.conn.connected) 512 closeSocket(info.fd, true, true); 513 514 info.evObj.tcpEvHandler.conn.socket = 0; 515 } 516 517 /// Close the connection after an unexpected socket error 518 if (graceful) { 519 try info.evObj.tcpEvHandler(TCPEvent.CLOSE); 520 catch (Exception e) { static if(LOG) log("Close failure"); } 521 closeAll(); 522 } 523 524 /// Kill the connection after an internal error 525 else { 526 try info.evObj.tcpEvHandler(TCPEvent.ERROR); 527 catch (Exception e) { static if(LOG) log("Error failure"); } 528 closeAll(); 529 } 530 531 if (info.evObj.tcpEvHandler.conn.inbound) { 532 log("Freeing inbound connection FD#" ~ info.fd.to!string); 533 try ThreadMem.free(info.evObj.tcpEvHandler.conn); 534 catch (Exception e){ assert(false, "Error freeing resources"); } 535 } 536 try ThreadMem.free(info); 537 catch (Exception e){ assert(false, "Error freeing resources"); } 538 } 539 540 if (!success && m_status.code == Status.ABORT) { 541 abortTCPHandler(true); 542 } 543 else if (!success && m_status.code == Status.ERROR) { 544 abortTCPHandler(false); 545 } 546 break; 547 default: 548 break; 549 } 550 551 } 552 return success; 553 } 554 555 bool setOption(T)(fd_t fd, TCPOption option, in T value) { 556 m_status = StatusInfo.init; 557 import std.traits : isIntegral; 558 559 import libasync.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT; 560 int err; 561 nothrow bool errorHandler() { 562 if (catchError!"setOption:"(err)) { 563 try m_status.text ~= option.to!string; 564 catch (Exception e){ assert(false, "to!string conversion failure"); } 565 return false; 566 } 567 568 return true; 569 } 570 final switch (option) { 571 case TCPOption.NODELAY: // true/false 572 static if (!is(T == bool)) 573 assert(false, "NODELAY value type must be bool, not " ~ T.stringof); 574 else { 575 int val = value?1:0; 576 socklen_t len = val.sizeof; 577 err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); 578 return errorHandler(); 579 } 580 case TCPOption.REUSEADDR: // true/false 581 static if (!is(T == bool)) 582 assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof); 583 else { 584 int val = value?1:0; 585 socklen_t len = val.sizeof; 586 err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); 587 if (!errorHandler()) 588 return false; 589 590 // BSD systems have SO_REUSEPORT 591 import libasync.internals.socket_compat : SO_REUSEPORT; 592 err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len); 593 594 // Not all linux kernels support SO_REUSEPORT 595 version(linux) { 596 // ignore invalid and not supported errors on linux 597 if (errno == EINVAL || errno == ENOPROTOOPT) { 598 return true; 599 } 600 } 601 602 return errorHandler(); 603 } 604 case TCPOption.QUICK_ACK: 605 static if (!is(T == bool)) 606 assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof); 607 else { 608 static if (EPOLL) { 609 int val = value?1:0; 610 socklen_t len = val.sizeof; 611 err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len); 612 return errorHandler(); 613 } 614 else /* not linux */ { 615 return false; 616 } 617 } 618 case TCPOption.KEEPALIVE_ENABLE: // true/false 619 static if (!is(T == bool)) 620 assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof); 621 else 622 { 623 int val = value?1:0; 624 socklen_t len = val.sizeof; 625 err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); 626 return errorHandler(); 627 } 628 case TCPOption.KEEPALIVE_COUNT: // ## 629 static if (!isIntegral!T) 630 assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof); 631 else { 632 int val = value.total!"msecs".to!uint; 633 socklen_t len = val.sizeof; 634 err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len); 635 return errorHandler(); 636 } 637 case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds 638 static if (!is(T == Duration)) 639 assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof); 640 else { 641 int val; 642 try val = value.total!"seconds".to!uint; catch { return false; } 643 socklen_t len = val.sizeof; 644 err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len); 645 return errorHandler(); 646 } 647 case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start 648 static if (!is(T == Duration)) 649 assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof); 650 else { 651 int val; 652 try val = value.total!"seconds".to!uint; catch { return false; } 653 socklen_t len = val.sizeof; 654 err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len); 655 return errorHandler(); 656 } 657 case TCPOption.BUFFER_RECV: // bytes 658 static if (!isIntegral!T) 659 assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof); 660 else { 661 int val = value.to!int; 662 socklen_t len = val.sizeof; 663 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); 664 return errorHandler(); 665 } 666 case TCPOption.BUFFER_SEND: // bytes 667 static if (!isIntegral!T) 668 assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof); 669 else { 670 int val = value.to!int; 671 socklen_t len = val.sizeof; 672 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); 673 return errorHandler(); 674 } 675 case TCPOption.TIMEOUT_RECV: 676 static if (!is(T == Duration)) 677 assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof); 678 else { 679 import core.sys.posix.sys.time : timeval; 680 time_t secs = cast(time_t) value.split!("seconds", "usecs")().seconds; 681 suseconds_t us; 682 try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {} 683 timeval t = timeval(secs, us); 684 socklen_t len = t.sizeof; 685 err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len); 686 return errorHandler(); 687 } 688 case TCPOption.TIMEOUT_SEND: 689 static if (!is(T == Duration)) 690 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 691 else { 692 import core.sys.posix.sys.time : timeval; 693 auto timeout = value.split!("seconds", "usecs")(); 694 timeval t; 695 try t = timeval(timeout.seconds.to!time_t, timeout.usecs.to!suseconds_t); 696 catch (Exception) { return false; } 697 socklen_t len = t.sizeof; 698 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len); 699 return errorHandler(); 700 } 701 case TCPOption.TIMEOUT_HALFOPEN: 702 static if (!is(T == Duration)) 703 assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof); 704 else { 705 uint val; 706 try val = value.total!"msecs".to!uint; catch { 707 return false; 708 } 709 socklen_t len = val.sizeof; 710 err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len); 711 return errorHandler(); 712 } 713 case TCPOption.LINGER: // bool onOff, int seconds 714 static if (!is(T == Tuple!(bool, int))) 715 assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof); 716 else { 717 linger l = linger(val[0]?1:0, val[1]); 718 socklen_t llen = l.sizeof; 719 err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen); 720 return errorHandler(); 721 } 722 case TCPOption.CONGESTION: 723 static if (!isIntegral!T) 724 assert(false, "CONGESTION value type must be integral, not " ~ T.stringof); 725 else { 726 int val = value.to!int; 727 len = int.sizeof; 728 err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len); 729 return errorHandler(); 730 } 731 case TCPOption.CORK: 732 static if (!isIntegral!T) 733 assert(false, "CORK value type must be int, not " ~ T.stringof); 734 else { 735 static if (EPOLL) { 736 int val = value.to!int; 737 socklen_t len = val.sizeof; 738 err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len); 739 return errorHandler(); 740 } 741 else /* if KQUEUE */ { 742 int val = value.to!int; 743 socklen_t len = val.sizeof; 744 err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len); 745 return errorHandler(); 746 747 } 748 } 749 case TCPOption.DEFER_ACCEPT: // seconds 750 static if (!isIntegral!T) 751 assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof); 752 else { 753 static if (EPOLL) { 754 int val = value.to!int; 755 socklen_t len = val.sizeof; 756 err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len); 757 return errorHandler(); 758 } 759 else /* if KQUEUE */ { 760 // todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9) 761 /*int val = value.to!int; 762 socklen_t len = val.sizeof; 763 err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len); 764 return errorHandler(); 765 */ 766 assert(false, "TCPOption.DEFER_ACCEPT is not implemented"); 767 } 768 } 769 } 770 771 } 772 773 uint recv(in fd_t fd, ref ubyte[] data) 774 { 775 try log("Recv from FD: " ~ fd.to!string); catch {} 776 m_status = StatusInfo.init; 777 import libasync.internals.socket_compat : recv; 778 int ret = cast(int) recv(fd, cast(void*) data.ptr, data.length, cast(int)0); 779 780 static if (LOG) log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string); 781 if (catchError!".recv"(ret)){ // ret == SOCKET_ERROR == -1 ? 782 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 783 m_status.code = Status.ASYNC; 784 785 return 0; // TODO: handle some errors more specifically 786 } 787 788 m_status.code = Status.OK; 789 790 return cast(uint) ret < 0 ? 0 : ret; 791 } 792 793 uint send(in fd_t fd, in ubyte[] data) 794 { 795 try log("Send to FD: " ~ fd.to!string); catch {} 796 m_status = StatusInfo.init; 797 import libasync.internals.socket_compat : send; 798 int ret = cast(int) send(fd, cast(const(void)*) data.ptr, data.length, cast(int)0); 799 800 if (catchError!"send"(ret)) { // ret == -1 801 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) { 802 m_status.code = Status.ASYNC; 803 return 0; 804 } 805 } else 806 m_status.code = Status.OK; 807 return cast(uint) ret < 0 ? 0 : ret; 808 } 809 810 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) 811 { 812 import libasync.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t; 813 814 m_status = StatusInfo.init; 815 816 addr.family = AF_INET6; 817 socklen_t addrLen = addr.sockAddrLen; 818 long ret = recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen); 819 820 if (addrLen < addr.sockAddrLen) { 821 addr.family = AF_INET; 822 } 823 824 try log("RECVFROM " ~ ret.to!string ~ "B"); catch {} 825 if (catchError!".recvfrom"(ret)) { // ret == -1 826 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 827 m_status.code = Status.ASYNC; 828 return 0; 829 } 830 831 m_status.code = Status.OK; 832 833 return cast(uint) ret; 834 } 835 836 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) 837 { 838 import libasync.internals.socket_compat : sendto; 839 840 m_status = StatusInfo.init; 841 842 try log("SENDTO " ~ data.length.to!string ~ "B"); 843 catch{} 844 long ret = sendto(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen); 845 846 if (catchError!".sendto"(ret)) { // ret == -1 847 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 848 m_status.code = Status.ASYNC; 849 return 0; 850 } 851 852 m_status.code = Status.OK; 853 return cast(uint) ret; 854 } 855 856 NetworkAddress localAddr(in fd_t fd, bool ipv6) { 857 NetworkAddress ret; 858 import libasync.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr; 859 860 if (ipv6) 861 ret.family = AF_INET6; 862 else 863 ret.family = AF_INET; 864 865 socklen_t len = ret.sockAddrLen; 866 int err = getsockname(fd, ret.sockAddr, &len); 867 if (catchError!"getsockname"(err)) 868 return NetworkAddress.init; 869 if (len > ret.sockAddrLen) 870 ret.family = AF_INET6; 871 872 return ret; 873 } 874 875 bool notify(in fd_t fd, AsyncNotifier ctxt) 876 { 877 static if (EPOLL) 878 { 879 import core.sys.posix.unistd : write; 880 881 long val = 1; 882 fd_t err = cast(fd_t) write(fd, &val, long.sizeof); 883 884 if (catchError!"write(notify)"(err)) { 885 return false; 886 } 887 return true; 888 } 889 else /* if KQUEUE */ 890 { 891 kevent_t _event; 892 EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo); 893 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 894 895 if (catchError!"kevent_notify"(err)) { 896 return false; 897 } 898 return true; 899 } 900 } 901 902 bool notify(in fd_t fd, shared AsyncSignal ctxt) 903 { 904 static if (EPOLL) 905 { 906 907 sigval sigvl; 908 fd_t err; 909 sigvl.sival_ptr = cast(void*) ctxt; 910 try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch {} 911 if (catchError!"sigqueue"(err)) { 912 return false; 913 } 914 } 915 else /* if KQUEUE */ 916 { 917 918 import core.thread : getpid; 919 920 addSignal(ctxt); 921 922 try { 923 log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string); 924 int err = core.sys.posix.signal.kill(getpid(), SIGXCPU); 925 if (catchError!"notify(signal)"(err)) 926 assert(false, "Signal could not be raised"); 927 } catch {} 928 } 929 930 return true; 931 } 932 933 // no known uses 934 uint read(in fd_t fd, ref ubyte[] data) 935 { 936 m_status = StatusInfo.init; 937 return 0; 938 } 939 940 // no known uses 941 uint write(in fd_t fd, in ubyte[] data) 942 { 943 m_status = StatusInfo.init; 944 return 0; 945 } 946 947 uint watch(in fd_t fd, in WatchInfo info) { 948 // note: info.wd is still 0 at this point. 949 m_status = StatusInfo.init; 950 import core.sys.linux.sys.inotify; 951 import std.file : dirEntries, isDir, SpanMode; 952 953 static if (EPOLL) { 954 // Manually handle recursivity... All events show up under the same inotify 955 uint events = info.events; // values for this API were pulled from inotify 956 if (events & IN_DELETE) 957 events |= IN_DELETE_SELF; 958 if (events & IN_MOVED_FROM) 959 events |= IN_MOVE_SELF; 960 961 nothrow fd_t addFolderRecursive(Path path) { 962 fd_t ret; 963 try { 964 ret = inotify_add_watch(fd, path.toNativeString().toStringz, events); 965 if (catchError!"inotify_add_watch"(ret)) 966 return fd_t.init; 967 try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch {} 968 assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string); 969 m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd); 970 if (info.recursive) { 971 foreach (de; path.toNativeString().dirEntries(SpanMode.shallow)) 972 { 973 Path de_path = Path(de.name); 974 if (!de_path.absolute) 975 de_path = path ~ Path(de.name); 976 if (isDir(de_path.toNativeString())) 977 if (addFolderRecursive(de_path) == 0) 978 return 0; 979 } 980 } 981 982 } catch (Exception e) { 983 try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch {} 984 return 0; 985 } 986 987 return ret; 988 } 989 990 return addFolderRecursive(info.path); 991 992 } else /* if KQUEUE */ { 993 /// Manually handle recursivity & file tracking. Each folder is an event! 994 /// E.g. file creation shows up as a folder change, we must be prepared to seek the file. 995 import core.sys.posix.fcntl; 996 import libasync.internals.kqueue; 997 998 uint events; 999 if (info.events & DWFileEvent.CREATED) 1000 events |= NOTE_LINK | NOTE_WRITE; 1001 if (info.events & DWFileEvent.DELETED) 1002 events |= NOTE_DELETE; 1003 if (info.events & DWFileEvent.MODIFIED) 1004 events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE; 1005 if (info.events & DWFileEvent.MOVED_FROM) 1006 events |= NOTE_RENAME; 1007 if (info.events & DWFileEvent.MOVED_TO) 1008 events |= NOTE_RENAME; 1009 1010 EventInfo* evinfo; 1011 try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); } 1012 1013 /// we need a file descriptor for the containers, so we open files but we don't monitor them 1014 /// todo: track indexes internally? 1015 nothrow fd_t addRecursive(Path path, bool is_dir) { 1016 int ret; 1017 try { 1018 log("Adding path: " ~ path.toNativeString()); 1019 1020 ret = open(path.toNativeString().toStringz, O_EVTONLY); 1021 if (catchError!"open(watch)"(ret)) 1022 return 0; 1023 1024 if (is_dir) 1025 m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd); 1026 1027 kevent_t _event; 1028 1029 EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo); 1030 1031 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1032 1033 if (catchError!"kevent_timer_add"(err)) 1034 return 0; 1035 1036 1037 if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) { 1038 Path filePath = Path(de.name); 1039 if (!filePath.absolute) 1040 filePath = path ~ filePath; 1041 fd_t fwd; 1042 if (info.recursive && isDir(filePath.toNativeString())) 1043 fwd = addRecursive(filePath, true); 1044 else { 1045 fwd = addRecursive(filePath, false); // gets an ID but will not scan 1046 m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString())); 1047 } 1048 1049 } 1050 1051 } catch (Exception e) { 1052 try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg); catch {} 1053 return 0; 1054 } 1055 return ret; 1056 } 1057 1058 fd_t wd; 1059 1060 try { 1061 wd = addRecursive(info.path, isDir(info.path.toNativeString())); 1062 1063 if (wd == 0) 1064 return 0; 1065 1066 } 1067 catch (Exception e) { 1068 setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg); 1069 } 1070 1071 return cast(uint) wd; 1072 } 1073 } 1074 1075 bool unwatch(in fd_t fd, in uint wd) { 1076 // the wd can be used with m_dwFolders to find the DWFolderInfo 1077 // and unwatch everything recursively. 1078 1079 m_status = StatusInfo.init; 1080 static if (EPOLL) { 1081 /// If recursive, all subfolders must also be unwatched recursively by removing them 1082 /// from containers and from inotify 1083 import core.sys.linux.sys.inotify; 1084 1085 nothrow bool removeAll(DWFolderInfo fi) { 1086 int err; 1087 try { 1088 1089 bool inotify_unwatch(uint wd) { 1090 err = inotify_rm_watch(fd, wd); 1091 1092 if (catchError!"inotify_rm_watch"(err)) 1093 return false; 1094 return true; 1095 } 1096 1097 if (!inotify_unwatch(fi.wi.wd)) 1098 return false; 1099 1100 /*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) 1101 { 1102 if (file.folder == fi.wi.wd) { 1103 inotify_unwatch(id); 1104 m_dwFiles.remove(id); 1105 } 1106 }*/ 1107 m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd)); 1108 1109 if (fi.wi.recursive) { 1110 // find all subdirectories by comparing the path 1111 Array!uint remove_list; 1112 foreach (ref const DWFolderInfo folder; m_dwFolders) { 1113 if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) { 1114 1115 if (!inotify_unwatch(folder.wi.wd)) 1116 return false; 1117 1118 remove_list.insertBack(fi.wi.wd); 1119 } 1120 } 1121 foreach (rm_wd; remove_list[]) 1122 m_dwFolders.remove(tuple(cast(fd_t) fd, rm_wd)); 1123 1124 } 1125 return true; 1126 } catch (Exception e) { 1127 try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString()); catch {} 1128 return false; 1129 } 1130 } 1131 1132 DWFolderInfo info; 1133 1134 try { 1135 info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init); 1136 if (info == DWFolderInfo.init) { 1137 setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string); 1138 return false; 1139 } 1140 } catch { } 1141 1142 return removeAll(info); 1143 } 1144 else /* if KQUEUE */ { 1145 1146 /// Recursivity must be handled manually, so we must unwatch subfiles and subfolders 1147 /// recursively, remove the container entries, close the file descriptor, and disable the vnode events. 1148 1149 nothrow bool removeAll(DWFolderInfo fi) { 1150 import core.sys.posix.unistd : close; 1151 1152 1153 bool event_unset(uint id) { 1154 kevent_t _event; 1155 EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null); 1156 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1157 if (catchError!"kevent_unwatch"(err)) 1158 return false; 1159 return true; 1160 } 1161 1162 bool removeFolder(uint wd) { 1163 if (!event_unset(fi.wi.wd)) 1164 return false; 1165 m_dwFolders.remove(fi.wi.wd); 1166 int err = close(fi.wi.wd); 1167 if (catchError!"close dir"(err)) 1168 return false; 1169 return true; 1170 } 1171 1172 try { 1173 removeFolder(fi.wi.wd); 1174 1175 if (fi.wi.recursive) { 1176 import std.container.array; 1177 Array!fd_t remove_list; // keep track of unwatched folders recursively 1178 Array!fd_t remove_file_list; 1179 // search for subfolders and unset them / close their wd 1180 foreach (ref const DWFolderInfo folder; m_dwFolders) { 1181 if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) { 1182 1183 if (!event_unset(folder.wi.wd)) 1184 return false; 1185 1186 // search for subfiles, close their descriptors and remove them from the file list 1187 foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) { 1188 if (file.folder == folder.wi.wd) { 1189 close(fwd); 1190 remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop 1191 } 1192 } 1193 1194 remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop 1195 } 1196 } 1197 1198 foreach (wd; remove_file_list[]) 1199 m_dwFiles.remove(wd); 1200 1201 foreach (rm_wd; remove_list[]) 1202 removeFolder(rm_wd); 1203 1204 1205 } 1206 } catch (Exception e) { 1207 try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString()); catch {} 1208 return false; 1209 } 1210 1211 return true; 1212 } 1213 1214 DWFolderInfo info; 1215 try info = m_dwFolders.get(wd, DWFolderInfo.init); catch {} 1216 1217 if (!removeAll(info)) 1218 return false; 1219 return true; 1220 } 1221 } 1222 1223 // returns the amount of changes 1224 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 1225 m_status = StatusInfo.init; 1226 1227 static if (EPOLL) { 1228 assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array"); 1229 import core.sys.linux.sys.inotify; 1230 import core.sys.posix.unistd : read; 1231 import core.stdc.stdio : FILENAME_MAX; 1232 import core.stdc.string : strlen; 1233 ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void; 1234 ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof); 1235 if (catchError!"read()"(nread)) 1236 { 1237 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 1238 m_status.code = Status.ASYNC; 1239 return 0; 1240 } 1241 assert(nread > 0); 1242 1243 1244 /// starts (recursively) watching all newly created folders in a recursive entry, 1245 /// creates events for additional files/folders founds, and unwatches all deleted folders 1246 void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) { 1247 import std.file : dirEntries, SpanMode, isDir; 1248 assert(fi.wi.recursive); 1249 // get a list of stuff in the created/moved folder 1250 if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) { 1251 foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) { 1252 Path entryPath = fi.wi.path ~ Path(de.name); 1253 1254 if (fi.wi.recursive && isDir(entryPath.toNativeString())) { 1255 1256 watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) ); 1257 void genEvents(Path subpath) { 1258 foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) { 1259 auto subsubpath = subpath ~ Path(de.name); 1260 changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath)); 1261 if (isDir(subsubpath.toNativeString())) 1262 genEvents(subsubpath); 1263 } 1264 } 1265 1266 genEvents(entryPath); 1267 1268 } 1269 } 1270 } 1271 } 1272 1273 size_t i; 1274 do 1275 { 1276 for (auto p = buf.ptr; p < buf.ptr + nread; ) 1277 { 1278 inotify_event* ev = cast(inotify_event*)p; 1279 p += inotify_event.sizeof + ev.len; 1280 1281 DWFileEvent evtype; 1282 evtype = DWFileEvent.CREATED; 1283 if (ev.mask & IN_CREATE) 1284 evtype = DWFileEvent.CREATED; 1285 if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF) 1286 evtype = DWFileEvent.DELETED; 1287 if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF) 1288 evtype = DWFileEvent.MOVED_FROM; 1289 if (ev.mask & (IN_MOVED_TO)) 1290 evtype = DWFileEvent.MOVED_TO; 1291 if (ev.mask & IN_MODIFY) 1292 evtype = DWFileEvent.MODIFIED; 1293 1294 import std.path : buildPath; 1295 import core.stdc.string : strlen; 1296 string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup; 1297 DWFolderInfo fi; 1298 Path path; 1299 try { 1300 fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init); 1301 if (fi == DWFolderInfo.init) { 1302 setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string); 1303 continue; 1304 } 1305 path = fi.wi.path ~ Path(name); 1306 } 1307 catch (Exception e) { 1308 setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders"); 1309 return 0; 1310 } 1311 1312 dst[i] = DWChangeInfo(evtype, path); 1313 import std.file : isDir; 1314 bool is_dir; 1315 try is_dir = isDir(path.toNativeString()); catch {} 1316 if (fi.wi.recursive && is_dir) { 1317 1318 try { 1319 Array!DWChangeInfo changes; 1320 recurseInto(fi, evtype, changes); 1321 // stop watching if the folder was deleted 1322 if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) { 1323 unwatch(fi.fd, fi.wi.wd); 1324 } 1325 foreach (change; changes[]) { 1326 i++; 1327 if (dst.length <= i) 1328 dst ~= change; 1329 else dst[i] = change; 1330 } 1331 } 1332 catch (Exception e) { 1333 setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively."); 1334 return 0; 1335 } 1336 1337 } 1338 1339 1340 i++; 1341 if (i >= dst.length) 1342 return cast(uint) i; 1343 } 1344 static if (LOG) foreach (j; 0 .. i) { 1345 try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch {} 1346 } 1347 nread = read(fd, buf.ptr, buf.sizeof); 1348 if (catchError!"read()"(nread)) { 1349 if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) 1350 m_status.code = Status.ASYNC; 1351 return cast(uint) i; 1352 } 1353 } while (nread > 0); 1354 1355 return cast(uint) i; 1356 } 1357 else /* if KQUEUE */ { 1358 Array!(DWChangeInfo)* changes; 1359 size_t i; 1360 try { 1361 changes = m_changes[fd]; 1362 import std.algorithm : min; 1363 size_t cnt = min(dst.length, changes.length); 1364 foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { 1365 dst[i] = (*changes)[i]; 1366 i++; 1367 } 1368 changes.linearRemove((*changes)[0 .. cnt]); 1369 } 1370 catch (Exception e) { 1371 setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); 1372 return false; 1373 } 1374 return cast(uint) i; 1375 } 1376 } 1377 1378 bool broadcast(in fd_t fd, bool b) { 1379 m_status = StatusInfo.init; 1380 1381 import libasync.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET; 1382 1383 int val = b?1:0; 1384 socklen_t len = val.sizeof; 1385 int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len); 1386 if (catchError!"setsockopt"(err)) 1387 return false; 1388 1389 return true; 1390 } 1391 1392 private bool closeRemoteSocket(fd_t fd, bool forced) { 1393 1394 int err; 1395 log("shutdown"); 1396 import libasync.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD; 1397 if (forced) 1398 err = shutdown(fd, SHUT_RDWR); 1399 else 1400 err = shutdown(fd, SHUT_WR); 1401 1402 static if (!EPOLL) { 1403 kevent_t[2] events; 1404 try log("!!DISC delete events"); catch {} 1405 EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 1406 EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 1407 kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 1408 1409 } 1410 1411 if (catchError!"shutdown"(err)) 1412 return false; 1413 1414 return true; 1415 } 1416 1417 // for connected sockets 1418 bool closeSocket(fd_t fd, bool connected, bool forced = false) 1419 { 1420 log("closeSocket"); 1421 if (connected && !closeRemoteSocket(fd, forced) && !forced) 1422 return false; 1423 1424 if (!connected || forced) { 1425 // todo: flush the socket here? 1426 1427 import core.sys.posix.unistd : close; 1428 log("close"); 1429 int err = close(fd); 1430 if (catchError!"closesocket"(err)) 1431 return false; 1432 } 1433 return true; 1434 } 1435 1436 1437 NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true) 1438 { 1439 import libasync.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV; 1440 addrinfo hints; 1441 hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver! 1442 1443 return getAddressInfo(ipAddr, port, ipv6, tcp, hints); 1444 } 1445 1446 1447 NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true) 1448 /*in { 1449 debug import libasync.internals.validator : validateHost; 1450 debug assert(validateHost(host), "Trying to connect to an invalid domain"); 1451 } 1452 body */{ 1453 import libasync.internals.socket_compat : addrinfo; 1454 addrinfo hints; 1455 return getAddressInfo(host, port, ipv6, tcp, hints); 1456 } 1457 1458 void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EPosix.EACCES) 1459 { 1460 if (details.length > 0) 1461 m_status.text = TRACE ~ ": " ~ details; 1462 else m_status.text = TRACE; 1463 m_error = error; 1464 m_status.code = s; 1465 static if(LOG) log(m_status); 1466 } 1467 private: 1468 1469 /// For DirectoryWatcher 1470 /// In kqueue/vnode, all we get is the folder in which changes occured. 1471 /// We have to figure out what changed exactly and put the results in a container 1472 /// for the readChanges call. 1473 static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) { 1474 import std.file; 1475 import std.path : buildPath; 1476 try { 1477 Array!Path currFiles; 1478 auto wd = fi.wi.wd; 1479 auto path = fi.wi.path; 1480 auto fd = fi.fd; 1481 Array!(DWChangeInfo)* changes = m_changes.get(fd, null); 1482 assert(changes !is null, "Invalid wd, could not find changes array."); 1483 //import std.stdio : writeln; 1484 //writeln("Scanning path: ", path.toNativeString()); 1485 //writeln("m_dwFiles length: ", m_dwFiles.length); 1486 1487 // get a list of the folder 1488 foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) { 1489 //writeln(de.name); 1490 Path entryPath = Path(de.name); 1491 if (!entryPath.absolute) 1492 entryPath = path ~ entryPath; 1493 bool found; 1494 1495 // compare it to the cached list fixme: make it faster using another container? 1496 foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) { 1497 if (file.folder != wd) continue; // this file isn't in the evented folder 1498 if (file.path == entryPath) { 1499 found = true; 1500 log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string); 1501 // Check if it was modified 1502 if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified) 1503 { 1504 DWFileInfo dwf = file; 1505 dwf.lastModified = de.timeLastModified; 1506 m_dwFiles[id] = dwf; 1507 changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path)); 1508 } 1509 break; 1510 } 1511 } 1512 1513 // This file/folder is new in the folder 1514 if (!found) { 1515 changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath)); 1516 1517 if (fi.wi.recursive && isDir(entryPath.toNativeString())) { 1518 /// This is the complicated part. The folder needs to be watched, and all the events 1519 /// generated for every file/folder found recursively inside it, 1520 /// Useful e.g. when mkdir -p is used. 1521 watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) ); 1522 void genEvents(Path subpath) { 1523 foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) { 1524 auto subsubpath = Path(de.name); 1525 if (!subsubpath.absolute()) 1526 subsubpath = subpath ~ subsubpath; 1527 changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath)); 1528 if (isDir(subsubpath.toNativeString())) 1529 genEvents(subsubpath); 1530 } 1531 } 1532 1533 genEvents(entryPath); 1534 1535 } 1536 else { 1537 EventInfo* evinfo; 1538 try evinfo = m_watchers[fd]; catch { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); } 1539 1540 log("Adding path: " ~ path.toNativeString()); 1541 1542 import core.sys.posix.fcntl : open; 1543 fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY); 1544 if (catchError!"open(watch)"(fwd)) 1545 return 0; 1546 1547 kevent_t _event; 1548 1549 EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo); 1550 1551 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1552 1553 if (catchError!"kevent_timer_add"(err)) 1554 return 0; 1555 1556 m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false); 1557 1558 } 1559 } 1560 1561 // This file/folder is now current. This avoids a deletion event. 1562 currFiles.insert(entryPath); 1563 } 1564 1565 /// Now search for files/folders that were deleted in this directory (no recursivity needed). 1566 /// Unwatch this directory and generate delete event only for the root dir 1567 foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) { 1568 if (file.folder != wd) continue; // skip those files in another folder than the evented one 1569 bool found; 1570 foreach (Path curr; currFiles) { 1571 if (file.path == curr){ 1572 found = true; 1573 break; 1574 } 1575 } 1576 // this file/folder was in the folder but it's not there anymore 1577 if (!found) { 1578 // writeln("Deleting: ", file.path.toNativeString()); 1579 kevent_t _event; 1580 EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null); 1581 int err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 1582 if (catchError!"kevent_unwatch"(err)) 1583 return false; 1584 import core.sys.posix.unistd : close; 1585 err = close(id); 1586 if (catchError!"close(dwFile)"(err)) 1587 return false; 1588 changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path)); 1589 1590 if (fi.wi.recursive && file.is_dir) 1591 unwatch(fd, id); 1592 1593 m_dwFiles.remove(id); 1594 1595 } 1596 1597 } 1598 if(changes.empty) 1599 return false; // unhandled event, skip the callback 1600 1601 // fixme: how to implement moved_from moved_to for rename? 1602 } 1603 catch (Exception e) 1604 { 1605 try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch {} 1606 return false; 1607 } 1608 return true; 1609 } 1610 1611 // socket must not be connected 1612 bool setNonBlock(fd_t fd) { 1613 import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK; 1614 int flags = fcntl(fd, F_GETFL); 1615 flags |= O_NONBLOCK; 1616 int err = fcntl(fd, F_SETFL, flags); 1617 if (catchError!"F_SETFL O_NONBLOCK"(err)) { 1618 closeSocket(fd, false); 1619 return false; 1620 } 1621 return true; 1622 } 1623 1624 bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events) 1625 { 1626 import libasync.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept; 1627 enum O_NONBLOCK = 0x800; // octal 04000 1628 1629 static if (EPOLL) 1630 { 1631 const uint epoll_events = cast(uint) events; 1632 const bool incoming = cast(bool) (epoll_events & EPOLLIN); 1633 const bool error = cast(bool) (epoll_events & EPOLLERR); 1634 } 1635 else 1636 { 1637 const short kqueue_events = cast(short) (events >> 16); 1638 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 1639 const bool incoming = cast(bool)(kqueue_events & EVFILT_READ); 1640 const bool error = cast(bool)(kqueue_flags & EV_ERROR); 1641 } 1642 1643 if (incoming) { // accept incoming connection 1644 do { 1645 NetworkAddress addr; 1646 addr.family = AF_INET; 1647 socklen_t addrlen = addr.sockAddrLen; 1648 1649 bool ret; 1650 static if (EPOLL) { 1651 /// Accept the connection and create a client socket 1652 fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK); 1653 1654 if (catchError!".accept"(csock)) { 1655 ret = false; 1656 return ret; 1657 } 1658 } else /* if KQUEUE */ { 1659 fd_t csock = accept(fd, addr.sockAddr, &addrlen); 1660 1661 if (catchError!".accept"(csock)) { 1662 ret = false; 1663 return ret; 1664 } 1665 1666 // Make non-blocking so subsequent calls to recv/send return immediately 1667 if (!setNonBlock(csock)) { 1668 ret = false; 1669 return ret; 1670 } 1671 } 1672 1673 // Set client address family based on address length 1674 if (addrlen > addr.sockAddrLen) 1675 addr.family = AF_INET6; 1676 if (addrlen == socklen_t.init) { 1677 setInternalError!"addrlen"(Status.ABORT); 1678 import core.sys.posix.unistd : close; 1679 close(csock); 1680 { 1681 ret = false; 1682 return ret; 1683 } 1684 } 1685 1686 // Allocate a new connection handler object 1687 AsyncTCPConnection conn; 1688 try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop); 1689 catch (Exception e){ assert(false, "Allocation failure"); } 1690 conn.peer = addr; 1691 conn.socket = csock; 1692 conn.inbound = true; 1693 1694 nothrow bool closeClient() { 1695 try ThreadMem.free(conn); 1696 catch (Exception e){ assert(false, "Free failure"); } 1697 closeSocket(csock, true, true); 1698 { 1699 ret = false; 1700 return ret; 1701 } 1702 } 1703 1704 // Get the connection handler from the callback 1705 TCPEventHandler evh; 1706 try { 1707 evh = del(conn); 1708 if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) { 1709 try log("Failed to connect"); catch {} 1710 return closeClient(); 1711 } 1712 try log("Connection Started with " ~ csock.to!string); catch {} 1713 } 1714 catch (Exception e) { 1715 log("Close socket"); 1716 return closeClient(); 1717 } 1718 1719 // Announce connection state to the connection handler 1720 try { 1721 log("Connected to: " ~ addr.toString()); 1722 evh.conn.connected = true; 1723 evh(TCPEvent.CONNECT); 1724 } 1725 catch (Exception e) { 1726 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 1727 { 1728 ret = false; 1729 return ret; 1730 } 1731 } 1732 } while(true); 1733 1734 } 1735 1736 if (error) { // socket failure 1737 m_status.text = "listen socket error"; 1738 int err; 1739 import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR; 1740 socklen_t len = int.sizeof; 1741 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); 1742 m_error = cast(error_t) err; 1743 m_status.code = Status.ABORT; 1744 static if(LOG) log(m_status); 1745 1746 // call with null to announce a failure 1747 try del(null); 1748 catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); } 1749 1750 /// close the listener? 1751 // closeSocket(fd, false); 1752 } 1753 return true; 1754 } 1755 1756 bool onUDPTraffic(fd_t fd, UDPHandler del, int events) 1757 { 1758 static if (EPOLL) 1759 { 1760 const uint epoll_events = cast(uint) events; 1761 const bool read = cast(bool) (epoll_events & EPOLLIN); 1762 const bool write = cast(bool) (epoll_events & EPOLLOUT); 1763 const bool error = cast(bool) (epoll_events & EPOLLERR); 1764 } 1765 else 1766 { 1767 const short kqueue_events = cast(short) (events >> 16); 1768 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 1769 const bool read = cast(bool) (kqueue_events & EVFILT_READ); 1770 const bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 1771 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 1772 } 1773 1774 if (read) { 1775 try { 1776 del(UDPEvent.READ); 1777 } 1778 catch (Exception e) { 1779 setInternalError!"del@UDPEvent.READ"(Status.ABORT); 1780 return false; 1781 } 1782 } 1783 1784 if (write) { 1785 1786 try { 1787 del(UDPEvent.WRITE); 1788 } 1789 catch (Exception e) { 1790 setInternalError!"del@UDPEvent.WRITE"(Status.ABORT); 1791 return false; 1792 } 1793 } 1794 1795 if (error) // socket failure 1796 { 1797 1798 import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR; 1799 import core.sys.posix.unistd : close; 1800 int err; 1801 socklen_t errlen = err.sizeof; 1802 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 1803 setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err); 1804 close(fd); 1805 } 1806 1807 return true; 1808 } 1809 1810 bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn) 1811 { 1812 log("TCP Traffic at FD#" ~ fd.to!string); 1813 1814 static if (EPOLL) 1815 { 1816 const uint epoll_events = cast(uint) events; 1817 const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected; 1818 bool read = cast(bool) (epoll_events & EPOLLIN); 1819 const bool write = cast(bool) (epoll_events & EPOLLOUT); 1820 const bool error = cast(bool) (epoll_events & EPOLLERR); 1821 const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP)); 1822 } 1823 else /* if KQUEUE */ 1824 { 1825 const short kqueue_events = cast(short) (events >> 16); 1826 const ushort kqueue_flags = cast(ushort) (events & 0xffff); 1827 const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected); 1828 bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect; 1829 const bool write = cast(bool) (kqueue_events & EVFILT_WRITE); 1830 const bool error = cast(bool) (kqueue_flags & EV_ERROR); 1831 const bool close = cast(bool) (kqueue_flags & EV_EOF); 1832 } 1833 1834 if (error) 1835 { 1836 import libasync.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR; 1837 int err; 1838 try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch {} 1839 socklen_t errlen = err.sizeof; 1840 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 1841 setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err); 1842 try 1843 del(TCPEvent.ERROR); 1844 catch (Exception e) 1845 { 1846 setInternalError!"del@TCPEvent.ERROR"(Status.ABORT); 1847 // ignore failure... 1848 } 1849 return false; 1850 } 1851 1852 1853 if (connect) 1854 { 1855 try log("!connect"); catch {} 1856 conn.connected = true; 1857 try del(TCPEvent.CONNECT); 1858 catch (Exception e) { 1859 setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT); 1860 return false; 1861 } 1862 return true; 1863 } 1864 1865 1866 if (write && conn.connected && !conn.disconnecting && conn.writeBlocked) 1867 { 1868 conn.writeBlocked = false; 1869 try log("!write"); catch {} 1870 try del(TCPEvent.WRITE); 1871 catch (Exception e) { 1872 setInternalError!"del@TCPEvent.WRITE"(Status.ABORT); 1873 return false; 1874 } 1875 } 1876 else { 1877 read = true; 1878 } 1879 1880 if (read && conn.connected && !conn.disconnecting) 1881 { 1882 try log("!read"); catch {} 1883 try del(TCPEvent.READ); 1884 catch (Exception e) { 1885 setInternalError!"del@TCPEvent.READ"(Status.ABORT); 1886 return false; 1887 } 1888 } 1889 1890 if (close && conn.connected && !conn.disconnecting) 1891 { 1892 try log("!close"); catch {} 1893 // todo: See if this hack is still necessary 1894 if (!conn.connected && conn.disconnecting) 1895 return true; 1896 1897 try del(TCPEvent.CLOSE); 1898 catch (Exception e) { 1899 setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT); 1900 return false; 1901 } 1902 1903 // Careful here, the delegate might have closed the connection already 1904 if (conn.connected) { 1905 closeSocket(fd, !conn.disconnecting, conn.connected); 1906 1907 m_status.code = Status.ABORT; 1908 conn.disconnecting = true; 1909 conn.connected = false; 1910 conn.writeBlocked = true; 1911 del.conn.socket = 0; 1912 1913 try ThreadMem.free(del.conn.evInfo); 1914 catch (Exception e){ assert(false, "Error freeing resources"); } 1915 1916 if (del.conn.inbound) { 1917 log("Freeing inbound connection"); 1918 try ThreadMem.free(del.conn); 1919 catch (Exception e){ assert(false, "Error freeing resources"); } 1920 } 1921 } 1922 } 1923 return true; 1924 } 1925 1926 bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del) 1927 { 1928 import libasync.internals.socket_compat : bind; 1929 import core.sys.posix.unistd; 1930 1931 fd_t err; 1932 1933 EventObject eo; 1934 eo.udpHandler = del; 1935 EventInfo* ev; 1936 try ev = ThreadMem.alloc!EventInfo(fd, EventType.UDPSocket, eo, m_instanceId); 1937 catch (Exception e){ assert(false, "Allocation error"); } 1938 ctxt.evInfo = ev; 1939 nothrow bool closeAll() { 1940 try ThreadMem.free(ev); 1941 catch(Exception e){ assert(false, "Failed to free resources"); } 1942 ctxt.evInfo = null; 1943 // socket will be closed by caller if return false 1944 return false; 1945 } 1946 1947 static if (EPOLL) 1948 { 1949 epoll_event _event; 1950 _event.data.ptr = ev; 1951 _event.events = EPOLLIN | EPOLLOUT | EPOLLET; 1952 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 1953 if (catchError!"epoll_ctl"(err)) { 1954 return closeAll(); 1955 } 1956 nothrow void deregisterEvent() 1957 { 1958 epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &_event); 1959 } 1960 } 1961 else /* if KQUEUE */ 1962 { 1963 kevent_t[2] _event; 1964 EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev); 1965 EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, ev); 1966 err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null); 1967 if (catchError!"kevent_add_udp"(err)) 1968 return closeAll(); 1969 1970 nothrow void deregisterEvent() { 1971 EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 1972 EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 1973 kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(libasync.internals.kqueue.timespec*) null); 1974 } 1975 1976 } 1977 1978 /// Start accepting packets 1979 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 1980 if (catchError!"bind"(err)) { 1981 deregisterEvent(); 1982 return closeAll(); 1983 } 1984 1985 return true; 1986 } 1987 1988 1989 bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false) 1990 in { 1991 assert(ctxt.local !is NetworkAddress.init); 1992 } 1993 body { 1994 import libasync.internals.socket_compat : bind, listen, SOMAXCONN; 1995 fd_t err; 1996 1997 /// Create callback object 1998 EventObject eo; 1999 eo.tcpAcceptHandler = del; 2000 EventInfo* ev; 2001 2002 try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPAccept, eo, m_instanceId); 2003 catch (Exception e){ assert(false, "Allocation error"); } 2004 ctxt.evInfo = ev; 2005 nothrow bool closeAll() { 2006 try ThreadMem.free(ev); 2007 catch(Exception e){ assert(false, "Failed free"); } 2008 ctxt.evInfo = null; 2009 // Socket is closed by run() 2010 //closeSocket(fd, false); 2011 return false; 2012 } 2013 2014 /// Add socket to event loop 2015 static if (EPOLL) 2016 { 2017 epoll_event _event; 2018 _event.data.ptr = ev; 2019 _event.events = EPOLLIN | EPOLLET; 2020 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 2021 if (catchError!"epoll_ctl_add"(err)) 2022 return closeAll(); 2023 2024 nothrow void deregisterEvent() { 2025 // epoll cleans itself when closing the socket 2026 } 2027 } 2028 else /* if KQUEUE */ 2029 { 2030 kevent_t _event; 2031 EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev); 2032 err = kevent(m_kqueuefd, &_event, 1, null, 0, null); 2033 if (catchError!"kevent_add_listener"(err)) 2034 return closeAll(); 2035 2036 nothrow void deregisterEvent() { 2037 EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null); 2038 kevent(m_kqueuefd, &_event, 1, null, 0, null); 2039 // wouldn't know how to deal with errors here... 2040 } 2041 } 2042 2043 /// Bind and listen to socket 2044 if (!reusing) { 2045 err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen); 2046 if (catchError!"bind"(err)) { 2047 deregisterEvent(); 2048 return closeAll(); 2049 } 2050 2051 err = listen(fd, SOMAXCONN); 2052 if (catchError!"listen"(err)) { 2053 deregisterEvent(); 2054 return closeAll(); 2055 } 2056 2057 } 2058 return true; 2059 } 2060 2061 bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false) 2062 in { 2063 assert(ctxt.peer.port != 0, "Connecting to an invalid port"); 2064 } 2065 body { 2066 2067 fd_t err; 2068 2069 /// Create callback object 2070 import libasync.internals.socket_compat : connect; 2071 EventObject eo; 2072 eo.tcpEvHandler = del; 2073 EventInfo* ev; 2074 2075 try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPTraffic, eo, m_instanceId); 2076 catch (Exception e){ assert(false, "Allocation error"); } 2077 assert(ev !is null); 2078 ctxt.evInfo = ev; 2079 nothrow bool destroyEvInfo() { 2080 try ThreadMem.free(ev); 2081 catch(Exception e){ assert(false, "Failed to free resources"); } 2082 ctxt.evInfo = null; 2083 2084 // Socket will be closed by run() 2085 // closeSocket(fd, false); 2086 return false; 2087 } 2088 2089 /// Add socket and callback object to event loop 2090 static if (EPOLL) 2091 { 2092 epoll_event _event = void; 2093 _event.data.ptr = ev; 2094 _event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET; 2095 err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event); 2096 log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string); 2097 if (catchError!"epoll_ctl_add"(err)) 2098 return destroyEvInfo(); 2099 2100 nothrow void deregisterEvent() { 2101 // will be handled automatically when socket is closed 2102 } 2103 } 2104 else /* if KQUEUE */ 2105 { 2106 kevent_t[2] events = void; 2107 try log("Register event ptr " ~ ev.to!string); catch {} 2108 assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection"); 2109 EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev); 2110 EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev); 2111 assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev); 2112 assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId); 2113 err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 2114 if (catchError!"kevent_add_tcp"(err)) 2115 return destroyEvInfo(); 2116 2117 // todo: verify if this allocates on the GC? 2118 nothrow void deregisterEvent() { 2119 EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null); 2120 EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null); 2121 kevent(m_kqueuefd, &(events[0]), 2, null, 0, null); 2122 // wouldn't know how to deal with errors here... 2123 } 2124 } 2125 2126 // Inbound objects are already connected 2127 if (inbound) return true; 2128 2129 // Connect is blocking, but this makes the socket non-blocking for send/recv 2130 if (!setNonBlock(fd)) { 2131 deregisterEvent(); 2132 return destroyEvInfo(); 2133 } 2134 2135 /// Start the connection 2136 err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen); 2137 if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ])) 2138 return true; 2139 if (catchError!"connect"(err)) { 2140 deregisterEvent(); 2141 return destroyEvInfo(); 2142 } 2143 2144 return true; 2145 } 2146 2147 bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR) 2148 if (isIntegral!T) 2149 { 2150 if (val == cmp) { 2151 m_status.text = TRACE; 2152 m_error = lastError(); 2153 m_status.code = Status.ABORT; 2154 static if(LOG) log(m_status); 2155 return true; 2156 } 2157 return false; 2158 } 2159 2160 bool catchSocketError(string TRACE)(fd_t fd) 2161 { 2162 m_status.text = TRACE; 2163 int err; 2164 import libasync.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR; 2165 socklen_t len = int.sizeof; 2166 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); 2167 m_error = cast(error_t) err; 2168 if (m_error != EPosix.EOK) { 2169 m_status.code = Status.ABORT; 2170 static if(LOG) log(m_status); 2171 return true; 2172 } 2173 2174 return false; 2175 } 2176 2177 bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...) 2178 if (isIntegral!T) 2179 { 2180 if (val == SOCKET_ERROR) { 2181 int err = errno; 2182 foreach (validator ; cmp) { 2183 if (errno == validator[0]) { 2184 m_status.text = TRACE; 2185 m_error = lastError(); 2186 m_status.code = validator[1]; 2187 static if(LOG) log(m_status); 2188 return true; 2189 } 2190 } 2191 2192 m_status.text = TRACE; 2193 m_status.code = Status.EVLOOP_FAILURE; 2194 m_error = lastError(); 2195 log(m_status); 2196 return true; 2197 } 2198 return false; 2199 } 2200 2201 /** 2202 * If the value at val matches the tuple first argument T, get the last error, 2203 * and if the last error matches tuple second argument error_t, set the Status as 2204 * tuple third argument Status. 2205 * 2206 * Repeats for each comparison tuple until a match in which case returns true. 2207 */ 2208 bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...) 2209 if (isIntegral!T) 2210 { 2211 error_t err; 2212 foreach (validator ; cmp) { 2213 if (val == validator[0]) { 2214 if (err is EPosix.init) err = lastError(); 2215 if (err == validator[1]) { 2216 m_status.text = TRACE; 2217 m_status.code = validator[2]; 2218 if (m_status.code == Status.EVLOOP_TIMEOUT) { 2219 log(m_status); 2220 break; 2221 } 2222 m_error = lastError(); 2223 static if(LOG) log(m_status); 2224 return true; 2225 } 2226 } 2227 } 2228 return false; 2229 } 2230 2231 2232 error_t lastError() { 2233 try { 2234 return cast(error_t) errno; 2235 } catch(Exception e) { 2236 return EPosix.EACCES; 2237 } 2238 2239 } 2240 2241 void log(StatusInfo val) 2242 { 2243 static if (LOG) { 2244 import std.stdio; 2245 try { 2246 writeln("Backtrace: ", m_status.text); 2247 writeln(" | Status: ", m_status.code); 2248 writeln(" | Error: " , m_error); 2249 if ((m_error in EPosixMessages) !is null) 2250 writeln(" | Message: ", EPosixMessages[m_error]); 2251 } catch(Exception e) { 2252 return; 2253 } 2254 } 2255 } 2256 2257 void log(T)(T val) 2258 { 2259 static if (LOG) { 2260 import std.stdio; 2261 try { 2262 writeln(val); 2263 } catch(Exception e) { 2264 return; 2265 } 2266 } 2267 } 2268 2269 NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints) 2270 { 2271 m_status = StatusInfo.init; 2272 import libasync.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo; 2273 2274 NetworkAddress addr; 2275 addrinfo* infos; 2276 error_t err; 2277 if (ipv6) { 2278 addr.family = AF_INET6; 2279 hints.ai_family = AF_INET6; 2280 } 2281 else { 2282 addr.family = AF_INET; 2283 hints.ai_family = AF_INET; 2284 } 2285 if (tcp) { 2286 hints.ai_socktype = SOCK_STREAM; 2287 hints.ai_protocol = IPPROTO_TCP; 2288 } 2289 else { 2290 hints.ai_socktype = SOCK_DGRAM; 2291 hints.ai_protocol = IPPROTO_UDP; 2292 } 2293 2294 static if (LOG) { 2295 log("Resolving " ~ host ~ ":" ~ port.to!string); 2296 } 2297 2298 auto chost = host.toStringz(); 2299 2300 if (port != 0) { 2301 addr.port = port; 2302 const(char)* cPort = cast(const(char)*) port.to!string.toStringz; 2303 err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos); 2304 } 2305 else { 2306 err = cast(error_t) getaddrinfo(chost, null, &hints, &infos); 2307 } 2308 2309 if (err != EPosix.EOK) { 2310 setInternalError!"getAddressInfo"(Status.ERROR, string.init, err); 2311 return NetworkAddress.init; 2312 } 2313 ubyte* pAddr = cast(ubyte*) infos.ai_addr; 2314 ubyte* data = cast(ubyte*) addr.sockAddr; 2315 data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy 2316 freeaddrinfo(infos); 2317 return addr; 2318 } 2319 2320 2321 2322 } 2323 2324 2325 static if (!EPOLL) 2326 { 2327 import std.container : Array; 2328 import core.sync.mutex : Mutex; 2329 import core.sync.rwmutex : ReadWriteMutex; 2330 size_t g_evIdxCapacity; 2331 Array!size_t g_evIdxAvailable; 2332 2333 // called on run 2334 nothrow size_t createIndex() { 2335 size_t idx; 2336 import std.algorithm : max; 2337 try { 2338 2339 size_t getIdx() { 2340 2341 if (!g_evIdxAvailable.empty) { 2342 immutable size_t ret = g_evIdxAvailable.back; 2343 g_evIdxAvailable.removeBack(); 2344 return ret; 2345 } 2346 return 0; 2347 } 2348 2349 idx = getIdx(); 2350 if (idx == 0) { 2351 import std.range : iota; 2352 g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) ); 2353 g_evIdxCapacity = max(32, g_evIdxCapacity * 2); 2354 idx = getIdx(); 2355 } 2356 2357 } catch (Throwable e) { 2358 static if (DEBUG) { 2359 import std.stdio : writeln; 2360 try writeln(e.toString()); catch {} 2361 } 2362 2363 } 2364 return idx; 2365 } 2366 2367 nothrow void destroyIndex(AsyncNotifier ctxt) { 2368 try { 2369 g_evIdxAvailable.insert(ctxt.id); 2370 } 2371 catch (Exception e) { 2372 assert(false, "Error destroying index: " ~ e.msg); 2373 } 2374 } 2375 2376 nothrow void destroyIndex(AsyncTimer ctxt) { 2377 try { 2378 g_evIdxAvailable.insert(ctxt.id); 2379 } 2380 catch (Exception e) { 2381 assert(false, "Error destroying index: " ~ e.msg); 2382 } 2383 } 2384 2385 size_t* g_threadId; 2386 size_t g_idxCapacity; 2387 Array!size_t g_idxAvailable; 2388 2389 __gshared ReadWriteMutex gs_queueMutex; 2390 __gshared Array!(Array!AsyncSignal) gs_signalQueue; 2391 __gshared Array!(Array!size_t) gs_idxQueue; // signals notified 2392 2393 2394 // loop 2395 nothrow bool popSignals(ref AsyncSignal[] sigarr) { 2396 bool more; 2397 try { 2398 foreach (ref AsyncSignal sig; sigarr) { 2399 if (!sig) 2400 break; 2401 sig = null; 2402 } 2403 size_t len; 2404 synchronized(gs_queueMutex.reader) { 2405 2406 if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty) 2407 return false; 2408 2409 len = gs_idxQueue[*g_threadId].length; 2410 import std.stdio; 2411 if (sigarr.length < len) { 2412 more = true; 2413 len = sigarr.length; 2414 } 2415 2416 size_t i; 2417 foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){ 2418 sigarr[i] = gs_signalQueue[*g_threadId][idx]; 2419 i++; 2420 } 2421 } 2422 2423 synchronized (gs_queueMutex.writer) { 2424 gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]); 2425 } 2426 } 2427 catch (Exception e) { 2428 assert(false, "Could not get pending signals: " ~ e.msg); 2429 } 2430 return more; 2431 } 2432 2433 // notify 2434 nothrow void addSignal(shared AsyncSignal ctxt) { 2435 try { 2436 size_t thread_id = ctxt.threadId; 2437 bool must_resize; 2438 import std.stdio; 2439 synchronized (gs_queueMutex.writer) { 2440 if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) { 2441 gs_idxQueue.reserve(thread_id + 1); 2442 foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) { 2443 gs_idxQueue.insertBack(Array!size_t.init); 2444 } 2445 } 2446 if (gs_idxQueue[thread_id].empty) 2447 { 2448 gs_idxQueue[thread_id].reserve(32); 2449 } 2450 2451 gs_idxQueue[thread_id].insertBack(ctxt.id); 2452 2453 } 2454 2455 } 2456 catch (Exception e) { 2457 assert(false, "Array error: " ~ e.msg); 2458 } 2459 } 2460 2461 // called on run 2462 nothrow size_t createIndex(shared AsyncSignal ctxt) { 2463 size_t idx; 2464 import std.algorithm : max; 2465 try { 2466 bool must_resize; 2467 2468 synchronized (gs_queueMutex.reader) { 2469 if (gs_signalQueue.length < *g_threadId) 2470 must_resize = true; 2471 } 2472 2473 /// make sure the signal queue is big enough for this thread ID 2474 if (must_resize) { 2475 synchronized (gs_queueMutex.writer) { 2476 while (gs_signalQueue.length <= *g_threadId) 2477 gs_signalQueue.insertBack(Array!AsyncSignal.init); 2478 } 2479 } 2480 2481 size_t getIdx() { 2482 2483 if (!g_idxAvailable.empty) { 2484 immutable size_t ret = g_idxAvailable.back; 2485 g_idxAvailable.removeBack(); 2486 return ret; 2487 } 2488 return 0; 2489 } 2490 2491 idx = getIdx(); 2492 if (idx == 0) { 2493 import std.range : iota; 2494 g_idxAvailable.insert( iota(g_idxCapacity + 1, max(32, g_idxCapacity * 2), 1) ); 2495 g_idxCapacity = g_idxAvailable[$-1]; 2496 idx = getIdx(); 2497 } 2498 2499 synchronized (gs_queueMutex.writer) { 2500 if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) { 2501 2502 gs_signalQueue.reserve(*g_threadId + 1); 2503 foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) { 2504 gs_signalQueue.insertBack(Array!AsyncSignal.init); 2505 } 2506 2507 } 2508 2509 if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) { 2510 2511 gs_signalQueue[*g_threadId].reserve(idx + 1); 2512 foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) { 2513 gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null); 2514 } 2515 2516 } 2517 2518 gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt; 2519 } 2520 } catch {} 2521 2522 return idx; 2523 } 2524 2525 // called on kill 2526 nothrow void destroyIndex(shared AsyncSignal ctxt) { 2527 try { 2528 g_idxAvailable.insert(ctxt.id); 2529 synchronized (gs_queueMutex.writer) { 2530 gs_signalQueue[*g_threadId][ctxt.id] = null; 2531 } 2532 } 2533 catch (Exception e) { 2534 assert(false, "Error destroying index: " ~ e.msg); 2535 } 2536 } 2537 } 2538 2539 mixin template TCPConnectionMixins() { 2540 2541 private CleanupData m_impl; 2542 2543 struct CleanupData { 2544 EventInfo* evInfo; 2545 bool connected; 2546 bool disconnecting; 2547 bool writeBlocked; 2548 } 2549 2550 @property bool disconnecting() const { 2551 return m_impl.disconnecting; 2552 } 2553 2554 @property void disconnecting(bool b) { 2555 m_impl.disconnecting = b; 2556 } 2557 2558 @property bool connected() const { 2559 return m_impl.connected; 2560 } 2561 2562 @property void connected(bool b) { 2563 m_impl.connected = b; 2564 } 2565 2566 @property bool writeBlocked() const { 2567 return m_impl.writeBlocked; 2568 } 2569 2570 @property void writeBlocked(bool b) { 2571 m_impl.writeBlocked = b; 2572 } 2573 2574 @property EventInfo* evInfo() { 2575 return m_impl.evInfo; 2576 } 2577 2578 @property void evInfo(EventInfo* info) { 2579 m_impl.evInfo = info; 2580 } 2581 2582 } 2583 2584 mixin template EvInfoMixinsShared() { 2585 2586 private CleanupData m_impl; 2587 2588 shared struct CleanupData { 2589 EventInfo* evInfo; 2590 } 2591 2592 static if (EPOLL) { 2593 import core.sys.posix.pthread : pthread_t; 2594 private pthread_t m_pthreadId; 2595 synchronized @property pthread_t pthreadId() { 2596 return cast(pthread_t) m_pthreadId; 2597 } 2598 /* todo: support multiple event loops per thread? 2599 private ushort m_sigId; 2600 synchronized @property ushort sigId() { 2601 return cast(ushort)m_loopId; 2602 } 2603 synchronized @property void sigId(ushort id) { 2604 m_loopId = cast(shared)id; 2605 } 2606 */ 2607 } 2608 else /* if KQUEUE */ 2609 { 2610 private shared(size_t)* m_owner_id; 2611 synchronized @property size_t threadId() { 2612 return cast(size_t) *m_owner_id; 2613 } 2614 } 2615 2616 @property shared(EventInfo*) evInfo() { 2617 return m_impl.evInfo; 2618 } 2619 2620 @property void evInfo(shared(EventInfo*) info) { 2621 m_impl.evInfo = info; 2622 } 2623 2624 } 2625 2626 mixin template EvInfoMixins() { 2627 2628 private CleanupData m_impl; 2629 2630 struct CleanupData { 2631 EventInfo* evInfo; 2632 } 2633 2634 @property EventInfo* evInfo() { 2635 return m_impl.evInfo; 2636 } 2637 2638 @property void evInfo(EventInfo* info) { 2639 m_impl.evInfo = info; 2640 } 2641 } 2642 2643 union EventObject { 2644 TCPAcceptHandler tcpAcceptHandler; 2645 TCPEventHandler tcpEvHandler; 2646 TimerHandler timerHandler; 2647 DWHandler dwHandler; 2648 UDPHandler udpHandler; 2649 NotifierHandler notifierHandler; 2650 } 2651 2652 enum EventType : char { 2653 TCPAccept, 2654 TCPTraffic, 2655 UDPSocket, 2656 Notifier, 2657 Signal, 2658 Timer, 2659 DirectoryWatcher 2660 } 2661 2662 struct EventInfo { 2663 fd_t fd; 2664 EventType evType; 2665 EventObject evObj; 2666 ushort owner; 2667 } 2668 2669 2670 2671 /** 2672 Represents a network/socket address. (taken from vibe.core.net) 2673 */ 2674 public struct NetworkAddress { 2675 import libasync.internals.socket_compat : sockaddr, sockaddr_in, sockaddr_in6, AF_INET, AF_INET6; 2676 private union { 2677 sockaddr addr; 2678 sockaddr_in addr_ip4; 2679 sockaddr_in6 addr_ip6; 2680 } 2681 2682 @property bool ipv6() const pure nothrow { return this.family == AF_INET6; } 2683 2684 /** Family (AF_) of the socket address. 2685 */ 2686 @property ushort family() const pure nothrow { return addr.sa_family; } 2687 /// ditto 2688 @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } 2689 2690 /** The port in host byte order. 2691 */ 2692 @property ushort port() 2693 const pure nothrow { 2694 switch (this.family) { 2695 default: assert(false, "port() called for invalid address family."); 2696 case AF_INET: return ntoh(addr_ip4.sin_port); 2697 case AF_INET6: return ntoh(addr_ip6.sin6_port); 2698 } 2699 } 2700 /// ditto 2701 @property void port(ushort val) 2702 pure nothrow { 2703 switch (this.family) { 2704 default: assert(false, "port() called for invalid address family."); 2705 case AF_INET: addr_ip4.sin_port = hton(val); break; 2706 case AF_INET6: addr_ip6.sin6_port = hton(val); break; 2707 } 2708 } 2709 2710 /** A pointer to a sockaddr struct suitable for passing to socket functions. 2711 */ 2712 @property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; } 2713 2714 /** Size of the sockaddr struct that is returned by sockAddr(). 2715 */ 2716 @property uint sockAddrLen() 2717 const pure nothrow { 2718 switch (this.family) { 2719 default: assert(false, "sockAddrLen() called for invalid address family."); 2720 case AF_INET: return addr_ip4.sizeof; 2721 case AF_INET6: return addr_ip6.sizeof; 2722 } 2723 } 2724 2725 @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow 2726 in { assert (family == AF_INET); } 2727 body { return &addr_ip4; } 2728 2729 @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow 2730 in { assert (family == AF_INET6); } 2731 body { return &addr_ip6; } 2732 2733 /** Returns a string representation of the IP address 2734 */ 2735 string toAddressString() 2736 const { 2737 import std.array : appender; 2738 import std.string : format; 2739 import std.format : formattedWrite; 2740 2741 switch (this.family) { 2742 default: assert(false, "toAddressString() called for invalid address family."); 2743 case AF_INET: 2744 ubyte[4] ip = (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; 2745 return format("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 2746 case AF_INET6: 2747 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 2748 auto ret = appender!string(); 2749 ret.reserve(40); 2750 foreach (i; 0 .. 8) { 2751 if (i > 0) ret.put(':'); 2752 ret.formattedWrite("%x", bigEndianToNative!ushort(cast(ubyte[2])ip[i*2 .. i*2+2].ptr[0 .. 2])); 2753 } 2754 return ret.data; 2755 } 2756 } 2757 2758 /** Returns a full string representation of the address, including the port number. 2759 */ 2760 string toString() 2761 const { 2762 2763 import std.string : format; 2764 2765 auto ret = toAddressString(); 2766 switch (this.family) { 2767 default: assert(false, "toString() called for invalid address family."); 2768 case AF_INET: return ret ~ format(":%s", port); 2769 case AF_INET6: return format("[%s]:%s", ret, port); 2770 } 2771 } 2772 2773 } 2774 2775 private pure nothrow { 2776 import std.bitmanip; 2777 2778 ushort ntoh(ushort val) 2779 { 2780 version (LittleEndian) return swapEndian(val); 2781 else version (BigEndian) return val; 2782 else static assert(false, "Unknown endianness."); 2783 } 2784 2785 ushort hton(ushort val) 2786 { 2787 version (LittleEndian) return swapEndian(val); 2788 else version (BigEndian) return val; 2789 else static assert(false, "Unknown endianness."); 2790 } 2791 }