1 module libasync.test;
2 version(unittest):
3 import libasync.events;
4 import std.stdio;
5 import std.datetime;
6 import libasync.file;
7 import std.conv : to;
8 import core.stdc.stdlib : getenv;
9 import std..string : fromStringz, toStringz;
10 
11 AsyncDirectoryWatcher g_watcher;
12 shared AsyncDNS g_dns;
13 string cache_path;
14 		
15 
16 unittest {
17 	cache_path = ".";
18 	version(iOS) cache_path = getenv("HOME".toStringz).fromStringz.to!string ~ "/Library/Caches";
19 
20 	spawnAsyncThreads();
21 	scope(exit)
22 		destroyAsyncThreads();
23 	//writeln("Unit test started");
24 	g_cbCheck = new shared bool[19];
25 	g_lastTimer = Clock.currTime();
26 	gs_start = Clock.currTime();
27 	g_evl = getThreadEventLoop();
28 	g_evl.loop(1.msecs);
29 	//writeln("Loading objects...");
30 	testDirectoryWatcher();
31 
32 	testDNS();
33  	testOneshotTimer();
34 	testMultiTimer();
35 	gs_tlsEvent = new shared AsyncSignal(g_evl);
36 	testSignal();
37 	testEvents();
38 	testTCPListen("localhost", 8081);
39 	testHTTPConnect();
40 	//writeln("Loaded. Running event loop...");
41 	testFile();
42 	testTCPConnect("localhost", 8081);
43 	while(Clock.currTime() - gs_start < 7.seconds)
44 		g_evl.loop(100.msecs);
45 
46 	int i;
47 	foreach (bool b; g_cbCheck) {
48 		assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]");
49 		i++;
50 	}
51 	writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start);
52 
53 	assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times
54 	g_watcher.kill();
55 	g_notifier.kill();
56 	g_listnr.kill();
57 	version(LDC) {
58 		import core.stdc.stdlib; exit(0);
59 	}
60 
61 }
62 
63 StopWatch g_swDns;
64 void testDNS() {
65 	g_dns = new shared AsyncDNS(g_evl);
66 	g_swDns.start();
67 	g_dns.handler((NetworkAddress addr) {
68 		g_cbCheck[17] = true;
69 		writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs");
70 	}).resolveHost("httpbin.org", false, true);
71 }
72 
73 void testDirectoryWatcher() {
74 	import std.file : mkdir, rmdir, exists;
75 	if (exists(cache_path ~ "/hey/tmp.tmp"))
76 		remove((cache_path ~ "/hey/tmp.tmp").toStringz);
77 	if (exists(cache_path ~ "/hey"))
78 		rmdir(cache_path ~ "/hey");
79 	g_watcher = new AsyncDirectoryWatcher(g_evl);
80 	g_watcher.run({
81 		DWChangeInfo[1] change;
82 		DWChangeInfo[] changeRef = change.ptr[0..1];
83 		bool done;
84 		while(g_watcher.readChanges(changeRef))
85 		{
86 			g_cbCheck[18] = true;
87 			writeln(change);
88 			if (change[0].event == DWFileEvent.DELETED)
89 				done = true;
90 		}
91 	});
92 	g_watcher.watchDir(cache_path);
93 	AsyncTimer tm = new AsyncTimer(g_evl);
94 	tm.duration(1.seconds).run({
95 		writeln("Creating directory ./hey");
96 		mkdir(cache_path ~ "/hey");
97 		assert(g_watcher.watchDir(cache_path ~ "/hey/"));
98 		tm.duration(1.seconds).run({
99 			static import std.file;
100 			writeln("Writing to ./hey/tmp.tmp for the first time");
101 			std.file.write(cache_path ~ "/hey/tmp.tmp", "some string");
102 			tm.duration(100.msecs).run({
103 				writeln("Removing ./hey/tmp.tmp");
104 				remove((cache_path ~ "/hey/tmp.tmp").toStringz);
105 				tm.kill();
106 			});
107 		});
108 	});
109 }
110 
111 void testFile() {
112 	gs_file = new shared AsyncFile(g_evl);
113 
114 	{
115 		File file = File(cache_path ~ "/test.txt", "w+");
116 		file.rawWrite("This is the file content.");
117 		file.close();
118 		//writeln("Wrote to file");
119 	}
120 	gs_file.onReady({
121 		//writeln("Created and wrote to test.txt through AsyncFile");
122 		auto file = gs_file;
123 		//if (file.status.code == Status.ERROR)
124 			//writeln("ERROR: ", file.status.text);
125 		
126 		import std..string : startsWith;
127 		if ((cast(string)file.buffer).startsWith("This is the file content.")) {
128 			g_cbCheck[7] = true;
129 		}
130 		else {
131 			writeln("ERROR: ", cast(string)file.buffer);
132 			assert(false);
133 		}
134 		import std.file : remove;
135 		gs_file.kill();
136 		remove(cache_path ~ "/test.txt");
137 		//writeln("Removed test.txt .. ");
138 	}).read(cache_path ~ "/test.txt");
139 
140 }
141 
142 
143 void testSignal() {
144 	g_notifier = new AsyncNotifier(g_evl);
145 	auto title = "This is my title";
146 
147 	void delegate() del = {
148 		import std.stdio;
149 		assert(title == "This is my title");
150 		g_cbCheck[0] = true;
151 		//writeln("Got signal title");
152 
153 		return;
154 	};
155 
156 	g_notifier.run(del);
157 	g_notifier.trigger(); // will be completed in the event loop
158 }
159 
160 void testEvents() {
161 
162 	gs_tlsEvent.run({
163 		assert(g_message == "Some message here");
164 		g_cbCheck[1] = true;
165 		//writeln("Got valid TLS Event");
166 	});
167 
168 	gs_shrEvent = new shared AsyncSignal(g_evl);
169 
170 	gs_shrEvent.run({
171 		assert(gs_hshr.message == "Hello from shared!");
172 		g_cbCheck[2] = true;
173 		//writeln("Got valid shared event!");
174 	});
175 
176 	testTLSEvent();
177 
178 	import std.concurrency;
179 	Tid t2 = spawn(&testSharedEvent);
180 	import core.thread : Thread;
181 	while (!gs_shrEvent2 || gs_shrEvent2.id == 0)
182 		Thread.sleep(100.msecs);
183 
184 	gs_shrEvent2.trigger(g_evl);
185 }
186 
187 void testTLSEvent() {
188 	gs_tlsEvent.trigger();
189 }
190 
191 void testSharedEvent() {
192 	EventLoop evl2 = new EventLoop;
193 
194 	gs_shrEvent2 = new shared AsyncSignal(evl2);
195 	gs_shrEvent2.run({
196 		g_cbCheck[3] = true;
197 		return;
198 	});
199 
200 	gs_shrEvent.trigger(evl2);
201 
202 	while(Clock.currTime() - gs_start < 1.seconds)
203 		evl2.loop();
204 
205 	gs_shrEvent.trigger(evl2);
206 
207 	while(Clock.currTime() - gs_start < 4.seconds)
208 		evl2.loop();
209 }
210 
211 void testOneshotTimer() {
212 	AsyncTimer g_timerOneShot = new AsyncTimer(g_evl);
213 	g_timerOneShot.duration(1.seconds).run({
214 		assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1400.msecs, "Timer completed in " ~ (Clock.currTime() - gs_start).total!"msecs".to!string ~ "ms" );
215 		assert(g_timerOneShot.id != 0);
216 		//writeln("Got timer callback!");
217 		g_cbCheck[4] = true;
218 
219 	});
220 }
221 
222 void testMultiTimer() {
223 	AsyncTimer g_timerMulti = new AsyncTimer(g_evl);
224 	g_timerMulti.periodic().duration(1.seconds).run({
225 		assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1400.msecs, "Timer completed in " ~ (Clock.currTime() - gs_start).total!"msecs".to!string ~ "ms" );
226 		assert(g_timerMulti.id > 0);
227 		assert(!g_timerMulti.oneShot);
228 		g_lastTimer = Clock.currTime();
229 		g_cbTimerCnt++;
230 		//writeln("Got timer callback #", g_cbTimerCnt.to!string);
231 		g_cbCheck[5] = true;
232 	});
233 
234 }
235 
236 
237 void trafficHandler(TCPEvent ev){
238 	//writeln("##TrafficHandler!");
239 	void doRead() {
240 		static ubyte[] bin = new ubyte[4092];
241 		while (true) {
242 			uint len = g_conn.recv(bin);
243 			//writeln("!!Server Received " ~ len.to!string ~ " bytes");
244 			// import std.file;
245 			if (len > 0) {
246 				auto res = cast(string)bin[0..len];
247 				//writeln(res);
248 				import std.algorithm : canFind;
249 				if (res.canFind("Client Hello"))
250 					g_cbCheck[8] = true;
251 
252 				if (res.canFind("Client WRITE"))
253 					g_cbCheck[8] = false;
254 
255 				if (res.canFind("Client READ"))
256 					g_cbCheck[9] = true;
257 
258 				if (res.canFind("Client KILL"))
259 					g_cbCheck[10] = true;
260 			}
261 			if (len < bin.length)
262 				break;
263 		}
264 	}
265 
266 	final switch (ev) {
267 		case TCPEvent.CONNECT:
268 			//writeln("!!Server Connected");
269 			doRead();
270 			if (g_conn.socket != 0)
271 				g_conn.send(cast(ubyte[])"Server Connect");
272 			break;
273 		case TCPEvent.READ:
274 			//writeln("!!Server Read is ready");
275 			g_cbCheck[11] = true;
276 			if (g_conn.socket != 0)
277 				g_conn.send(cast(ubyte[])"Server READ");
278 			doRead();
279 			break;
280 		case TCPEvent.WRITE:
281 			//writeln("!!Server Write is ready");
282 			if (g_conn.socket != 0)
283 				g_conn.send(cast(ubyte[])"Server WRITE");
284 			break;
285 		case TCPEvent.CLOSE:
286 			doRead();
287 			//writeln("!!Server Disconnect!");
288 			g_cbCheck[12] = true;
289 			break;
290 		case TCPEvent.ERROR:
291 			//writeln("!!Server Error!");
292 			break;
293 	}
294 
295 	return;
296 }
297 
298 void testTCPListen(string ip, ushort port) {
299 
300 	g_listnr = new AsyncTCPListener(g_evl);
301 
302 	void delegate(TCPEvent) handler(AsyncTCPConnection conn) {
303 		g_conn = conn;
304 		g_cbCheck[6] = true;
305 		import std.functional : toDelegate;
306 		//writeln("Got handler TCPListen");
307 		return toDelegate(&trafficHandler);
308 	}
309 
310 	auto success = g_listnr.host(ip, port).run(&handler);
311 	assert(success, g_listnr.error);
312 }
313 
314 void testTCPConnect(string ip, ushort port) {
315 	auto conn = new AsyncTCPConnection(g_evl);
316 	conn.peer = g_evl.resolveHost(ip, port);
317 
318 	void delegate(TCPEvent) connHandler = (TCPEvent ev){
319 		void doRead() {
320 			static ubyte[] bin = new ubyte[4092];
321 			while (true) {
322 				assert(conn.socket > 0);
323 				uint len = conn.recv(bin);
324 				//writeln("!!Client Received " ~ len.to!string ~ " bytes");
325 				//if (len > 0)
326 				//	writeln(cast(string)bin[0..len]);
327 				if (len < bin.length)
328 					break;
329 			}
330 		}
331 		final switch (ev) {
332 			case TCPEvent.CONNECT:
333 				// writeln("!!Client Connected");
334 				conn.setOption(TCPOption.QUICK_ACK, true);
335 				conn.setOption(TCPOption.NODELAY, true);
336 				g_cbCheck[14] = true;
337 				if (conn.socket != 0)
338 					conn.send(cast(ubyte[])"Client Hello");
339 				assert(conn.socket > 0);
340 				break;
341 			case TCPEvent.READ:
342 				//writeln("!!Client Read is ready at writes: ", g_writes);
343 				doRead();
344 
345 				// respond
346 				g_writes += 1;
347 				if (g_writes > 3) {
348 					if (conn.socket != 0)
349 						conn.send(cast(ubyte[])"Client KILL");
350 					conn.kill();
351 
352 					g_cbCheck[13] = true;
353 				}
354 				else
355 					if (conn.socket != 0)
356 						conn.send(cast(ubyte[])"Client READ");
357 
358 				break;
359 			case TCPEvent.WRITE:
360 
361 				g_writes += 1;
362 				//writeln("!!Client Write is ready");
363 				if (conn.socket != 0)
364 					conn.send(cast(ubyte[])"Client WRITE");
365 				break;
366 			case TCPEvent.CLOSE:
367 				//writeln("!!Client Disconnected");
368 				break;
369 			case TCPEvent.ERROR:
370 				//writeln("!!Client Error!");
371 				break;
372 		}
373 		return;
374 	};
375 
376 	auto success = conn.run(connHandler);
377 	assert(success);
378 
379 }
380 
381 void testHTTPConnect() {
382 	auto conn = new AsyncTCPConnection(g_evl);
383 	conn.peer = g_evl.resolveHost("httpbin.org", 80);
384 
385 	auto del = (TCPEvent ev){
386 		final switch (ev) {
387 			case TCPEvent.CONNECT:
388 				//writeln("!!Connected");
389 				static ubyte[] abin = new ubyte[4092];
390 				while (true) {
391 					uint len = conn.recv(abin);
392 					if (len < abin.length)
393 						break;
394 				}
395 				g_cbCheck[15] = true;
396 				//writeln(conn.local.toString());
397 				//writeln(conn.peer.toString());
398 				conn.send(cast(ubyte[])"GET /ip\nHost: httpbin.org\nConnection: close\n\n");
399 				break;
400 			case TCPEvent.READ:
401 				static ubyte[] bin = new ubyte[4092];
402 				while (true) {
403 					uint len = conn.recv(bin);
404 					g_cbCheck[16] = true;
405 					if (len > 0) writeln("HTTP Response: ", cast(string)bin.ptr[0 .. len]);
406 					//writeln("!!Received " ~ len.to!string ~ " bytes");
407 					if (len < bin.length)
408 						break;
409 				}
410 				break;
411 			case TCPEvent.WRITE:
412 				//writeln("!!Write is ready");
413 				break;
414 			case TCPEvent.CLOSE:
415 				//writeln("!!Disconnected");
416 				break;
417 			case TCPEvent.ERROR:
418 				//writeln("!!Error!");
419 				break;
420 		}
421 		return;
422 	};
423 
424 
425 	conn.run(del);
426 }
427 
428 EventLoop g_evl;
429 AsyncTimer g_timerOneShot;
430 AsyncTimer g_timerMulti;
431 AsyncTCPConnection g_tcpConnect;
432 AsyncTCPConnection g_httpConnect;
433 AsyncTCPConnection g_conn; // incoming
434 AsyncNotifier g_notifier;
435 AsyncTCPListener g_listnr;
436 shared AsyncSignal gs_tlsEvent;
437 shared AsyncSignal gs_shrEvent;
438 shared AsyncSignal gs_shrEvent2;
439 shared AsyncFile gs_file;
440 __gshared SysTime gs_start;
441 string g_message = "Some message here";
442 shared Msg* gs_hshr = new shared Msg("Hello from shared!");
443 shared bool[] g_cbCheck;
444 int g_cbTimerCnt;
445 int g_writes;
446 SysTime g_lastTimer;
447 
448 shared struct Msg {
449 	string message;
450 }