1 module libasync.test; 2 version(unittest): 3 import libasync.events; 4 import std.stdio; 5 import std.datetime; 6 import libasync.file; 7 import std.conv : to; 8 import core.stdc.stdlib : getenv; 9 import std..string : fromStringz, toStringz; 10 11 AsyncDirectoryWatcher g_watcher; 12 shared AsyncDNS g_dns; 13 string cache_path; 14 15 16 unittest { 17 cache_path = "."; 18 version(iOS) cache_path = getenv("HOME".toStringz).fromStringz.to!string ~ "/Library/Caches"; 19 20 spawnAsyncThreads(); 21 scope(exit) 22 destroyAsyncThreads(); 23 //writeln("Unit test started"); 24 g_cbCheck = new shared bool[19]; 25 g_lastTimer = Clock.currTime(); 26 gs_start = Clock.currTime(); 27 g_evl = getThreadEventLoop(); 28 g_evl.loop(1.msecs); 29 //writeln("Loading objects..."); 30 testDirectoryWatcher(); 31 32 testDNS(); 33 testOneshotTimer(); 34 testMultiTimer(); 35 gs_tlsEvent = new shared AsyncSignal(g_evl); 36 testSignal(); 37 testEvents(); 38 testTCPListen("localhost", 8081); 39 testHTTPConnect(); 40 //writeln("Loaded. Running event loop..."); 41 testFile(); 42 testTCPConnect("localhost", 8081); 43 while(Clock.currTime() - gs_start < 7.seconds) 44 g_evl.loop(100.msecs); 45 46 int i; 47 foreach (bool b; g_cbCheck) { 48 assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]"); 49 i++; 50 } 51 writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start); 52 53 assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times 54 g_watcher.kill(); 55 g_notifier.kill(); 56 g_listnr.kill(); 57 version(LDC) { 58 import core.stdc.stdlib; exit(0); 59 } 60 61 } 62 63 StopWatch g_swDns; 64 void testDNS() { 65 g_dns = new shared AsyncDNS(g_evl); 66 g_swDns.start(); 67 g_dns.handler((NetworkAddress addr) { 68 g_cbCheck[17] = true; 69 writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs"); 70 }).resolveHost("httpbin.org", false, true); 71 } 72 73 void testDirectoryWatcher() { 74 import std.file : mkdir, rmdir, exists; 75 if (exists(cache_path ~ "/hey/tmp.tmp")) 76 remove((cache_path ~ "/hey/tmp.tmp").toStringz); 77 if (exists(cache_path ~ "/hey")) 78 rmdir(cache_path ~ "/hey"); 79 g_watcher = new AsyncDirectoryWatcher(g_evl); 80 g_watcher.run({ 81 DWChangeInfo[1] change; 82 DWChangeInfo[] changeRef = change.ptr[0..1]; 83 bool done; 84 while(g_watcher.readChanges(changeRef)) 85 { 86 g_cbCheck[18] = true; 87 writeln(change); 88 if (change[0].event == DWFileEvent.DELETED) 89 done = true; 90 } 91 }); 92 g_watcher.watchDir(cache_path); 93 AsyncTimer tm = new AsyncTimer(g_evl); 94 tm.duration(1.seconds).run({ 95 writeln("Creating directory ./hey"); 96 mkdir(cache_path ~ "/hey"); 97 assert(g_watcher.watchDir(cache_path ~ "/hey/")); 98 tm.duration(1.seconds).run({ 99 static import std.file; 100 writeln("Writing to ./hey/tmp.tmp for the first time"); 101 std.file.write(cache_path ~ "/hey/tmp.tmp", "some string"); 102 tm.duration(100.msecs).run({ 103 writeln("Removing ./hey/tmp.tmp"); 104 remove((cache_path ~ "/hey/tmp.tmp").toStringz); 105 tm.kill(); 106 }); 107 }); 108 }); 109 } 110 111 void testFile() { 112 gs_file = new shared AsyncFile(g_evl); 113 114 { 115 File file = File(cache_path ~ "/test.txt", "w+"); 116 file.rawWrite("This is the file content."); 117 file.close(); 118 //writeln("Wrote to file"); 119 } 120 gs_file.onReady({ 121 //writeln("Created and wrote to test.txt through AsyncFile"); 122 auto file = gs_file; 123 //if (file.status.code == Status.ERROR) 124 //writeln("ERROR: ", file.status.text); 125 126 import std..string : startsWith; 127 if ((cast(string)file.buffer).startsWith("This is the file content.")) { 128 g_cbCheck[7] = true; 129 } 130 else { 131 writeln("ERROR: ", cast(string)file.buffer); 132 assert(false); 133 } 134 import std.file : remove; 135 gs_file.kill(); 136 remove(cache_path ~ "/test.txt"); 137 //writeln("Removed test.txt .. "); 138 }).read(cache_path ~ "/test.txt"); 139 140 } 141 142 143 void testSignal() { 144 g_notifier = new AsyncNotifier(g_evl); 145 auto title = "This is my title"; 146 147 void delegate() del = { 148 import std.stdio; 149 assert(title == "This is my title"); 150 g_cbCheck[0] = true; 151 //writeln("Got signal title"); 152 153 return; 154 }; 155 156 g_notifier.run(del); 157 g_notifier.trigger(); // will be completed in the event loop 158 } 159 160 void testEvents() { 161 162 gs_tlsEvent.run({ 163 assert(g_message == "Some message here"); 164 g_cbCheck[1] = true; 165 //writeln("Got valid TLS Event"); 166 }); 167 168 gs_shrEvent = new shared AsyncSignal(g_evl); 169 170 gs_shrEvent.run({ 171 assert(gs_hshr.message == "Hello from shared!"); 172 g_cbCheck[2] = true; 173 //writeln("Got valid shared event!"); 174 }); 175 176 testTLSEvent(); 177 178 import std.concurrency; 179 Tid t2 = spawn(&testSharedEvent); 180 import core.thread : Thread; 181 while (!gs_shrEvent2 || gs_shrEvent2.id == 0) 182 Thread.sleep(100.msecs); 183 184 gs_shrEvent2.trigger(g_evl); 185 } 186 187 void testTLSEvent() { 188 gs_tlsEvent.trigger(); 189 } 190 191 void testSharedEvent() { 192 EventLoop evl2 = new EventLoop; 193 194 gs_shrEvent2 = new shared AsyncSignal(evl2); 195 gs_shrEvent2.run({ 196 g_cbCheck[3] = true; 197 return; 198 }); 199 200 gs_shrEvent.trigger(evl2); 201 202 while(Clock.currTime() - gs_start < 1.seconds) 203 evl2.loop(); 204 205 gs_shrEvent.trigger(evl2); 206 207 while(Clock.currTime() - gs_start < 4.seconds) 208 evl2.loop(); 209 } 210 211 void testOneshotTimer() { 212 AsyncTimer g_timerOneShot = new AsyncTimer(g_evl); 213 g_timerOneShot.duration(1.seconds).run({ 214 assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1400.msecs, "Timer completed in " ~ (Clock.currTime() - gs_start).total!"msecs".to!string ~ "ms" ); 215 assert(g_timerOneShot.id != 0); 216 //writeln("Got timer callback!"); 217 g_cbCheck[4] = true; 218 219 }); 220 } 221 222 void testMultiTimer() { 223 AsyncTimer g_timerMulti = new AsyncTimer(g_evl); 224 g_timerMulti.periodic().duration(1.seconds).run({ 225 assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1400.msecs, "Timer completed in " ~ (Clock.currTime() - gs_start).total!"msecs".to!string ~ "ms" ); 226 assert(g_timerMulti.id > 0); 227 assert(!g_timerMulti.oneShot); 228 g_lastTimer = Clock.currTime(); 229 g_cbTimerCnt++; 230 //writeln("Got timer callback #", g_cbTimerCnt.to!string); 231 g_cbCheck[5] = true; 232 }); 233 234 } 235 236 237 void trafficHandler(TCPEvent ev){ 238 //writeln("##TrafficHandler!"); 239 void doRead() { 240 static ubyte[] bin = new ubyte[4092]; 241 while (true) { 242 uint len = g_conn.recv(bin); 243 //writeln("!!Server Received " ~ len.to!string ~ " bytes"); 244 // import std.file; 245 if (len > 0) { 246 auto res = cast(string)bin[0..len]; 247 //writeln(res); 248 import std.algorithm : canFind; 249 if (res.canFind("Client Hello")) 250 g_cbCheck[8] = true; 251 252 if (res.canFind("Client WRITE")) 253 g_cbCheck[8] = false; 254 255 if (res.canFind("Client READ")) 256 g_cbCheck[9] = true; 257 258 if (res.canFind("Client KILL")) 259 g_cbCheck[10] = true; 260 } 261 if (len < bin.length) 262 break; 263 } 264 } 265 266 final switch (ev) { 267 case TCPEvent.CONNECT: 268 //writeln("!!Server Connected"); 269 doRead(); 270 if (g_conn.socket != 0) 271 g_conn.send(cast(ubyte[])"Server Connect"); 272 break; 273 case TCPEvent.READ: 274 //writeln("!!Server Read is ready"); 275 g_cbCheck[11] = true; 276 if (g_conn.socket != 0) 277 g_conn.send(cast(ubyte[])"Server READ"); 278 doRead(); 279 break; 280 case TCPEvent.WRITE: 281 //writeln("!!Server Write is ready"); 282 if (g_conn.socket != 0) 283 g_conn.send(cast(ubyte[])"Server WRITE"); 284 break; 285 case TCPEvent.CLOSE: 286 doRead(); 287 //writeln("!!Server Disconnect!"); 288 g_cbCheck[12] = true; 289 break; 290 case TCPEvent.ERROR: 291 //writeln("!!Server Error!"); 292 break; 293 } 294 295 return; 296 } 297 298 void testTCPListen(string ip, ushort port) { 299 300 g_listnr = new AsyncTCPListener(g_evl); 301 302 void delegate(TCPEvent) handler(AsyncTCPConnection conn) { 303 g_conn = conn; 304 g_cbCheck[6] = true; 305 import std.functional : toDelegate; 306 //writeln("Got handler TCPListen"); 307 return toDelegate(&trafficHandler); 308 } 309 310 auto success = g_listnr.host(ip, port).run(&handler); 311 assert(success, g_listnr.error); 312 } 313 314 void testTCPConnect(string ip, ushort port) { 315 auto conn = new AsyncTCPConnection(g_evl); 316 conn.peer = g_evl.resolveHost(ip, port); 317 318 void delegate(TCPEvent) connHandler = (TCPEvent ev){ 319 void doRead() { 320 static ubyte[] bin = new ubyte[4092]; 321 while (true) { 322 assert(conn.socket > 0); 323 uint len = conn.recv(bin); 324 //writeln("!!Client Received " ~ len.to!string ~ " bytes"); 325 //if (len > 0) 326 // writeln(cast(string)bin[0..len]); 327 if (len < bin.length) 328 break; 329 } 330 } 331 final switch (ev) { 332 case TCPEvent.CONNECT: 333 // writeln("!!Client Connected"); 334 conn.setOption(TCPOption.QUICK_ACK, true); 335 conn.setOption(TCPOption.NODELAY, true); 336 g_cbCheck[14] = true; 337 if (conn.socket != 0) 338 conn.send(cast(ubyte[])"Client Hello"); 339 assert(conn.socket > 0); 340 break; 341 case TCPEvent.READ: 342 //writeln("!!Client Read is ready at writes: ", g_writes); 343 doRead(); 344 345 // respond 346 g_writes += 1; 347 if (g_writes > 3) { 348 if (conn.socket != 0) 349 conn.send(cast(ubyte[])"Client KILL"); 350 conn.kill(); 351 352 g_cbCheck[13] = true; 353 } 354 else 355 if (conn.socket != 0) 356 conn.send(cast(ubyte[])"Client READ"); 357 358 break; 359 case TCPEvent.WRITE: 360 361 g_writes += 1; 362 //writeln("!!Client Write is ready"); 363 if (conn.socket != 0) 364 conn.send(cast(ubyte[])"Client WRITE"); 365 break; 366 case TCPEvent.CLOSE: 367 //writeln("!!Client Disconnected"); 368 break; 369 case TCPEvent.ERROR: 370 //writeln("!!Client Error!"); 371 break; 372 } 373 return; 374 }; 375 376 auto success = conn.run(connHandler); 377 assert(success); 378 379 } 380 381 void testHTTPConnect() { 382 auto conn = new AsyncTCPConnection(g_evl); 383 conn.peer = g_evl.resolveHost("httpbin.org", 80); 384 385 auto del = (TCPEvent ev){ 386 final switch (ev) { 387 case TCPEvent.CONNECT: 388 //writeln("!!Connected"); 389 static ubyte[] abin = new ubyte[4092]; 390 while (true) { 391 uint len = conn.recv(abin); 392 if (len < abin.length) 393 break; 394 } 395 g_cbCheck[15] = true; 396 //writeln(conn.local.toString()); 397 //writeln(conn.peer.toString()); 398 conn.send(cast(ubyte[])"GET /ip\nHost: httpbin.org\nConnection: close\n\n"); 399 break; 400 case TCPEvent.READ: 401 static ubyte[] bin = new ubyte[4092]; 402 while (true) { 403 uint len = conn.recv(bin); 404 g_cbCheck[16] = true; 405 if (len > 0) writeln("HTTP Response: ", cast(string)bin.ptr[0 .. len]); 406 //writeln("!!Received " ~ len.to!string ~ " bytes"); 407 if (len < bin.length) 408 break; 409 } 410 break; 411 case TCPEvent.WRITE: 412 //writeln("!!Write is ready"); 413 break; 414 case TCPEvent.CLOSE: 415 //writeln("!!Disconnected"); 416 break; 417 case TCPEvent.ERROR: 418 //writeln("!!Error!"); 419 break; 420 } 421 return; 422 }; 423 424 425 conn.run(del); 426 } 427 428 EventLoop g_evl; 429 AsyncTimer g_timerOneShot; 430 AsyncTimer g_timerMulti; 431 AsyncTCPConnection g_tcpConnect; 432 AsyncTCPConnection g_httpConnect; 433 AsyncTCPConnection g_conn; // incoming 434 AsyncNotifier g_notifier; 435 AsyncTCPListener g_listnr; 436 shared AsyncSignal gs_tlsEvent; 437 shared AsyncSignal gs_shrEvent; 438 shared AsyncSignal gs_shrEvent2; 439 shared AsyncFile gs_file; 440 __gshared SysTime gs_start; 441 string g_message = "Some message here"; 442 shared Msg* gs_hshr = new shared Msg("Hello from shared!"); 443 shared bool[] g_cbCheck; 444 int g_cbTimerCnt; 445 int g_writes; 446 SysTime g_lastTimer; 447 448 shared struct Msg { 449 string message; 450 }