1 module libasync.threads; 2 import core.sync.mutex; 3 import core.sync.condition; 4 import core.thread; 5 import libasync.events; 6 import std.stdio; 7 import std.container : Array; 8 import core.atomic; 9 10 nothrow { 11 struct Waiter { 12 Mutex mtx; 13 Condition cond; 14 } 15 16 __gshared Mutex gs_wlock; 17 __gshared Array!Waiter gs_waiters; 18 __gshared Array!CommandInfo gs_jobs; 19 __gshared Condition gs_started; 20 shared(bool) gs_closing; 21 22 __gshared ThreadGroup gs_threads; // daemon threads 23 shared(int) gs_threadCnt; 24 25 } 26 27 final class CmdProcessor : Thread 28 { 29 nothrow: 30 private: 31 EventLoop m_evLoop; 32 Waiter m_waiter; 33 shared(bool) g_stop; 34 35 this() { 36 try { 37 Mutex mtx = new Mutex; 38 Condition cond = new Condition(mtx); 39 m_waiter = Waiter(mtx, cond); 40 super(&run); 41 } 42 catch (Throwable e) { 43 static if (DEBUG) { 44 import std.stdio; 45 try writeln("Failed to run thread ... ", e.toString()); catch {} 46 } 47 } 48 } 49 50 void process(shared AsyncDNS ctxt) { 51 DNSCmdInfo cmdInfo = ctxt.cmdInfo(); 52 auto mutex = cmdInfo.mtx; 53 DNSCmd cmd; 54 Waiter waiter; 55 string url; 56 cmd = cmdInfo.command; 57 waiter = cast(Waiter)cmdInfo.waiter; 58 url = cmdInfo.url; 59 try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {} 60 61 try final switch (cmd) 62 { 63 case DNSCmd.RESOLVEHOST: 64 *ctxt.addr = cast(shared) m_evLoop.resolveHost(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no); 65 break; 66 67 case DNSCmd.RESOLVEIP: 68 *ctxt.addr = cast(shared) m_evLoop.resolveIP(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no); 69 break; 70 71 } catch (Throwable e) { 72 auto status = StatusInfo.init; 73 status.code = Status.ERROR; 74 try status.text = e.toString(); catch {} 75 ctxt.status = status; 76 } 77 78 try { 79 cmdInfo.ready.trigger(m_evLoop); 80 synchronized(gs_wlock) 81 gs_waiters.insertBack(m_waiter); 82 gs_started.notifyAll(); // saves some waiting on a new thread 83 } 84 catch (Throwable e) { 85 auto status = StatusInfo.init; 86 status.code = Status.ERROR; 87 try status.text = e.toString(); catch {} 88 ctxt.status = status; 89 } 90 } 91 92 void process(shared AsyncFile ctxt) { 93 auto cmdInfo = ctxt.cmdInfo; 94 auto mutex = cmdInfo.mtx; 95 FileCmd cmd; 96 Waiter waiter; 97 cmd = cmdInfo.command; 98 waiter = cast(Waiter)cmdInfo.waiter; 99 100 try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {} 101 102 try final switch (cmd) 103 { 104 case FileCmd.READ: 105 File file = ctxt.file; 106 if (ctxt.offset != -1) 107 file.seek(cast(long)ctxt.offset); 108 ubyte[] res; 109 synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer); 110 if (res) 111 ctxt.offset = cast(ulong) (ctxt.offset + res.length); 112 113 break; 114 115 case FileCmd.WRITE: 116 117 File file = cast(File)ctxt.file; 118 if (ctxt.offset != -1) 119 file.seek(cast(long)ctxt.offset); 120 synchronized(mutex) { 121 file.rawWrite(cast(ubyte[])ctxt.buffer); 122 } 123 file.flush(); 124 ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length); 125 break; 126 127 case FileCmd.APPEND: 128 File file = cast(File)ctxt.file; 129 synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); 130 ctxt.offset = cast(ulong) file.size(); 131 file.flush(); 132 break; 133 } catch (Throwable e) { 134 auto status = StatusInfo.init; 135 status.code = Status.ERROR; 136 try status.text = "Error in " ~ cmd.to!string ~ ", " ~ e.toString(); catch {} 137 ctxt.status = status; 138 } 139 140 141 try { 142 143 cmdInfo.ready.trigger(m_evLoop); 144 145 synchronized(gs_wlock) 146 gs_waiters.insertBack(m_waiter); 147 gs_started.notifyAll(); // saves some waiting on a new thread 148 } 149 catch (Throwable e) { 150 static if (DEBUG) { 151 try writeln("AsyncFile Thread Error: ", e.toString()); catch {} 152 } 153 auto status = StatusInfo.init; 154 status.code = Status.ERROR; 155 try status.text = e.toString(); catch {} 156 ctxt.status = status; 157 } 158 } 159 160 void run() 161 { 162 core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1); 163 try { 164 m_evLoop = new EventLoop; 165 synchronized(gs_wlock) { 166 gs_waiters.insertBack(m_waiter); 167 } 168 169 gs_started.notifyAll(); 170 171 process(); 172 } catch (Throwable e) { 173 static if (DEBUG) { 174 try writeln("Error inserting in waiters " ~ e.toString()); catch {} 175 } 176 } 177 178 core.atomic.atomicOp!"-="(gs_threadCnt, cast(int) 1); 179 } 180 181 void stop() 182 { 183 atomicStore(g_stop, true); 184 try (cast(Waiter)m_waiter).cond.notifyAll(); 185 catch (Exception e) { 186 static if (DEBUG) { 187 try writeln("Exception occured notifying foreign thread: ", e.toString()); catch {} 188 } 189 } 190 } 191 192 private void process() { 193 while(!atomicLoad(g_stop)) { 194 CommandInfo cmd; 195 try synchronized(m_waiter.mtx) { 196 if (!m_waiter.cond) return; 197 m_waiter.cond.wait(); 198 } 199 catch {} 200 if (atomicLoad(g_stop)) break; 201 try synchronized(gs_wlock) { 202 if (gs_jobs.empty) continue; 203 cmd = gs_jobs.back; 204 gs_jobs.removeBack(); 205 } catch {} 206 207 final switch (cmd.type) { 208 case CmdInfoType.FILE: 209 process(cast(shared AsyncFile) cmd.data); 210 break; 211 case CmdInfoType.DNS: 212 process(cast(shared AsyncDNS) cmd.data); 213 break; 214 } 215 216 } 217 } 218 219 } 220 221 Waiter popWaiter() { 222 if (atomicLoad(gs_threadCnt) == 0) return Waiter.init; // we're in a static ctor probably... 223 Waiter cmd_handler; 224 bool start_thread; 225 do { 226 if (start_thread) { 227 Thread thr = new CmdProcessor; 228 thr.isDaemon = true; 229 thr.name = "CmdProcessor"; 230 thr.start(); 231 gs_threads.add(thr); 232 } 233 234 synchronized(gs_wlock) { 235 if (start_thread && !gs_started.wait(5.seconds)) 236 continue; 237 238 try { 239 if (!cmd_handler.mtx && !gs_waiters.empty) { 240 cmd_handler = gs_waiters.back; 241 gs_waiters.removeBack(); 242 } 243 else if (core.atomic.atomicLoad(gs_threadCnt) < 16) { 244 start_thread = true; 245 } 246 else { 247 Thread.sleep(50.usecs); 248 } 249 } catch (Exception e){ 250 static if (DEBUG) { 251 import std.stdio : writeln; 252 writeln("Exception in popWaiter: ", e); 253 } 254 } 255 } 256 } while(!cmd_handler.cond); 257 return cmd_handler; 258 } 259 260 shared static this() { 261 import std.stdio : writeln; 262 gs_wlock = new Mutex; 263 gs_threads = new ThreadGroup; 264 gs_started = new Condition(gs_wlock); 265 } 266 267 bool spawnAsyncThreads() nothrow { 268 import core.atomic : atomicLoad; 269 try { 270 if(!atomicLoad(gs_closing) && atomicLoad(gs_threadCnt) == 0) { 271 synchronized { 272 if(!atomicLoad(gs_closing) && atomicLoad(gs_threadCnt) == 0) { 273 foreach (i; 0 .. 4) { 274 Thread thr = new CmdProcessor; 275 thr.isDaemon = true; 276 thr.name = "CmdProcessor"; 277 gs_threads.add(thr); 278 thr.start(); 279 synchronized(gs_wlock) 280 gs_started.wait(1.seconds); 281 } 282 } 283 } 284 } 285 } catch(Exception ignore) { 286 return false; 287 } 288 return true; 289 } 290 291 void destroyAsyncThreads() { 292 if (!atomicLoad(gs_closing)) atomicStore(gs_closing, true); 293 else return; 294 synchronized(gs_wlock) foreach (thr; gs_threads) { 295 CmdProcessor thread = cast(CmdProcessor)thr; 296 thread.stop(); 297 import core.thread : thread_isMainThread; 298 if (thread_isMainThread) 299 thread.join(); 300 } 301 } 302 303 static ~this() { 304 import core.thread : thread_isMainThread; 305 if (thread_isMainThread) 306 destroyAsyncThreads(); 307 } 308 309 shared static ~this() { 310 assert(atomicLoad(gs_closing), "You must call libasync.threads.destroyAsyncThreads() upon termination of the program to avoid segfaulting"); 311 } 312 313 enum CmdInfoType { 314 FILE, 315 DNS 316 } 317 318 struct CommandInfo { 319 CmdInfoType type; 320 void* data; 321 }