1 /// 2 module libasync.events; 3 4 import std.stdio; 5 6 import core.thread; 7 import std.container : Array; 8 import std.datetime : Duration; 9 import std.typecons : Flag; 10 import libasync.internals.memory : FreeListObjectAlloc; 11 12 public import libasync.types; 13 public import libasync.bufferedtcp; 14 public import libasync.tcp; 15 public import libasync.udp; 16 public import libasync.uds; 17 public import libasync.notifier; 18 public import libasync.dns; 19 public import libasync.timer; 20 public import libasync.signal; 21 public import libasync.watcher; 22 public import libasync.file; 23 public import libasync.threads; 24 public import libasync.event; 25 public import libasync.socket; 26 27 version(Windows) { 28 public import libasync.windows; 29 } 30 31 version(Posix) { 32 public import libasync.posix; 33 } 34 35 /// 36 EventLoop getThreadEventLoop() nothrow { 37 static EventLoop evLoop; 38 if (!evLoop) { 39 evLoop = new EventLoop; 40 } 41 return evLoop; 42 } 43 44 /// Event handlers can be registered to the event loop by being run(), all events 45 /// associated with them will trigger the OS to resume the underlying thread which 46 /// enables the existence of all the asynchroneous event objects in this library. 47 final class EventLoop 48 { 49 50 package: 51 EventLoopImpl m_evLoop; 52 bool m_asyncThreadsStarted = false; 53 54 nothrow: 55 public: 56 /// 57 this() { 58 if (m_evLoop.started || !m_evLoop.init(this)) 59 assert(false, "Event loop initialization failure"); 60 } 61 62 /// Call this to cleanup underlying OS resources. The implementation is currently incomplete 63 /// and requires the process to be shut down for the resources to be collected automatically. 64 /// Used as a placeholder in the meantime. 65 void exit() { 66 m_evLoop.exit(); 67 } 68 69 /// 70 NetworkAddress resolveIP(in string ip, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes) 71 { 72 if (!force) 73 return m_evLoop.getAddressFromIP(ip, port, ipv6, tcp); 74 NetworkAddress addr = m_evLoop.getAddressFromIP(ip, port, ipv6, tcp); 75 if (status.code != Status.OK) 76 addr = m_evLoop.getAddressFromIP(ip, port, !ipv6, tcp); 77 return addr; 78 } 79 80 /** Blocks until the hostname is resolved, unless it's invalid. */ 81 NetworkAddress resolveHost(in string host, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes) 82 { 83 if (!force) 84 return m_evLoop.getAddressFromDNS(host, port, ipv6, tcp); 85 NetworkAddress addr = m_evLoop.getAddressFromDNS(host, port, ipv6, tcp); 86 if (status.code != Status.OK) 87 addr = m_evLoop.getAddressFromDNS(host, port, !ipv6, tcp); 88 return addr; 89 } 90 91 package: 92 93 @property StatusInfo status() const 94 { 95 return m_evLoop.status; 96 } 97 98 @property string error() const 99 { 100 return m_evLoop.error; 101 } 102 103 uint recvFrom(in fd_t fd, ubyte[] data, ref NetworkAddress addr) { 104 return m_evLoop.recvFrom(fd, data, addr); 105 } 106 107 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) { 108 return m_evLoop.sendTo(fd, data, addr); 109 } 110 111 uint recv(in fd_t fd, ubyte[] data) 112 { 113 return m_evLoop.recv(fd, data); 114 } 115 116 pragma(inline, true) 117 uint send(in fd_t fd, in ubyte[] data) 118 { 119 return m_evLoop.send(fd, data); 120 } 121 122 pragma(inline, true) 123 uint read(in fd_t fd, ref ubyte[] data) 124 { 125 return m_evLoop.read(fd, data); 126 } 127 128 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 129 return m_evLoop.readChanges(fd, dst); 130 } 131 132 uint write(in fd_t fd, in ubyte[] data) 133 { 134 return m_evLoop.write(fd, data); 135 } 136 137 uint watch(in fd_t fd, in WatchInfo info) { 138 return m_evLoop.watch(fd, info); 139 } 140 141 bool unwatch(in fd_t fd, in fd_t wd) { 142 return m_evLoop.unwatch(fd, wd); 143 } 144 145 bool broadcast(in fd_t fd, bool b) 146 { 147 return m_evLoop.broadcast(fd, b); 148 } 149 150 NetworkAddress localAddr(in fd_t fd, bool ipv6 = false) { 151 return m_evLoop.localAddr(fd, ipv6); 152 } 153 154 bool notify(T)(in fd_t fd, T payload) 155 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 156 { 157 return m_evLoop.notify(fd, payload); 158 } 159 160 bool setOption(T)(in fd_t fd, TCPOption option, in T val) { 161 return m_evLoop.setOption(fd, option, val); 162 } 163 164 /*uint send(in ubyte[] data, in fd_t fd, in NetworkAddress dst) 165 { 166 return m_evLoop.send(data, fd, dst); 167 }*/ 168 169 bool closeSocket(fd_t fd, bool connected, bool listener = false) 170 { 171 return m_evLoop.closeSocket(fd, connected, listener); 172 } 173 174 bool run(AsyncEvent ctxt, EventHandler del) { 175 return m_evLoop.run(ctxt, del); 176 } 177 178 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) { 179 return m_evLoop.run(ctxt, del); 180 } 181 182 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) { 183 return m_evLoop.run(ctxt, del); 184 } 185 186 version (Posix) 187 fd_t run(AsyncUDSConnection ctxt) { 188 return m_evLoop.run(ctxt); 189 } 190 191 version (Posix) 192 fd_t run(AsyncUDSListener ctxt) { 193 return m_evLoop.run(ctxt); 194 } 195 196 fd_t run(AsyncSocket ctxt) { 197 return m_evLoop.run(ctxt); 198 } 199 200 void submitRequest(AsyncAcceptRequest* ctxt) { 201 m_evLoop.submitRequest(ctxt); 202 } 203 204 void submitRequest(AsyncReceiveRequest* ctxt) { 205 m_evLoop.submitRequest(ctxt); 206 } 207 208 void submitRequest(AsyncSendRequest* ctxt) { 209 m_evLoop.submitRequest(ctxt); 210 } 211 212 import libasync.internals.socket_compat : sockaddr, socklen_t; 213 bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen) 214 { 215 return m_evLoop.bind(ctxt, addr, addrlen); 216 } 217 218 bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen) 219 { 220 return m_evLoop.connect(ctxt, addr, addrlen); 221 } 222 223 bool listen(AsyncSocket ctxt, int backlog) 224 { 225 return m_evLoop.listen(ctxt, backlog); 226 } 227 228 fd_t run(shared AsyncSignal ctxt) { 229 return m_evLoop.run(ctxt); 230 } 231 232 fd_t run(AsyncNotifier ctxt) { 233 return m_evLoop.run(ctxt); 234 } 235 236 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 237 return m_evLoop.run(ctxt, del, timeout); 238 } 239 240 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 241 return m_evLoop.run(ctxt, del); 242 } 243 244 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 245 return m_evLoop.run(ctxt, del); 246 } 247 248 version (Posix) 249 AsyncUDSConnection accept(AsyncUDSListener ctxt) { 250 return m_evLoop.accept(ctxt); 251 } 252 253 bool kill(AsyncEvent obj, bool forced = true) { 254 return m_evLoop.kill(obj, forced); 255 } 256 257 bool kill(AsyncDirectoryWatcher obj) { 258 return m_evLoop.kill(obj); 259 } 260 261 bool kill(AsyncSocket obj, bool forced = false) { 262 return m_evLoop.kill(obj, forced); 263 } 264 265 bool kill(AsyncTCPConnection obj, bool forced = false) { 266 return m_evLoop.kill(obj, forced); 267 } 268 269 bool kill(AsyncTCPListener obj) { 270 return m_evLoop.kill(obj); 271 } 272 273 bool kill(shared AsyncSignal obj) { 274 return m_evLoop.kill(obj); 275 } 276 277 bool kill(AsyncNotifier obj) { 278 return m_evLoop.kill(obj); 279 } 280 281 bool kill(AsyncTimer obj) { 282 return m_evLoop.kill(obj); 283 } 284 285 bool kill(AsyncUDPSocket obj) { 286 return m_evLoop.kill(obj); 287 } 288 289 /** 290 Runs the event loop once and returns false if a an unrecoverable error occured 291 Using a value of 0 will return immediately, while a value of -1.seconds will block indefinitely 292 */ 293 public bool loop(Duration max_timeout = 100.msecs) 294 { 295 if(!m_asyncThreadsStarted) { 296 if(!spawnAsyncThreads()) { 297 return false; 298 } 299 m_asyncThreadsStarted = true; 300 } 301 302 if (!m_evLoop.loop(max_timeout) && m_evLoop.status.code == Status.EVLOOP_FAILURE) { 303 return false; 304 } 305 306 return true; 307 } 308 309 }