1 module libasync.test;
2 version(unittest):
3 import libasync.events;
4 import std.stdio;
5 import std.datetime;
6 import libasync.file;
7 
8 AsyncDirectoryWatcher g_watcher;
9 shared AsyncDNS g_dns;
10 
11 
12 unittest {
13 	spawnAsyncThreads();
14 	// writeln("Unit test started");
15 	g_cbCheck = new shared bool[19];
16 	g_lastTimer = Clock.currTime();
17 	gs_start = Clock.currTime();
18 	g_evl = getThreadEventLoop();
19 	g_evl.loop(1.msecs);
20 	// writeln("Loading objects...");
21 	testDirectoryWatcher();
22 	
23 	testDNS();
24  	testOneshotTimer();
25 	testMultiTimer();
26 	gs_tlsEvent = new shared AsyncSignal(g_evl);
27 	testSignal();
28 	testEvents();
29 	testTCPListen("localhost", 8081);
30 	testHTTPConnect();
31 	// writeln("Loaded. Running event loop...");
32 	testFile();
33 	testTCPConnect("localhost", 8081);
34 
35 	while(Clock.currTime() - gs_start < 7.seconds) 
36 		g_evl.loop(100.msecs);
37 
38 	int i;
39 	foreach (bool b; g_cbCheck) {
40 		assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]");
41 		i++;
42 	}
43 	// writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start);
44 
45 	assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times
46 
47 	g_listnr.kill();
48 
49 }
50 
51 shared static ~this() { destroyAsyncThreads(); }
52 
53 StopWatch g_swDns;
54 void testDNS() {
55 	g_dns = new shared AsyncDNS(g_evl);
56 	g_swDns.start();
57 	g_dns.handler((NetworkAddress addr) {
58 		g_cbCheck[17] = true;
59 		// writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs");
60 	}).resolveHost("127.0.0.1");
61 }
62 
63 void testDirectoryWatcher() {
64 	import std.file : mkdir, write, rmdir, exists;
65 	if (exists("./hey/tmp.tmp"))
66 		remove("./hey/tmp.tmp");
67 	if (exists("./hey"))
68 		rmdir("./hey");
69 	g_watcher = new AsyncDirectoryWatcher(g_evl);
70 	g_watcher.run({
71 		DWChangeInfo[1] change;
72 		DWChangeInfo[] changeRef = change.ptr[0..1];
73 		while(g_watcher.readChanges(changeRef)){
74 			g_cbCheck[18] = true;
75 			writeln(change);
76 		}
77 	});
78 	g_watcher.watchDir(".");
79 	AsyncTimer tm = new AsyncTimer(g_evl);
80 	tm.duration(1.seconds).run({
81 		writeln("Creating directory ./hey");
82 		mkdir("./hey");
83 		assert(g_watcher.watchDir("./hey/"));
84 		tm.duration(1.seconds).run({
85 			writeln("Writing to ./hey/tmp.tmp for the first time");
86 			std.file.write("./hey/tmp.tmp", "some string");
87 			tm.duration(100.msecs).run({
88 				writeln("Removing ./hey/tmp.tmp");
89 				remove("./hey/tmp.tmp");
90 			});
91 		});
92 	});
93 }
94 
95 void testFile() {
96 	gs_file = new shared AsyncFile(g_evl);
97 
98 	{
99 		File file = File("test.txt", "w");
100 		file.rawWrite("This is the file content.");
101 	}
102 	gs_file.onReady({
103 		writeln("Created and wrote to test.txt through AsyncFile");
104 		auto file = gs_file;
105 		if (file.status.code == Status.ERROR)
106 			writeln("ERROR: ", file.status.text);
107 		import std.algorithm;
108 		if ((cast(string)file.buffer).startsWith("This is the file content.")) {
109 			g_cbCheck[7] = true;
110 		}
111 		else {
112 			//import std.stdio :  writeln;
113 			// writeln("ERROR: ", cast(string)file.buffer);
114 			assert(false);
115 		}
116 		import std.file : remove;
117 		remove("test.txt");
118 		writeln("Removed test.txt .. ");
119 	}).read("test.txt");
120 
121 }
122 
123 
124 void testSignal() {
125 	g_notifier = new AsyncNotifier(g_evl);
126 	auto title = "This is my title";
127 
128 	void delegate() del = {
129 		import std.stdio;
130 		assert(title == "This is my title");
131 		g_cbCheck[0] = true;
132 
133 		return;
134 	};
135 
136 	g_notifier.run(del);
137 	g_notifier.trigger(); // will be completed in the event loop
138 }
139 
140 void testEvents() {
141 
142 	gs_tlsEvent.run({
143 		assert(g_message == "Some message here");
144 		g_cbCheck[1] = true;
145 	});
146 
147 	gs_shrEvent = new shared AsyncSignal(g_evl);
148 
149 	gs_shrEvent.run({
150 		assert(gs_hshr.message == "Hello from shared!");
151 		g_cbCheck[2] = true;
152 	});
153 
154 	testTLSEvent();
155 
156 	import std.concurrency;
157 	Tid t2 = spawn(&testSharedEvent);
158 	import core.thread : Thread;
159 	while (!gs_shrEvent2 || gs_shrEvent2.id == 0)
160 		Thread.sleep(100.msecs);
161 
162 	gs_shrEvent2.trigger(g_evl);
163 }
164 
165 void testTLSEvent() {
166 	gs_tlsEvent.trigger();
167 }
168 
169 void testSharedEvent() {
170 	EventLoop evl2 = new EventLoop;
171 
172 	gs_shrEvent2 = new shared AsyncSignal(evl2);
173 	gs_shrEvent2.run({
174 		g_cbCheck[3] = true;
175 		return;
176 	});
177 
178 	gs_shrEvent.trigger(evl2);
179 
180 	while(Clock.currTime() - gs_start < 1.seconds) 
181 		evl2.loop();
182 
183 	gs_shrEvent.trigger(evl2);
184 
185 	while(Clock.currTime() - gs_start < 4.seconds) 
186 		evl2.loop();
187 }
188 
189 void testOneshotTimer() {	
190 	AsyncTimer g_timerOneShot = new AsyncTimer(g_evl);
191 	g_timerOneShot.duration(1.seconds).run({
192 		assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1100.msecs);
193 		assert(g_timerOneShot.id > 0);
194 		g_cbCheck[4] = true;
195 		
196 	});
197 }
198 
199 void testMultiTimer() {	
200 	AsyncTimer g_timerMulti = new AsyncTimer(g_evl);
201 	g_timerMulti.periodic().duration(1.seconds).run({
202 		assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1100.msecs);
203 		assert(g_timerMulti.id > 0);
204 		assert(!g_timerMulti.oneShot);
205 		g_lastTimer = Clock.currTime();
206 		g_cbTimerCnt++;
207 		g_cbCheck[5] = true;
208 	});
209 
210 }
211 
212 
213 void trafficHandler(TCPEvent ev){
214 	// writeln("##TrafficHandler!");
215 	void doRead() {
216 		static ubyte[] bin = new ubyte[4092];
217 		while (true) {
218 			uint len = g_conn.recv(bin);
219 			// writeln("!!Server Received " ~ len.to!string ~ " bytes");
220 			// import std.file;
221 			if (len > 0) {
222 				auto res = cast(string)bin[0..len];
223 				// writeln(res);
224 				import std.algorithm : canFind;
225 				if (res.canFind("Client Hello"))
226 					g_cbCheck[8] = true;
227 
228 				if (res.canFind("Client WRITE"))
229 					g_cbCheck[8] = false;
230 
231 				if (res.canFind("Client READ"))
232 					g_cbCheck[9] = true;
233 
234 				if (res.canFind("Client KILL"))
235 					g_cbCheck[10] = true;
236 			}
237 			if (len < bin.length)
238 				break;
239 		}
240 	}
241 
242 	final switch (ev) {
243 		case TCPEvent.CONNECT:
244 			// writeln("!!Server Connected");
245 			doRead();
246 			if (g_conn.socket != 0)
247 				g_conn.send(cast(ubyte[])"Server Connect");
248 			break;
249 		case TCPEvent.READ:
250 			// writeln("!!Server Read is ready");
251 			g_cbCheck[11] = true;
252 			if (g_conn.socket != 0)
253 				g_conn.send(cast(ubyte[])"Server READ");
254 			doRead();
255 			break;
256 		case TCPEvent.WRITE:
257 			// writeln("!!Server Write is ready");
258 			if (g_conn.socket != 0)
259 				g_conn.send(cast(ubyte[])"Server WRITE");
260 			break;
261 		case TCPEvent.CLOSE:
262 			doRead();
263 			// writeln("!!Server Disconnect!");
264 			g_cbCheck[12] = true;
265 			break;
266 		case TCPEvent.ERROR:
267 			// writeln("!!Server Error!");
268 			break;
269 	}
270 	
271 	return;
272 }
273 
274 void testTCPListen(string ip, ushort port) {
275 
276 	g_listnr = new AsyncTCPListener(g_evl);
277 
278 	void delegate(TCPEvent) handler(AsyncTCPConnection conn) {
279 		g_conn = conn;
280 		g_cbCheck[6] = true;
281 		import std.functional : toDelegate;
282 		return toDelegate(&trafficHandler);
283 	}
284 
285 	auto success = g_listnr.host(ip, port).run(&handler);
286 	assert(success, g_listnr.error);
287 }
288 
289 void testTCPConnect(string ip, ushort port) {
290 	auto conn = new AsyncTCPConnection(g_evl);
291 	conn.peer = g_evl.resolveHost(ip, port);
292 
293 	void delegate(TCPEvent) connHandler = (TCPEvent ev){
294 		void doRead() {
295 			static ubyte[] bin = new ubyte[4092];
296 			while (true) {
297 				assert(conn.socket > 0);
298 				uint len = conn.recv(bin);
299 				// writeln("!!Client Received " ~ len.to!string ~ " bytes");
300 				// if (len > 0)
301 					// writeln(cast(string)bin[0..len]);
302 				if (len < bin.length)
303 					break;
304 			}
305 		}
306 		final switch (ev) {
307 			case TCPEvent.CONNECT:
308 				// writeln("!!Client Connected");
309 				conn.setOption(TCPOption.QUICK_ACK, true);
310 				conn.setOption(TCPOption.NODELAY, true);
311 				g_cbCheck[14] = true;
312 				if (conn.socket != 0)
313 					conn.send(cast(ubyte[])"Client Hello");
314 				assert(conn.socket > 0);
315 				break;
316 			case TCPEvent.READ:
317 				// writeln("!!Client Read is ready at writes: ", g_writes);
318 				doRead();
319 
320 				// respond
321 				g_writes += 1;
322 				if (g_writes > 3) {
323 					if (conn.socket != 0)
324 						conn.send(cast(ubyte[])"Client KILL");
325 					conn.kill();
326 
327 					g_cbCheck[13] = true;
328 				}
329 				else
330 					if (conn.socket != 0)
331 						conn.send(cast(ubyte[])"Client READ");
332 
333 				break;
334 			case TCPEvent.WRITE:
335 
336 				g_writes += 1;
337 				// writeln("!!Client Write is ready");
338 				if (conn.socket != 0)
339 					conn.send(cast(ubyte[])"Client WRITE");
340 				break;
341 			case TCPEvent.CLOSE:
342 				// writeln("!!Client Disconnected");
343 				break;
344 			case TCPEvent.ERROR:
345 				// writeln("!!Client Error!");
346 				break;
347 		}
348 		return;
349 	};
350 
351 	auto success = conn.run(connHandler);
352 	assert(success);
353 
354 }
355 
356 void testHTTPConnect() {
357 	auto conn = new AsyncTCPConnection(g_evl);
358 	conn.peer = g_evl.resolveHost("example.org", 80);
359 
360 	auto del = (TCPEvent ev){
361 		final switch (ev) {
362 			case TCPEvent.CONNECT:
363 				// writeln("!!Connected");
364 				static ubyte[] abin = new ubyte[4092];
365 				while (true) {
366 					uint len = conn.recv(abin);
367 					if (len < abin.length)
368 						break;
369 				}
370 				g_cbCheck[15] = true;
371 				// writeln(conn.local.toString());
372 				// writeln(conn.peer.toString());
373 				conn.send(cast(ubyte[])"GET http://example.org/\nHost: example.org\nConnection: close");
374 				break;
375 			case TCPEvent.READ:
376 				static ubyte[] bin = new ubyte[4092];
377 				while (true) {
378 					uint len = conn.recv(bin);
379 					g_cbCheck[16] = true;
380 					// writeln("!!Received " ~ len.to!string ~ " bytes");
381 					if (len < bin.length)
382 						break;
383 				}
384 				break;
385 			case TCPEvent.WRITE:
386 				// writeln("!!Write is ready");
387 				break;
388 			case TCPEvent.CLOSE:
389 				// writeln("!!Disconnected");
390 				break;
391 			case TCPEvent.ERROR:
392 				// writeln("!!Error!");
393 				break;
394 		}
395 		return;
396 	};
397 
398 	
399 	conn.run(del);
400 }
401 
402 EventLoop g_evl;
403 AsyncTimer g_timerOneShot;
404 AsyncTimer g_timerMulti;
405 AsyncTCPConnection g_tcpConnect;
406 AsyncTCPConnection g_httpConnect;
407 AsyncTCPConnection g_conn; // incoming
408 AsyncNotifier g_notifier;
409 AsyncTCPListener g_listnr;
410 shared AsyncSignal gs_tlsEvent;
411 shared AsyncSignal gs_shrEvent;
412 shared AsyncSignal gs_shrEvent2;
413 shared AsyncFile gs_file;
414 __gshared SysTime gs_start;
415 string g_message = "Some message here";
416 shared Msg* gs_hshr = new shared Msg("Hello from shared!");
417 shared bool[] g_cbCheck;
418 int g_cbTimerCnt;
419 int g_writes;
420 SysTime g_lastTimer;
421 
422 shared struct Msg {
423 	string message;
424 }