1 module libasync.file; 2 import libasync.types; 3 import libasync.events; 4 import core.thread : Thread, ThreadGroup; 5 import core.sync.mutex; 6 import core.sync.condition; 7 import std.stdio; 8 import core.atomic; 9 import libasync.internals.path; 10 import libasync.threads; 11 12 /// Runs all blocking file I/O commands in a thread pool and calls the handler 13 /// upon completion. 14 shared final class AsyncFile 15 { 16 nothrow: 17 private: 18 EventLoop m_evLoop; 19 bool m_busy; 20 bool m_error; 21 FileReadyHandler m_handler; 22 FileCmdInfo m_cmdInfo; 23 StatusInfo m_status; 24 ulong m_cursorOffset; 25 Thread m_owner; 26 27 public: 28 this(EventLoop evl) { 29 m_evLoop = cast(shared) evl; 30 m_cmdInfo.ready = new shared AsyncSignal(cast(EventLoop)m_evLoop); 31 m_cmdInfo.ready.run(cast(void delegate())&handler); 32 m_owner = cast(shared)Thread.getThis(); 33 try m_cmdInfo.mtx = cast(shared) new Mutex; catch {} 34 } 35 36 /// Cleans up the underlying resources. todo: make this dispose? 37 bool kill() { 38 m_cmdInfo.ready.kill(); 39 return true; 40 } 41 42 synchronized @property StatusInfo status() const 43 { 44 return cast(StatusInfo) m_status; 45 } 46 47 @property string error() const 48 { 49 return status.text; 50 } 51 52 /// Retrieve the buffer from the last command. Must be called upon completion. 53 shared(ubyte[]) buffer() { 54 try synchronized(m_cmdInfo.mtx) 55 return m_cmdInfo.buffer; 56 catch {} 57 return null; 58 } 59 60 /// The current offset updated after the command execution 61 synchronized @property ulong offset() const { 62 return m_cursorOffset; 63 } 64 65 /// Sets the handler called by the owner thread's event loop after the command is completed. 66 shared(typeof(this)) onReady(void delegate() del) { 67 shared FileReadyHandler handler; 68 handler.del = del; 69 handler.ctxt = this; 70 try synchronized(this) m_handler = handler; catch {} 71 return this; 72 } 73 74 /// Creates a new buffer with the specified length and uses it to read the 75 /// file data at the specified path starting at the specified offset byte. 76 bool read(string file_path, size_t len = 128, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) { 77 return read(file_path, new shared ubyte[len], off, create_if_not_exists, truncate_if_exists); 78 } 79 80 /// Reads the file into the buffer starting at offset byte position. 81 bool read(string file_path, shared ubyte[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 82 in { 83 assert(!m_busy, "File is busy or closed"); 84 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 85 } 86 body { 87 try synchronized(m_cmdInfo.mtx) { 88 m_cmdInfo.buffer = buffer; 89 m_cmdInfo.command = FileCmd.READ; 90 m_cmdInfo.create_if_not_exists = create_if_not_exists; 91 m_cmdInfo.truncate_if_exists = truncate_if_exists; 92 filePath = Path(file_path); 93 } catch {} 94 offset = off; 95 return sendCommand(); 96 } 97 98 /// Writes the data from the buffer into the file at the specified path starting at the 99 /// given offset byte position. 100 bool write(string file_path, shared const(ubyte)[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 101 in { 102 assert(!m_busy, "File is busy or closed"); 103 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 104 } 105 body { 106 try synchronized(m_cmdInfo.mtx) { 107 m_cmdInfo.buffer = cast(shared(ubyte[])) buffer; 108 m_cmdInfo.command = FileCmd.WRITE; 109 m_cmdInfo.create_if_not_exists = create_if_not_exists; 110 m_cmdInfo.truncate_if_exists = truncate_if_exists; 111 filePath = Path(file_path); 112 } catch {} 113 offset = off; 114 return sendCommand(); 115 116 } 117 118 /// Appends the data from the buffer into a file at the specified path. 119 bool append(string file_path, shared ubyte[] buffer, bool create_if_not_exists = true, bool truncate_if_exists = false) 120 in { 121 assert(!m_busy, "File is busy or closed"); 122 assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on."); 123 } 124 body { 125 try synchronized(m_cmdInfo.mtx) { 126 m_cmdInfo.command = FileCmd.APPEND; 127 m_cmdInfo.buffer = buffer; 128 m_cmdInfo.create_if_not_exists = create_if_not_exists; 129 m_cmdInfo.truncate_if_exists = truncate_if_exists; 130 filePath = Path(file_path); 131 } catch {} 132 return sendCommand(); 133 } 134 135 private bool sendCommand() 136 in { assert(!waiting, "File is busy or closed"); } 137 body { 138 waiting = true; 139 m_error = false; 140 status = StatusInfo.init; 141 142 Waiter cmd_handler; 143 144 try { 145 cmd_handler = popWaiter(); 146 147 } catch (Throwable e) { 148 import std.stdio; 149 try { 150 status = StatusInfo(Status.ERROR, e.toString()); 151 m_error = true; 152 } catch {} 153 154 return false; 155 156 } 157 assert(cmd_handler.cond); 158 159 m_cmdInfo.waiter = cast(shared) cmd_handler; 160 try { 161 synchronized(gs_wlock) 162 gs_jobs.insert(CommandInfo(CmdInfoType.FILE, cast(void*) this)); 163 cmd_handler.cond.notifyAll(); 164 } 165 catch (Exception e){ 166 import std.stdio; 167 try writeln("Exception occured notifying foreign thread: ", e); catch {} 168 } 169 return true; 170 } 171 package: 172 173 synchronized @property FileCmdInfo cmdInfo() { 174 return m_cmdInfo; 175 } 176 177 synchronized @property Path filePath() { 178 return cast(Path) m_cmdInfo.filePath; 179 } 180 181 synchronized @property bool waiting() const { 182 return cast(bool) m_busy; 183 } 184 185 synchronized @property void filePath(Path file_path) { 186 m_cmdInfo.filePath = cast(shared) file_path; 187 } 188 189 synchronized @property void status(StatusInfo stat) { 190 m_status = cast(shared) stat; 191 } 192 193 synchronized @property void offset(ulong val) { 194 m_cursorOffset = cast(shared) val; 195 } 196 197 synchronized @property void waiting(bool b) { 198 m_busy = cast(shared) b; 199 } 200 201 void handler() { 202 try m_handler(); 203 catch (Throwable e) { 204 import std.stdio : writeln; 205 try writeln("Failed to send command. ", e.toString()); catch {} 206 } 207 } 208 } 209 210 package enum FileCmd { 211 READ, 212 WRITE, 213 APPEND 214 } 215 216 package shared struct FileCmdInfo 217 { 218 FileCmd command; 219 Path filePath; 220 ubyte[] buffer; 221 Waiter waiter; 222 AsyncSignal ready; 223 AsyncFile file; 224 Mutex mtx; // for buffer writing 225 bool truncate_if_exists; 226 bool create_if_not_exists; 227 } 228 229 package shared struct FileReadyHandler { 230 AsyncFile ctxt; 231 void delegate() del; 232 233 void opCall() { 234 assert(ctxt !is null); 235 ctxt.waiting = false; 236 del(); 237 return; 238 } 239 }