1 ///
2 module libasync.events;
3 
4 import std.stdio;
5 
6 import core.thread;
7 import std.container : Array;
8 import std.datetime : Duration;
9 import std.typecons : Flag;
10 import libasync.internals.memory : FreeListObjectAlloc;
11 
12 public import libasync.types;
13 public import libasync.bufferedtcp;
14 public import libasync.tcp;
15 public import libasync.udp;
16 public import libasync.uds;
17 public import libasync.notifier;
18 public import libasync.dns;
19 public import libasync.timer;
20 public import libasync.signal;
21 public import libasync.watcher;
22 public import libasync.file;
23 public import libasync.threads;
24 public import libasync.event;
25 public import libasync.socket;
26 
27 version(Windows) {
28 	public import libasync.windows;
29 }
30 
31 version(Posix) {
32 	public import libasync.posix;
33 }
34 
35 ///
36 EventLoop getThreadEventLoop() nothrow {
37 	static EventLoop evLoop;
38 	if (!evLoop)  {
39 		evLoop = new EventLoop;
40 	}
41 	return evLoop;
42 }
43 
44 /// Event handlers can be registered to the event loop by being run(), all events
45 /// associated with them will trigger the OS to resume the underlying thread which
46 /// enables the existence of all the asynchroneous event objects in this library.
47 final class EventLoop
48 {
49 
50 package:
51 	EventLoopImpl m_evLoop;
52 	bool m_asyncThreadsStarted = false;
53 
54 nothrow:
55 public:
56 	///
57 	this() {
58 		if (m_evLoop.started || !m_evLoop.init(this))
59 			assert(false, "Event loop initialization failure");
60 	}
61 
62 	/// Call this to cleanup underlying OS resources. The implementation is currently incomplete
63 	/// and requires the process to be shut down for the resources to be collected automatically.
64 	/// Used as a placeholder in the meantime.
65 	void exit() {
66 		m_evLoop.exit();
67 	}
68 
69 	///
70 	NetworkAddress resolveIP(in string ip, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes)
71 	{
72 		if (!force)
73 			return m_evLoop.getAddressFromIP(ip, port, ipv6, tcp);
74 		NetworkAddress addr = m_evLoop.getAddressFromIP(ip, port, ipv6, tcp);
75 		if (status.code != Status.OK)
76 			addr = m_evLoop.getAddressFromIP(ip, port, !ipv6, tcp);
77 		return addr;
78 	}
79 
80 	/** Blocks until the hostname is resolved, unless it's invalid. */
81 	NetworkAddress resolveHost(in string host, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes)
82 	{
83 		if (!force)
84 			return m_evLoop.getAddressFromDNS(host, port, ipv6, tcp);
85 		NetworkAddress addr = m_evLoop.getAddressFromDNS(host, port, ipv6, tcp);
86 		if (status.code != Status.OK)
87 			addr = m_evLoop.getAddressFromDNS(host, port, !ipv6, tcp);
88 		return addr;
89 	}
90 
91 package:
92 
93 	@property StatusInfo status() const
94 	{
95 		return m_evLoop.status;
96 	}
97 
98 	@property string error() const
99 	{
100 		return m_evLoop.error;
101 	}
102 
103 	uint recvFrom(in fd_t fd, ubyte[] data, ref NetworkAddress addr) {
104 		return m_evLoop.recvFrom(fd, data, addr);
105 	}
106 
107 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) {
108 		return m_evLoop.sendTo(fd, data, addr);
109 	}
110 
111 	uint recv(in fd_t fd, ubyte[] data)
112 	{
113 		return m_evLoop.recv(fd, data);
114 	}
115 
116 	pragma(inline, true)
117 	uint send(in fd_t fd, in ubyte[] data)
118 	{
119 		return m_evLoop.send(fd, data);
120 	}
121 
122 	pragma(inline, true)
123 	uint read(in fd_t fd, ref ubyte[] data)
124 	{
125 		return m_evLoop.read(fd, data);
126 	}
127 
128 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
129 		return m_evLoop.readChanges(fd, dst);
130 	}
131 
132 	uint write(in fd_t fd, in ubyte[] data)
133 	{
134 		return m_evLoop.write(fd, data);
135 	}
136 
137 	uint watch(in fd_t fd, in WatchInfo info) {
138 		return m_evLoop.watch(fd, info);
139 	}
140 
141 	bool unwatch(in fd_t fd, in fd_t wd) {
142 		return m_evLoop.unwatch(fd, wd);
143 	}
144 
145 	bool broadcast(in fd_t fd, bool b)
146 	{
147 		return m_evLoop.broadcast(fd, b);
148 	}
149 
150 	NetworkAddress localAddr(in fd_t fd, bool ipv6 = false) {
151 		return m_evLoop.localAddr(fd, ipv6);
152 	}
153 
154 	bool notify(T)(in fd_t fd, T payload)
155 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
156 	{
157 		return m_evLoop.notify(fd, payload);
158 	}
159 
160 	bool setOption(T)(in fd_t fd, TCPOption option, in T val) {
161 		return m_evLoop.setOption(fd, option, val);
162 	}
163 
164 	/*uint send(in ubyte[] data, in fd_t fd, in NetworkAddress dst)
165 	{
166 		return m_evLoop.send(data, fd, dst);
167 	}*/
168 
169 	bool closeSocket(fd_t fd, bool connected, bool listener = false)
170 	{
171 		return m_evLoop.closeSocket(fd, connected, listener);
172 	}
173 
174 	bool run(AsyncEvent ctxt, EventHandler del) {
175 		return m_evLoop.run(ctxt, del);
176 	}
177 
178 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) {
179 		return m_evLoop.run(ctxt, del);
180 	}
181 
182 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) {
183 		return m_evLoop.run(ctxt, del);
184 	}
185 
186 	version (Posix)
187 	fd_t run(AsyncUDSConnection ctxt) {
188 		return m_evLoop.run(ctxt);
189 	}
190 
191 	version (Posix)
192 	fd_t run(AsyncUDSListener ctxt) {
193 		return m_evLoop.run(ctxt);
194 	}
195 
196 	fd_t run(AsyncSocket ctxt) {
197 		return m_evLoop.run(ctxt);
198 	}
199 
200 	void submitRequest(AsyncAcceptRequest* ctxt) {
201 		m_evLoop.submitRequest(ctxt);
202 	}
203 
204 	void submitRequest(AsyncReceiveRequest* ctxt) {
205 		m_evLoop.submitRequest(ctxt);
206 	}
207 
208 	void submitRequest(AsyncSendRequest* ctxt) {
209 		m_evLoop.submitRequest(ctxt);
210 	}
211 
212 	import libasync.internals.socket_compat : sockaddr, socklen_t;
213 	bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
214 	{
215 		return m_evLoop.bind(ctxt, addr, addrlen);
216 	}
217 
218 	bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
219 	{
220 		return m_evLoop.connect(ctxt, addr, addrlen);
221 	}
222 
223 	bool listen(AsyncSocket ctxt, int backlog)
224 	{
225 		return m_evLoop.listen(ctxt, backlog);
226 	}
227 
228 	fd_t run(shared AsyncSignal ctxt) {
229 		return m_evLoop.run(ctxt);
230 	}
231 
232 	fd_t run(AsyncNotifier ctxt) {
233 		return m_evLoop.run(ctxt);
234 	}
235 
236 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
237 		return m_evLoop.run(ctxt, del, timeout);
238 	}
239 
240 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
241 		return m_evLoop.run(ctxt, del);
242 	}
243 
244 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) {
245 		return m_evLoop.run(ctxt, del);
246 	}
247 
248 	version (Posix)
249 	AsyncUDSConnection accept(AsyncUDSListener ctxt) {
250 		return m_evLoop.accept(ctxt);
251 	}
252 
253 	bool kill(AsyncEvent obj, bool forced = true) {
254 		return m_evLoop.kill(obj, forced);
255 	}
256 
257 	bool kill(AsyncDirectoryWatcher obj) {
258 		return m_evLoop.kill(obj);
259 	}
260 
261 	bool kill(AsyncSocket obj, bool forced = false) {
262 		return m_evLoop.kill(obj, forced);
263 	}
264 
265 	bool kill(AsyncTCPConnection obj, bool forced = false) {
266 		return m_evLoop.kill(obj, forced);
267 	}
268 
269 	bool kill(AsyncTCPListener obj) {
270 		return m_evLoop.kill(obj);
271 	}
272 
273 	bool kill(shared AsyncSignal obj) {
274 		return m_evLoop.kill(obj);
275 	}
276 
277 	bool kill(AsyncNotifier obj) {
278 		return m_evLoop.kill(obj);
279 	}
280 
281 	bool kill(AsyncTimer obj) {
282 		return m_evLoop.kill(obj);
283 	}
284 
285 	bool kill(AsyncUDPSocket obj) {
286 		return m_evLoop.kill(obj);
287 	}
288 
289 	/**
290 		Runs the event loop once and returns false if a an unrecoverable error occured
291 		Using a value of 0 will return immediately, while a value of -1.seconds will block indefinitely
292 	*/
293 	public bool loop(Duration max_timeout = 100.msecs)
294 	{
295 		if(!m_asyncThreadsStarted) {
296 			if(!spawnAsyncThreads()) {
297 				return false;
298 			}
299 			m_asyncThreadsStarted = true;
300 		}
301 
302 		if (!m_evLoop.loop(max_timeout) && m_evLoop.status.code == Status.EVLOOP_FAILURE) {
303 			return false;
304 		}
305 
306 		return true;
307 	}
308 
309 }