1 module libasync.threads; 2 import core.sync.mutex; 3 import core.sync.condition; 4 import core.thread; 5 import libasync.events; 6 import std.stdio; 7 import std.container : Array; 8 9 nothrow { 10 struct Waiter { 11 Mutex mtx; 12 Condition cond; 13 } 14 15 __gshared Mutex gs_wlock; 16 __gshared Array!Waiter gs_waiters; 17 __gshared Array!CommandInfo gs_jobs; 18 __gshared Condition gs_started; 19 __gshared bool gs_closing; 20 21 __gshared ThreadGroup gs_threads; // daemon threads 22 shared(int) gs_threadCnt; 23 24 } 25 26 final class CmdProcessor : Thread 27 { 28 nothrow: 29 private: 30 EventLoop m_evLoop; 31 Waiter m_waiter; 32 bool m_stop; 33 34 this() { 35 try { 36 Mutex mtx = new Mutex; 37 Condition cond = new Condition(mtx); 38 m_waiter = Waiter(mtx, cond); 39 super(&run); 40 } 41 catch (Throwable e) { 42 import std.stdio; 43 try writeln("Failed to run thread ... ", e.toString()); catch {} 44 } 45 } 46 47 void process(shared AsyncDNS ctxt) { 48 DNSCmdInfo cmdInfo = ctxt.cmdInfo(); 49 auto mutex = cmdInfo.mtx; 50 DNSCmd cmd; 51 Waiter waiter; 52 string url; 53 cmd = cmdInfo.command; 54 waiter = cast(Waiter)cmdInfo.waiter; 55 url = cmdInfo.url; 56 try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {} 57 58 try final switch (cmd) 59 { 60 case DNSCmd.RESOLVEHOST: 61 *ctxt.addr = cast(shared) m_evLoop.resolveHost(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no); 62 break; 63 64 case DNSCmd.RESOLVEIP: 65 *ctxt.addr = cast(shared) m_evLoop.resolveIP(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no); 66 break; 67 68 } catch (Throwable e) { 69 auto status = StatusInfo.init; 70 status.code = Status.ERROR; 71 try status.text = e.toString(); catch {} 72 ctxt.status = status; 73 } 74 75 try { 76 cmdInfo.ready.trigger(m_evLoop); 77 synchronized(gs_wlock) 78 gs_waiters.insertBack(m_waiter); 79 gs_started.notifyAll(); // saves some waiting on a new thread 80 } 81 catch (Throwable e) { 82 auto status = StatusInfo.init; 83 status.code = Status.ERROR; 84 try status.text = e.toString(); catch {} 85 ctxt.status = status; 86 } 87 } 88 89 void process(shared AsyncFile ctxt) { 90 auto cmdInfo = ctxt.cmdInfo; 91 auto mutex = cmdInfo.mtx; 92 FileCmd cmd; 93 Waiter waiter; 94 cmd = cmdInfo.command; 95 waiter = cast(Waiter)cmdInfo.waiter; 96 97 try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {} 98 import std.file : exists; 99 try if (cmdInfo.create_if_not_exists || cmdInfo.truncate_if_exists) { 100 bool flag; 101 if (cmdInfo.create_if_not_exists && !exists(ctxt.filePath.toNativeString())) 102 flag = true; 103 else if (cmdInfo.truncate_if_exists && exists(ctxt.filePath.toNativeString())) 104 flag = true; 105 if (flag) // touch 106 { File dummy = File(ctxt.filePath.toNativeString(), "w"); } 107 } 108 catch (Exception e){ 109 auto status = StatusInfo.init; 110 status.code = Status.ERROR; 111 try status.text = "Could not create the file in destination: " ~ e.toString(); catch {} 112 ctxt.status = status; 113 } 114 115 116 try final switch (cmd) 117 { 118 case FileCmd.READ: 119 File file = File(ctxt.filePath.toNativeString(), "rb"); 120 if (ctxt.offset != -1) 121 file.seek(cast(long)ctxt.offset); 122 ubyte[] res; 123 synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer); 124 if (res) 125 ctxt.offset = cast(ulong) (ctxt.offset + res.length); 126 127 break; 128 129 case FileCmd.WRITE: 130 131 File file = File(ctxt.filePath.toNativeString(), "r+b"); 132 if (ctxt.offset != -1) 133 file.seek(cast(long)ctxt.offset); 134 synchronized(mutex) { 135 file.rawWrite(cast(ubyte[])ctxt.buffer); 136 } 137 file.flush(); 138 ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length); 139 break; 140 141 case FileCmd.APPEND: 142 143 File file = File(ctxt.filePath.toNativeString(), "a+b"); 144 synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); 145 ctxt.offset = cast(ulong) file.size(); 146 break; 147 } catch (Throwable e) { 148 auto status = StatusInfo.init; 149 status.code = Status.ERROR; 150 try status.text = "Error in " ~ cmd.to!string ~ ", " ~ e.toString(); catch {} 151 ctxt.status = status; 152 } 153 154 155 try { 156 157 cmdInfo.ready.trigger(m_evLoop); 158 159 synchronized(gs_wlock) 160 gs_waiters.insertBack(m_waiter); 161 gs_started.notifyAll(); // saves some waiting on a new thread 162 } 163 catch (Throwable e) { 164 try writeln("ERROR"); catch {} 165 auto status = StatusInfo.init; 166 status.code = Status.ERROR; 167 try status.text = e.toString(); catch {} 168 ctxt.status = status; 169 } 170 } 171 172 void run() 173 { 174 try { 175 m_evLoop = new EventLoop; 176 synchronized(gs_wlock) { 177 gs_waiters.insertBack(m_waiter); 178 } 179 180 gs_started.notifyAll(); 181 182 process(); 183 } catch (Throwable e) { 184 try writeln("Error inserting in waiters " ~ e.toString()); catch {} 185 } 186 187 core.atomic.atomicOp!"-="(gs_threadCnt, cast(int) 1); 188 } 189 190 void stop() 191 { 192 m_stop = true; 193 try (cast(Waiter)m_waiter).cond.notifyAll(); 194 catch (Exception e) { 195 try writeln("Exception occured notifying foreign thread: ", e); catch {} 196 } 197 } 198 199 private void process() { 200 while(!m_stop) { 201 CommandInfo cmd; 202 try synchronized(m_waiter.mtx) 203 m_waiter.cond.wait(); 204 catch {} 205 206 if (m_stop) break; 207 208 try synchronized(gs_wlock) { 209 if (gs_jobs.empty) continue; 210 cmd = gs_jobs.back; 211 gs_jobs.removeBack(); 212 } catch {} 213 214 final switch (cmd.type) { 215 case CmdInfoType.FILE: 216 process(cast(shared AsyncFile) cmd.data); 217 break; 218 case CmdInfoType.DNS: 219 process(cast(shared AsyncDNS) cmd.data); 220 break; 221 } 222 223 } 224 } 225 226 } 227 228 Waiter popWaiter() { 229 Waiter cmd_handler; 230 bool start_thread; 231 do { 232 if (start_thread) { 233 Thread thr = new CmdProcessor; 234 thr.isDaemon = true; 235 thr.name = "CmdProcessor"; 236 thr.start(); 237 core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1); 238 gs_threads.add(thr); 239 } 240 241 synchronized(gs_wlock) { 242 if (start_thread && !gs_started.wait(5.seconds)) 243 continue; 244 245 try { 246 if (!cmd_handler.mtx && !gs_waiters.empty) { 247 cmd_handler = gs_waiters.back; 248 gs_waiters.removeBack(); 249 } 250 else if (core.atomic.atomicLoad(gs_threadCnt) < 16) { 251 start_thread = true; 252 } 253 else { 254 Thread.sleep(50.usecs); 255 } 256 } catch (Exception e){ 257 writeln("Exception in popWaiter: ", e); 258 } 259 } 260 } while(!cmd_handler.cond); 261 return cmd_handler; 262 } 263 264 shared static this() { 265 import std.stdio : writeln; 266 gs_wlock = new Mutex; 267 gs_threads = new ThreadGroup; 268 gs_started = new Condition(gs_wlock); 269 foreach (i; 0 .. 4) { 270 Thread thr = new CmdProcessor; 271 gs_threads.add(thr); 272 thr.isDaemon = true; 273 thr.name = "CmdProcessor"; 274 thr.start(); 275 core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1); 276 synchronized(gs_wlock) 277 gs_started.wait(1.seconds); 278 } 279 } 280 281 void destroyAsyncThreads() { 282 if (!gs_closing) gs_closing = true; 283 else return; 284 import core.memory : GC; 285 GC.disable(); 286 synchronized(gs_wlock) foreach (thr; gs_threads) { 287 CmdProcessor thread = cast(CmdProcessor)thr; 288 thread.stop(); 289 thread.join(); 290 } 291 } 292 293 shared static ~this() { 294 assert(core.atomic.atomicLoad(gs_threadCnt) == 0, "You must call libasync.threads.destroyAsyncThreads() upon termination of the program to avoid segfaulting"); 295 } 296 297 enum CmdInfoType { 298 FILE, 299 DNS 300 } 301 302 struct CommandInfo { 303 CmdInfoType type; 304 void* data; 305 }