1 module libasync.events; 2 3 import std.stdio; 4 5 import core.thread; 6 import std.container : Array; 7 import std.datetime : Duration; 8 import std.typecons : Flag; 9 import libasync.internals.memory : FreeListObjectAlloc; 10 11 public import libasync.types; 12 public import libasync.bufferedtcp; 13 public import libasync.tcp; 14 public import libasync.udp; 15 public import libasync.notifier; 16 public import libasync.dns; 17 public import libasync.timer; 18 public import libasync.signal; 19 public import libasync.watcher; 20 public import libasync.file; 21 public import libasync.threads; 22 23 version(Windows) { 24 public import libasync.windows; 25 } 26 27 version(Posix) { 28 public import libasync.posix; 29 } 30 31 EventLoop getThreadEventLoop() nothrow { 32 static EventLoop evLoop; 33 if (!evLoop) { 34 evLoop = new EventLoop; 35 } 36 return evLoop; 37 } 38 39 /// Event handlers can be registered to the event loop by being run(), all events 40 /// associated with them will trigger the OS to resume the underlying thread which 41 /// enables the existence of all the asynchroneous event objects in this library. 42 final class EventLoop 43 { 44 45 package: 46 EventLoopImpl m_evLoop; 47 bool m_asyncThreadsStarted = false; 48 49 nothrow: 50 public: 51 this() { 52 if (m_evLoop.started || !m_evLoop.init(this)) 53 assert(false, "Event loop initialization failure"); 54 } 55 56 /// Call this to cleanup underlying OS resources. The implementation is currently incomplete 57 /// and requires the process to be shut down for the resources to be collected automatically. 58 /// Used as a placeholder in the meantime. 59 void exit() { 60 m_evLoop.exit(); 61 } 62 63 NetworkAddress resolveIP(in string ip, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes) 64 { 65 if (!force) 66 return m_evLoop.getAddressFromIP(ip, port, ipv6, tcp); 67 NetworkAddress addr = m_evLoop.getAddressFromIP(ip, port, ipv6, tcp); 68 if (status.code != Status.OK) 69 addr = m_evLoop.getAddressFromIP(ip, port, !ipv6, tcp); 70 return addr; 71 } 72 73 /* Blocks until the hostname is resolved, unless it's invalid. */ 74 NetworkAddress resolveHost(in string host, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes) 75 { 76 if (!force) 77 return m_evLoop.getAddressFromDNS(host, port, ipv6, tcp); 78 NetworkAddress addr = m_evLoop.getAddressFromDNS(host, port, ipv6, tcp); 79 if (status.code != Status.OK) 80 addr = m_evLoop.getAddressFromDNS(host, port, !ipv6, tcp); 81 return addr; 82 } 83 84 package: 85 86 @property StatusInfo status() const 87 { 88 return m_evLoop.status; 89 } 90 91 @property string error() const 92 { 93 return m_evLoop.error; 94 } 95 96 uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) { 97 return m_evLoop.recvFrom(fd, data, addr); 98 } 99 100 uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) { 101 return m_evLoop.sendTo(fd, data, addr); 102 } 103 104 uint recv(in fd_t fd, ref ubyte[] data) 105 { 106 return m_evLoop.recv(fd, data); 107 } 108 109 /*uint recv(out ubyte[] data, in fd_t fd, in NetworkAddress dst) 110 { 111 return m_evLoop.recv(data, fd, dst); 112 }*/ 113 114 uint send(in fd_t fd, in ubyte[] data) 115 { 116 return m_evLoop.send(fd, data); 117 } 118 119 uint read(in fd_t fd, ref ubyte[] data) 120 { 121 return m_evLoop.read(fd, data); 122 } 123 124 uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) { 125 return m_evLoop.readChanges(fd, dst); 126 } 127 128 uint write(in fd_t fd, in ubyte[] data) 129 { 130 return m_evLoop.write(fd, data); 131 } 132 133 uint watch(in fd_t fd, in WatchInfo info) { 134 return m_evLoop.watch(fd, info); 135 } 136 137 bool unwatch(in fd_t fd, in fd_t wd) { 138 return m_evLoop.unwatch(fd, wd); 139 } 140 141 bool broadcast(in fd_t fd, bool b) 142 { 143 return m_evLoop.broadcast(fd, b); 144 } 145 146 NetworkAddress localAddr(in fd_t fd, bool ipv6 = false) { 147 return m_evLoop.localAddr(fd, ipv6); 148 } 149 150 bool notify(T)(in fd_t fd, T payload) 151 if (is(T == shared AsyncSignal) || is(T == AsyncNotifier)) 152 { 153 return m_evLoop.notify(fd, payload); 154 } 155 156 bool setOption(T)(in fd_t fd, TCPOption option, in T val) { 157 return m_evLoop.setOption(fd, option, val); 158 } 159 160 /*uint send(in ubyte[] data, in fd_t fd, in NetworkAddress dst) 161 { 162 return m_evLoop.send(data, fd, dst); 163 }*/ 164 165 bool closeSocket(fd_t fd, bool connected, bool listener = false) 166 { 167 return m_evLoop.closeSocket(fd, connected, listener); 168 } 169 170 fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) { 171 return m_evLoop.run(ctxt, del); 172 } 173 174 fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) { 175 return m_evLoop.run(ctxt, del); 176 } 177 178 fd_t run(shared AsyncSignal ctxt) { 179 return m_evLoop.run(ctxt); 180 } 181 182 fd_t run(AsyncNotifier ctxt) { 183 return m_evLoop.run(ctxt); 184 } 185 186 fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) { 187 return m_evLoop.run(ctxt, del, timeout); 188 } 189 190 fd_t run(AsyncUDPSocket ctxt, UDPHandler del) { 191 return m_evLoop.run(ctxt, del); 192 } 193 194 fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) { 195 return m_evLoop.run(ctxt, del); 196 } 197 198 bool kill(AsyncDirectoryWatcher obj) { 199 return m_evLoop.kill(obj); 200 } 201 202 bool kill(AsyncTCPConnection obj, bool forced = false) { 203 return m_evLoop.kill(obj, forced); 204 } 205 206 bool kill(AsyncTCPListener obj) { 207 return m_evLoop.kill(obj); 208 } 209 210 bool kill(shared AsyncSignal obj) { 211 return m_evLoop.kill(obj); 212 } 213 214 bool kill(AsyncNotifier obj) { 215 return m_evLoop.kill(obj); 216 } 217 218 bool kill(AsyncTimer obj) { 219 return m_evLoop.kill(obj); 220 } 221 222 bool kill(AsyncUDPSocket obj) { 223 return m_evLoop.kill(obj); 224 } 225 226 /** 227 Runs the event loop once and returns false if a an unrecoverable error occured 228 */ 229 public bool loop(Duration max_timeout = 100.msecs) 230 { 231 if(!m_asyncThreadsStarted) { 232 if(!spawnAsyncThreads()) { 233 return false; 234 } 235 m_asyncThreadsStarted = true; 236 } 237 238 if (!m_evLoop.loop(max_timeout) && m_evLoop.status.code == Status.EVLOOP_FAILURE) 239 return false; 240 241 return true; 242 } 243 244 }