1 module libasync.events; 2 3 import std.stdio; 4 5 import core.thread; 6 import std.container : Array; 7 import std.datetime : Duration; 8 import std.typecons : Flag; 9 import libasync.internals.memory : FreeListObjectAlloc; 10 11 public import libasync.types; 12 public import libasync.tcp; 13 public import libasync.udp; 14 public import libasync.notifier; 15 public import libasync.dns; 16 public import libasync.timer; 17 public import libasync.signal; 18 public import libasync.watcher; 19 public import libasync.file; 20 public import libasync.threads; 21 22 version(Windows) { 23 public import libasync.windows; 24 } 25 26 version(Posix) { 27 public import libasync.posix; 28 } 29 30 EventLoop getThreadEventLoop() { 31 static EventLoop evLoop; 32 if (!evLoop) { 33 evLoop = new EventLoop; 34 } 35 return evLoop; 36 } 37 38 /// Event handlers can be registered to the event loop by being run(), all events 39 /// associated with them will trigger the OS to resume the underlying thread which 40 /// enables the existence of all the asynchroneous event objects in this library. 41 final class EventLoop 42 { 43 44 package: 45 EventLoopImpl m_evLoop; 46 47 nothrow: 48 public: 49 this() { 50 if (m_evLoop.started || !m_evLoop.init(this)) 51 assert(false, "Event loop initialization failure"); 52 } 53 54 /// Call this to cleanup underlying OS resources. The implementation is currently incomplete 55 /// and requires the process to be shut down for the resources to be collected automatically. 56 /// Used as a placeholder in the meantime. 57 void exit() { 58 m_evLoop.exit(); 59 } 60 61 NetworkAddress resolveIP(in string ip, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes) 62 { 63 if (!force) 64 return m_evLoop.getAddressFromIP(ip, port, ipv6, tcp); 65 NetworkAddress addr = m_evLoop.getAddressFromIP(ip, port, ipv6, tcp); 66 if (status.code != Status.OK) 67 addr = m_evLoop.getAddressFromIP(ip, port, !ipv6, tcp); 68 return addr; 69 } 70 71 /* Blocks until the hostname is resolved, unless it's invalid. */ 72 NetworkAddress resolveHost(in string host, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes) 73 { 74 if (!force) 75 return m_evLoop.getAddressFromDNS(host, port, ipv6, tcp); 76 NetworkAddress addr = m_evLoop.getAddressFromDNS(host, port, ipv6, tcp); 77 if (status.code != Status.OK) 78 addr = m_evLoop.getAddressFromDNS(host, port, !ipv6, tcp); 79 return addr; 80 } 81 82 package: 83 84 @property StatusInfo status() const 85 { 86 return m_evLoop.status; 87 } 88 89 @property string error() const 90 { 91 return m_evLoop.error; 92 } 93 94 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) { 95 return m_evLoop.recvFrom(fd, data, addr); 96 } 97 98 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) { 99 return m_evLoop.sendTo(fd, data, addr); 100 } 101 102 uint recv(in fd_t fd, ref ubyte[] data) 103 { 104 return m_evLoop.recv(fd, data); 105 } 106 107 /*uint recv(out ubyte[] data, in fd_t fd, in NetworkAddress dst) 108 { 109 return m_evLoop.recv(data, fd, dst); 110 }*/ 111 112 uint send(in fd_t fd, in ubyte[] data) 113 { 114 return m_evLoop.send(fd, data); 115 } 116 117 uint read(in fd_t fd, ref ubyte[] data) 118 { 119 return m_evLoop.read(fd, data); 120 } 121 122 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 123 return m_evLoop.readChanges(fd, dst); 124 } 125 126 uint write(in fd_t fd, in ubyte[] data) 127 { 128 return m_evLoop.write(fd, data); 129 } 130 131 uint watch(in fd_t fd, in WatchInfo info) { 132 return m_evLoop.watch(fd, info); 133 } 134 135 bool unwatch(in fd_t fd, in fd_t wd) { 136 return m_evLoop.unwatch(fd, wd); 137 } 138 139 bool broadcast(in fd_t fd, bool b) 140 { 141 return m_evLoop.broadcast(fd, b); 142 } 143 144 NetworkAddress localAddr(in fd_t fd, bool ipv6 = false) { 145 return m_evLoop.localAddr(fd, ipv6); 146 } 147 148 bool notify(T)(in fd_t fd, T payload) 149 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 150 { 151 return m_evLoop.notify(fd, payload); 152 } 153 154 bool setOption(T)(in fd_t fd, TCPOption option, in T val) { 155 return m_evLoop.setOption(fd, option, val); 156 } 157 158 /*uint send(in ubyte[] data, in fd_t fd, in NetworkAddress dst) 159 { 160 return m_evLoop.send(data, fd, dst); 161 }*/ 162 163 bool closeSocket(fd_t fd, bool connected, bool listener = false) 164 { 165 return m_evLoop.closeSocket(fd, connected, listener); 166 } 167 168 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) { 169 return m_evLoop.run(ctxt, del); 170 } 171 172 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) { 173 return m_evLoop.run(ctxt, del); 174 } 175 176 fd_t run(shared AsyncSignal ctxt) { 177 return m_evLoop.run(ctxt); 178 } 179 180 fd_t run(AsyncNotifier ctxt) { 181 return m_evLoop.run(ctxt); 182 } 183 184 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 185 return m_evLoop.run(ctxt, del, timeout); 186 } 187 188 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 189 return m_evLoop.run(ctxt, del); 190 } 191 192 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 193 return m_evLoop.run(ctxt, del); 194 } 195 196 bool kill(AsyncDirectoryWatcher obj) { 197 return m_evLoop.kill(obj); 198 } 199 200 bool kill(AsyncTCPConnection obj, bool forced = false) { 201 return m_evLoop.kill(obj, forced); 202 } 203 204 bool kill(AsyncTCPListener obj) { 205 return m_evLoop.kill(obj); 206 } 207 208 bool kill(shared AsyncSignal obj) { 209 return m_evLoop.kill(obj); 210 } 211 212 bool kill(AsyncNotifier obj) { 213 return m_evLoop.kill(obj); 214 } 215 216 bool kill(AsyncTimer obj) { 217 return m_evLoop.kill(obj); 218 } 219 220 bool kill(AsyncUDPSocket obj) { 221 return m_evLoop.kill(obj); 222 } 223 224 /** 225 Runs the event loop once and returns false if a an unrecoverable error occured 226 */ 227 public bool loop(Duration max_timeout = 100.msecs) 228 { 229 if (!m_evLoop.loop(max_timeout) && m_evLoop.status.code == Status.EVLOOP_FAILURE) 230 return false; 231 232 return true; 233 } 234 235 }