Sunday, January 13, 2013

Asynchronous Evaluation

As promised at the end of my previous post, I've added (what I think is) a fairly simple asynchronous execution structure.

It's worth noting that this post actually has nothing to do with functional programming (especially since I will be producing side-effects). It just happens that what I've written works because of lazy evaluation, which itself is not a functional programming concept, so much as something that just happens to come up in some functional programming environments (most notably Haskell). The beauty of programming without side-effects is that your logic remains the same, regardless of the actual evaluation semantics. In turn, this makes it easier to "make it work, make it right, make it fast". This post largely deals with the last part.

Adding callbacks to a concurrent evaluation

Originally, when I wrote this, I had planned to write an AsyncFunction0 class, which would decorate a Function0 with asynchronous callback evaluation, and still support Function0 semantics. Upon further thought, I figured it would be easier to keep the asynchronous evaluation behaviour totally separate. Instead, I opted to create an AsyncEvaluator class which would:

  • be based on a Function0 and an Executor, and could be explicitly invoked to trigger evaluation of the result lazily defined by the Function0 on the Executor.
  • be able to accept callbacks, which will be called (on the same thread as evaluation) once the Function0 has been evaluated.
  • support convenient chaining of Function0 evaluations, so evaluation of one result could trigger evaluation of later results.

For callbacks, rather than defining a new type, we'll use a function that takes the result of evaluation as input, but it doesn't really make sense to return something -- to whom will we be returning it? Instead, the callback should "do" something. That is, it should have a side-effect. So, the callback should return void, or something to that effect. Since we already have tuples, we can depict the lack of a value by the empty tuple, or Tuple0. This is a singleton:

public enum Tuple0 {
INSTANCE;
public String toString() {
return "()";
}
}
view raw Tuple0.java hosted with ❤ by GitHub

With that, we can move on to the core code of AsyncEvaluator:

public class AsyncEvaluator<T> {
private final Function0<T> value;
private final Executor executor;
private boolean invoked = false;
private final Collection<Function1<Tuple0, ? super T>> callbacks =
new LinkedList<>();
public AsyncEvaluator(final Function0<T> value, final Executor executor) {
this.value = value;
this.executor = executor;
}
public synchronized void invoke() {
if (invoked) {
return;
}
invoked = true;
executor.execute(new Runnable() {
@Override
public void run() {
T t = value.get();
for (Function1<Tuple0, ? super T> callback : callbacks) {
callback.evaluate(t);
}
}
});
}
public synchronized void addCallback(Function1<Tuple0, ? super T> callback) {
if (invoked) {
throw new RuntimeException("Should not add callback to an " +
"invoked AsyncEvaluator");
}
callbacks.add(callback);
}
}

This is simple enough, and still useful. You could write an asynchronous event handler that would respond to a UI event (like a button click) by creating a Function0 that returns a result, wrapping it in an AsyncEvaluator, and then adding a callback that writes the result to the UI. Before your event handler returns, it would call invoke to trigger the background processing.

Chaining function evaluations

Suppose you've written a nice, working program that runs well on a single thread, but you see various pieces that could run concurrently. Assuming you have lazy values representing certain "checkpoints" in the process, it would be nice to be able to say, "When result A has been calculated, start evaluating B and C (which depend on A), then when B finishes, kick off D. When C and D are both done, invoke E." We can do that, without changing any actual logic. Effectively, we just need to trigger evaluation of the Function0s based on the completed evaluation of earlier values.

First, we'll look at creating simple or branching chains. That is, from a single Function0 that has been wrapped in an AsyncEvaluator, we can trigger one or more later Function0 evaluations:

public class AsyncEvaluator<T> {
/* previous definition of AsyncEvaluator */
public <U> AsyncEvaluator<U> chain(final Function0<U> value,
final Executor executor) {
final AsyncEvaluator<U> evaluator = new AsyncEvaluator<>(value, executor);
addCallback(new Function1<Tuple0, T>() {
@Override
public Tuple0 evaluate(final Function1<Tuple0, T> self, final T i1) {
evaluator.invoke();
return Tuple0.INSTANCE;
}
});
return evaluator;
}
public <U> AsyncEvaluator<U> chain(final Function0<U> value) {
return chain(value, executor);
}
}

We may not want all evaluations to occur on the same Executor, so we allow an alternative Executor to be specified for the chained evaluation. If no Executor is specified, we'll use the same one used to evaluate the current result.

Here is a silly unit test that counts to ten asynchronously:

