1 module future; 2 3 import core.thread; 4 import std.datetime; 5 import std.concurrency : spawn; 6 import std.experimental.logger; 7 import std.typetuple; 8 import std.functional : compose; 9 import std.typecons; 10 import std.traits; 11 import std.range; 12 import std.algorithm; 13 import std.format; 14 import std.conv; 15 import universal; 16 import universal.extras; 17 import universal.meta; 18 19 alias defaultWait = Thread.yield; 20 21 @("EXAMPLES") unittest 22 { 23 /* 24 create a future object to be assigned a value later 25 */ 26 auto ex1 = pending!int; 27 assert(ex1.isPending); 28 ex1.fulfill(6); 29 assert(ex1.isReady); 30 assert(ex1.result == 6); 31 /* 32 create a trivial future object out of a value 33 (equivalent to haskell's `pure` or `return`) 34 */ 35 auto ex1a = ready!int(2); 36 assert(ex1a.isReady); 37 assert(ex1a.result == 2); 38 39 /* 40 then: map over futures 41 42 takes a function from A to B, and lifts it into a function from Future!A to Future!B 43 44 (equivalent to to haskell's `fmap`) 45 */ 46 auto ex2a = pending!int; 47 auto ex2b = ex2a.then!((int x) => x * 2); 48 assert(ex2a.isPending && ex2b.isPending); 49 ex2a.fulfill(3); 50 assert(ex2a.isReady && ex2b.isReady); 51 assert(ex2b.result == 6); 52 53 /* 54 when: maps a tuple of futures to a future tuple 55 completes when all complete 56 57 (the logical AND of Futures) 58 */ 59 auto ex3a = pending!int; 60 auto ex3b = pending!int; 61 auto ex3c = pending!int; 62 auto ex3 = when(ex3a, ex3b, ex3c); 63 assert(ex3.isPending); 64 ex3a.fulfill = 1; 65 ex3b.fulfill = 2; 66 assert(ex3.isPending); 67 ex3c.fulfill = 3; 68 assert(ex3.isReady); 69 assert(ex3.result == tuple(1,2,3)); 70 71 /* 72 race: maps a tuple of futures to a future union 73 inhabited by the first of the futures to complete 74 75 (the logical OR of Futures) 76 */ 77 auto ex4a = pending!int; 78 auto ex4b = pending!(Tuple!(int, int)); 79 auto ex4 = race(ex4a, ex4b); 80 assert(ex4.isPending); 81 ex4b.fulfill(t_(1,2)); 82 assert(ex4.isReady); 83 assert(ex4.result.visit!( 84 (x) => x, 85 (x,y) => x + y 86 ) == 3); 87 88 /* 89 note: when and race can both accept named fields as template arguments 90 */ 91 92 /* 93 async/await: multithreaded function calls 94 */ 95 auto ex5a = tuple(3, 2)[].async!((int x, int y) 96 { Thread.sleep(250.msecs); return x * y; } 97 ); 98 assert(ex5a.isPending); 99 ex5a.await; 100 assert(ex5a.isReady); 101 /* 102 async will wrap the function's return value in a Result (see universal.extras.errors) 103 */ 104 assert(ex5a.result.visit!( 105 q{failure}, _ => 0, 106 q{success}, x => x, 107 ) == 6); 108 /* 109 which allows errors and exceptions to be easily recovered 110 whereas normally, if a thread throws, it dies silently. 111 */ 112 auto ex5b = async!((){ throw new Exception("!"); }); 113 auto ex5c = async!((){ assert(0, "!"); }); 114 assert(ex5b.await.result.failure.exception.msg == "!"); 115 assert(ex5c.await.result.failure.error.msg == "!"); 116 /* 117 by the way, functions that return void can have their result visited with no arguments 118 */ 119 assert(async!((){}).await.result.visit!( 120 q{failure}, _ => false, 121 q{success}, () => true, 122 )); 123 124 /* 125 sync: flattens nested futures into one future 126 the new future waits until both nested futures are complete 127 then forwards the result from the inner future 128 129 in other words, it is a function from Future!(Future!A) to Future!A 130 131 (equivalent to haskell's `join`) 132 */ 133 auto ex6a = pending!(shared(Future!(int))); 134 auto ex6 = sync(ex6a); 135 assert(ex6.isPending); 136 ex6a.fulfill(pending!int); 137 assert(ex6.isPending); 138 ex6a.result.fulfill(6); 139 assert(ex6.isReady); 140 assert(ex6.result == 6); 141 142 /* 143 next: chains the fulfillment of one future into the launching of another 144 enables comfortable future sequencing 145 146 takes a function from A to Future!B and lifts it into a function from Future!A to Future!B 147 it is equivalent to calling `then` followed by `sync` 148 149 (equivalent to haskell's `bind` or `>>=`) 150 */ 151 auto ex7a = pending!(int); 152 auto ex7b = ex7a.next!(async!((int i) => i)); 153 auto ex7c = ex7a.then!(async!((int i) => i)); 154 assert(ex7b.isPending && ex7c.isPending); 155 ex7a.fulfill(6); 156 ex7b.await; ex7c.await; 157 assert(ex7b.isReady && ex7c.isReady); 158 assert(ex7a.result == 6); 159 assert(ex7b.result.success == 6); 160 assert(ex7c.result.await.result.success == 6); 161 } 162 163 static: 164 165 template Future(A) 166 { 167 final shared class Future 168 { 169 alias Eventual = A; 170 171 private: 172 void delegate(A)[] _onFulfill; 173 A _result; 174 bool _ready; 175 } 176 } 177 enum isFuture(F) = is(F == Future!A, A); 178 179 template pending(A) 180 { 181 shared(Future!A) pending() 182 { return new typeof(return); } 183 } 184 template ready(A) 185 { 186 shared(Future!A) ready(A a) 187 { return (new typeof(return)).fulfill(a); } 188 } 189 template result(A) 190 { 191 A result(shared(Future!A) future) 192 in{ assert(future.isReady); } 193 body{ return *cast(A*)&future._result; } 194 } 195 template fulfill(A) 196 { 197 shared(Future!A) fulfill(shared(Future!A) future, A a) 198 { 199 synchronized(future) 200 if(future.isReady) 201 return future; 202 else 203 { 204 *cast(A*)&future._result = a; 205 future._ready = true; 206 } 207 208 foreach(cb; future._onFulfill) 209 cb(future.result); 210 211 future._onFulfill = []; 212 213 return future; 214 } 215 } 216 template onFulfill(A) 217 { 218 void onFulfill(shared(Future!A) future, void delegate(A) callback) 219 { 220 synchronized(future) 221 if(future.isReady) // REVIEW could this lead to deadlock? can that be analyzed with π calculus? 222 callback(future.result); 223 else 224 future._onFulfill ~= callback; 225 } 226 void onFulfill(shared(Future!A) future, void delegate() callback) 227 { 228 return future.onFulfill((A _){ callback(); }); 229 } 230 } 231 232 alias pending() = pending!Unit; 233 234 template isReady(A) 235 { 236 bool isReady(shared(Future!A) future) 237 { return future._ready; } 238 } 239 template isPending(A) 240 { 241 bool isPending(shared(Future!A) future) 242 { return ! future.isReady; } 243 } 244 245 template async(alias f) // applied 246 { 247 template async(A...) 248 { 249 alias g = tryCatch!f; 250 alias B = typeof(g(A.init)); 251 252 static void run(shared(Future!B) future, A args) 253 { future.fulfill(g(args)); } 254 255 shared(Future!B) async(A args) 256 { 257 auto future = pending!B; 258 259 spawn(&run, future, args); 260 261 return future; 262 } 263 } 264 } 265 template await(alias wait = defaultWait) 266 { 267 template await(A) 268 { 269 shared(Future!A) await(shared(Future!A) future) 270 { 271 while(future.isPending) 272 wait(); 273 274 return future; 275 } 276 277 shared(Future!A) await(shared(Future!A) future, Duration timeout) 278 { 279 auto start = Clock.currTime; 280 281 while(future.isPending && Clock.currTime < start + timeout) 282 wait(); 283 284 return future; 285 } 286 } 287 } 288 289 template next(alias f) // applied 290 { 291 template next(A, B...) 292 { 293 alias C = typeof(apply!f(A.init, B.init).result); 294 295 shared(Future!C) next(shared(Future!A) future, B args) 296 { return future.then!f(args).sync; } 297 } 298 } 299 template sync(A) 300 { 301 shared(Future!A) sync(shared(Future!(shared(Future!A))) future) 302 { 303 auto synced = pending!A; 304 305 future.onFulfill((shared(Future!A) nextFuture) 306 { nextFuture.onFulfill((A a){ synced.fulfill(a); }); } 307 ); 308 309 return synced; 310 } 311 } 312 template then(alias f) // applied 313 { 314 template then(A, B...) 315 { 316 alias C = typeof(apply!f(A.init, B.init)); 317 318 shared(Future!C) then(shared(Future!A) future, B args) 319 { 320 auto thenFuture = pending!C; 321 322 future.onFulfill((A result) 323 { thenFuture.fulfill(apply!f(result, args)); } 324 ); 325 326 return thenFuture; 327 } 328 } 329 } 330 331 template when(A) if(isTuple!A && allSatisfy!(isFuture, A.Types)) 332 { 333 alias EventualType(F) = F.Eventual; 334 alias When = Tuple!(staticMap!(EventualType, A.Types)); 335 336 shared(Future!When) when(A futures) 337 { 338 auto allFuture = pending!When; 339 340 foreach(i, future; futures) 341 future.onFulfill((When.Types[i]) 342 { 343 if(all(futures.tlift!isReady[].only)) 344 allFuture.fulfill( 345 futures.tlift!result 346 ); 347 } 348 ); 349 350 return allFuture; 351 } 352 } 353 template when(Futures...) if(allSatisfy!(isFuture, Futures)) 354 { 355 auto when(Futures futures) 356 { 357 return .when(futures.tuple); 358 } 359 } 360 361 template race(A) if(isTuple!A && allSatisfy!(isFuture, A.Types)) 362 { 363 alias EventualType(F) = F.Eventual; 364 alias Race = Union!(staticMap!(EventualType, A.Types)); 365 366 shared(Future!Race) race(A futures) 367 { 368 auto anyFuture = pending!Race; 369 370 foreach(i, future; futures) 371 future.onFulfill((Race.Union.Args!i result) 372 { anyFuture.fulfill(Race().inject!i(result)); } 373 ); 374 375 return anyFuture; 376 } 377 } 378 template race(Futures...) if(allSatisfy!(isFuture, Futures)) 379 { 380 auto race(Futures futures) 381 { 382 return .race(futures.tuple); 383 } 384 }