1 module libasync.file; 2 import libasync.types; 3 import libasync.events; 4 import core.thread : Thread, ThreadGroup; 5 import core.sync.mutex; 6 import core.sync.condition; 7 import std.stdio; 8 import core.atomic; 9 import libasync.internals.path; 10 import libasync.threads; 11 import std.file; 12 import libasync.internals.memory; 13 14 /// Runs all blocking file I/O commands in a thread pool and calls the handler 15 /// upon completion. 16 shared final class AsyncFile 17 { 18 nothrow: 19 private: 20 EventLoop m_evLoop; 21 bool m_busy; 22 bool m_error; 23 FileReadyHandler m_handler; 24 FileCmdInfo m_cmdInfo; 25 StatusInfo m_status; 26 ulong m_cursorOffset; 27 Thread m_owner; 28 File* m_file; 29 30 public: 31 this(EventLoop evl) { 32 m_evLoop = cast(shared) evl; 33 m_cmdInfo.ready = new shared AsyncSignal(cast(EventLoop)m_evLoop); 34 m_cmdInfo.ready.run(cast(void delegate())&handler); 35 m_owner = cast(shared)Thread.getThis(); 36 m_file = cast(shared)new File; 37 try m_cmdInfo.mtx = cast(shared) new Mutex; catch {} 38 } 39 40 /// Cleans up the underlying resources. todo: make this dispose? 41 bool kill() { 42 scope(failure) assert(false); 43 if (file.isOpen) 44 (cast()*m_file).close(); 45 (cast()*m_file).__dtor(); 46 m_cmdInfo.ready.kill(); 47 m_cmdInfo = typeof(m_cmdInfo).init; 48 m_handler = typeof(m_handler).init; 49 return true; 50 } 51 52 synchronized @property StatusInfo status() const 53 { 54 return cast(StatusInfo) m_status; 55 } 56 57 @property string error() const 58 { 59 return status.text; 60 } 61 62 /// Retrieve the buffer from the last command. Must be called upon completion. 63 shared(ubyte[]) buffer() { 64 try synchronized(m_cmdInfo.mtx) 65 return m_cmdInfo.buffer; 66 catch {} 67 return null; 68 } 69 70 /// The current offset updated after the command execution 71 synchronized @property ulong offset() const { 72 return m_cursorOffset; 73 } 74 75 /// Sets the handler called by the owner thread's event loop after the command is completed. 76 shared(typeof(this)) onReady(void delegate() del) { 77 shared FileReadyHandler handler; 78 handler.del = del; 79 handler.ctxt = this; 80 try synchronized(this) m_handler = handler; catch {} 81 return this; 82 } 83 84 /// Creates a new buffer with the specified length and uses it to read the 85 /// file data at the specified path starting at the specified offset byte. 86 bool read(string file_path, size_t len = 128, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) { 87 return read(file_path, new shared ubyte[len], off, create_if_not_exists, truncate_if_exists); 88 } 89 90 /// Reads the file into the buffer starting at offset byte position. 91 bool read(string file_path, shared ubyte[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 92 in { 93 assert(!m_busy, "File is busy or closed"); 94 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 95 } 96 body { 97 if (buffer.length == 0) { 98 try m_handler(); catch { } 99 return true; 100 } 101 try { 102 static string last_path; 103 static string last_native_path; 104 if (last_path == file_path) 105 file_path = last_native_path; 106 else { 107 last_path = file_path; 108 file_path = Path(file_path).toNativeString(); 109 last_native_path = file_path; 110 } 111 112 bool flag; 113 if (create_if_not_exists && !m_file && !exists(file_path)) 114 flag = true; 115 else if (truncate_if_exists && (m_file || exists(file_path))) 116 flag = true; 117 if (flag) // touch 118 { 119 if (file.isOpen) 120 file.close(); 121 import std.c.stdio; 122 import std.string : toStringz; 123 FILE * f = fopen(file_path.toStringz, "w\0".ptr); 124 fclose(f); 125 } 126 127 if (!file.isOpen || m_cmdInfo.command != FileCmd.READ) { 128 auto tmp = File(file_path, "rb"); 129 file = tmp; 130 m_cmdInfo.command = FileCmd.READ; 131 } 132 if (buffer.length <= 65_536) { 133 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 134 135 if (off != -1) 136 file.seek(cast(long)off); 137 ubyte[] res; 138 res = file.rawRead(cast(ubyte[])buffer); 139 if (res) 140 m_cursorOffset = cast(shared(ulong)) (off + res.length); 141 m_handler(); 142 return true; 143 } 144 } catch (Exception e) { 145 auto status = StatusInfo.init; 146 status.code = Status.ERROR; 147 try status.text = "Error in read, " ~ e.toString(); catch {} 148 m_status = cast(shared) status; 149 try m_handler(); catch { } 150 return false; 151 } 152 try synchronized(m_cmdInfo.mtx) { 153 m_cmdInfo.buffer = buffer; 154 m_cmdInfo.command = FileCmd.READ; 155 filePath = Path(file_path); 156 } catch {} 157 offset = off; 158 return sendCommand(); 159 } 160 161 /// Writes the data from the buffer into the file at the specified path starting at the 162 /// given offset byte position. 163 bool write(string file_path, shared const(ubyte)[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 164 in { 165 assert(!m_busy, "File is busy or closed"); 166 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 167 } 168 body { 169 if (buffer.length == 0) { 170 try m_handler(); catch { return false; } 171 return true; 172 } 173 try { 174 175 static string last_path; 176 static string last_native_path; 177 if (last_path == file_path) 178 file_path = last_native_path; 179 else { 180 last_path = file_path; 181 file_path = Path(file_path).toNativeString(); 182 last_native_path = file_path; 183 } 184 185 bool flag; 186 if (create_if_not_exists && !m_file && !exists(file_path)) 187 flag = true; 188 else if (truncate_if_exists && (m_file || exists(file_path))) 189 flag = true; 190 if (flag) // touch 191 { 192 if (file.isOpen) 193 file.close(); 194 import std.c.stdio; 195 import std.string : toStringz; 196 FILE * f = fopen(file_path.toStringz, "w\0".ptr); 197 fclose(f); 198 } 199 200 if (!file.isOpen || m_cmdInfo.command != FileCmd.WRITE) { 201 auto tmp = File(file_path, "r+b"); 202 file = tmp; 203 m_cmdInfo.command = FileCmd.WRITE; 204 } 205 206 if (buffer.length <= 65_536) { 207 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 208 if (off != -1) 209 file.seek(cast(long)off); 210 file.rawWrite(cast(ubyte[])buffer); 211 file.flush(); 212 m_cursorOffset = cast(shared(ulong)) (off + buffer.length); 213 m_handler(); 214 return true; 215 } 216 } catch (Exception e) { 217 auto status = StatusInfo.init; 218 status.code = Status.ERROR; 219 try status.text = "Error in write, " ~ e.toString(); catch {} 220 m_status = cast(shared) status; 221 return false; 222 } 223 try synchronized(m_cmdInfo.mtx) { 224 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 225 m_cmdInfo.command = FileCmd.WRITE; 226 filePath = Path(file_path); 227 } catch {} 228 offset = off; 229 return sendCommand(); 230 231 } 232 233 /// Appends the data from the buffer into a file at the specified path. 234 bool append(string file_path, shared ubyte[] buffer, bool create_if_not_exists = true, bool truncate_if_exists = false) 235 in { 236 assert(!m_busy, "File is busy or closed"); 237 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 238 } 239 body { 240 if (buffer.length == 0) { 241 try m_handler(); catch { return false; } 242 return true; 243 } 244 try { 245 static string last_path; 246 static string last_native_path; 247 if (last_path == file_path) 248 file_path = last_native_path; 249 else { 250 last_path = file_path; 251 file_path = Path(file_path).toNativeString(); 252 last_native_path = file_path; 253 } 254 255 bool flag; 256 if (create_if_not_exists && !m_file && !exists(file_path)) 257 flag = true; 258 else if (truncate_if_exists && (m_file || exists(file_path))) 259 flag = true; 260 if (flag) // touch 261 { 262 if (file.isOpen) 263 file.close(); 264 import std.c.stdio; 265 import std.string : toStringz; 266 FILE * f = fopen(file_path.toStringz, "w\0".ptr); 267 fclose(f); 268 } 269 270 if (!file.isOpen || m_cmdInfo.command != FileCmd.APPEND) { 271 auto tmp = File(file_path, "a+"); 272 file = tmp; 273 m_cmdInfo.command = FileCmd.APPEND; 274 } 275 if (buffer.length < 65_536) { 276 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 277 file.rawWrite(cast(ubyte[]) buffer); 278 m_cursorOffset = cast(shared(ulong)) file.size(); 279 file.flush(); 280 m_handler(); 281 return true; 282 } 283 } catch (Exception e) { 284 auto status = StatusInfo.init; 285 status.code = Status.ERROR; 286 try status.text = "Error in append, " ~ e.toString(); catch {} 287 m_status = cast(shared) status; 288 return false; 289 } 290 try synchronized(m_cmdInfo.mtx) { 291 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 292 m_cmdInfo.command = FileCmd.APPEND; 293 filePath = Path(file_path); 294 } catch {} 295 return sendCommand(); 296 } 297 298 private bool sendCommand() 299 in { assert(!waiting, "File is busy or closed"); } 300 body { 301 waiting = true; 302 m_error = false; 303 status = StatusInfo.init; 304 305 Waiter cmd_handler; 306 307 try { 308 cmd_handler = popWaiter(); 309 310 } catch (Throwable e) { 311 import std.stdio; 312 try { 313 status = StatusInfo(Status.ERROR, e.toString()); 314 m_error = true; 315 } catch {} 316 317 return false; 318 } 319 320 assert(cmd_handler.cond, "Could not lock a thread for async operations. Note: Async file I/O in static constructors is unsupported."); 321 322 m_cmdInfo.waiter = cast(shared) cmd_handler; 323 try { 324 synchronized(gs_wlock) 325 gs_jobs.insert(CommandInfo(CmdInfoType.FILE, cast(void*) this)); 326 cmd_handler.cond.notifyAll(); 327 } 328 catch (Exception e){ 329 static if (DEBUG) { 330 import std.stdio; 331 try writeln("Exception occured notifying foreign thread: ", e); catch {} 332 } 333 } 334 return true; 335 } 336 package: 337 338 synchronized @property FileCmdInfo cmdInfo() { 339 return m_cmdInfo; 340 } 341 342 synchronized @property Path filePath() { 343 return cast(Path) m_cmdInfo.filePath; 344 } 345 346 synchronized @property bool waiting() const { 347 return cast(bool) m_busy; 348 } 349 350 synchronized @property void filePath(Path file_path) { 351 m_cmdInfo.filePath = cast(shared) file_path; 352 } 353 354 synchronized @property File file() { 355 scope(failure) assert(false); 356 return cast()*m_file; 357 } 358 359 synchronized @property void file(ref File f) { 360 try (cast()*m_file).opAssign(f); 361 catch (Exception e) { 362 static if (DEBUG) { 363 import std.stdio : writeln; 364 try writeln(e.msg); catch {} 365 } 366 } 367 } 368 369 synchronized @property void status(StatusInfo stat) { 370 m_status = cast(shared) stat; 371 } 372 373 synchronized @property void offset(ulong val) { 374 m_cursorOffset = cast(shared) val; 375 } 376 377 synchronized @property void waiting(bool b) { 378 m_busy = cast(shared) b; 379 } 380 381 void handler() { 382 try m_handler(); 383 catch (Throwable e) { 384 static if (DEBUG) { 385 import std.stdio : writeln; 386 try writeln("Failed to send command. ", e.toString()); catch {} 387 } 388 } 389 } 390 } 391 392 package enum FileCmd { 393 READ, 394 WRITE, 395 APPEND 396 } 397 398 package shared struct FileCmdInfo 399 { 400 FileCmd command; 401 Path filePath; 402 ubyte[] buffer; 403 Waiter waiter; 404 AsyncSignal ready; 405 AsyncFile file; 406 Mutex mtx; // for buffer writing 407 } 408 409 package shared struct FileReadyHandler { 410 AsyncFile ctxt; 411 void delegate() del; 412 413 void opCall() { 414 assert(ctxt !is null); 415 ctxt.waiting = false; 416 del(); 417 return; 418 } 419 }