1 module libasync.test;
2 version(unittest):
3 import libasync.events;
4 import std.stdio;
5 import std.datetime;
6 import libasync.file;
7 
8 AsyncDirectoryWatcher g_watcher;
9 shared AsyncDNS g_dns;
10 
11 
12 unittest {
13 	spawnAsyncThreads();
14 	scope(exit) 
15 		destroyAsyncThreads();
16 	// writeln("Unit test started");
17 	g_cbCheck = new shared bool[19];
18 	g_lastTimer = Clock.currTime();
19 	gs_start = Clock.currTime();
20 	g_evl = getThreadEventLoop();
21 	g_evl.loop(1.msecs);
22 	// writeln("Loading objects...");
23 	testDirectoryWatcher();
24 	
25 	testDNS();
26  	testOneshotTimer();
27 	testMultiTimer();
28 	gs_tlsEvent = new shared AsyncSignal(g_evl);
29 	testSignal();
30 	testEvents();
31 	testTCPListen("localhost", 8081);
32 	testHTTPConnect();
33 	// writeln("Loaded. Running event loop...");
34 	testFile();
35 	testTCPConnect("localhost", 8081);
36 
37 	while(Clock.currTime() - gs_start < 7.seconds) 
38 		g_evl.loop(100.msecs);
39 
40 	int i;
41 	foreach (bool b; g_cbCheck) {
42 		assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]");
43 		i++;
44 	}
45 	// writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start);
46 
47 	assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times
48 	g_watcher.kill();
49 	g_notifier.kill();
50 	g_listnr.kill();
51 }
52 
53 StopWatch g_swDns;
54 void testDNS() {
55 	g_dns = new shared AsyncDNS(g_evl);
56 	g_swDns.start();
57 	g_dns.handler((NetworkAddress addr) {
58 		g_cbCheck[17] = true;
59 		// writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs");
60 	}).resolveHost("127.0.0.1");
61 }
62 
63 void testDirectoryWatcher() {
64 	import std.file : mkdir, write, rmdir, exists;
65 	if (exists("./hey/tmp.tmp"))
66 		remove("./hey/tmp.tmp");
67 	if (exists("./hey"))
68 		rmdir("./hey");
69 	g_watcher = new AsyncDirectoryWatcher(g_evl);
70 	g_watcher.run({
71 		DWChangeInfo[1] change;
72 		DWChangeInfo[] changeRef = change.ptr[0..1];
73 		bool done;
74 		while(g_watcher.readChanges(changeRef))
75 		{
76 			g_cbCheck[18] = true;
77 			writeln(change);
78 			if (change[0].event == DWFileEvent.DELETED)
79 				done = true;
80 		}
81 	});
82 	g_watcher.watchDir(".");
83 	AsyncTimer tm = new AsyncTimer(g_evl);
84 	tm.duration(1.seconds).run({
85 		writeln("Creating directory ./hey");
86 		mkdir("./hey");
87 		assert(g_watcher.watchDir("./hey/"));
88 		tm.duration(1.seconds).run({
89 			writeln("Writing to ./hey/tmp.tmp for the first time");
90 			std.file.write("./hey/tmp.tmp", "some string");
91 			tm.duration(100.msecs).run({
92 				writeln("Removing ./hey/tmp.tmp");
93 				remove("./hey/tmp.tmp");
94 				tm.kill();
95 			});
96 		});
97 	});
98 }
99 
100 void testFile() {
101 	gs_file = new shared AsyncFile(g_evl);
102 
103 	{
104 		File file = File("test.txt", "w");
105 		file.rawWrite("This is the file content.");
106 		file.close();
107 	}
108 	gs_file.onReady({
109 		writeln("Created and wrote to test.txt through AsyncFile");
110 		auto file = gs_file;
111 		if (file.status.code == Status.ERROR)
112 			writeln("ERROR: ", file.status.text);
113 		import std.algorithm;
114 		if ((cast(string)file.buffer).startsWith("This is the file content.")) {
115 			g_cbCheck[7] = true;
116 		}
117 		else {
118 			//import std.stdio :  writeln;
119 			// writeln("ERROR: ", cast(string)file.buffer);
120 			assert(false);
121 		}
122 		import std.file : remove;
123 		gs_file.kill();
124 		remove("test.txt");
125 		writeln("Removed test.txt .. ");
126 	}).read("test.txt");
127 
128 }
129 
130 
131 void testSignal() {
132 	g_notifier = new AsyncNotifier(g_evl);
133 	auto title = "This is my title";
134 
135 	void delegate() del = {
136 		import std.stdio;
137 		assert(title == "This is my title");
138 		g_cbCheck[0] = true;
139 
140 		return;
141 	};
142 
143 	g_notifier.run(del);
144 	g_notifier.trigger(); // will be completed in the event loop
145 }
146 
147 void testEvents() {
148 
149 	gs_tlsEvent.run({
150 		assert(g_message == "Some message here");
151 		g_cbCheck[1] = true;
152 	});
153 
154 	gs_shrEvent = new shared AsyncSignal(g_evl);
155 
156 	gs_shrEvent.run({
157 		assert(gs_hshr.message == "Hello from shared!");
158 		g_cbCheck[2] = true;
159 	});
160 
161 	testTLSEvent();
162 
163 	import std.concurrency;
164 	Tid t2 = spawn(&testSharedEvent);
165 	import core.thread : Thread;
166 	while (!gs_shrEvent2 || gs_shrEvent2.id == 0)
167 		Thread.sleep(100.msecs);
168 
169 	gs_shrEvent2.trigger(g_evl);
170 }
171 
172 void testTLSEvent() {
173 	gs_tlsEvent.trigger();
174 }
175 
176 void testSharedEvent() {
177 	EventLoop evl2 = new EventLoop;
178 
179 	gs_shrEvent2 = new shared AsyncSignal(evl2);
180 	gs_shrEvent2.run({
181 		g_cbCheck[3] = true;
182 		return;
183 	});
184 
185 	gs_shrEvent.trigger(evl2);
186 
187 	while(Clock.currTime() - gs_start < 1.seconds) 
188 		evl2.loop();
189 
190 	gs_shrEvent.trigger(evl2);
191 
192 	while(Clock.currTime() - gs_start < 4.seconds) 
193 		evl2.loop();
194 }
195 
196 void testOneshotTimer() {	
197 	AsyncTimer g_timerOneShot = new AsyncTimer(g_evl);
198 	g_timerOneShot.duration(1.seconds).run({
199 		assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1100.msecs);
200 		assert(g_timerOneShot.id > 0);
201 		g_cbCheck[4] = true;
202 		
203 	});
204 }
205 
206 void testMultiTimer() {	
207 	AsyncTimer g_timerMulti = new AsyncTimer(g_evl);
208 	g_timerMulti.periodic().duration(1.seconds).run({
209 		assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1100.msecs);
210 		assert(g_timerMulti.id > 0);
211 		assert(!g_timerMulti.oneShot);
212 		g_lastTimer = Clock.currTime();
213 		g_cbTimerCnt++;
214 		g_cbCheck[5] = true;
215 	});
216 
217 }
218 
219 
220 void trafficHandler(TCPEvent ev){
221 	// writeln("##TrafficHandler!");
222 	void doRead() {
223 		static ubyte[] bin = new ubyte[4092];
224 		while (true) {
225 			uint len = g_conn.recv(bin);
226 			// writeln("!!Server Received " ~ len.to!string ~ " bytes");
227 			// import std.file;
228 			if (len > 0) {
229 				auto res = cast(string)bin[0..len];
230 				// writeln(res);
231 				import std.algorithm : canFind;
232 				if (res.canFind("Client Hello"))
233 					g_cbCheck[8] = true;
234 
235 				if (res.canFind("Client WRITE"))
236 					g_cbCheck[8] = false;
237 
238 				if (res.canFind("Client READ"))
239 					g_cbCheck[9] = true;
240 
241 				if (res.canFind("Client KILL"))
242 					g_cbCheck[10] = true;
243 			}
244 			if (len < bin.length)
245 				break;
246 		}
247 	}
248 
249 	final switch (ev) {
250 		case TCPEvent.CONNECT:
251 			// writeln("!!Server Connected");
252 			doRead();
253 			if (g_conn.socket != 0)
254 				g_conn.send(cast(ubyte[])"Server Connect");
255 			break;
256 		case TCPEvent.READ:
257 			// writeln("!!Server Read is ready");
258 			g_cbCheck[11] = true;
259 			if (g_conn.socket != 0)
260 				g_conn.send(cast(ubyte[])"Server READ");
261 			doRead();
262 			break;
263 		case TCPEvent.WRITE:
264 			// writeln("!!Server Write is ready");
265 			if (g_conn.socket != 0)
266 				g_conn.send(cast(ubyte[])"Server WRITE");
267 			break;
268 		case TCPEvent.CLOSE:
269 			doRead();
270 			// writeln("!!Server Disconnect!");
271 			g_cbCheck[12] = true;
272 			break;
273 		case TCPEvent.ERROR:
274 			// writeln("!!Server Error!");
275 			break;
276 	}
277 	
278 	return;
279 }
280 
281 void testTCPListen(string ip, ushort port) {
282 
283 	g_listnr = new AsyncTCPListener(g_evl);
284 
285 	void delegate(TCPEvent) handler(AsyncTCPConnection conn) {
286 		g_conn = conn;
287 		g_cbCheck[6] = true;
288 		import std.functional : toDelegate;
289 		return toDelegate(&trafficHandler);
290 	}
291 
292 	auto success = g_listnr.host(ip, port).run(&handler);
293 	assert(success, g_listnr.error);
294 }
295 
296 void testTCPConnect(string ip, ushort port) {
297 	auto conn = new AsyncTCPConnection(g_evl);
298 	conn.peer = g_evl.resolveHost(ip, port);
299 
300 	void delegate(TCPEvent) connHandler = (TCPEvent ev){
301 		void doRead() {
302 			static ubyte[] bin = new ubyte[4092];
303 			while (true) {
304 				assert(conn.socket > 0);
305 				uint len = conn.recv(bin);
306 				// writeln("!!Client Received " ~ len.to!string ~ " bytes");
307 				// if (len > 0)
308 					// writeln(cast(string)bin[0..len]);
309 				if (len < bin.length)
310 					break;
311 			}
312 		}
313 		final switch (ev) {
314 			case TCPEvent.CONNECT:
315 				// writeln("!!Client Connected");
316 				conn.setOption(TCPOption.QUICK_ACK, true);
317 				conn.setOption(TCPOption.NODELAY, true);
318 				g_cbCheck[14] = true;
319 				if (conn.socket != 0)
320 					conn.send(cast(ubyte[])"Client Hello");
321 				assert(conn.socket > 0);
322 				break;
323 			case TCPEvent.READ:
324 				// writeln("!!Client Read is ready at writes: ", g_writes);
325 				doRead();
326 
327 				// respond
328 				g_writes += 1;
329 				if (g_writes > 3) {
330 					if (conn.socket != 0)
331 						conn.send(cast(ubyte[])"Client KILL");
332 					conn.kill();
333 
334 					g_cbCheck[13] = true;
335 				}
336 				else
337 					if (conn.socket != 0)
338 						conn.send(cast(ubyte[])"Client READ");
339 
340 				break;
341 			case TCPEvent.WRITE:
342 
343 				g_writes += 1;
344 				// writeln("!!Client Write is ready");
345 				if (conn.socket != 0)
346 					conn.send(cast(ubyte[])"Client WRITE");
347 				break;
348 			case TCPEvent.CLOSE:
349 				// writeln("!!Client Disconnected");
350 				break;
351 			case TCPEvent.ERROR:
352 				// writeln("!!Client Error!");
353 				break;
354 		}
355 		return;
356 	};
357 
358 	auto success = conn.run(connHandler);
359 	assert(success);
360 
361 }
362 
363 void testHTTPConnect() {
364 	auto conn = new AsyncTCPConnection(g_evl);
365 	conn.peer = g_evl.resolveHost("example.org", 80);
366 
367 	auto del = (TCPEvent ev){
368 		final switch (ev) {
369 			case TCPEvent.CONNECT:
370 				// writeln("!!Connected");
371 				static ubyte[] abin = new ubyte[4092];
372 				while (true) {
373 					uint len = conn.recv(abin);
374 					if (len < abin.length)
375 						break;
376 				}
377 				g_cbCheck[15] = true;
378 				// writeln(conn.local.toString());
379 				// writeln(conn.peer.toString());
380 				conn.send(cast(ubyte[])"GET http://example.org/\nHost: example.org\nConnection: close");
381 				break;
382 			case TCPEvent.READ:
383 				static ubyte[] bin = new ubyte[4092];
384 				while (true) {
385 					uint len = conn.recv(bin);
386 					g_cbCheck[16] = true;
387 					// writeln("!!Received " ~ len.to!string ~ " bytes");
388 					if (len < bin.length)
389 						break;
390 				}
391 				break;
392 			case TCPEvent.WRITE:
393 				// writeln("!!Write is ready");
394 				break;
395 			case TCPEvent.CLOSE:
396 				// writeln("!!Disconnected");
397 				break;
398 			case TCPEvent.ERROR:
399 				// writeln("!!Error!");
400 				break;
401 		}
402 		return;
403 	};
404 
405 	
406 	conn.run(del);
407 }
408 
409 EventLoop g_evl;
410 AsyncTimer g_timerOneShot;
411 AsyncTimer g_timerMulti;
412 AsyncTCPConnection g_tcpConnect;
413 AsyncTCPConnection g_httpConnect;
414 AsyncTCPConnection g_conn; // incoming
415 AsyncNotifier g_notifier;
416 AsyncTCPListener g_listnr;
417 shared AsyncSignal gs_tlsEvent;
418 shared AsyncSignal gs_shrEvent;
419 shared AsyncSignal gs_shrEvent2;
420 shared AsyncFile gs_file;
421 __gshared SysTime gs_start;
422 string g_message = "Some message here";
423 shared Msg* gs_hshr = new shared Msg("Hello from shared!");
424 shared bool[] g_cbCheck;
425 int g_cbTimerCnt;
426 int g_writes;
427 SysTime g_lastTimer;
428 
429 shared struct Msg {
430 	string message;
431 }