Saturday, December 29, 2012

Adding concurrency to our functional framework

In previous posts, I hinted that someday I would discuss concurrent evaluation of functions. That time has finally come. Also, I've made the switch to Java 1.7 at home, so don't be shocked by some of the syntax here (like multi-catch or try-with-resources).

First, let's acknowledge that Function0 is effectively a Callable with result caching. Let's make this explicit, by making Function0 implement Callable:

public abstract class Function0<R> implements Callable<R> {
/* previous definition of Function0 */
@Override
public R call() throws Exception {
return get();
}
}

Using that, it's quite easy to dispatch evaluation of a Function0 to an ExecutorService using a helper class:

public class ConcurrentEvaluator {
private final ExecutorService executorService;
public ConcurrentEvaluator(final ExecutorService executorService) {
this.executorService = executorService;
}
public <T> Function0<T> evaluateConcurrent(final Function0<T> f0) {
final Future<T> futureVal = executorService.submit(f0);
return new Function0<T>() {
@Override
public T evaluate() {
try {
return futureVal.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
};
}
}

This is a pretty powerful (but simple) pattern, since a Function0 can be built up at runtime based on successive lazy applications of functions. (Effectively, the Function0 type is a form of thunk.) Once you have several work units of decent size, you can dispatch them to background threads for evaluation, and then pass the resulting Function0s (which are probably still running) to other lazy functions. Once you need a result (e.g. for output), you invoke get() and (if necessary), your thread will block until the required background tasks have finished.

Let's see this in action:

public class ConcurrentEvaluatorTest {
private static final String url1 =
"http://mikefroh.blogspot.com/2012/11/what-is-functional.html";
private static final String url2 =
"http://mikefroh.blogspot.com/2012/07/why-i-like-scala.html";
@Test
public void testSerialPageDownload() throws Exception {
final long start = System.currentTimeMillis();
int byteCount = addByteCounts(getPageBytes(url1), getPageBytes(url2)).get();
final long time = System.currentTimeMillis() - start;
System.out.println("testSerialPageDownload returned " + byteCount +
" after " + time + "ms");
}
@Test
public void testConcurrentPageDownload() throws Exception {
ExecutorService executorService =
Executors.newFixedThreadPool(2);
ConcurrentEvaluator evaluator = new ConcurrentEvaluator(executorService);
final long start = System.currentTimeMillis();
int byteCount = addByteCounts(evaluator.evaluateConcurrent(getPageBytes(url1)),
evaluator.evaluateConcurrent(getPageBytes(url2))).get();
final long time = System.currentTimeMillis() - start;
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("testConcurrentPageDownload returned " + byteCount +
" after " + time + "ms");
}
// Returns a Function0 that, when evaluated, will compute the length of each
// pageByte iterable and then adds them together.
private static Function0<Integer>
addByteCounts(Function0<Iterable<? extends Byte>> pageBytes1,
Function0<Iterable<? extends Byte>> pageBytes2) {
Function2<Integer, Integer, Byte> accumulateByteCount =
new Function2<Integer, Integer, Byte>() {
@Override
public Integer evaluate(final Function2<Integer, Integer, Byte> self,
final Integer i1, final Byte i2) {
return i1 + 1;
}
};
Function2<Integer, Integer, Integer> addInts =
new Function2<Integer, Integer, Integer>() {
@Override
public Integer evaluate(final Function2<Integer, Integer, Integer> self,
final Integer i1, final Integer i2) {
return i1 + i2;
}
};
return addInts.apply(ListUtils.foldLeft(accumulateByteCount).apply(0).apply(pageBytes1))
.apply(ListUtils.foldLeft(accumulateByteCount).apply(0).apply(pageBytes2));
}
// Downloads the resource from the given HTTP url and returns it as an ImmutableList
// of bytes.
private static Function0<Iterable<? extends Byte>> getPageBytes(final String url) {
return new Function0<Iterable<? extends Byte>>() {
@Override
public Iterable<Byte> evaluate() {
try {
HttpURLConnection httpURLConnection =
(HttpURLConnection) new URL(url).openConnection();
ImmutableList<Byte> pageBytesReversed = ImmutableList.nil();
try (BufferedInputStream bufferedStream =
new BufferedInputStream(httpURLConnection.getInputStream())) {
byte pageByte;
while ((pageByte = (byte) bufferedStream.read()) != -1) {
pageBytesReversed = pageBytesReversed.prepend(pageByte);
}
}
return pageBytesReversed.reverse();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
}

These test cases download the two previous posts on this blog, counts their bytes (using a foldLeft), and output the sum of the byte counts, along with the time taken to compute the result. The only difference is that testConcurrentPageDownload kicks off evaluation of the two calls to getPageBytes on an ExecutorService. On my computer, the serial test typically takes about 1100ms, while the concurrent test takes about 600ms, suggesting that it takes about 500ms to download each page (they're pretty similar in size) and 100ms to count the bytes (since we're not using a particularly efficient data structure).

That's probably enough for now. I've got some code written that extends Function0 to include asynchronous evaluation of callbacks when the result is ready. Unfortunately, I still haven't developed a good example that makes it look useful yet. Once that's ready, though, I'll write it up. Stay tuned.