1 /// 2 module libasync.file; 3 import libasync.types; 4 import libasync.events; 5 import core.thread : Thread, ThreadGroup; 6 import core.sync.mutex; 7 import core.sync.condition; 8 import std.stdio; 9 import core.atomic; 10 import libasync.internals.path; 11 import libasync.threads; 12 import std.file; 13 import std.conv : to; 14 import libasync.internals.memory; 15 import libasync.internals.logging; 16 17 /// Runs all blocking file I/O commands in a thread pool and calls the handler 18 /// upon completion. 19 shared final class AsyncFile 20 { 21 nothrow: 22 private: 23 EventLoop m_evLoop; 24 bool m_busy; 25 bool m_error; 26 FileReadyHandler m_handler; 27 FileCmdInfo m_cmdInfo; 28 StatusInfo m_status; 29 ulong m_cursorOffset; 30 Thread m_owner; 31 File* m_file; 32 33 public: 34 /// 35 this(EventLoop evl) { 36 m_evLoop = cast(shared) evl; 37 m_cmdInfo.ready = new shared AsyncSignal(cast(EventLoop)m_evLoop); 38 m_cmdInfo.ready.run(cast(void delegate())&handler); 39 m_owner = cast(shared)Thread.getThis(); 40 m_file = cast(shared)new File; 41 try m_cmdInfo.mtx = cast(shared) new Mutex; catch (Exception) {} 42 } 43 44 /// Cleans up the underlying resources. todo: make this dispose? 45 bool kill() { 46 scope(failure) assert(false); 47 if (file.isOpen) 48 (cast()*m_file).close(); 49 (cast()*m_file).__dtor(); 50 m_cmdInfo.ready.kill(); 51 m_cmdInfo = typeof(m_cmdInfo).init; 52 m_handler = typeof(m_handler).init; 53 return true; 54 } 55 56 /// 57 synchronized @property StatusInfo status() const 58 { 59 return cast(StatusInfo) m_status; 60 } 61 62 /// 63 @property string error() const 64 { 65 return status.text; 66 } 67 68 /// Retrieve the buffer from the last command. Must be called upon completion. 69 shared(ubyte[]) buffer() { 70 try synchronized(m_cmdInfo.mtx) 71 return m_cmdInfo.buffer; 72 catch (Exception) {} 73 return null; 74 } 75 76 /// The current offset updated after the command execution 77 synchronized @property ulong offset() const { 78 return m_cursorOffset; 79 } 80 81 /// Sets the handler called by the owner thread's event loop after the command is completed. 82 shared(typeof(this)) onReady(void delegate() del) { 83 shared FileReadyHandler handler; 84 handler.del = del; 85 handler.ctxt = this; 86 try synchronized(this) m_handler = handler; catch (Exception) {} 87 return this; 88 } 89 90 /// Creates a new buffer with the specified length and uses it to read the 91 /// file data at the specified path starting at the specified offset byte. 92 bool read(string file_path, size_t len = 128, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) { 93 return read(file_path, new shared ubyte[len], off, create_if_not_exists, truncate_if_exists); 94 } 95 96 /// Reads the file into the buffer starting at offset byte position. 97 bool read(string file_path, shared ubyte[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 98 in { 99 assert(!m_busy, "File is busy or closed"); 100 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 101 } 102 body { 103 if (buffer.length == 0) { 104 try m_handler(); catch (Exception) { } 105 return true; 106 } 107 try { 108 static string last_path; 109 static string last_native_path; 110 if (last_path == file_path) 111 file_path = last_native_path; 112 else { 113 last_path = file_path; 114 file_path = Path(file_path).toNativeString(); 115 last_native_path = file_path; 116 } 117 118 bool flag; 119 if (create_if_not_exists && !m_file && !exists(file_path)) 120 flag = true; 121 else if (truncate_if_exists && (m_file || exists(file_path))) 122 flag = true; 123 if (flag) // touch 124 { 125 if (file.isOpen) 126 file.close(); 127 import core.stdc.stdio; 128 import std..string : toStringz; 129 FILE * f = fopen(file_path.toStringz, "w\0".ptr); 130 fclose(f); 131 } 132 133 if (!file.isOpen || m_cmdInfo.command != FileCmd.READ) { 134 auto tmp = File(file_path, "rb"); 135 file = tmp; 136 m_cmdInfo.command = FileCmd.READ; 137 } 138 if (buffer.length <= 65_536) { 139 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 140 141 if (off != -1) 142 file.seek(cast(long)off); 143 ubyte[] res; 144 res = file.rawRead(cast(ubyte[])buffer); 145 if (res) 146 m_cursorOffset = cast(shared(ulong)) (off + res.length); 147 m_handler(); 148 return true; 149 } 150 } catch (Exception e) { 151 auto status = StatusInfo.init; 152 status.code = Status.ERROR; 153 try status.text = "Error in read, " ~ e.toString(); catch (Exception) {} 154 m_status = cast(shared) status; 155 try m_handler(); catch (Exception) { } 156 return false; 157 } 158 try synchronized(m_cmdInfo.mtx) { 159 m_cmdInfo.buffer = buffer; 160 m_cmdInfo.command = FileCmd.READ; 161 filePath = Path(file_path); 162 } catch (Exception) {} 163 offset = off; 164 165 return doOffThread({ process(this); }); 166 } 167 168 /// Writes the data from the buffer into the file at the specified path starting at the 169 /// given offset byte position. 170 bool write(string file_path, shared const(ubyte)[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 171 in { 172 assert(!m_busy, "File is busy or closed"); 173 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 174 } 175 body { 176 if (buffer.length == 0) { 177 try m_handler(); catch (Exception) { return false; } 178 return true; 179 } 180 try { 181 182 static string last_path; 183 static string last_native_path; 184 if (last_path == file_path) 185 file_path = last_native_path; 186 else { 187 last_path = file_path; 188 file_path = Path(file_path).toNativeString(); 189 last_native_path = file_path; 190 } 191 192 bool flag; 193 if (create_if_not_exists && !m_file && !exists(file_path)) 194 flag = true; 195 else if (truncate_if_exists && (m_file || exists(file_path))) 196 flag = true; 197 if (flag) // touch 198 { 199 if (file.isOpen) 200 file.close(); 201 import core.stdc.stdio; 202 import std..string : toStringz; 203 FILE * f = fopen(file_path.toStringz, "w\0".ptr); 204 fclose(f); 205 } 206 207 if (!file.isOpen || m_cmdInfo.command != FileCmd.WRITE) { 208 auto tmp = File(file_path, "r+b"); 209 file = tmp; 210 m_cmdInfo.command = FileCmd.WRITE; 211 } 212 213 if (buffer.length <= 65_536) { 214 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 215 if (off != -1) 216 file.seek(cast(long)off); 217 file.rawWrite(cast(ubyte[])buffer); 218 file.flush(); 219 m_cursorOffset = cast(shared(ulong)) (off + buffer.length); 220 m_handler(); 221 return true; 222 } 223 } catch (Exception e) { 224 auto status = StatusInfo.init; 225 status.code = Status.ERROR; 226 try status.text = "Error in write, " ~ e.toString(); catch (Exception) {} 227 m_status = cast(shared) status; 228 return false; 229 } 230 try synchronized(m_cmdInfo.mtx) { 231 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 232 m_cmdInfo.command = FileCmd.WRITE; 233 filePath = Path(file_path); 234 } catch (Exception) {} 235 offset = off; 236 237 return doOffThread({ process(this); }); 238 } 239 240 /// Appends the data from the buffer into a file at the specified path. 241 bool append(string file_path, shared ubyte[] buffer, bool create_if_not_exists = true, bool truncate_if_exists = false) 242 in { 243 assert(!m_busy, "File is busy or closed"); 244 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 245 } 246 body { 247 if (buffer.length == 0) { 248 try m_handler(); catch (Exception) { return false; } 249 return true; 250 } 251 try { 252 static string last_path; 253 static string last_native_path; 254 if (last_path == file_path) 255 file_path = last_native_path; 256 else { 257 last_path = file_path; 258 file_path = Path(file_path).toNativeString(); 259 last_native_path = file_path; 260 } 261 262 bool flag; 263 if (create_if_not_exists && !m_file && !exists(file_path)) 264 flag = true; 265 else if (truncate_if_exists && (m_file || exists(file_path))) 266 flag = true; 267 if (flag) // touch 268 { 269 if (file.isOpen) 270 file.close(); 271 import core.stdc.stdio; 272 import std..string : toStringz; 273 FILE * f = fopen(file_path.toStringz, "w\0".ptr); 274 fclose(f); 275 } 276 277 if (!file.isOpen || m_cmdInfo.command != FileCmd.APPEND) { 278 auto tmp = File(file_path, "a+"); 279 file = tmp; 280 m_cmdInfo.command = FileCmd.APPEND; 281 } 282 if (buffer.length < 65_536) { 283 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 284 file.rawWrite(cast(ubyte[]) buffer); 285 m_cursorOffset = cast(shared(ulong)) file.size(); 286 file.flush(); 287 m_handler(); 288 return true; 289 } 290 } catch (Exception e) { 291 auto status = StatusInfo.init; 292 status.code = Status.ERROR; 293 try status.text = "Error in append, " ~ e.toString(); catch (Exception) {} 294 m_status = cast(shared) status; 295 return false; 296 } 297 try synchronized(m_cmdInfo.mtx) { 298 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 299 m_cmdInfo.command = FileCmd.APPEND; 300 filePath = Path(file_path); 301 } catch (Exception) {} 302 303 return doOffThread({ process(this); }); 304 } 305 306 package: 307 308 synchronized @property FileCmdInfo cmdInfo() { 309 return m_cmdInfo; 310 } 311 312 synchronized @property Path filePath() { 313 return cast(Path) m_cmdInfo.filePath; 314 } 315 316 synchronized @property bool waiting() const { 317 return cast(bool) m_busy; 318 } 319 320 synchronized @property void filePath(Path file_path) { 321 m_cmdInfo.filePath = cast(shared) file_path; 322 } 323 324 synchronized @property File file() { 325 scope(failure) assert(false); 326 return cast()*m_file; 327 } 328 329 synchronized @property void file(ref File f) { 330 try (cast()*m_file).opAssign(f); 331 catch (Exception e) { 332 trace(e.msg); 333 } 334 } 335 336 synchronized @property void status(StatusInfo stat) { 337 m_status = cast(shared) stat; 338 } 339 340 synchronized @property void offset(ulong val) { 341 m_cursorOffset = cast(shared) val; 342 } 343 344 synchronized @property void waiting(bool b) { 345 m_busy = cast(shared) b; 346 } 347 348 void handler() { 349 try m_handler(); 350 catch (Throwable e) { 351 trace("Failed to send command. ", e.toString()); 352 } 353 } 354 } 355 356 package enum FileCmd { 357 READ, 358 WRITE, 359 APPEND 360 } 361 362 package shared struct FileCmdInfo 363 { 364 FileCmd command; 365 Path filePath; 366 ubyte[] buffer; 367 AsyncSignal ready; 368 AsyncFile file; 369 Mutex mtx; // for buffer writing 370 } 371 372 package shared struct FileReadyHandler { 373 AsyncFile ctxt; 374 void delegate() del; 375 376 void opCall() { 377 assert(ctxt !is null); 378 ctxt.waiting = false; 379 del(); 380 return; 381 } 382 } 383 384 private void process(shared AsyncFile ctxt) { 385 auto cmdInfo = ctxt.cmdInfo; 386 auto mutex = cmdInfo.mtx; 387 FileCmd cmd; 388 cmd = cmdInfo.command; 389 390 try final switch (cmd) 391 { 392 case FileCmd.READ: 393 File file = ctxt.file; 394 if (ctxt.offset != -1) 395 file.seek(cast(long) ctxt.offset); 396 ubyte[] res; 397 synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer); 398 if (res) 399 ctxt.offset = cast(ulong) (ctxt.offset + res.length); 400 401 break; 402 403 case FileCmd.WRITE: 404 405 File file = cast(File) ctxt.file; 406 if (ctxt.offset != -1) 407 file.seek(cast(long) ctxt.offset); 408 synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); 409 file.flush(); 410 ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length); 411 break; 412 413 case FileCmd.APPEND: 414 File file = cast(File) ctxt.file; 415 synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); 416 ctxt.offset = cast(ulong) file.size(); 417 file.flush(); 418 break; 419 } catch (Throwable e) { 420 auto status = StatusInfo.init; 421 status.code = Status.ERROR; 422 try status.text = "Error in " ~ cmd.to!string ~ ", " ~ e.toString(); catch (Exception) {} 423 ctxt.status = status; 424 } 425 426 try cmdInfo.ready.trigger(getThreadEventLoop()); 427 catch (Throwable e) { 428 trace("AsyncFile Thread Error: ", e.toString()); 429 auto status = StatusInfo.init; 430 status.code = Status.ERROR; 431 try status.text = e.toString(); catch (Exception) {} 432 ctxt.status = status; 433 } 434 }