1 module libasync.test; 2 version(unittest): 3 import libasync.events; 4 import std.stdio; 5 import std.datetime; 6 import libasync.file; 7 8 AsyncDirectoryWatcher g_watcher; 9 shared AsyncDNS g_dns; 10 11 12 unittest { 13 spawnAsyncThreads(); 14 // writeln("Unit test started"); 15 g_cbCheck = new shared bool[19]; 16 g_lastTimer = Clock.currTime(); 17 gs_start = Clock.currTime(); 18 g_evl = getThreadEventLoop(); 19 g_evl.loop(1.msecs); 20 // writeln("Loading objects..."); 21 testDirectoryWatcher(); 22 23 testDNS(); 24 testOneshotTimer(); 25 testMultiTimer(); 26 gs_tlsEvent = new shared AsyncSignal(g_evl); 27 testSignal(); 28 testEvents(); 29 testTCPListen("localhost", 8081); 30 testHTTPConnect(); 31 // writeln("Loaded. Running event loop..."); 32 testFile(); 33 testTCPConnect("localhost", 8081); 34 35 while(Clock.currTime() - gs_start < 7.seconds) 36 g_evl.loop(100.msecs); 37 38 int i; 39 foreach (bool b; g_cbCheck) { 40 assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]"); 41 i++; 42 } 43 // writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start); 44 45 assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times 46 47 g_listnr.kill(); 48 49 } 50 51 shared static ~this() { destroyAsyncThreads(); } 52 53 StopWatch g_swDns; 54 void testDNS() { 55 g_dns = new shared AsyncDNS(g_evl); 56 g_swDns.start(); 57 g_dns.handler((NetworkAddress addr) { 58 g_cbCheck[17] = true; 59 // writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs"); 60 }).resolveHost("127.0.0.1"); 61 } 62 63 void testDirectoryWatcher() { 64 import std.file : mkdir, write, rmdir, exists; 65 if (exists("./hey/tmp.tmp")) 66 remove("./hey/tmp.tmp"); 67 if (exists("./hey")) 68 rmdir("./hey"); 69 g_watcher = new AsyncDirectoryWatcher(g_evl); 70 g_watcher.run({ 71 DWChangeInfo[1] change; 72 DWChangeInfo[] changeRef = change.ptr[0..1]; 73 while(g_watcher.readChanges(changeRef)){ 74 g_cbCheck[18] = true; 75 writeln(change); 76 } 77 }); 78 g_watcher.watchDir("."); 79 AsyncTimer tm = new AsyncTimer(g_evl); 80 tm.duration(1.seconds).run({ 81 writeln("Creating directory ./hey"); 82 mkdir("./hey"); 83 assert(g_watcher.watchDir("./hey/")); 84 tm.duration(1.seconds).run({ 85 writeln("Writing to ./hey/tmp.tmp for the first time"); 86 std.file.write("./hey/tmp.tmp", "some string"); 87 tm.duration(100.msecs).run({ 88 writeln("Removing ./hey/tmp.tmp"); 89 remove("./hey/tmp.tmp"); 90 }); 91 }); 92 }); 93 } 94 95 void testFile() { 96 gs_file = new shared AsyncFile(g_evl); 97 98 { 99 File file = File("test.txt", "w"); 100 file.rawWrite("This is the file content."); 101 } 102 gs_file.onReady({ 103 writeln("Created and wrote to test.txt through AsyncFile"); 104 auto file = gs_file; 105 if (file.status.code == Status.ERROR) 106 writeln("ERROR: ", file.status.text); 107 import std.algorithm; 108 if ((cast(string)file.buffer).startsWith("This is the file content.")) { 109 g_cbCheck[7] = true; 110 } 111 else { 112 //import std.stdio : writeln; 113 // writeln("ERROR: ", cast(string)file.buffer); 114 assert(false); 115 } 116 import std.file : remove; 117 remove("test.txt"); 118 writeln("Removed test.txt .. "); 119 }).read("test.txt"); 120 121 } 122 123 124 void testSignal() { 125 g_notifier = new AsyncNotifier(g_evl); 126 auto title = "This is my title"; 127 128 void delegate() del = { 129 import std.stdio; 130 assert(title == "This is my title"); 131 g_cbCheck[0] = true; 132 133 return; 134 }; 135 136 g_notifier.run(del); 137 g_notifier.trigger(); // will be completed in the event loop 138 } 139 140 void testEvents() { 141 142 gs_tlsEvent.run({ 143 assert(g_message == "Some message here"); 144 g_cbCheck[1] = true; 145 }); 146 147 gs_shrEvent = new shared AsyncSignal(g_evl); 148 149 gs_shrEvent.run({ 150 assert(gs_hshr.message == "Hello from shared!"); 151 g_cbCheck[2] = true; 152 }); 153 154 testTLSEvent(); 155 156 import std.concurrency; 157 Tid t2 = spawn(&testSharedEvent); 158 import core.thread : Thread; 159 while (!gs_shrEvent2 || gs_shrEvent2.id == 0) 160 Thread.sleep(100.msecs); 161 162 gs_shrEvent2.trigger(g_evl); 163 } 164 165 void testTLSEvent() { 166 gs_tlsEvent.trigger(); 167 } 168 169 void testSharedEvent() { 170 EventLoop evl2 = new EventLoop; 171 172 gs_shrEvent2 = new shared AsyncSignal(evl2); 173 gs_shrEvent2.run({ 174 g_cbCheck[3] = true; 175 return; 176 }); 177 178 gs_shrEvent.trigger(evl2); 179 180 while(Clock.currTime() - gs_start < 1.seconds) 181 evl2.loop(); 182 183 gs_shrEvent.trigger(evl2); 184 185 while(Clock.currTime() - gs_start < 4.seconds) 186 evl2.loop(); 187 } 188 189 void testOneshotTimer() { 190 AsyncTimer g_timerOneShot = new AsyncTimer(g_evl); 191 g_timerOneShot.duration(1.seconds).run({ 192 assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1100.msecs); 193 assert(g_timerOneShot.id > 0); 194 g_cbCheck[4] = true; 195 196 }); 197 } 198 199 void testMultiTimer() { 200 AsyncTimer g_timerMulti = new AsyncTimer(g_evl); 201 g_timerMulti.periodic().duration(1.seconds).run({ 202 assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1100.msecs); 203 assert(g_timerMulti.id > 0); 204 assert(!g_timerMulti.oneShot); 205 g_lastTimer = Clock.currTime(); 206 g_cbTimerCnt++; 207 g_cbCheck[5] = true; 208 }); 209 210 } 211 212 213 void trafficHandler(TCPEvent ev){ 214 // writeln("##TrafficHandler!"); 215 void doRead() { 216 static ubyte[] bin = new ubyte[4092]; 217 while (true) { 218 uint len = g_conn.recv(bin); 219 // writeln("!!Server Received " ~ len.to!string ~ " bytes"); 220 // import std.file; 221 if (len > 0) { 222 auto res = cast(string)bin[0..len]; 223 // writeln(res); 224 import std.algorithm : canFind; 225 if (res.canFind("Client Hello")) 226 g_cbCheck[8] = true; 227 228 if (res.canFind("Client WRITE")) 229 g_cbCheck[8] = false; 230 231 if (res.canFind("Client READ")) 232 g_cbCheck[9] = true; 233 234 if (res.canFind("Client KILL")) 235 g_cbCheck[10] = true; 236 } 237 if (len < bin.length) 238 break; 239 } 240 } 241 242 final switch (ev) { 243 case TCPEvent.CONNECT: 244 // writeln("!!Server Connected"); 245 doRead(); 246 if (g_conn.socket != 0) 247 g_conn.send(cast(ubyte[])"Server Connect"); 248 break; 249 case TCPEvent.READ: 250 // writeln("!!Server Read is ready"); 251 g_cbCheck[11] = true; 252 if (g_conn.socket != 0) 253 g_conn.send(cast(ubyte[])"Server READ"); 254 doRead(); 255 break; 256 case TCPEvent.WRITE: 257 // writeln("!!Server Write is ready"); 258 if (g_conn.socket != 0) 259 g_conn.send(cast(ubyte[])"Server WRITE"); 260 break; 261 case TCPEvent.CLOSE: 262 doRead(); 263 // writeln("!!Server Disconnect!"); 264 g_cbCheck[12] = true; 265 break; 266 case TCPEvent.ERROR: 267 // writeln("!!Server Error!"); 268 break; 269 } 270 271 return; 272 } 273 274 void testTCPListen(string ip, ushort port) { 275 276 g_listnr = new AsyncTCPListener(g_evl); 277 278 void delegate(TCPEvent) handler(AsyncTCPConnection conn) { 279 g_conn = conn; 280 g_cbCheck[6] = true; 281 import std.functional : toDelegate; 282 return toDelegate(&trafficHandler); 283 } 284 285 auto success = g_listnr.host(ip, port).run(&handler); 286 assert(success, g_listnr.error); 287 } 288 289 void testTCPConnect(string ip, ushort port) { 290 auto conn = new AsyncTCPConnection(g_evl); 291 conn.peer = g_evl.resolveHost(ip, port); 292 293 void delegate(TCPEvent) connHandler = (TCPEvent ev){ 294 void doRead() { 295 static ubyte[] bin = new ubyte[4092]; 296 while (true) { 297 assert(conn.socket > 0); 298 uint len = conn.recv(bin); 299 // writeln("!!Client Received " ~ len.to!string ~ " bytes"); 300 // if (len > 0) 301 // writeln(cast(string)bin[0..len]); 302 if (len < bin.length) 303 break; 304 } 305 } 306 final switch (ev) { 307 case TCPEvent.CONNECT: 308 // writeln("!!Client Connected"); 309 conn.setOption(TCPOption.QUICK_ACK, true); 310 conn.setOption(TCPOption.NODELAY, true); 311 g_cbCheck[14] = true; 312 if (conn.socket != 0) 313 conn.send(cast(ubyte[])"Client Hello"); 314 assert(conn.socket > 0); 315 break; 316 case TCPEvent.READ: 317 // writeln("!!Client Read is ready at writes: ", g_writes); 318 doRead(); 319 320 // respond 321 g_writes += 1; 322 if (g_writes > 3) { 323 if (conn.socket != 0) 324 conn.send(cast(ubyte[])"Client KILL"); 325 conn.kill(); 326 327 g_cbCheck[13] = true; 328 } 329 else 330 if (conn.socket != 0) 331 conn.send(cast(ubyte[])"Client READ"); 332 333 break; 334 case TCPEvent.WRITE: 335 336 g_writes += 1; 337 // writeln("!!Client Write is ready"); 338 if (conn.socket != 0) 339 conn.send(cast(ubyte[])"Client WRITE"); 340 break; 341 case TCPEvent.CLOSE: 342 // writeln("!!Client Disconnected"); 343 break; 344 case TCPEvent.ERROR: 345 // writeln("!!Client Error!"); 346 break; 347 } 348 return; 349 }; 350 351 auto success = conn.run(connHandler); 352 assert(success); 353 354 } 355 356 void testHTTPConnect() { 357 auto conn = new AsyncTCPConnection(g_evl); 358 conn.peer = g_evl.resolveHost("example.org", 80); 359 360 auto del = (TCPEvent ev){ 361 final switch (ev) { 362 case TCPEvent.CONNECT: 363 // writeln("!!Connected"); 364 static ubyte[] abin = new ubyte[4092]; 365 while (true) { 366 uint len = conn.recv(abin); 367 if (len < abin.length) 368 break; 369 } 370 g_cbCheck[15] = true; 371 // writeln(conn.local.toString()); 372 // writeln(conn.peer.toString()); 373 conn.send(cast(ubyte[])"GET http://example.org/\nHost: example.org\nConnection: close"); 374 break; 375 case TCPEvent.READ: 376 static ubyte[] bin = new ubyte[4092]; 377 while (true) { 378 uint len = conn.recv(bin); 379 g_cbCheck[16] = true; 380 // writeln("!!Received " ~ len.to!string ~ " bytes"); 381 if (len < bin.length) 382 break; 383 } 384 break; 385 case TCPEvent.WRITE: 386 // writeln("!!Write is ready"); 387 break; 388 case TCPEvent.CLOSE: 389 // writeln("!!Disconnected"); 390 break; 391 case TCPEvent.ERROR: 392 // writeln("!!Error!"); 393 break; 394 } 395 return; 396 }; 397 398 399 conn.run(del); 400 } 401 402 EventLoop g_evl; 403 AsyncTimer g_timerOneShot; 404 AsyncTimer g_timerMulti; 405 AsyncTCPConnection g_tcpConnect; 406 AsyncTCPConnection g_httpConnect; 407 AsyncTCPConnection g_conn; // incoming 408 AsyncNotifier g_notifier; 409 AsyncTCPListener g_listnr; 410 shared AsyncSignal gs_tlsEvent; 411 shared AsyncSignal gs_shrEvent; 412 shared AsyncSignal gs_shrEvent2; 413 shared AsyncFile gs_file; 414 __gshared SysTime gs_start; 415 string g_message = "Some message here"; 416 shared Msg* gs_hshr = new shared Msg("Hello from shared!"); 417 shared bool[] g_cbCheck; 418 int g_cbTimerCnt; 419 int g_writes; 420 SysTime g_lastTimer; 421 422 shared struct Msg { 423 string message; 424 }