1 module future;
2 
3 import core.thread;
4 import std.datetime;
5 import std.concurrency : spawn;
6 import std.experimental.logger;
7 import std.typetuple;
8 import std.functional : compose;
9 import std.typecons;
10 import std.traits;
11 import std.range;
12 import std.algorithm;
13 import std.format;
14 import std.conv;
15 import universal;
16 import universal.extras;
17 import universal.meta;
18 
19 alias defaultWait = Thread.yield;
20 
21 @("EXAMPLES") unittest
22 {
23   /*
24     create a future object to be assigned a value later
25   */
26   auto ex1 = pending!int;
27   assert(ex1.isPending);
28   ex1.fulfill(6);
29   assert(ex1.isReady);
30   assert(ex1.result == 6);
31   /*
32     create a trivial future object out of a value
33     (equivalent to haskell's `pure` or `return`)
34   */
35   auto ex1a = ready!int(2);
36   assert(ex1a.isReady);
37   assert(ex1a.result == 2);
38 
39   /*
40     then: map over futures
41 
42     takes a function from A to B, and lifts it into a function from Future!A to Future!B
43 
44     (equivalent to to haskell's `fmap`)
45   */
46   auto ex2a = pending!int;
47   auto ex2b = ex2a.then!((int x) => x * 2);
48   assert(ex2a.isPending && ex2b.isPending);
49   ex2a.fulfill(3);
50   assert(ex2a.isReady && ex2b.isReady);
51   assert(ex2b.result == 6);
52 
53   /*
54     when: maps a tuple of futures to a future tuple 
55       completes when all complete
56 
57     (the logical AND of Futures)
58   */
59   auto ex3a = pending!int;
60   auto ex3b = pending!int;
61   auto ex3c = pending!int;
62   auto ex3 = when(ex3a, ex3b, ex3c);
63   assert(ex3.isPending);
64   ex3a.fulfill = 1;
65   ex3b.fulfill = 2;
66   assert(ex3.isPending);
67   ex3c.fulfill = 3;
68   assert(ex3.isReady);
69   assert(ex3.result == tuple(1,2,3));
70 
71   /*
72     race: maps a tuple of futures to a future union
73       inhabited by the first of the futures to complete
74 
75     (the logical OR of Futures)
76   */
77   auto ex4a = pending!int;
78   auto ex4b = pending!(Tuple!(int, int));
79   auto ex4 = race(ex4a, ex4b);
80   assert(ex4.isPending);
81   ex4b.fulfill(t_(1,2));
82   assert(ex4.isReady);
83   assert(ex4.result.visit!(
84     (x) => x,
85     (x,y) => x + y
86   ) == 3);
87 
88   /*
89     note: when and race can both accept named fields as template arguments
90   */
91 
92   /*
93     async/await: multithreaded function calls
94   */
95   auto ex5a = tuple(3, 2)[].async!((int x, int y) 
96   { Thread.sleep(250.msecs); return x * y; }
97   );
98   assert(ex5a.isPending);
99   ex5a.await;
100   assert(ex5a.isReady);
101   /*
102     async will wrap the function's return value in a Result (see universal.extras.errors)
103   */
104   assert(ex5a.result.visit!(
105     q{failure}, _ => 0,
106     q{success}, x => x,
107   ) == 6);
108   /*
109     which allows errors and exceptions to be easily recovered
110     whereas normally, if a thread throws, it dies silently.
111   */
112   auto ex5b = async!((){ throw new Exception("!"); });
113   auto ex5c = async!((){ assert(0, "!"); });
114   assert(ex5b.await.result.failure.exception.msg == "!");
115   assert(ex5c.await.result.failure.error.msg == "!");
116   /*
117     by the way, functions that return void can have their result visited with no arguments
118   */
119   assert(async!((){}).await.result.visit!(
120     q{failure}, _ => false,
121     q{success}, () => true,
122   ));
123 
124   /*
125     sync: flattens nested futures into one future
126       the new future waits until both nested futures are complete
127       then forwards the result from the inner future
128 
129     in other words, it is a function from Future!(Future!A) to Future!A
130 
131     (equivalent to haskell's `join`)
132   */
133   auto ex6a = pending!(shared(Future!(int)));
134   auto ex6 = sync(ex6a);
135   assert(ex6.isPending);
136   ex6a.fulfill(pending!int);
137   assert(ex6.isPending);
138   ex6a.result.fulfill(6);
139   assert(ex6.isReady);
140   assert(ex6.result == 6);
141 
142   /*
143     next: chains the fulfillment of one future into the launching of another
144       enables comfortable future sequencing
145 
146     takes a function from A to Future!B and lifts it into a function from Future!A to Future!B
147     it is equivalent to calling `then` followed by `sync`
148 
149     (equivalent to haskell's `bind` or `>>=`)
150   */
151   auto ex7a = pending!(int);
152   auto ex7b = ex7a.next!(async!((int i) => i));
153   auto ex7c = ex7a.then!(async!((int i) => i));
154   assert(ex7b.isPending && ex7c.isPending);
155   ex7a.fulfill(6);
156   ex7b.await; ex7c.await;
157   assert(ex7b.isReady && ex7c.isReady);
158   assert(ex7a.result == 6);
159   assert(ex7b.result.success == 6);
160   assert(ex7c.result.await.result.success == 6);
161 }
162 
163 static:
164 
165 template Future(A)
166 {
167   final shared class Future
168   {
169     alias Eventual = A;
170 
171     private:
172     void delegate(A)[] _onFulfill;
173     A _result;
174     bool _ready;
175   }
176 }
177 enum isFuture(F) = is(F == Future!A, A);
178 
179 template pending(A)
180 {
181   shared(Future!A) pending()
182   { return new typeof(return); }
183 }
184 template ready(A)
185 {
186 	shared(Future!A) ready(A a)
187   { return (new typeof(return)).fulfill(a); }
188 }
189 template result(A)
190 {
191   A result(shared(Future!A) future) 
192   in{ assert(future.isReady); }
193   body{ return *cast(A*)&future._result; }
194 }
195 template fulfill(A)
196 {
197   shared(Future!A) fulfill(shared(Future!A) future, A a)
198   {
199       synchronized(future)
200         if(future.isReady)
201           return future;
202         else
203         {
204           *cast(A*)&future._result = a;
205           future._ready = true;
206         }
207 
208       foreach(cb; future._onFulfill)
209         cb(future.result);
210 
211       future._onFulfill = [];
212 
213       return future;
214   }
215 }
216 template onFulfill(A)
217 {
218   void onFulfill(shared(Future!A) future, void delegate(A) callback)
219   {
220     synchronized(future)
221       if(future.isReady) // REVIEW could this lead to deadlock? can that be analyzed with π calculus?
222         callback(future.result);
223       else
224         future._onFulfill ~= callback;
225   }
226 	void onFulfill(shared(Future!A) future, void delegate() callback)
227 	{
228 		return future.onFulfill((A _){ callback(); });
229 	}
230 }
231 
232 alias pending()  = pending!Unit;
233 
234 template isReady(A)
235 {
236   bool isReady(shared(Future!A) future)
237   { return future._ready; }
238 }
239 template isPending(A)
240 {
241   bool isPending(shared(Future!A) future)
242   { return ! future.isReady; }
243 }
244 
245 template async(alias f) // applied
246 {
247   template async(A...)
248   {
249     alias g = tryCatch!f;
250     alias B = typeof(g(A.init));
251 
252     static void run(shared(Future!B) future, A args)
253     { future.fulfill(g(args)); }
254 
255     shared(Future!B) async(A args)
256     {
257       auto future = pending!B;
258 
259       spawn(&run, future, args);
260 
261       return future;
262     }
263   }
264 }
265 template await(alias wait = defaultWait)
266 {
267   template await(A)
268   {
269     shared(Future!A) await(shared(Future!A) future)
270     {
271       while(future.isPending)
272         wait();
273 
274       return future;
275     }
276 
277     shared(Future!A) await(shared(Future!A) future, Duration timeout)
278     {
279 			auto start = Clock.currTime;
280 
281       while(future.isPending && Clock.currTime < start + timeout)
282         wait();
283 
284       return future;
285     }
286   }
287 }
288 
289 template next(alias f) // applied
290 {
291   template next(A, B...)
292   {
293     alias C = typeof(apply!f(A.init, B.init).result);
294 
295     shared(Future!C) next(shared(Future!A) future, B args)
296     { return future.then!f(args).sync; }
297   }
298 }
299 template sync(A)
300 {
301   shared(Future!A) sync(shared(Future!(shared(Future!A))) future)
302   {
303     auto synced = pending!A;
304 
305     future.onFulfill((shared(Future!A) nextFuture)
306     { nextFuture.onFulfill((A a){ synced.fulfill(a); }); }
307     );
308 
309     return synced;
310   }
311 }
312 template then(alias f) // applied
313 {
314   template then(A, B...)
315   {
316     alias C = typeof(apply!f(A.init, B.init));
317 
318     shared(Future!C) then(shared(Future!A) future, B args)
319     {
320       auto thenFuture = pending!C;
321 
322       future.onFulfill((A result)
323       { thenFuture.fulfill(apply!f(result, args)); }
324       );
325 
326       return thenFuture;
327     }
328   }
329 }
330 
331 template when(A) if(isTuple!A && allSatisfy!(isFuture, A.Types)) 
332 {
333 	alias EventualType(F) = F.Eventual;
334 	alias When = Tuple!(staticMap!(EventualType, A.Types));
335 
336 	shared(Future!When) when(A futures)
337 	{
338 		auto allFuture = pending!When;
339 
340 		foreach(i, future; futures)
341 			future.onFulfill((When.Types[i])
342 			{
343 				if(all(futures.tlift!isReady[].only))
344 					allFuture.fulfill(
345 						futures.tlift!result
346 					);
347 			}
348 			);
349 
350 		return allFuture;
351 	}
352 }
353 template when(Futures...) if(allSatisfy!(isFuture, Futures)) 
354 {
355 	auto when(Futures futures)
356 	{
357 		return .when(futures.tuple);
358 	}
359 }
360 
361 template race(A) if(isTuple!A && allSatisfy!(isFuture, A.Types)) 
362 {
363 	alias EventualType(F) = F.Eventual;
364 	alias Race = Union!(staticMap!(EventualType, A.Types));
365 
366 	shared(Future!Race) race(A futures)
367 	{
368 		auto anyFuture = pending!Race;
369 
370 		foreach(i, future; futures)
371 			future.onFulfill((Race.Union.Args!i result)
372 			{ anyFuture.fulfill(Race().inject!i(result)); }
373 			);
374 
375 		return anyFuture;
376 	}
377 }
378 template race(Futures...) if(allSatisfy!(isFuture, Futures)) 
379 {
380 	auto race(Futures futures)
381 	{
382 		return .race(futures.tuple);
383 	}
384 }