1 module libasync.test; 2 version(unittest): 3 import libasync.events; 4 import std.stdio; 5 import std.datetime; 6 import libasync.file; 7 8 AsyncDirectoryWatcher g_watcher; 9 shared AsyncDNS g_dns; 10 11 12 unittest { 13 // writeln("Unit test started"); 14 g_cbCheck = new shared bool[19]; 15 g_lastTimer = Clock.currTime(); 16 gs_start = Clock.currTime(); 17 g_evl = getThreadEventLoop(); 18 // writeln("Loading objects..."); 19 testDirectoryWatcher(); 20 testDNS(); 21 testFile(); 22 testOneshotTimer(); 23 testMultiTimer(); 24 gs_tlsEvent = new shared AsyncSignal(g_evl); 25 testSignal(); 26 testEvents(); 27 testTCPListen("localhost", 8081); 28 testHTTPConnect(); 29 // writeln("Loaded. Running event loop..."); 30 31 testTCPConnect("localhost", 8081); 32 33 while(Clock.currTime() - gs_start < 4.seconds) 34 g_evl.loop(100.msecs); 35 36 int i; 37 foreach (bool b; g_cbCheck) { 38 assert(b, "Callback not triggered: g_cbCheck[" ~ i.to!string ~ "]"); 39 i++; 40 } 41 // writeln("Callback triggers were successful, run time: ", Clock.currTime - gs_start); 42 43 assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times 44 45 g_listnr.kill(); 46 47 destroyAsyncThreads(); 48 } 49 50 StopWatch g_swDns; 51 void testDNS() { 52 g_dns = new shared AsyncDNS(g_evl); 53 g_swDns.start(); 54 g_dns.handler((NetworkAddress addr) { 55 g_cbCheck[17] = true; 56 // writeln("Resolved to: ", addr.toString(), ", it took: ", g_swDns.peek().usecs, " usecs"); 57 }).resolveHost("127.0.0.1"); 58 } 59 60 void testDirectoryWatcher() { 61 import std.file : mkdir, write, rmdir, exists; 62 if (exists("./hey/tmp.tmp")) 63 remove("./hey/tmp.tmp"); 64 if (exists("./hey")) 65 rmdir("./hey"); 66 g_watcher = new AsyncDirectoryWatcher(g_evl); 67 g_watcher.run({ 68 DWChangeInfo[1] change; 69 DWChangeInfo[] changeRef = change.ptr[0..1]; 70 while(g_watcher.readChanges(changeRef)){ 71 g_cbCheck[18] = true; 72 // writeln(change); 73 } 74 }); 75 g_watcher.watchDir("."); 76 AsyncTimer tm = new AsyncTimer(g_evl); 77 tm.duration(1.seconds).run({ 78 mkdir("./hey"); 79 if (!g_watcher.watchDir("./hey/")) 80 // writeln("ERROR: ", g_watcher.status); 81 tm.duration(1.seconds).run({ 82 // writeln("Writing to ./hey/tmp.tmp"); 83 std.file.write("./hey/tmp.tmp", "some string"); 84 tm.duration(100.msecs).run({ 85 // writeln("Removing ./hey/tmp.tmp"); 86 remove("./hey/tmp.tmp"); 87 }); 88 }); 89 }); 90 } 91 92 void testFile() { 93 94 gs_file = new shared AsyncFile(g_evl); 95 96 { 97 File file = File("test.txt", "w"); 98 file.rawWrite("This is the file content."); 99 } 100 gs_file.onReady({ 101 auto file = gs_file; 102 if (file.status.code == Status.ERROR) 103 writeln("ERROR: ", file.status.text); 104 import std.algorithm; 105 if ((cast(string)file.buffer).startsWith("This is the file content.")) { 106 g_cbCheck[7] = true; 107 } 108 else { 109 //import std.stdio : writeln; 110 // writeln("ERROR: ", cast(string)file.buffer); 111 assert(false); 112 } 113 import std.file : remove; 114 remove("test.txt"); 115 }).read("test.txt"); 116 117 } 118 119 120 void testSignal() { 121 g_notifier = new AsyncNotifier(g_evl); 122 auto title = "This is my title"; 123 124 void delegate() del = { 125 import std.stdio; 126 assert(title == "This is my title"); 127 g_cbCheck[0] = true; 128 129 return; 130 }; 131 132 g_notifier.run(del); 133 g_notifier.trigger(); // will be completed in the event loop 134 } 135 136 void testEvents() { 137 138 gs_tlsEvent.run({ 139 assert(g_message == "Some message here"); 140 g_cbCheck[1] = true; 141 }); 142 143 gs_shrEvent = new shared AsyncSignal(g_evl); 144 145 gs_shrEvent.run({ 146 assert(gs_hshr.message == "Hello from shared!"); 147 g_cbCheck[2] = true; 148 }); 149 150 testTLSEvent(); 151 152 import std.concurrency; 153 Tid t2 = spawn(&testSharedEvent); 154 import core.thread : Thread; 155 while (!gs_shrEvent2 || gs_shrEvent2.id == 0) 156 Thread.sleep(100.msecs); 157 158 gs_shrEvent2.trigger(g_evl); 159 } 160 161 void testTLSEvent() { 162 gs_tlsEvent.trigger(); 163 } 164 165 void testSharedEvent() { 166 EventLoop evl2 = new EventLoop; 167 168 gs_shrEvent2 = new shared AsyncSignal(evl2); 169 gs_shrEvent2.run({ 170 g_cbCheck[3] = true; 171 return; 172 }); 173 174 gs_shrEvent.trigger(evl2); 175 176 while(Clock.currTime() - gs_start < 1.seconds) 177 evl2.loop(); 178 179 gs_shrEvent.trigger(evl2); 180 181 while(Clock.currTime() - gs_start < 4.seconds) 182 evl2.loop(); 183 } 184 185 void testOneshotTimer() { 186 AsyncTimer g_timerOneShot = new AsyncTimer(g_evl); 187 g_timerOneShot.duration(1.seconds).run({ 188 assert(!g_cbCheck[4] && Clock.currTime() - gs_start > 900.msecs && Clock.currTime() - gs_start < 1100.msecs); 189 assert(g_timerOneShot.id > 0); 190 g_cbCheck[4] = true; 191 192 }); 193 } 194 195 void testMultiTimer() { 196 AsyncTimer g_timerMulti = new AsyncTimer(g_evl); 197 g_timerMulti.periodic().duration(1.seconds).run({ 198 assert(g_lastTimer !is SysTime.init && Clock.currTime() - g_lastTimer > 900.msecs && Clock.currTime() - g_lastTimer < 1100.msecs); 199 assert(g_timerMulti.id > 0); 200 assert(!g_timerMulti.oneShot); 201 g_lastTimer = Clock.currTime(); 202 g_cbTimerCnt++; 203 g_cbCheck[5] = true; 204 }); 205 206 } 207 208 209 void trafficHandler(TCPEvent ev){ 210 // writeln("##TrafficHandler!"); 211 void doRead() { 212 static ubyte[] bin = new ubyte[4092]; 213 while (true) { 214 uint len = g_conn.recv(bin); 215 // writeln("!!Server Received " ~ len.to!string ~ " bytes"); 216 // import std.file; 217 if (len > 0) { 218 auto res = cast(string)bin[0..len]; 219 // writeln(res); 220 import std.algorithm : canFind; 221 if (res.canFind("Client Hello")) 222 g_cbCheck[8] = true; 223 224 if (res.canFind("Client WRITE")) 225 g_cbCheck[8] = false; 226 227 if (res.canFind("Client READ")) 228 g_cbCheck[9] = true; 229 230 if (res.canFind("Client KILL")) 231 g_cbCheck[10] = true; 232 } 233 if (len < bin.length) 234 break; 235 } 236 } 237 238 final switch (ev) { 239 case TCPEvent.CONNECT: 240 // writeln("!!Server Connected"); 241 doRead(); 242 if (g_conn.socket != 0) 243 g_conn.send(cast(ubyte[])"Server Connect"); 244 break; 245 case TCPEvent.READ: 246 // writeln("!!Server Read is ready"); 247 g_cbCheck[11] = true; 248 if (g_conn.socket != 0) 249 g_conn.send(cast(ubyte[])"Server READ"); 250 doRead(); 251 break; 252 case TCPEvent.WRITE: 253 // writeln("!!Server Write is ready"); 254 if (g_conn.socket != 0) 255 g_conn.send(cast(ubyte[])"Server WRITE"); 256 break; 257 case TCPEvent.CLOSE: 258 doRead(); 259 // writeln("!!Server Disconnect!"); 260 g_cbCheck[12] = true; 261 break; 262 case TCPEvent.ERROR: 263 // writeln("!!Server Error!"); 264 break; 265 } 266 267 return; 268 } 269 270 void testTCPListen(string ip, ushort port) { 271 272 g_listnr = new AsyncTCPListener(g_evl); 273 274 void delegate(TCPEvent) handler(AsyncTCPConnection conn) { 275 g_conn = conn; 276 g_cbCheck[6] = true; 277 import std.functional : toDelegate; 278 return toDelegate(&trafficHandler); 279 } 280 281 auto success = g_listnr.host(ip, port).run(&handler); 282 assert(success, g_listnr.error); 283 } 284 285 void testTCPConnect(string ip, ushort port) { 286 auto conn = new AsyncTCPConnection(g_evl); 287 conn.peer = g_evl.resolveHost(ip, port); 288 289 void delegate(TCPEvent) connHandler = (TCPEvent ev){ 290 void doRead() { 291 static ubyte[] bin = new ubyte[4092]; 292 while (true) { 293 assert(conn.socket > 0); 294 uint len = conn.recv(bin); 295 // writeln("!!Client Received " ~ len.to!string ~ " bytes"); 296 // if (len > 0) 297 // writeln(cast(string)bin[0..len]); 298 if (len < bin.length) 299 break; 300 } 301 } 302 final switch (ev) { 303 case TCPEvent.CONNECT: 304 // writeln("!!Client Connected"); 305 conn.setOption(TCPOption.QUICK_ACK, true); 306 conn.setOption(TCPOption.NODELAY, true); 307 g_cbCheck[14] = true; 308 if (conn.socket != 0) 309 conn.send(cast(ubyte[])"Client Hello"); 310 assert(conn.socket > 0); 311 break; 312 case TCPEvent.READ: 313 // writeln("!!Client Read is ready at writes: ", g_writes); 314 doRead(); 315 316 // respond 317 g_writes += 1; 318 if (g_writes > 3) { 319 if (conn.socket != 0) 320 conn.send(cast(ubyte[])"Client KILL"); 321 conn.kill(); 322 323 g_cbCheck[13] = true; 324 } 325 else 326 if (conn.socket != 0) 327 conn.send(cast(ubyte[])"Client READ"); 328 329 break; 330 case TCPEvent.WRITE: 331 332 g_writes += 1; 333 // writeln("!!Client Write is ready"); 334 if (conn.socket != 0) 335 conn.send(cast(ubyte[])"Client WRITE"); 336 break; 337 case TCPEvent.CLOSE: 338 // writeln("!!Client Disconnected"); 339 break; 340 case TCPEvent.ERROR: 341 // writeln("!!Client Error!"); 342 break; 343 } 344 return; 345 }; 346 347 auto success = conn.run(connHandler); 348 assert(success); 349 350 } 351 352 void testHTTPConnect() { 353 auto conn = new AsyncTCPConnection(g_evl); 354 conn.peer = g_evl.resolveHost("example.org", 80); 355 356 auto del = (TCPEvent ev){ 357 final switch (ev) { 358 case TCPEvent.CONNECT: 359 // writeln("!!Connected"); 360 static ubyte[] abin = new ubyte[4092]; 361 while (true) { 362 uint len = conn.recv(abin); 363 if (len < abin.length) 364 break; 365 } 366 g_cbCheck[15] = true; 367 // writeln(conn.local.toString()); 368 // writeln(conn.peer.toString()); 369 conn.send(cast(ubyte[])"GET http://example.org/\nHost: example.org\nConnection: close"); 370 break; 371 case TCPEvent.READ: 372 static ubyte[] bin = new ubyte[4092]; 373 while (true) { 374 uint len = conn.recv(bin); 375 g_cbCheck[16] = true; 376 // writeln("!!Received " ~ len.to!string ~ " bytes"); 377 if (len < bin.length) 378 break; 379 } 380 break; 381 case TCPEvent.WRITE: 382 // writeln("!!Write is ready"); 383 break; 384 case TCPEvent.CLOSE: 385 // writeln("!!Disconnected"); 386 break; 387 case TCPEvent.ERROR: 388 // writeln("!!Error!"); 389 break; 390 } 391 return; 392 }; 393 394 395 conn.run(del); 396 } 397 398 EventLoop g_evl; 399 AsyncTimer g_timerOneShot; 400 AsyncTimer g_timerMulti; 401 AsyncTCPConnection g_tcpConnect; 402 AsyncTCPConnection g_httpConnect; 403 AsyncTCPConnection g_conn; // incoming 404 AsyncNotifier g_notifier; 405 AsyncTCPListener g_listnr; 406 shared AsyncSignal gs_tlsEvent; 407 shared AsyncSignal gs_shrEvent; 408 shared AsyncSignal gs_shrEvent2; 409 shared AsyncFile gs_file; 410 __gshared SysTime gs_start; 411 string g_message = "Some message here"; 412 shared Msg* gs_hshr = new shared Msg("Hello from shared!"); 413 shared bool[] g_cbCheck; 414 int g_cbTimerCnt; 415 int g_writes; 416 SysTime g_lastTimer; 417 418 shared struct Msg { 419 string message; 420 }