1 module libasync.threads;
2 import core.sync.mutex;
3 import core.sync.condition;
4 import core.thread;
5 import libasync.events;
6 import std.stdio;
7 import std.container : Array;
8 
9 nothrow {
10 	struct Waiter {
11 		Mutex mtx;
12 		Condition cond;
13 	}
14 
15 	__gshared Mutex gs_wlock;
16 	__gshared Array!Waiter gs_waiters;
17 	__gshared Array!CommandInfo gs_jobs;
18 	__gshared Condition gs_started;
19 	__gshared bool gs_closing;
20 
21 	__gshared ThreadGroup gs_threads; // daemon threads
22 	shared(int) gs_threadCnt;
23 	
24 }
25 
26 final class CmdProcessor : Thread 
27 {
28 nothrow:
29 private:
30 	EventLoop m_evLoop;
31 	Waiter m_waiter;
32 	bool m_stop;
33 	
34 	this() {
35 		try {
36 			Mutex mtx = new Mutex;
37 			Condition cond = new Condition(mtx);
38 			m_waiter = Waiter(mtx, cond);
39 			super(&run);
40 		}
41 		catch (Throwable e) {
42 			import std.stdio;
43 			try writeln("Failed to run thread ... ", e.toString()); catch {}
44 		}
45 	}
46 
47 	void process(shared AsyncDNS ctxt) {
48 		DNSCmdInfo cmdInfo = ctxt.cmdInfo();
49 		auto mutex = cmdInfo.mtx;
50 		DNSCmd cmd;
51 		Waiter waiter;
52 		string url;
53 		cmd = cmdInfo.command; 
54 		waiter = cast(Waiter)cmdInfo.waiter;
55 		url = cmdInfo.url;
56 		try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {}
57 
58 		try final switch (cmd)
59 		{
60 			case DNSCmd.RESOLVEHOST:
61 				*ctxt.addr = cast(shared) m_evLoop.resolveHost(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no);
62 				break;
63 
64 			case DNSCmd.RESOLVEIP:
65 				*ctxt.addr = cast(shared) m_evLoop.resolveIP(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no);
66 				break;
67 
68 		} catch (Throwable e) {
69 			auto status = StatusInfo.init;
70 			status.code = Status.ERROR;
71 			try status.text = e.toString(); catch {}
72 			ctxt.status = status;
73 		}
74 		
75 		try {
76 			cmdInfo.ready.trigger(m_evLoop);
77 			synchronized(gs_wlock)
78 				gs_waiters.insertBack(m_waiter);
79 			gs_started.notifyAll(); // saves some waiting on a new thread
80 		}
81 		catch (Throwable e) {
82 			auto status = StatusInfo.init;
83 			status.code = Status.ERROR;
84 			try status.text = e.toString(); catch {}
85 			ctxt.status = status;
86 		}
87 	}
88 	
89 	void process(shared AsyncFile ctxt) {
90 		auto cmdInfo = ctxt.cmdInfo;
91 		auto mutex = cmdInfo.mtx;
92 		FileCmd cmd;
93 		Waiter waiter;
94 		cmd = cmdInfo.command; 
95 		waiter = cast(Waiter)cmdInfo.waiter;
96 
97 		try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {}
98 		import std.file : exists;
99 		try if (cmdInfo.create_if_not_exists || cmdInfo.truncate_if_exists) {
100 			bool flag;
101 			if (cmdInfo.create_if_not_exists && !exists(ctxt.filePath.toNativeString()))
102 				flag = true;
103 			else if (cmdInfo.truncate_if_exists && exists(ctxt.filePath.toNativeString()))
104 				flag = true;
105 			if (flag) // touch
106 			{	File dummy = File(ctxt.filePath.toNativeString(), "w"); }
107 		}
108 		catch (Exception e){
109 			auto status = StatusInfo.init;
110 			status.code = Status.ERROR;
111 			try status.text = "Could not create the file in destination: " ~ e.toString(); catch {}
112 			ctxt.status = status;
113 		}
114 
115 		
116 		try final switch (cmd)
117 		{
118 			case FileCmd.READ:
119 				File file = File(ctxt.filePath.toNativeString(), "rb");
120 				if (ctxt.offset != -1)
121 					file.seek(cast(long)ctxt.offset);
122 				ubyte[] res;
123 				synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer);
124 				if (res)
125 					ctxt.offset = cast(ulong) (ctxt.offset + res.length);
126 
127 				break;
128 				
129 			case FileCmd.WRITE:
130 
131 				File file = File(ctxt.filePath.toNativeString(), "r+b");
132 				if (ctxt.offset != -1)
133 					file.seek(cast(long)ctxt.offset);
134 				synchronized(mutex) {
135 					file.rawWrite(cast(ubyte[])ctxt.buffer);
136 				}
137 				file.flush();
138 				ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length);
139 				break;
140 
141 			case FileCmd.APPEND:
142 				
143 				File file = File(ctxt.filePath.toNativeString(), "a+b");
144 				synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer);
145 				ctxt.offset = cast(ulong) file.size();
146 				break;
147 		} catch (Throwable e) {
148 			auto status = StatusInfo.init;
149 			status.code = Status.ERROR;
150 			try status.text = "Error in " ~  cmd.to!string ~ ", " ~ e.toString(); catch {}
151 			ctxt.status = status;
152 		}
153 
154 
155 		try {
156 
157 			cmdInfo.ready.trigger(m_evLoop);
158 
159 			synchronized(gs_wlock)
160 				gs_waiters.insertBack(m_waiter);
161 			gs_started.notifyAll(); // saves some waiting on a new thread
162 		}
163 		catch (Throwable e) {
164 			try writeln("ERROR"); catch {}
165 			auto status = StatusInfo.init;
166 			status.code = Status.ERROR;
167 			try status.text = e.toString(); catch {}
168 			ctxt.status = status;
169 		}
170 	}
171 	
172 	void run()
173 	{
174 		try {
175 			m_evLoop = new EventLoop;
176 			synchronized(gs_wlock) {
177 				gs_waiters.insertBack(m_waiter);
178 			}
179 			
180 			gs_started.notifyAll();
181 
182 			process();
183 		} catch (Throwable e) {
184 			try writeln("Error inserting in waiters " ~ e.toString()); catch {}
185 		}
186 
187 		core.atomic.atomicOp!"-="(gs_threadCnt, cast(int) 1);
188 	}
189 	
190 	void stop()
191 	{
192 		m_stop = true;
193 		try (cast(Waiter)m_waiter).cond.notifyAll();
194 		catch (Exception e) {
195 			try writeln("Exception occured notifying foreign thread: ", e); catch {}
196 		}
197 	}
198 	
199 	private void process() {
200 		while(!m_stop) {
201 			CommandInfo cmd;
202 			try synchronized(m_waiter.mtx)
203 				m_waiter.cond.wait();
204 			catch {}
205 
206 			if (m_stop) break;
207 
208 			try synchronized(gs_wlock) {
209 				if (gs_jobs.empty) continue;
210 				cmd = gs_jobs.back;
211 				gs_jobs.removeBack();
212 			} catch {}
213 
214 			final switch (cmd.type) {
215 				case CmdInfoType.FILE:
216 					process(cast(shared AsyncFile) cmd.data);
217 					break;
218 				case CmdInfoType.DNS:
219 					process(cast(shared AsyncDNS) cmd.data);
220 					break;
221 			}
222 
223 		}
224 	}
225 
226 }
227 
228 Waiter popWaiter() {
229 	Waiter cmd_handler;
230 	bool start_thread;
231 	do {
232 		if (start_thread) {
233 			Thread thr = new CmdProcessor;
234 			thr.isDaemon = true;
235 			thr.name = "CmdProcessor";
236 			thr.start();
237 			core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1);
238 			gs_threads.add(thr);
239 		}
240 		
241 		synchronized(gs_wlock) {
242 			if (start_thread && !gs_started.wait(5.seconds))
243 				continue;
244 			
245 			try {
246 				if (!cmd_handler.mtx && !gs_waiters.empty) {
247 					cmd_handler = gs_waiters.back;
248 					gs_waiters.removeBack();
249 				}
250 				else if (core.atomic.atomicLoad(gs_threadCnt) < 16) {
251 					start_thread = true;
252 				}
253 				else {
254 					Thread.sleep(50.usecs);
255 				}
256 			} catch (Exception e){
257 				writeln("Exception in popWaiter: ", e);
258 			}
259 		}
260 	} while(!cmd_handler.cond);
261 	return cmd_handler;
262 }
263 
264 shared static this() {
265 	import std.stdio : writeln;
266 	gs_wlock = new Mutex;
267 	gs_threads = new ThreadGroup;
268 	gs_started = new Condition(gs_wlock);
269 	foreach (i; 0 .. 4) {
270 		Thread thr = new CmdProcessor;
271 		gs_threads.add(thr);
272 		thr.isDaemon = true;
273 		thr.name = "CmdProcessor";
274 		thr.start();
275 		core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1);
276 		synchronized(gs_wlock)
277 			gs_started.wait(1.seconds);
278 	}
279 }
280 
281 void destroyAsyncThreads() {
282 	if (!gs_closing) gs_closing = true;
283 	else return;
284 	import core.memory : GC;
285 	GC.disable();
286 	synchronized(gs_wlock) foreach (thr; gs_threads) {
287 		CmdProcessor thread = cast(CmdProcessor)thr;
288 		thread.stop();
289 		thread.join();
290 	}
291 }
292 
293 shared static ~this() {
294 	assert(core.atomic.atomicLoad(gs_threadCnt) == 0, "You must call libasync.threads.destroyAsyncThreads() upon termination of the program to avoid segfaulting");
295 }
296 
297 enum CmdInfoType {
298 	FILE,
299 	DNS
300 }
301 
302 struct CommandInfo {
303 	CmdInfoType type;
304 	void* data;
305 }