1 module libasync.threads;
2 import core.sync.mutex;
3 import core.sync.condition;
4 import core.thread;
5 import libasync.events;
6 import std.stdio;
7 import std.container : Array;
8 import core.atomic;
9 
10 nothrow {
11 	struct Waiter {
12 		Mutex mtx;
13 		Condition cond;
14 	}
15 
16 	__gshared Mutex gs_wlock;
17 	__gshared Array!Waiter gs_waiters;
18 	__gshared Array!CommandInfo gs_jobs;
19 	__gshared Condition gs_started;
20 	shared(bool) gs_closing;
21 
22 	__gshared ThreadGroup gs_threads; // daemon threads
23 	shared(int) gs_threadCnt;
24 	
25 }
26 
27 final class CmdProcessor : Thread 
28 {
29 nothrow:
30 private:
31 	EventLoop m_evLoop;
32 	Waiter m_waiter;
33 	shared(bool) g_stop;
34 	
35 	this() {
36 		try {
37 			Mutex mtx = new Mutex;
38 			Condition cond = new Condition(mtx);
39 			m_waiter = Waiter(mtx, cond);
40 			super(&run);
41 		}
42 		catch (Throwable e) {
43 			static if (DEBUG) {
44 				import std.stdio;
45 				try writeln("Failed to run thread ... ", e.toString()); catch {}
46 			}
47 		}
48 	}
49 
50 	void process(shared AsyncDNS ctxt) {
51 		DNSCmdInfo cmdInfo = ctxt.cmdInfo();
52 		auto mutex = cmdInfo.mtx;
53 		DNSCmd cmd;
54 		Waiter waiter;
55 		string url;
56 		cmd = cmdInfo.command; 
57 		waiter = cast(Waiter)cmdInfo.waiter;
58 		url = cmdInfo.url;
59 		try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {}
60 
61 		try final switch (cmd)
62 		{
63 			case DNSCmd.RESOLVEHOST:
64 				*ctxt.addr = cast(shared) m_evLoop.resolveHost(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no);
65 				break;
66 
67 			case DNSCmd.RESOLVEIP:
68 				*ctxt.addr = cast(shared) m_evLoop.resolveIP(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no);
69 				break;
70 
71 		} catch (Throwable e) {
72 			auto status = StatusInfo.init;
73 			status.code = Status.ERROR;
74 			try status.text = e.toString(); catch {}
75 			ctxt.status = status;
76 		}
77 		
78 		try {
79 			cmdInfo.ready.trigger(m_evLoop);
80 			synchronized(gs_wlock)
81 				gs_waiters.insertBack(m_waiter);
82 			gs_started.notifyAll(); // saves some waiting on a new thread
83 		}
84 		catch (Throwable e) {
85 			auto status = StatusInfo.init;
86 			status.code = Status.ERROR;
87 			try status.text = e.toString(); catch {}
88 			ctxt.status = status;
89 		}
90 	}
91 	
92 	void process(shared AsyncFile ctxt) {
93 		auto cmdInfo = ctxt.cmdInfo;
94 		auto mutex = cmdInfo.mtx;
95 		FileCmd cmd;
96 		Waiter waiter;
97 		cmd = cmdInfo.command; 
98 		waiter = cast(Waiter)cmdInfo.waiter;
99 
100 		try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {}
101 
102 		try final switch (cmd)
103 		{
104 			case FileCmd.READ:
105 				File file = ctxt.file;
106 				if (ctxt.offset != -1)
107 					file.seek(cast(long)ctxt.offset);
108 				ubyte[] res;
109 				synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer);
110 				if (res)
111 					ctxt.offset = cast(ulong) (ctxt.offset + res.length);
112 
113 				break;
114 				
115 			case FileCmd.WRITE:
116 
117 				File file = cast(File)ctxt.file;
118 				if (ctxt.offset != -1)
119 					file.seek(cast(long)ctxt.offset);
120 				synchronized(mutex) {
121 					file.rawWrite(cast(ubyte[])ctxt.buffer);
122 				}
123 				file.flush();
124 				ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length);
125 				break;
126 
127 			case FileCmd.APPEND:				
128 				File file = cast(File)ctxt.file;
129 				synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer);
130 				ctxt.offset = cast(ulong) file.size();
131 				file.flush();
132 				break;
133 		} catch (Throwable e) {
134 			auto status = StatusInfo.init;
135 			status.code = Status.ERROR;
136 			try status.text = "Error in " ~  cmd.to!string ~ ", " ~ e.toString(); catch {}
137 			ctxt.status = status;
138 		}
139 
140 
141 		try {
142 
143 			cmdInfo.ready.trigger(m_evLoop);
144 
145 			synchronized(gs_wlock)
146 				gs_waiters.insertBack(m_waiter);
147 			gs_started.notifyAll(); // saves some waiting on a new thread
148 		}
149 		catch (Throwable e) {
150 			static if (DEBUG) {
151 				try writeln("AsyncFile Thread Error: ", e.toString()); catch {}
152 			}
153 			auto status = StatusInfo.init;
154 			status.code = Status.ERROR;
155 			try status.text = e.toString(); catch {}
156 			ctxt.status = status;
157 		}
158 	}
159 	
160 	void run()
161 	{
162 		core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1);
163 		try {
164 			m_evLoop = new EventLoop;
165 			synchronized(gs_wlock) {
166 				gs_waiters.insertBack(m_waiter);
167 			}
168 			
169 			gs_started.notifyAll();
170 
171 			process();
172 		} catch (Throwable e) {
173 			static if (DEBUG) {
174 				try writeln("Error inserting in waiters " ~ e.toString()); catch {}
175 			}
176 		}
177 
178 		core.atomic.atomicOp!"-="(gs_threadCnt, cast(int) 1);
179 	}
180 	
181 	void stop()
182 	{
183 		atomicStore(g_stop, true);
184 		try (cast(Waiter)m_waiter).cond.notifyAll();
185 		catch (Exception e) {
186 			static if (DEBUG) {
187 				try writeln("Exception occured notifying foreign thread: ", e.toString()); catch {}
188 			}
189 		}
190 	}
191 	
192 	private void process() {
193 		while(!atomicLoad(g_stop)) {
194 			CommandInfo cmd;
195 			try synchronized(m_waiter.mtx) {
196 				if (!m_waiter.cond) return;
197 				m_waiter.cond.wait();
198 			}
199 			catch {}
200 			if (atomicLoad(g_stop)) break;
201 			try synchronized(gs_wlock) {
202 				if (gs_jobs.empty) continue;
203 				cmd = gs_jobs.back;
204 				gs_jobs.removeBack();
205 			} catch {}
206 
207 			final switch (cmd.type) {
208 				case CmdInfoType.FILE:
209 					process(cast(shared AsyncFile) cmd.data);
210 					break;
211 				case CmdInfoType.DNS:
212 					process(cast(shared AsyncDNS) cmd.data);
213 					break;
214 			}
215 
216 		}
217 	}
218 
219 }
220 
221 Waiter popWaiter() {
222 	if (atomicLoad(gs_threadCnt) == 0) return Waiter.init; // we're in a static ctor probably...
223 	Waiter cmd_handler;
224 	bool start_thread;
225 	do {
226 		if (start_thread) {
227 			Thread thr = new CmdProcessor;
228 			thr.isDaemon = true;
229 			thr.name = "CmdProcessor";
230 			thr.start();
231 			gs_threads.add(thr);
232 		}
233 		
234 		synchronized(gs_wlock) {
235 			if (start_thread && !gs_started.wait(5.seconds))
236 				continue;
237 			
238 			try {
239 				if (!cmd_handler.mtx && !gs_waiters.empty) {
240 					cmd_handler = gs_waiters.back;
241 					gs_waiters.removeBack();
242 				}
243 				else if (core.atomic.atomicLoad(gs_threadCnt) < 16) {
244 					start_thread = true;
245 				}
246 				else {
247 					Thread.sleep(50.usecs);
248 				}
249 			} catch (Exception e){
250 				static if (DEBUG) {
251 					import std.stdio : writeln;
252 					writeln("Exception in popWaiter: ", e);
253 				}
254 			}
255 		}
256 	} while(!cmd_handler.cond);
257 	return cmd_handler;
258 }
259 
260 shared static this() {
261 	import std.stdio : writeln;
262 	gs_wlock = new Mutex;
263 	gs_threads = new ThreadGroup;
264 	gs_started = new Condition(gs_wlock);
265 }
266 
267 bool spawnAsyncThreads() nothrow {
268 	import core.atomic : atomicLoad;
269 	try {
270 		if(!atomicLoad(gs_closing) && atomicLoad(gs_threadCnt) == 0) {
271 			synchronized {
272 				if(!atomicLoad(gs_closing) && atomicLoad(gs_threadCnt) == 0) {
273 					foreach (i; 0 .. 4) {
274 						Thread thr = new CmdProcessor;
275 						thr.isDaemon = true;
276 						thr.name = "CmdProcessor";
277 						gs_threads.add(thr);
278 						thr.start();
279 						synchronized(gs_wlock)
280 							gs_started.wait(1.seconds);
281 					}
282 				}
283 			}
284 		}
285 	} catch(Exception ignore) {
286 		return false;
287 	}
288 	return true;
289 }
290 
291 void destroyAsyncThreads() {
292 	if (!atomicLoad(gs_closing)) atomicStore(gs_closing, true);
293 	else return;
294 	synchronized(gs_wlock) foreach (thr; gs_threads) {
295 		CmdProcessor thread = cast(CmdProcessor)thr;
296 		thread.stop();
297 		import core.thread : thread_isMainThread;
298 		if (thread_isMainThread)
299 			thread.join();
300 	}
301 }
302 
303 static ~this() {
304  import core.thread : thread_isMainThread;
305                 if (thread_isMainThread)
306 	destroyAsyncThreads();
307 }
308 
309 shared static ~this() {
310 	assert(atomicLoad(gs_closing), "You must call libasync.threads.destroyAsyncThreads() upon termination of the program to avoid segfaulting");
311 }
312 
313 enum CmdInfoType {
314 	FILE,
315 	DNS
316 }
317 
318 struct CommandInfo {
319 	CmdInfoType type;
320 	void* data;
321 }