1 module libasync.events;
2 
3 import std.stdio;
4 
5 import core.thread;
6 import std.container : Array;
7 import std.datetime : Duration;
8 import std.typecons : Flag;
9 import libasync.internals.memory : FreeListObjectAlloc;
10 
11 public import libasync.types;
12 public import libasync.bufferedtcp;
13 public import libasync.tcp;
14 public import libasync.udp;
15 public import libasync.notifier;
16 public import libasync.dns;
17 public import libasync.timer;
18 public import libasync.signal;
19 public import libasync.watcher;
20 public import libasync.file;
21 public import libasync.threads;
22 
23 version(Windows) {
24 	public import libasync.windows;
25 }
26 
27 version(Posix) {
28 	public import libasync.posix;
29 }
30 
31 EventLoop getThreadEventLoop() nothrow {
32 	static EventLoop evLoop;
33 	if (!evLoop)  {
34 		evLoop = new EventLoop;
35 	}
36 	return evLoop;
37 }
38 
39 /// Event handlers can be registered to the event loop by being run(), all events
40 /// associated with them will trigger the OS to resume the underlying thread which
41 /// enables the existence of all the asynchroneous event objects in this library. 
42 final class EventLoop
43 {
44 
45 package:
46 	EventLoopImpl m_evLoop;
47 	bool m_asyncThreadsStarted = false;
48 
49 nothrow:
50 public:
51 	this() { 
52 		if (m_evLoop.started || !m_evLoop.init(this))
53 			assert(false, "Event loop initialization failure");
54 	}
55 
56 	/// Call this to cleanup underlying OS resources. The implementation is currently incomplete
57 	/// and requires the process to be shut down for the resources to be collected automatically.
58 	/// Used as a placeholder in the meantime.
59 	void exit() {
60 		m_evLoop.exit();
61 	}
62 
63 	NetworkAddress resolveIP(in string ip, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes)
64 	{
65 		if (!force)
66 			return m_evLoop.getAddressFromIP(ip, port, ipv6, tcp);
67 		NetworkAddress addr = m_evLoop.getAddressFromIP(ip, port, ipv6, tcp);
68 		if (status.code != Status.OK)
69 			addr = m_evLoop.getAddressFromIP(ip, port, !ipv6, tcp);
70 		return addr;
71 	}
72 	
73 	/* Blocks until the hostname is resolved, unless it's invalid. */
74 	NetworkAddress resolveHost(in string host, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes, isForced force = isForced.yes)
75 	{
76 		if (!force)
77 			return m_evLoop.getAddressFromDNS(host, port, ipv6, tcp);
78 		NetworkAddress addr = m_evLoop.getAddressFromDNS(host, port, ipv6, tcp);
79 		if (status.code != Status.OK)
80 			addr = m_evLoop.getAddressFromDNS(host, port, !ipv6, tcp);
81 		return addr;
82 	}
83 
84 package:
85 
86 	@property StatusInfo status() const
87 	{
88 		return m_evLoop.status;
89 	}
90 
91 	@property string error() const 
92 	{
93 		return m_evLoop.error;
94 	}
95 
96 	uint recvFrom(in fd_t fd, ref ubyte[] data, ref NetworkAddress addr) {
97 		return m_evLoop.recvFrom(fd, data, addr);
98 	}
99 
100 	uint sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr) {
101 		return m_evLoop.sendTo(fd, data, addr);
102 	}
103 
104 	uint recv(in fd_t fd, ref ubyte[] data)
105 	{
106 		return m_evLoop.recv(fd, data);
107 	}
108 
109 	/*uint recv(out ubyte[] data, in fd_t fd, in NetworkAddress dst)
110 	{
111 		return m_evLoop.recv(data, fd, dst);
112 	}*/
113 	
114 	uint send(in fd_t fd, in ubyte[] data)
115 	{
116 		return m_evLoop.send(fd, data);
117 	}
118 	
119 	uint read(in fd_t fd, ref ubyte[] data)
120 	{
121 		return m_evLoop.read(fd, data);
122 	}
123 
124 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
125 		return m_evLoop.readChanges(fd, dst);
126 	}
127 
128 	uint write(in fd_t fd, in ubyte[] data)
129 	{
130 		return m_evLoop.write(fd, data);
131 	}
132 
133 	uint watch(in fd_t fd, in WatchInfo info) {
134 		return m_evLoop.watch(fd, info);
135 	}
136 
137 	bool unwatch(in fd_t fd, in fd_t wd) {
138 		return m_evLoop.unwatch(fd, wd);
139 	}
140 
141 	bool broadcast(in fd_t fd, bool b)
142 	{
143 		return m_evLoop.broadcast(fd, b);
144 	}
145 
146 	NetworkAddress localAddr(in fd_t fd, bool ipv6 = false) {
147 		return m_evLoop.localAddr(fd, ipv6);
148 	}
149 
150 	bool notify(T)(in fd_t fd, T payload) 
151 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
152 	{
153 		return m_evLoop.notify(fd, payload);
154 	}
155 	
156 	bool setOption(T)(in fd_t fd, TCPOption option, in T val) {
157 		return m_evLoop.setOption(fd, option, val);
158 	}
159 
160 	/*uint send(in ubyte[] data, in fd_t fd, in NetworkAddress dst)
161 	{
162 		return m_evLoop.send(data, fd, dst);
163 	}*/
164 
165 	bool closeSocket(fd_t fd, bool connected, bool listener = false)
166 	{
167 		return m_evLoop.closeSocket(fd, connected, listener);
168 	}
169 
170 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del) {
171 		return m_evLoop.run(ctxt, del);
172 	}
173 
174 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del) {
175 		return m_evLoop.run(ctxt, del);
176 	}
177 	
178 	fd_t run(shared AsyncSignal ctxt) {
179 		return m_evLoop.run(ctxt);
180 	}
181 	
182 	fd_t run(AsyncNotifier ctxt) {
183 		return m_evLoop.run(ctxt);
184 	}
185 	
186 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
187 		return m_evLoop.run(ctxt, del, timeout);
188 	}
189 
190 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
191 		return m_evLoop.run(ctxt, del);
192 	}
193 
194 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del) {
195 		return m_evLoop.run(ctxt, del);
196 	}
197 
198 	bool kill(AsyncDirectoryWatcher obj) {
199 		return m_evLoop.kill(obj);
200 	}
201 
202 	bool kill(AsyncTCPConnection obj, bool forced = false) {
203 		return m_evLoop.kill(obj, forced);
204 	}
205 
206 	bool kill(AsyncTCPListener obj) {
207 		return m_evLoop.kill(obj);
208 	}
209 	
210 	bool kill(shared AsyncSignal obj) {
211 		return m_evLoop.kill(obj);
212 	}
213 	
214 	bool kill(AsyncNotifier obj) {
215 		return m_evLoop.kill(obj);
216 	}
217 
218 	bool kill(AsyncTimer obj) {
219 		return m_evLoop.kill(obj);
220 	}
221 
222 	bool kill(AsyncUDPSocket obj) {
223 		return m_evLoop.kill(obj);
224 	}
225 
226 	/**
227 		Runs the event loop once and returns false if a an unrecoverable error occured
228 	*/
229 	public bool loop(Duration max_timeout = 100.msecs)
230 	{
231 		if(!m_asyncThreadsStarted) {
232 			if(!spawnAsyncThreads()) {
233 				return false;
234 			}
235 			m_asyncThreadsStarted = true;
236 		}
237 
238 		if (!m_evLoop.loop(max_timeout) && m_evLoop.status.code == Status.EVLOOP_FAILURE)
239 			return false;
240 
241 		return true;
242 	}
243 	
244 }