1 module libasync.file;
2 import libasync.types;
3 import libasync.events;
4 import core.thread : Thread, ThreadGroup;
5 import core.sync.mutex;
6 import core.sync.condition;
7 import std.stdio;
8 import core.atomic;
9 import libasync.internals.path;
10 import libasync.threads;
11 
12 /// Runs all blocking file I/O commands in a thread pool and calls the handler
13 /// upon completion.
14 shared final class AsyncFile
15 {
16 nothrow:
17 private:
18 	EventLoop m_evLoop;
19 	bool m_busy;
20 	bool m_error;
21 	FileReadyHandler m_handler;
22 	FileCmdInfo m_cmdInfo;
23 	StatusInfo m_status;
24 	ulong m_cursorOffset;
25 	Thread m_owner;
26 
27 public:
28 	this(EventLoop evl) {
29 		m_evLoop = cast(shared) evl;
30 		m_cmdInfo.ready = new shared AsyncSignal(cast(EventLoop)m_evLoop);
31 		m_cmdInfo.ready.run(cast(void delegate())&handler);
32 		m_owner = cast(shared)Thread.getThis();
33 		try m_cmdInfo.mtx = cast(shared) new Mutex; catch {}
34 	}
35 
36 	/// Cleans up the underlying resources. todo: make this dispose?
37 	bool kill() {
38 		m_cmdInfo.ready.kill();
39 		return true;
40 	}
41 
42 	synchronized @property StatusInfo status() const
43 	{
44 		return cast(StatusInfo) m_status;
45 	}
46 
47 	@property string error() const
48 	{
49 		return status.text;
50 	}
51 
52 	/// Retrieve the buffer from the last command. Must be called upon completion.
53 	shared(ubyte[]) buffer() {
54 		try synchronized(m_cmdInfo.mtx)
55 			return m_cmdInfo.buffer;
56 		catch {}
57 		return null;
58 	}
59 
60 	/// The current offset updated after the command execution
61 	synchronized @property ulong offset() const {
62 		return m_cursorOffset;
63 	}
64 
65 	/// Sets the handler called by the owner thread's event loop after the command is completed.
66 	shared(typeof(this)) onReady(void delegate() del) {
67 		shared FileReadyHandler handler;
68 		handler.del = del;
69 		handler.ctxt = this;
70 		try synchronized(this) m_handler = handler; catch {}
71 		return this;
72 	}
73 
74 	/// Creates a new buffer with the specified length and uses it to read the
75 	/// file data at the specified path starting at the specified offset byte.
76 	bool read(string file_path, size_t len = 128, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) {
77 		return read(file_path, new shared ubyte[len], off, create_if_not_exists, truncate_if_exists);
78 	}
79 
80 	/// Reads the file into the buffer starting at offset byte position.
81 	bool read(string file_path, shared ubyte[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 
82 	in { 
83 		assert(!m_busy, "File is busy or closed");
84 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
85 	}
86 	body {
87 		try synchronized(m_cmdInfo.mtx) { 
88 			m_cmdInfo.buffer = buffer;
89 			m_cmdInfo.command = FileCmd.READ;
90 			m_cmdInfo.create_if_not_exists = create_if_not_exists;
91 			m_cmdInfo.truncate_if_exists = truncate_if_exists;
92 			filePath = Path(file_path);
93 		} catch {}
94 		offset = off;
95 		return sendCommand();
96 	}
97 
98 	/// Writes the data from the buffer into the file at the specified path starting at the
99 	/// given offset byte position.
100 	bool write(string file_path, shared const(ubyte)[] buffer, ulong off = -1, bool create_if_not_exists = true, bool truncate_if_exists = false) 
101 	in { 
102 		assert(!m_busy, "File is busy or closed"); 
103 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
104 	}
105 	body {
106 		try synchronized(m_cmdInfo.mtx) { 
107 			m_cmdInfo.buffer = cast(shared(ubyte[])) buffer;
108 			m_cmdInfo.command = FileCmd.WRITE;
109 			m_cmdInfo.create_if_not_exists = create_if_not_exists;
110 			m_cmdInfo.truncate_if_exists = truncate_if_exists;
111 			filePath = Path(file_path);
112 		} catch {}
113 		offset = off;
114 		return sendCommand();
115 
116 	}
117 
118 	/// Appends the data from the buffer into a file at the specified path.
119 	bool append(string file_path, shared ubyte[] buffer, bool create_if_not_exists = true, bool truncate_if_exists = false)
120 	in {
121 		assert(!m_busy, "File is busy or closed");
122 		assert(m_handler.ctxt !is null, "AsyncFile must be run before being operated on.");
123 	}
124 	body {
125 		try synchronized(m_cmdInfo.mtx) { 
126 			m_cmdInfo.command = FileCmd.APPEND;
127 			m_cmdInfo.buffer = buffer;
128 			m_cmdInfo.create_if_not_exists = create_if_not_exists;
129 			m_cmdInfo.truncate_if_exists = truncate_if_exists;
130 			filePath = Path(file_path);
131 		} catch {}
132 		return sendCommand();
133 	}
134 
135 	private bool sendCommand() 
136 	in { assert(!waiting, "File is busy or closed"); }
137 	body {
138 		waiting = true;
139 		m_error = false;
140 		status = StatusInfo.init;
141 		
142 		Waiter cmd_handler;
143 
144 		try {
145 			cmd_handler = popWaiter();
146 		
147 		} catch (Throwable e) {
148 			import std.stdio;
149 			try {
150 				status = StatusInfo(Status.ERROR, e.toString());
151 				m_error = true;
152 			} catch {}
153 			
154 			return false;
155 			
156 		}
157 		assert(cmd_handler.cond);
158 		
159 		m_cmdInfo.waiter = cast(shared) cmd_handler;
160 		try {
161 			synchronized(gs_wlock)
162 				gs_jobs.insert(CommandInfo(CmdInfoType.FILE, cast(void*) this));
163 			cmd_handler.cond.notifyAll();
164 		}
165 		catch (Exception e){
166 			import std.stdio;
167 			try writeln("Exception occured notifying foreign thread: ", e); catch {}
168 		}
169 		return true;
170 	}
171 package:
172 
173 	synchronized @property FileCmdInfo cmdInfo() {
174 		return m_cmdInfo;
175 	}
176 
177 	synchronized @property Path filePath() {
178 		return cast(Path) m_cmdInfo.filePath;
179 	}
180 
181 	synchronized @property bool waiting() const {
182 		return cast(bool) m_busy;
183 	}
184 
185 	synchronized @property void filePath(Path file_path) {
186 		m_cmdInfo.filePath = cast(shared) file_path;
187 	}
188 	
189 	synchronized @property void status(StatusInfo stat) {
190 		m_status = cast(shared) stat;
191 	}
192 	
193 	synchronized @property void offset(ulong val) {
194 		m_cursorOffset = cast(shared) val;
195 	}
196 
197 	synchronized @property void waiting(bool b) {
198 		m_busy = cast(shared) b;
199 	}
200 
201 	void handler() {
202 		try m_handler();
203 		catch (Throwable e) {
204 			import std.stdio : writeln;
205 			try writeln("Failed to send command. ", e.toString()); catch {}
206 		}
207 	}
208 }
209 
210 package enum FileCmd {
211 	READ,
212 	WRITE,
213 	APPEND
214 }
215 
216 package shared struct FileCmdInfo
217 {
218 	FileCmd command;
219 	Path filePath;
220 	ubyte[] buffer;
221 	Waiter waiter;
222 	AsyncSignal ready;
223 	AsyncFile file;
224 	Mutex mtx; // for buffer writing
225 	bool truncate_if_exists;
226 	bool create_if_not_exists;
227 }
228 
229 package shared struct FileReadyHandler {
230 	AsyncFile ctxt;
231 	void delegate() del;
232 	
233 	void opCall() {
234 		assert(ctxt !is null);
235 		ctxt.waiting = false;
236 		del();
237 		return;
238 	}
239 }