1 module libasync.test;
2 version(unittest):
3 import libasync.events;
4 import std.stdio;
5 import std.datetime;
6 import libasync.file;
7 
8 AsyncDirectoryWatcher g_watcher;
9 shared AsyncDNS g_dns;
10 
11 
12 unittest {
13 	// writeln("Unit test started");
14 	g_cbCheck = new shared bool[19];
15 	g_lastTimer = Clock.currTime();
16 	gs_start = Clock.currTime();
17 	g_evl = getThreadEventLoop();
18 	// writeln("Loading objects...");
19 	testDirectoryWatcher();
20 	testDNS();
21 	testFile();
22  	testOneshotTimer();
23 	testMultiTimer();
24 	gs_tlsEvent = new shared AsyncSignal(g_evl);
25 	testSignal();
26 	testEvents();
27 	testTCPListen("localhost", 8081);
28 	testHTTPConnect();
29 	// writeln("Loaded. Running event loop...");
30 
31 	testTCPConnect("localhost", 8081);
32 
33 	while(Clock.currTime() - gs_start < 4.seconds) 
34 		g_evl.loop(100.msecs);
35 
36 	int i;
37 	foreach (bool b; g_cbCheck) {
38 		assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]");
39 		i++;
40 	}
41 	// writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start);
42 
43 	assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times
44 
45 	g_listnr.kill();
46 
47 	destroyAsyncThreads();
48 }
49 
50 StopWatch g_swDns;
51 void testDNS() {
52 	g_dns = new shared AsyncDNS(g_evl);
53 	g_swDns.start();
54 	g_dns.handler((NetworkAddress addr) {
55 		g_cbCheck[17] = true;
56 		// writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs");
57 	}).resolveHost("127.0.0.1");
58 }
59 
60 void testDirectoryWatcher() {
61 	import std.file : mkdir, write, rmdir, exists;
62 	if (exists("./hey/tmp.tmp"))
63 		remove("./hey/tmp.tmp");
64 	if (exists("./hey"))
65 		rmdir("./hey");
66 	g_watcher = new AsyncDirectoryWatcher(g_evl);
67 	g_watcher.run({
68 		DWChangeInfo[1] change;
69 		DWChangeInfo[] changeRef = change.ptr[0..1];
70 		while(g_watcher.readChanges(changeRef)){
71 			g_cbCheck[18] = true;
72 			// writeln(change);
73 		}
74 	});
75 	g_watcher.watchDir(".");
76 	AsyncTimer tm = new AsyncTimer(g_evl);
77 	tm.duration(1.seconds).run({
78 		mkdir("./hey");
79 		if (!g_watcher.watchDir("./hey/"))
80 			// writeln("ERROR: ", g_watcher.status);
81 		tm.duration(1.seconds).run({
82 			// writeln("Writing to ./hey/tmp.tmp");
83 			std.file.write("./hey/tmp.tmp", "some string");
84 			tm.duration(100.msecs).run({
85 				// writeln("Removing ./hey/tmp.tmp");
86 				remove("./hey/tmp.tmp");
87 			});
88 		});
89 	});
90 }
91 
92 void testFile() {
93 
94 	gs_file = new shared AsyncFile(g_evl);
95 
96 	{
97 		File file = File("test.txt", "w");
98 		file.rawWrite("This is the file content.");
99 	}
100 	gs_file.onReady({
101 		auto file = gs_file;
102 		if (file.status.code == Status.ERROR)
103 			writeln("ERROR: ", file.status.text);
104 		import std.algorithm;
105 		if ((cast(string)file.buffer).startsWith("This is the file content.")) {
106 			g_cbCheck[7] = true;
107 		}
108 		else {
109 			//import std.stdio :  writeln;
110 			// writeln("ERROR: ", cast(string)file.buffer);
111 			assert(false);
112 		}
113 		import std.file : remove;
114 		remove("test.txt");
115 	}).read("test.txt");
116 
117 }
118 
119 
120 void testSignal() {
121 	g_notifier = new AsyncNotifier(g_evl);
122 	auto title = "This is my title";
123 
124 	void delegate() del = {
125 		import std.stdio;
126 		assert(title == "This is my title");
127 		g_cbCheck[0] = true;
128 
129 		return;
130 	};
131 
132 	g_notifier.run(del);
133 	g_notifier.trigger(); // will be completed in the event loop
134 }
135 
136 void testEvents() {
137 
138 	gs_tlsEvent.run({
139 		assert(g_message == "Some message here");
140 		g_cbCheck[1] = true;
141 	});
142 
143 	gs_shrEvent = new shared AsyncSignal(g_evl);
144 
145 	gs_shrEvent.run({
146 		assert(gs_hshr.message == "Hello from shared!");
147 		g_cbCheck[2] = true;
148 	});
149 
150 	testTLSEvent();
151 
152 	import std.concurrency;
153 	Tid t2 = spawn(&testSharedEvent);
154 	import core.thread : Thread;
155 	while (!gs_shrEvent2 || gs_shrEvent2.id == 0)
156 		Thread.sleep(100.msecs);
157 
158 	gs_shrEvent2.trigger(g_evl);
159 }
160 
161 void testTLSEvent() {
162 	gs_tlsEvent.trigger();
163 }
164 
165 void testSharedEvent() {
166 	EventLoop evl2 = new EventLoop;
167 
168 	gs_shrEvent2 = new shared AsyncSignal(evl2);
169 	gs_shrEvent2.run({
170 		g_cbCheck[3] = true;
171 		return;
172 	});
173 
174 	gs_shrEvent.trigger(evl2);
175 
176 	while(Clock.currTime() - gs_start < 1.seconds) 
177 		evl2.loop();
178 
179 	gs_shrEvent.trigger(evl2);
180 
181 	while(Clock.currTime() - gs_start < 4.seconds) 
182 		evl2.loop();
183 }
184 
185 void testOneshotTimer() {	
186 	AsyncTimer g_timerOneShot = new AsyncTimer(g_evl);
187 	g_timerOneShot.duration(1.seconds).run({
188 		assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1100.msecs);
189 		assert(g_timerOneShot.id > 0);
190 		g_cbCheck[4] = true;
191 		
192 	});
193 }
194 
195 void testMultiTimer() {	
196 	AsyncTimer g_timerMulti = new AsyncTimer(g_evl);
197 	g_timerMulti.periodic().duration(1.seconds).run({
198 		assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1100.msecs);
199 		assert(g_timerMulti.id > 0);
200 		assert(!g_timerMulti.oneShot);
201 		g_lastTimer = Clock.currTime();
202 		g_cbTimerCnt++;
203 		g_cbCheck[5] = true;
204 	});
205 
206 }
207 
208 
209 void trafficHandler(TCPEvent ev){
210 	// writeln("##TrafficHandler!");
211 	void doRead() {
212 		static ubyte[] bin = new ubyte[4092];
213 		while (true) {
214 			uint len = g_conn.recv(bin);
215 			// writeln("!!Server Received " ~ len.to!string ~ " bytes");
216 			// import std.file;
217 			if (len > 0) {
218 				auto res = cast(string)bin[0..len];
219 				// writeln(res);
220 				import std.algorithm : canFind;
221 				if (res.canFind("Client Hello"))
222 					g_cbCheck[8] = true;
223 
224 				if (res.canFind("Client WRITE"))
225 					g_cbCheck[8] = false;
226 
227 				if (res.canFind("Client READ"))
228 					g_cbCheck[9] = true;
229 
230 				if (res.canFind("Client KILL"))
231 					g_cbCheck[10] = true;
232 			}
233 			if (len < bin.length)
234 				break;
235 		}
236 	}
237 
238 	final switch (ev) {
239 		case TCPEvent.CONNECT:
240 			// writeln("!!Server Connected");
241 			doRead();
242 			if (g_conn.socket != 0)
243 				g_conn.send(cast(ubyte[])"Server Connect");
244 			break;
245 		case TCPEvent.READ:
246 			// writeln("!!Server Read is ready");
247 			g_cbCheck[11] = true;
248 			if (g_conn.socket != 0)
249 				g_conn.send(cast(ubyte[])"Server READ");
250 			doRead();
251 			break;
252 		case TCPEvent.WRITE:
253 			// writeln("!!Server Write is ready");
254 			if (g_conn.socket != 0)
255 				g_conn.send(cast(ubyte[])"Server WRITE");
256 			break;
257 		case TCPEvent.CLOSE:
258 			doRead();
259 			// writeln("!!Server Disconnect!");
260 			g_cbCheck[12] = true;
261 			break;
262 		case TCPEvent.ERROR:
263 			// writeln("!!Server Error!");
264 			break;
265 	}
266 	
267 	return;
268 }
269 
270 void testTCPListen(string ip, ushort port) {
271 
272 	g_listnr = new AsyncTCPListener(g_evl);
273 
274 	void delegate(TCPEvent) handler(AsyncTCPConnection conn) {
275 		g_conn = conn;
276 		g_cbCheck[6] = true;
277 		import std.functional : toDelegate;
278 		return toDelegate(&trafficHandler);
279 	}
280 
281 	auto success = g_listnr.host(ip, port).run(&handler);
282 	assert(success, g_listnr.error);
283 }
284 
285 void testTCPConnect(string ip, ushort port) {
286 	auto conn = new AsyncTCPConnection(g_evl);
287 	conn.peer = g_evl.resolveHost(ip, port);
288 
289 	void delegate(TCPEvent) connHandler = (TCPEvent ev){
290 		void doRead() {
291 			static ubyte[] bin = new ubyte[4092];
292 			while (true) {
293 				assert(conn.socket > 0);
294 				uint len = conn.recv(bin);
295 				// writeln("!!Client Received " ~ len.to!string ~ " bytes");
296 				// if (len > 0)
297 					// writeln(cast(string)bin[0..len]);
298 				if (len < bin.length)
299 					break;
300 			}
301 		}
302 		final switch (ev) {
303 			case TCPEvent.CONNECT:
304 				// writeln("!!Client Connected");
305 				conn.setOption(TCPOption.QUICK_ACK, true);
306 				conn.setOption(TCPOption.NODELAY, true);
307 				g_cbCheck[14] = true;
308 				if (conn.socket != 0)
309 					conn.send(cast(ubyte[])"Client Hello");
310 				assert(conn.socket > 0);
311 				break;
312 			case TCPEvent.READ:
313 				// writeln("!!Client Read is ready at writes: ", g_writes);
314 				doRead();
315 
316 				// respond
317 				g_writes += 1;
318 				if (g_writes > 3) {
319 					if (conn.socket != 0)
320 						conn.send(cast(ubyte[])"Client KILL");
321 					conn.kill();
322 
323 					g_cbCheck[13] = true;
324 				}
325 				else
326 					if (conn.socket != 0)
327 						conn.send(cast(ubyte[])"Client READ");
328 
329 				break;
330 			case TCPEvent.WRITE:
331 
332 				g_writes += 1;
333 				// writeln("!!Client Write is ready");
334 				if (conn.socket != 0)
335 					conn.send(cast(ubyte[])"Client WRITE");
336 				break;
337 			case TCPEvent.CLOSE:
338 				// writeln("!!Client Disconnected");
339 				break;
340 			case TCPEvent.ERROR:
341 				// writeln("!!Client Error!");
342 				break;
343 		}
344 		return;
345 	};
346 
347 	auto success = conn.run(connHandler);
348 	assert(success);
349 
350 }
351 
352 void testHTTPConnect() {
353 	auto conn = new AsyncTCPConnection(g_evl);
354 	conn.peer = g_evl.resolveHost("example.org", 80);
355 
356 	auto del = (TCPEvent ev){
357 		final switch (ev) {
358 			case TCPEvent.CONNECT:
359 				// writeln("!!Connected");
360 				static ubyte[] abin = new ubyte[4092];
361 				while (true) {
362 					uint len = conn.recv(abin);
363 					if (len < abin.length)
364 						break;
365 				}
366 				g_cbCheck[15] = true;
367 				// writeln(conn.local.toString());
368 				// writeln(conn.peer.toString());
369 				conn.send(cast(ubyte[])"GET http://example.org/\nHost: example.org\nConnection: close");
370 				break;
371 			case TCPEvent.READ:
372 				static ubyte[] bin = new ubyte[4092];
373 				while (true) {
374 					uint len = conn.recv(bin);
375 					g_cbCheck[16] = true;
376 					// writeln("!!Received " ~ len.to!string ~ " bytes");
377 					if (len < bin.length)
378 						break;
379 				}
380 				break;
381 			case TCPEvent.WRITE:
382 				// writeln("!!Write is ready");
383 				break;
384 			case TCPEvent.CLOSE:
385 				// writeln("!!Disconnected");
386 				break;
387 			case TCPEvent.ERROR:
388 				// writeln("!!Error!");
389 				break;
390 		}
391 		return;
392 	};
393 
394 	
395 	conn.run(del);
396 }
397 
398 EventLoop g_evl;
399 AsyncTimer g_timerOneShot;
400 AsyncTimer g_timerMulti;
401 AsyncTCPConnection g_tcpConnect;
402 AsyncTCPConnection g_httpConnect;
403 AsyncTCPConnection g_conn; // incoming
404 AsyncNotifier g_notifier;
405 AsyncTCPListener g_listnr;
406 shared AsyncSignal gs_tlsEvent;
407 shared AsyncSignal gs_shrEvent;
408 shared AsyncSignal gs_shrEvent2;
409 shared AsyncFile gs_file;
410 __gshared SysTime gs_start;
411 string g_message = "Some message here";
412 shared Msg* gs_hshr = new shared Msg("Hello from shared!");
413 shared bool[] g_cbCheck;
414 int g_cbTimerCnt;
415 int g_writes;
416 SysTime g_lastTimer;
417 
418 shared struct Msg {
419 	string message;
420 }