@Test
public void testChainedCalls() throws Exception {
// Core pieces: a value-holder that returns zero, and a function that
// adds 1 to its input.
final Function0<Integer> zero = new Function0<Integer>() {
@Override
public Integer evaluate() {
return 0;
}
};
final Function1<Integer, Integer> addOne = new Function1<Integer, Integer>() {
@Override
public Integer evaluate(final Function1<Integer, Integer> self, final Integer i1) {
return i1 + 1;
}
};
final ExecutorService executorService =
Executors.newSingleThreadExecutor();
// Wrap our "zero" value in an evaluator so we can "evaluate" it
// asynchronously.
final AsyncEvaluator<Integer> zeroEvaluator =
new AsyncEvaluator<>(zero, executorService);
// Chain 10 calls to addOne, so they execute asynchronously as soon as the
// previous one completes.
AsyncEvaluator<Integer> previousEvaluator = zeroEvaluator;
Function0<Integer> v = zero;
for (int i = 0; i < 10; i++) {
v = addOne.apply(v);
previousEvaluator = previousEvaluator.chain(v);
}
// Create a place to keep our final answer, and add a callback to the last
// value to add the answer to the queue.
final BlockingQueue<Integer> answerQueue = new LinkedBlockingQueue<>();
previousEvaluator.addCallback(new Function1<Tuple0, Integer>() {
@Override
public Tuple0 evaluate(final Function1<Tuple0, Integer> self,
final Integer i1) {
answerQueue.add(i1);
return Tuple0.INSTANCE;
}
});
// Finally kick off the chained execution and wait for the result to
// show up in answerQueue.
zeroEvaluator.invoke();
assertEquals(10, answerQueue.take().intValue());
}

What if we have a value that is constructed based on multiple earlier asynchronously computed values? In that case, we want the new value to be computed as soon as the last of the previous values is available:

With that, we can write an asynchronous version of our example from the previous post, where we fetch two webpages and sum their byte counts:

@Test
public void testAsyncPageDownload() throws Exception {
ExecutorService executorService =
Executors.newFixedThreadPool(2);
// Set up the lazy evaluation structure leading to our answer
// If we call .get() on sumOfPageBytes, we get the same answer, just
// evaluated synchronously on a single thread.
Function0<Iterable<? extends Byte>> page1Bytes = getPageBytes(url1);
Function0<Iterable<? extends Byte>> page2Bytes = getPageBytes(url2);
Function0<Integer> sumOfPageBytes = addByteCounts(page1Bytes, page2Bytes);
// Create evaluators for the two page loads
AsyncEvaluator<?> page1Evaluator = new AsyncEvaluator<>(page1Bytes,
executorService);
AsyncEvaluator<?> page2Evaluator = new AsyncEvaluator<>(page2Bytes,
executorService);
// We'll need somewhere to store our answer. In a real program, we would
// ideally avoid blocking by simply sending our answer across the network
// or outputting it to the user.
final BlockingQueue<Integer> answerQueue = new LinkedBlockingQueue<>();
// Trigger evaluation of the answer once the two page loads are done and add
// a callback to put its value in the answerQueue.
AsyncEvaluator.joinedChain(sumOfPageBytes, executorService, page1Evaluator,
page2Evaluator)
.addCallback(new Function1<Tuple0, Integer>() {
@Override
public Tuple0 evaluate(final Function1<Tuple0, Integer> self,
final Integer i1) {
answerQueue.add(i1);
return Tuple0.INSTANCE;
}
});
// Kick off the two page loads
final long start = System.currentTimeMillis();
page1Evaluator.invoke();
page2Evaluator.invoke();
// Block until our answerQueue gets a value
Integer byteCount = answerQueue.poll(5, TimeUnit.SECONDS);
final long time = System.currentTimeMillis() - start;
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("testAsyncPageDownload returned " + byteCount +
" after " + time + "ms");
assertTrue(byteCount > 100000);
}

As mentioned in the code comments, neither of these tests really reflects what you would actually want to do with an asynchronous execution path. In the real world, your final callback would push the result as a network response or output it to the user, such that you would never actually need to block while waiting for an answer.

Conclusion

Asynchronous processing is a pretty powerful tool, made quite popular (and certainly more familiar) thanks to AJAX calls from the browser. Now that many of us are comfortable with this approach, it's not a bad idea to add more asynchronous processing to our server-side or desktop software. In particular, while a blocked thread uses fewer resources than a busy-waiting thread (or a thread that periodically polls for a result), it's better to only use a thread when there's stuff to be done. In the second unit test example above, we still have two threads blocking on the HttpURLConnection responses. A better (more scalable) program would use Java NIO to fire off the requests and react to the responses asynchronously. Unfortunately, that's outside the scope of the current post.


No comments:

Post a Comment

Note: Only a member of this blog may post a comment.