1 module libasync.test; 2 version(unittest): 3 import libasync.events; 4 import std.stdio; 5 import std.datetime; 6 import libasync.file; 7 8 AsyncDirectoryWatcher g_watcher; 9 shared AsyncDNS g_dns; 10 11 12 unittest { 13 spawnAsyncThreads(); 14 scope(exit) 15 destroyAsyncThreads(); 16 // writeln("Unit test started"); 17 g_cbCheck = new shared bool[19]; 18 g_lastTimer = Clock.currTime(); 19 gs_start = Clock.currTime(); 20 g_evl = getThreadEventLoop(); 21 g_evl.loop(1.msecs); 22 // writeln("Loading objects..."); 23 testDirectoryWatcher(); 24 25 testDNS(); 26 testOneshotTimer(); 27 testMultiTimer(); 28 gs_tlsEvent = new shared AsyncSignal(g_evl); 29 testSignal(); 30 testEvents(); 31 testTCPListen("localhost", 8081); 32 testHTTPConnect(); 33 // writeln("Loaded. Running event loop..."); 34 testFile(); 35 testTCPConnect("localhost", 8081); 36 37 while(Clock.currTime() - gs_start < 7.seconds) 38 g_evl.loop(100.msecs); 39 40 int i; 41 foreach (bool b; g_cbCheck) { 42 assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]"); 43 i++; 44 } 45 // writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start); 46 47 assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times 48 g_watcher.kill(); 49 g_notifier.kill(); 50 g_listnr.kill(); 51 } 52 53 StopWatch g_swDns; 54 void testDNS() { 55 g_dns = new shared AsyncDNS(g_evl); 56 g_swDns.start(); 57 g_dns.handler((NetworkAddress addr) { 58 g_cbCheck[17] = true; 59 // writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs"); 60 }).resolveHost("127.0.0.1"); 61 } 62 63 void testDirectoryWatcher() { 64 import std.file : mkdir, write, rmdir, exists; 65 if (exists("./hey/tmp.tmp")) 66 remove("./hey/tmp.tmp"); 67 if (exists("./hey")) 68 rmdir("./hey"); 69 g_watcher = new AsyncDirectoryWatcher(g_evl); 70 g_watcher.run({ 71 DWChangeInfo[1] change; 72 DWChangeInfo[] changeRef = change.ptr[0..1]; 73 bool done; 74 while(g_watcher.readChanges(changeRef)) 75 { 76 g_cbCheck[18] = true; 77 writeln(change); 78 if (change[0].event == DWFileEvent.DELETED) 79 done = true; 80 } 81 }); 82 g_watcher.watchDir("."); 83 AsyncTimer tm = new AsyncTimer(g_evl); 84 tm.duration(1.seconds).run({ 85 writeln("Creating directory ./hey"); 86 mkdir("./hey"); 87 assert(g_watcher.watchDir("./hey/")); 88 tm.duration(1.seconds).run({ 89 writeln("Writing to ./hey/tmp.tmp for the first time"); 90 std.file.write("./hey/tmp.tmp", "some string"); 91 tm.duration(100.msecs).run({ 92 writeln("Removing ./hey/tmp.tmp"); 93 remove("./hey/tmp.tmp"); 94 tm.kill(); 95 }); 96 }); 97 }); 98 } 99 100 void testFile() { 101 gs_file = new shared AsyncFile(g_evl); 102 103 { 104 File file = File("test.txt", "w"); 105 file.rawWrite("This is the file content."); 106 file.close(); 107 } 108 gs_file.onReady({ 109 writeln("Created and wrote to test.txt through AsyncFile"); 110 auto file = gs_file; 111 if (file.status.code == Status.ERROR) 112 writeln("ERROR: ", file.status.text); 113 import std.algorithm; 114 if ((cast(string)file.buffer).startsWith("This is the file content.")) { 115 g_cbCheck[7] = true; 116 } 117 else { 118 //import std.stdio : writeln; 119 // writeln("ERROR: ", cast(string)file.buffer); 120 assert(false); 121 } 122 import std.file : remove; 123 gs_file.kill(); 124 remove("test.txt"); 125 writeln("Removed test.txt .. "); 126 }).read("test.txt"); 127 128 } 129 130 131 void testSignal() { 132 g_notifier = new AsyncNotifier(g_evl); 133 auto title = "This is my title"; 134 135 void delegate() del = { 136 import std.stdio; 137 assert(title == "This is my title"); 138 g_cbCheck[0] = true; 139 140 return; 141 }; 142 143 g_notifier.run(del); 144 g_notifier.trigger(); // will be completed in the event loop 145 } 146 147 void testEvents() { 148 149 gs_tlsEvent.run({ 150 assert(g_message == "Some message here"); 151 g_cbCheck[1] = true; 152 }); 153 154 gs_shrEvent = new shared AsyncSignal(g_evl); 155 156 gs_shrEvent.run({ 157 assert(gs_hshr.message == "Hello from shared!"); 158 g_cbCheck[2] = true; 159 }); 160 161 testTLSEvent(); 162 163 import std.concurrency; 164 Tid t2 = spawn(&testSharedEvent); 165 import core.thread : Thread; 166 while (!gs_shrEvent2 || gs_shrEvent2.id == 0) 167 Thread.sleep(100.msecs); 168 169 gs_shrEvent2.trigger(g_evl); 170 } 171 172 void testTLSEvent() { 173 gs_tlsEvent.trigger(); 174 } 175 176 void testSharedEvent() { 177 EventLoop evl2 = new EventLoop; 178 179 gs_shrEvent2 = new shared AsyncSignal(evl2); 180 gs_shrEvent2.run({ 181 g_cbCheck[3] = true; 182 return; 183 }); 184 185 gs_shrEvent.trigger(evl2); 186 187 while(Clock.currTime() - gs_start < 1.seconds) 188 evl2.loop(); 189 190 gs_shrEvent.trigger(evl2); 191 192 while(Clock.currTime() - gs_start < 4.seconds) 193 evl2.loop(); 194 } 195 196 void testOneshotTimer() { 197 AsyncTimer g_timerOneShot = new AsyncTimer(g_evl); 198 g_timerOneShot.duration(1.seconds).run({ 199 assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1100.msecs); 200 assert(g_timerOneShot.id > 0); 201 g_cbCheck[4] = true; 202 203 }); 204 } 205 206 void testMultiTimer() { 207 AsyncTimer g_timerMulti = new AsyncTimer(g_evl); 208 g_timerMulti.periodic().duration(1.seconds).run({ 209 assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1100.msecs); 210 assert(g_timerMulti.id > 0); 211 assert(!g_timerMulti.oneShot); 212 g_lastTimer = Clock.currTime(); 213 g_cbTimerCnt++; 214 g_cbCheck[5] = true; 215 }); 216 217 } 218 219 220 void trafficHandler(TCPEvent ev){ 221 // writeln("##TrafficHandler!"); 222 void doRead() { 223 static ubyte[] bin = new ubyte[4092]; 224 while (true) { 225 uint len = g_conn.recv(bin); 226 // writeln("!!Server Received " ~ len.to!string ~ " bytes"); 227 // import std.file; 228 if (len > 0) { 229 auto res = cast(string)bin[0..len]; 230 // writeln(res); 231 import std.algorithm : canFind; 232 if (res.canFind("Client Hello")) 233 g_cbCheck[8] = true; 234 235 if (res.canFind("Client WRITE")) 236 g_cbCheck[8] = false; 237 238 if (res.canFind("Client READ")) 239 g_cbCheck[9] = true; 240 241 if (res.canFind("Client KILL")) 242 g_cbCheck[10] = true; 243 } 244 if (len < bin.length) 245 break; 246 } 247 } 248 249 final switch (ev) { 250 case TCPEvent.CONNECT: 251 // writeln("!!Server Connected"); 252 doRead(); 253 if (g_conn.socket != 0) 254 g_conn.send(cast(ubyte[])"Server Connect"); 255 break; 256 case TCPEvent.READ: 257 // writeln("!!Server Read is ready"); 258 g_cbCheck[11] = true; 259 if (g_conn.socket != 0) 260 g_conn.send(cast(ubyte[])"Server READ"); 261 doRead(); 262 break; 263 case TCPEvent.WRITE: 264 // writeln("!!Server Write is ready"); 265 if (g_conn.socket != 0) 266 g_conn.send(cast(ubyte[])"Server WRITE"); 267 break; 268 case TCPEvent.CLOSE: 269 doRead(); 270 // writeln("!!Server Disconnect!"); 271 g_cbCheck[12] = true; 272 break; 273 case TCPEvent.ERROR: 274 // writeln("!!Server Error!"); 275 break; 276 } 277 278 return; 279 } 280 281 void testTCPListen(string ip, ushort port) { 282 283 g_listnr = new AsyncTCPListener(g_evl); 284 285 void delegate(TCPEvent) handler(AsyncTCPConnection conn) { 286 g_conn = conn; 287 g_cbCheck[6] = true; 288 import std.functional : toDelegate; 289 return toDelegate(&trafficHandler); 290 } 291 292 auto success = g_listnr.host(ip, port).run(&handler); 293 assert(success, g_listnr.error); 294 } 295 296 void testTCPConnect(string ip, ushort port) { 297 auto conn = new AsyncTCPConnection(g_evl); 298 conn.peer = g_evl.resolveHost(ip, port); 299 300 void delegate(TCPEvent) connHandler = (TCPEvent ev){ 301 void doRead() { 302 static ubyte[] bin = new ubyte[4092]; 303 while (true) { 304 assert(conn.socket > 0); 305 uint len = conn.recv(bin); 306 // writeln("!!Client Received " ~ len.to!string ~ " bytes"); 307 // if (len > 0) 308 // writeln(cast(string)bin[0..len]); 309 if (len < bin.length) 310 break; 311 } 312 } 313 final switch (ev) { 314 case TCPEvent.CONNECT: 315 // writeln("!!Client Connected"); 316 conn.setOption(TCPOption.QUICK_ACK, true); 317 conn.setOption(TCPOption.NODELAY, true); 318 g_cbCheck[14] = true; 319 if (conn.socket != 0) 320 conn.send(cast(ubyte[])"Client Hello"); 321 assert(conn.socket > 0); 322 break; 323 case TCPEvent.READ: 324 // writeln("!!Client Read is ready at writes: ", g_writes); 325 doRead(); 326 327 // respond 328 g_writes += 1; 329 if (g_writes > 3) { 330 if (conn.socket != 0) 331 conn.send(cast(ubyte[])"Client KILL"); 332 conn.kill(); 333 334 g_cbCheck[13] = true; 335 } 336 else 337 if (conn.socket != 0) 338 conn.send(cast(ubyte[])"Client READ"); 339 340 break; 341 case TCPEvent.WRITE: 342 343 g_writes += 1; 344 // writeln("!!Client Write is ready"); 345 if (conn.socket != 0) 346 conn.send(cast(ubyte[])"Client WRITE"); 347 break; 348 case TCPEvent.CLOSE: 349 // writeln("!!Client Disconnected"); 350 break; 351 case TCPEvent.ERROR: 352 // writeln("!!Client Error!"); 353 break; 354 } 355 return; 356 }; 357 358 auto success = conn.run(connHandler); 359 assert(success); 360 361 } 362 363 void testHTTPConnect() { 364 auto conn = new AsyncTCPConnection(g_evl); 365 conn.peer = g_evl.resolveHost("example.org", 80); 366 367 auto del = (TCPEvent ev){ 368 final switch (ev) { 369 case TCPEvent.CONNECT: 370 // writeln("!!Connected"); 371 static ubyte[] abin = new ubyte[4092]; 372 while (true) { 373 uint len = conn.recv(abin); 374 if (len < abin.length) 375 break; 376 } 377 g_cbCheck[15] = true; 378 // writeln(conn.local.toString()); 379 // writeln(conn.peer.toString()); 380 conn.send(cast(ubyte[])"GET http://example.org/\nHost: example.org\nConnection: close"); 381 break; 382 case TCPEvent.READ: 383 static ubyte[] bin = new ubyte[4092]; 384 while (true) { 385 uint len = conn.recv(bin); 386 g_cbCheck[16] = true; 387 // writeln("!!Received " ~ len.to!string ~ " bytes"); 388 if (len < bin.length) 389 break; 390 } 391 break; 392 case TCPEvent.WRITE: 393 // writeln("!!Write is ready"); 394 break; 395 case TCPEvent.CLOSE: 396 // writeln("!!Disconnected"); 397 break; 398 case TCPEvent.ERROR: 399 // writeln("!!Error!"); 400 break; 401 } 402 return; 403 }; 404 405 406 conn.run(del); 407 } 408 409 EventLoop g_evl; 410 AsyncTimer g_timerOneShot; 411 AsyncTimer g_timerMulti; 412 AsyncTCPConnection g_tcpConnect; 413 AsyncTCPConnection g_httpConnect; 414 AsyncTCPConnection g_conn; // incoming 415 AsyncNotifier g_notifier; 416 AsyncTCPListener g_listnr; 417 shared AsyncSignal gs_tlsEvent; 418 shared AsyncSignal gs_shrEvent; 419 shared AsyncSignal gs_shrEvent2; 420 shared AsyncFile gs_file; 421 __gshared SysTime gs_start; 422 string g_message = "Some message here"; 423 shared Msg* gs_hshr = new shared Msg("Hello from shared!"); 424 shared bool[] g_cbCheck; 425 int g_cbTimerCnt; 426 int g_writes; 427 SysTime g_lastTimer; 428 429 shared struct Msg { 430 string message; 431 }