Friday, July 25, 2008

A Short Road to TCPoly(morphism) in Scala

In my previous post, I talked about adding rich methods to Java Collections with the help of traits. If you go back and look at that post, you will see that I stayed well away from rich methods of a certain kind: those that return a new Collection when they are applied to an existing Collection. Examples of such methods are: map, flatMap, and filter.

Why did I not talk about methods of this nature? Well - because they would have led me down a short and slippery slope to TCPoly (aka Type Constructor Polymorphism )!

I'm going to navigate down that slope today...

But first, a quick recap. Here's a trait that we saw in the previous post:
trait RichIterableOps[A] {

// required method
def iterator: Iterator[A]

def foreach(f: A => Unit) = {
val iter
= iterator
while (iter.hasNext) f(iter.next)
}

def foldLeft[B](seed: B)(f: (B, A) => B) = {
var result = seed
foreach(e => result = f(result, e))
result
}
}
The job of this trait is to add rich operations to an Iterable, which is anything that has an iterator method; Java Collection classes are Iterables.

Let's say that we now want to add a map method to this trait:
  def map[B](f: A => B): ??? = { /* todo */}
Right away, I'm faced with the choice of trying to figure out the return type of the map method. Some requirements for this return type are:
  • Methods available on the original Collection type that mixes in this trait should be available via the return type. So, for example, if we mix the trait into a Set, and then call map on this Set, the methods of the Set interface/type should be available to us via the return type of map.
  • We should be able to chain rich methods.
  • Equality should work in a natural fashion for the enriched Collection.
Let me capture these requirements as (JUnit-4) tests:
class TestRichCollections {
val richSet = new RichHashSet[Int]

@Before
def setupSet: Unit = {
richSet.add(1); richSet.add(2); richSet.add(3)
}

@Test
def testFoldForSet = {
assertEquals(6, richSet.foldLeft(0)((x,y) => x+y))
}

@Test
def testOriginalCollectionOpForSet = {
assertTrue(richSet.map(e => 2*e).contains(4))
}

@Test
def testChainedRichOpForSet = {
assertTrue(richSet.map(e => 2*e).map(e => 2*e).contains(8))
}

@Test
def testEqualityForSet = {
val expected = new HashSet[Int]
expected.add(2); expected.add(4); expected.add(6)
assertEquals(expected, richSet.map(e => 2*e))
}
}
So, based on the set of requirements before us, what can we say about the return type of RichIterableOps.map?

It seems pretty clear that the return type of map should be:
  • a subtype of RichIterableOps: to allow chaining of rich methods.
  • a subtype of the Collection into which RichIterableOps is mixed: to make the methods of that Collection available via the return type.
A potential solution is to get Collections that mix in this trait to redefine the map method, to make it return the appropriate type. But this is highly sub-optimal, because it forces every mixing-in Collection to do substantial extra work. Plus, if such a Collection is accessed via a supertype variable, the redefined return type is not available via this variable without type-casting.

Ideally, I want to capture the logic of the map method right within the RichIterableOps trait. I also want to be able to say that the return type of map is a Container with certain constraints; subclasses should be able to specify the exact type of the Container.

This is precisely what TCPoly allows me to do.

So what exactly is TCPoly? To give a clear definition of that term, I first need to define a couple of other terms:

Parametric Polymorphism: a mechanism that allows us to abstract over types using type paramaters, so that a single piece of code can work with multiple types. An example of this is:
class List[A] {}
In the above code snippet, class List employs parametric polymorphism to abstract over the type of its members, and makes use of a type parameter: A.
This mechanism is known by the name Generics in the Java world.

Type Constructor: A type that is used to construct another (proper) type. In the above code snippet, List is a type constructor. It can be used, for example, to construct proper types like List[Int] and List[String]. A type constructor is also a higher-kinded type. Type constructors cannot have instances; only proper types can have instances.

Given these definitions, here, finally, is the definition of TCPoly:

Type Constructor Polymorphism: a form of Parametric Polymorphism which allows Type Constructors as type parameters. TCPoly also allows Type Constructors as abstract type members.

As far as the definition of TcPoly is concerned, that is it! But let me dig a little deeper...

Without TcPoly, the following is invalid:
class C1[A[_]]
Here, I'm trying to say that C1 takes a type parameter that itself needs a type parameter to denote a proper type.

TCPoly enables this kind of definition.

So how does that help me with giving RichIterableOps.map an appropriate return type? Here's a first attempt at answering that question:
import java.util.Set
import java.util.Collection
import java.util.HashSet
import java.util.LinkedHashSet
import java.util.Iterator

// a builder for containers of type C with element A
trait Builder[C[_], A] {
def add(el: A): Boolean
def build(): C[A]

def using(op: Builder[C, A] => Unit): C[A] = {
op(this); build()
}
}

trait RichIterableOps[A, Iterable[_], Container[X] <: RichIterableOps[X, Iterable, Container] with Iterable[X]] {

def iterator: Iterator[A]
def builder[T]: Builder[Container, T]

def foreach(f: A => Unit) = {
val iter = iterator
while (iter.hasNext) f(iter.next)
}

def foldLeft[B](seed: B)(f: (B, A) => B) = {
var result = seed
foreach(e => result = f(result, e))
result
}

def map[B](f: A => B): Container[B] = builder.using { b =>
foreach(e => b.add(f(e)))
}
}

class RichHashSet[A] extends HashSet[A] with RichIterableOps[A, HashSet, RichHashSet] {
def builder[T] = new RichHashSet[T] with Builder[RichHashSet, T] {def build() = this}
}

The important points in the above code fragement are:
  • RichIterableOps now uses a Type Constructor, Container[_], as a type parameter. Container[_] stands for the return type of the map method. The ability to do this, as enabled by TCPoly, is the key to the whole problem (of returning an appropriate type from map).
  • RichIterableOps uses another Type Constructor, Iterable[_], as a type parameter. This type parameter stands for the Collection/Iterable into which RichIterableOps is mixed, and is used to enforce the property that the Container should be a subtype of the mixing-in Collection; this is show next.
  • Bounds on the Container capture our requirements for the return type of map:
    Container[X] <: RichIterableOps[X, Iterable, Container] with Iterable[X].
  • map now returns the Container:
    def map[B](f: A => B): Container[B]
  • Subclasses specify the precise type of the Container:
    class RichHashSet[A] extends HashSet[A] with RichIterableOps[A, HashSet, RichHashSet]
  • RichIterableOps makes use of a builder to create the Container; subclasses supply an appropriate builder:
    class RichHashSet[A] extends HashSet[A] with RichIterableOps[A, HashSet, RichHashSet] {
    def builder[T] = new RichHashSet[T] with Builder[RichHashSet, T] {def build() = this}
    }
The code above, which employs TCPoly and uses Type Constructors as type parameters, allows us to do exactly what we want to do with the return type of map (and other higher order functions that return Collections). But it has one drawback: it is a little difficult to understand because of the use of a self-reference for specifying the bounds of the Container. Let me re-write the code using an abstract type member as opposed to a type parameter:
trait RichIterableOps[A, Iterable[_]] {

type Container[X] <: RichIterableOps[X, Iterable] with Iterable[X]

def iterator: Iterator[A]
def builder[T]: Builder[Container, T]

def foreach(f: A => Unit) = {
val iter = iterator
while (iter.hasNext) f(iter.next)
}

def foldLeft[B](seed: B)(f: (B, A) => B) = {
var result = seed
foreach(e => result = f(result, e))
result
}

def map[B](f: A => B): Container[B] = builder.using { b =>
foreach(e => b.add(f(e)))
}
}

class RichHashSet[A] extends HashSet[A] with RichIterableOps[A, HashSet] {
type Container[X] = RichHashSet[X]
def builder[T] = new RichHashSet[T] with Builder[Container, T] {def build() = this}
}

This version definitely looks easier to read. Also, given that RichIterableOps is designed to be mixed-in with a Collection into a subclass, the use of an abstract type member (which forces the use of a subclass) is not a burden.

The key difference here from the previous code fragment is that Container[_] is now an abstract type member as opposed to a type parameter.

Let me run my tests to make sure that the code does what it is supposed to do:



That looks good.

RichIterableOps should also work with Collections other than Sets. Let me try it with an ArrayList:
class RichArrayList[A] extends ArrayList[A] with RichIterableOps[A, ArrayList] {
type Container[X] = RichArrayList[X]
def builder[T] = new RichArrayList[T] with Builder[Container, T] {def build() = this}
}
The tests for this are:
class TestRichCollections {
// [snip]

val richList = new RichArrayList[Int]

@Before
def setupList: Unit = {
richList.add(1); richList.add(2); richList.add(3)
}

@Test
def testFoldForList = {
assertEquals(6, richList.foldLeft(0)((x,y) => x+y))
}

@Test
def testOriginalCollectionOpForList = {
assertTrue(richList.map(e => 2*e).contains(4))
}

@Test
def testChainedRichOpForList = {
assertTrue(richList.map(e => 2*e).map(e => 2*e).contains(8))
}

@Test
def testEqualityForList = {
val expected = new ArrayList[Int]
expected.add(2); expected.add(4); expected.add(6)
assertEquals(expected, richList.map(e => 2*e))
}
}
Let me run the full set of tests:



That's exactly what we wanted!

Conclusion


The availability of good Data-Structures/Collections is an important aspect of the 'power' of a programming language. As we saw in this post, TCPoly quickly becomes useful when we want to provide powerful Collection operations like map, flatMap, and filter in the context of a statically typed and parametrically polymorphic language like Scala. Without TCPoly, Collections in Scala would have to resort to code duplication and type-casting to support such operations. We saw this problem when I tried to add the map method to existing Java Collections via a trait.

TCPoly provides an elegant solution to this problem. Using TCPoly, we saw how I was able to extend RichIterableOps with a map method that abstracts over its return type. We saw this method working without any problem, and without any code duplication, in the context of two very different types of Java Collections: a HashSet and an ArrayList.

Relevant Reads:
Adriaan Moors's TCPoly Page

Sunday, July 13, 2008

Traits in Scala: a Powerful Design Tool

Traits in Scala are an extremely powerful Object Oriented (OO) design tool. They also provide a powerful mechanism for code reuse by making good use of subtyping and delegation.

In terms of raw functionality, traits allow us to:
  • Define types by specifying the signatures of supported methods. This is similar to how interfaces work in Java.
  • Provide partial/full implementations that can be mixed-in to classes that use the trait. A class can mix in multiple traits, as opposed to (single) inheritance, where a class can extend from only one superclass.
This core functionality can be used in different ways to provide powerful usage patterns. But before I start digging into that, let me define, for quick reference, some frequently used terms in the area of OO design:


Type: a set of methods that an object responds to, as defined by an Interface.

Class: a definition of an object's implementation.

Subtype: a relation between Types. A given Type is a subtype of another Type if its Interface is a superset of the Interface of the other type.

Subclass: a relation between Classes. A subclass inherits its Type and its Implementation from its parent class.

Interface Inheritance: enables the creation of a subtype relation between two interfaces.

Class Inheritance: enables the creation of a subclass relation between two classes. Class inheritance is essentially a code reuse mechanism that is defined statically at compile time. Class inheritance also implies subtyping: a class that inherits from another class is a subtype of (the type of) the parent class.

Composition: Another code reuse mechanism, in which new functionality is obtained by assembling or composing objects - at runtime. Composition does not imply subtyping. Composition involves a wrapped object, a wrapper object, and method forwarding.


With that out of the way, let's move on...

Inheritance and composition are the primary mechanisms of code reuse within Object Oriented systems. A lot has been written about the relative merits and demerits of these two approaches, so I will not go into that in a lot of detail here. In general, composition is preferred to inheritance for the following reasons:
  • Dynamic reconfigurability: with composition (and its slightly looser variant - aggregation), objects are wired together at runtime to achieve desired functionality. Consequently, these objects can be rewired at runtime to tweak behavior. Inheritance, on the other hand, results in systems with runtime structures that are statically fixed at compile time.
  • Ability to compose multiple objects without complication, to provide different facets of an object's behavior. A similar effect can potentially be achieved with multiple-inheritance (MI), but MI has a host of problems associated with it.
  • Ability to compose multiple objects in a chain without complication, to refine a particular facet of an object's behavior. An example of this is a chain of Interceptor objects attached to an object. This kind of effect is difficult to achieve via inheritance.

Another touted benefit of composition is that it involves black-box reuse with loose coupling and good encapsulation. Inheritance, on the other hand, involves white-box reuse, and consequently breaks encapsulation.

I think this argument is a double edged sword. Inheritance definitely involves stronger coupling than composition for a scenario where a class wants to reuse the code in another (supplier) class; but this comes with potential benefits. To dig deeper into this, let's focus on just the public interface of a supplier class (as opposed to also talking about its protected interface, which is not available via composition). The primary reason for the stronger coupling shown by inheritance is: the potential for self-use of public methods. This happens when a public method (say m1) in the supplier class calls another public method (say m2) in this class. If a subclass overrides m2 (but not m1), then a call to the supplier's m1 method on an instance of the subclass will result in a call to the overridden m2. In other words, a subclass can alter the behavior of a method in its parent supplier class without redefining it. As opposed to this, with composition, there is no way that a wrapper object can interfere with a method call to a wrapped supplier object.

So we see that, for our scenario of interest, inheritance involves stronger coupling than composition because of self-use; but if this self-use is done in a controlled fashion, the stronger coupling afforded by inheritance can actually be useful. We will see an example of this later in the post. In a similar scenario, composition does not work out quite as well; this is because of the Self Problem.

On the other side of the coin, the stronger coupling shown by Inheritance has potential drawbacks. These are documented by Joshua Bloch in Effective Java, 1st ed., Item 14.


Composition, as a mechanism for reuse, has a lot of benefits compared to inheritance. But it also has some drawbacks:
  • Unlike inheritance, it does not play well with polymorphism in situations where it is used to enhance the functionality of an existing class. Consider, for example, a class A, and another class that contains a List of As. If a class B extends A via composition, instances of B cannot be added to the List of As, thus defeating the whole purpose of polymorphism.
    This is easily fixed via subtyping, with class B inheriting from an interface that it shares with A (say X), and then using composition to implement X. The List of As now becomes a List of Xs.
    But when this fix is applied, there is a lot of grunt work and code noise involved in forwarding all the methods of the implemented interface to the wrapped object.
  • Unlike inheritance, it exhibits the Self Problem.
This is where traits come in. Traits address the problems with composition, while keeping most of its advantages. A couple of points are worth discussing:
  • One element of composition that is compromised by traits is 'dynamic reconfigurability', but this is not a big issue in practice. Most cases of 'dynamic reconfigurability' make use of Dependency Injection to wire in dependencies into an object, and this is normally done only once at program startup. Traits provides similar functionality, albiet at compile time, via required methods or self-types.
  • With regard to the Self Problem, traits make use of true delegation; they consequently do not suffer from the Self Problem. The use of delegation introduces tighter coupling than composition for self-use scenarios, with the attendant risks and benefits.
That's more than enough talk! Let's look at some code.

Let's say that we want to enrich Java Sets so that we can do folds and foreachs and all that good functional stuff with them. Here's a trait that defines the methods that we want to provide:
trait RichIterableOps[A] {

// required method
def iterator: Iterator[A]

def foreach(f: A => Unit) = {
val iter = iterator
while (iter.hasNext) f(iter.next)
}

def foldLeft[B](seed: B)(f: (B, A) => B) = {
var result = seed
foreach(e => result = f(result, e))
result
}
}
Let me fire up the Scala Interpreter to play with this. I am going to mix the trait into a HashSet, and then do some operation on the Set; let's see how that plays out:
scala> val richSet = new HashSet[Int] with RichIterableOps[Int]
richSet: java.util.HashSet[Int] with traits.RichIterableOps[Int] = []

scala> richSet.add(1); richSet.add(2)

scala> richSet
res0: java.util.HashSet[Int] with traits.RichIterableOps[Int] = [1, 2]

scala> richSet.foldLeft(1)((x,y) => x+y)
res1: Int = 4
That's exactly what we wanted!

So why did I call the trait RichIterableOps as opposed to RichSetOps? Because it makes no assumptions about the kind of collection it is working with. All it needs to do its work is for the collection to provide an iterator() method. To validate this point, let me try it with a List now:
scala> val richList = new ArrayList[Int] with RichIterableOps[Int]
richList: java.util.ArrayList[Int] with traits.RichIterableOps[Int] = []

scala> richList.add(1)
res2: Boolean = true

scala> richList.add(2)
res3: Boolean = true

scala> richList
res4: java.util.ArrayList[Int] with traits.RichIterableOps[Int] = [1, 2]

scala> richList.foldLeft(1)((x,y) => x+y)
res5: Int = 4
Already, we're starting to see the power of traits. Let me pause for a moment here and enumerate the traits features that we've seen so far:
  • Trait usage Pattern 1: A trait can be used to provide a rich interface for a class. It does this by declaring required methods, which a class that mixes in the trait needs to implement. Based on these required methods, the trait can provide additional (rich) methods to the class. Required methods in a trait lead to low, controlled, and good coupling; more on that later.
  • A trait functions as a powerful unit of code reuse: it can be mixed into unrelated classes within an existing class hierarchy to provide extra functionality. This works despite the following potential obstacles:
    • The existing classes already extend other classes.
    • no source code is available for the (possibly third party) class hierarchy, so making tweaks to the existing classes is not possible.
Next, let's say that we want a Set of Strings that does not care about the case of its elements. This looks like something that can be implemented using an existing Set class, with minor code enhancements. I'll capture the desired enhancement in a trait:
trait IgnoreCaseSet extends Set[String] {

abstract override def add(e: String) = {
super.add(e.toLowerCase)
}

abstract override def contains(e: Any) = {
super.contains(e.asInstanceOf[String].toLowerCase)
}

abstract override def remove(e: Any) = {
super.remove(e.asInstanceOf[String].toLowerCase)
}
}
Let me try this out:
scala> val icSet = new HashSet[String] with IgnoreCaseSet
icSet: java.util.HashSet[String] with traits.IgnoreCaseSet = []

scala> icSet.add("Hi There")
res0: Boolean = true

scala> icSet
res1: java.util.HashSet[String] with traits.IgnoreCaseSet = [hi there]

scala> icSet.contains("hi there")
res2: Boolean = true

scala> icSet.remove("hi there")
res3: Boolean = true

scala> icSet
res4: java.util.HashSet[String] with traits.IgnoreCaseSet = []
Looks good!

Once again, let me enumerate the trait features that we just saw:
  • Trait usage Pattern 2: A trait can decorate the behavior of an existing class. It does this by:
    • extending an Interface that it shares with the class.
    • overriding the methods that it wants to decorate with the help of the abstract override modifier.
    • delegating to the original methods within its overridden methods using the super keyword.
It is possible for multiple traits to decorate the same method in a class by forming a chain of stackable decorators. In this scenario, the use of super calls within the traits provides control flow along the stack of decorators. The precise rules for the order in which this happens is governed by a process called Linearization. Details about Linearization are available in the Scala Language Spec, if you're interested.

Let's test something here. I want to see what happens when I add multiple Strings to the Set using the addAll() method. The intersting thing here is that the IgnoreCaseSet trait does not override the addAll() method. Does that mean that we can sneak in behind the covers and add Strings with uppercase letters to the Set - by using the addAll() method? Or is the trait going to be able to hook into this operation?
scala> val list = new ArrayList[String]
list: java.util.ArrayList[String] = []

scala> list.add("Element 1"); list.add("Element 2"); list.add("Element 3")

scala> list
res5: java.util.ArrayList[String] = [Element 1, Element 2, Element 3]

scala> icSet.addAll(list)
res6: Boolean = true

scala> icSet
res7: java.util.HashSet[String] with traits.IgnoreCaseSet = [element 3, element
2, element 1]
As we can see, the trait was able to hook into the functioning of the addAll() method, to help us do the right thing. This worked out because of two reasons:
  • HashSet's addAll() method calls its add() method (which it should, to keep things DRY).
  • The self variable for the call chain is such that HashSet.addAll()'s call to add() gets routed to the mixed in Trait. This feature distinguishes the delegation used by Traits from the simpler notion of composition; it also overcomes the Self Problem.
Note: if we use composition in this scenario, we will need to wrap the addAll() method to get the right behavior.

Now - let's say that we want a String Set that provides rich operations, but also ignores the case of its elements. I should be able to combine the two traits that I just created to get the desired behavior:
scala> val icRichSet = new LinkedHashSet[String] with IgnoreCaseSet with RichIterableOps[String]
icRichSet: java.util.LinkedHashSet[String] with traits.IgnoreCaseSet with traits.RichIterableOps[String] = []

scala> icRichSet.add("Hi There"); icRichSet.add("My Friends")

scala> icRichSet
res8: java.util.LinkedHashSet[String] with traits.IgnoreCaseSet with traits.RichIterableOps[String] = [hi there, my friends]

scala> icRichSet.foldLeft("Namaskar, and")((x,y) => x + " " + y)
res9: java.lang.String = Namaskar, and hi there my friends
Looks good.

Btw, I used a LinkedHashSet above because I needed the fold operation to return deterministic output based on the order in which I put things into the Set.

Again, let me enumerate the trait features that we just saw:
  • Trait usage Pattern 3: Multiple traits can be mixed into a class to provide different facets of a class's behavior. This is an extremely powerful capability, because it works at the level of two of the most fundamental forces in OO design: cohesion and coupling:
    • Cohesion: traits encourage small, cohesive chunks of code that focus on doing one thing well. These chunks can then be mixed together into a class to provide the class's functionality.
    • Coupling: when a trait is mixed into a class, there is very little coupling between it and the class. In general, three different kinds of coupling are possible when a class mixes in a trait:
      • No coupling: the trait just mixes in and does its thing.
      • Coupling based on required methods: in this case, the class mixing in a trait just needs to implement methods that match the required methods of the trait.
      • Coupling based on overridden methods in a shared Interface: in this case, the coupling is based on an interface. This is the highest degree of coupling that we encounter when a trait is mixed into a class. But even this is good coupling, because it is based on well-defined interfaces.

So, do traits have any downsides? A few minor ones:
  • Traits do not have constructor parameters (but this should not be an issue in practice).
  • Traits impose a slight performance overhead (but this is unlikely to impact the overall performance of your program).
  • Traits introduce compilation fragility for classes that mix them in. If a trait changes, a class that mixes it in has to be recompiled.
And what happens to Abstract Classes now that we have traits? Traits definitely encroach on the design space occupied by Abstract classes: which, in my mind, is the capturing of commonality within a very localized class hierarchy. If you find that even three (or two?) classes that implement an interface share some commonality, this commonality is a good candidate for being pulled out into the interface trait. This would fall under the category of the 'provide rich interface' mode of trait usage.

Conclusion

I have covered a fair bit of ground in this post. We saw that traits are a powerful tool for OO design, and that they serve as a great mechanism for code reuse. We also looked at the three primary usage patterns for traits:
  • To provide rich interfaces for classes that mix them in.
  • To decorate the behavior of existing classes in a stackable manner.
  • To provide different facets of a class's behavior.
Along the way we also saw how traits:
  • Eliminate the grunt work and code noise involved in doing manual composition with interface inheritance/subtyping.
  • Overcome the Self Problem.
  • Help in achieving high cohesion and low coupling.
I hope all of this is starting to make you see that traits are a great tool to have in your design toolbox.

Happy Trait-ing (and Scala-ing)!

Relevant Reads:
Programming in Scala; the chapter on Traits
Traits: Composable Units of Behaviour
Using Prototypical Objects to Implement Shared Behavior in Object Oriented Systems
What is (Not) Delegation
Scala Language Spec; the section on Trait Linearization

Thursday, July 3, 2008

Parallel Folds: Sample Code

To enable easy access to the sample source code for my previous blog post on Clustered Scala Actors, I have set up a GitHub project at: http://github.com/litan/clustered-fold/tree/master

If you don't want to mess with Git, the sample files can be downloaded (as a gzipped tarball) from: http://github.com/litan/clustered-fold/tarball/master

I have been playing with this code using Java 1.6.0_05, Scala 2.7.1, and Terracotta 2.6.1

After downloading and extracting the files, and navigating to the root of extracted directory tree, here's what you need to do to run the sample:

Console 0
Run Terracotta Server
Console 1:
# modify run.sh and run-cclient.sh: set M2_REPO and SCALA_HOME appropriately 
mvn compile
cd target/classes
run.sh -c
Console 2:
cd target/classes
run.sh -c
Console 3:
 cd target/classes
run-cclient.sh

Enjoy!

Tuesday, July 1, 2008

Clustered Scala Actors

I have recently been looking at the Terracotta Integration Module (TIM) for Scala actors that was announced earlier this year by Jonas Boner.

As I started playing with the TIM, I decided that I wanted to do the following:
  • Write a simple but hopefully interesting actors based program
  • Try to cluster it using the Scala TIM
  • Identify the lessons learned on the way
At this point, I needed a sample problem to work on. After giving this some thought, I settled upon parallel-Folds as an interesting problem to try out.


As a quick aside: a Fold operation combines the elements of a sequence and a seed using a binary operator.

For associative binary operators, there is no ambiguity about the result. But for operators that are not associative, the order in which the elements are combined makes a difference.
Also, for non-commutative operators, if the seed value is not the identity element of the operation, it makes a difference whether the seed is combined at the end or the beginning of the sequence. To account for these factors, programming languages provide left-fold and right-fold operations.

All of this raises issues about the parallelizability of the Fold operation. For the sake of the current discussion, I will assume that we are dealing with Fold operators that are commutative and associative.

Here's a quick example of a (sequential) fold:
scala>val list = List(1,2,3)
list: List[Int] = List(1, 2, 3)

scala> list.foldLeft(0)((x, y) => x+y)
res5: Int = 6
In this example, the different elements of the sequence are combined from left to right using the plus operator.

With a parallel-Fold, the general idea is that the different elements of a sequence are combined in parallel.


So I went ahead and coded a class called ParallelFolder that implements parallel-Folds. This class uses a Master actor and a couple of Worker actors to do parallel-Fold operations. Here's how that works:
  • The Master receives a fold request containing a data sequence and a fold operator
  • The Master splits the provided data and farms the work out to the Workers
  • The Workers apply the fold operator to the provided data, and send the results back to the Master
  • The Master aggregates the results from the Workers by applying the provided fold operator, and then sends the final result back to the Requester
Enough talk. Let's look at some code.


Update: instructions for downloading and running the sample code for this Post are available at:
http://lalitpant.blogspot.com/2008/07/parallel-folds-sample-code.html


Here's the definition of the messages that flow through the system:
case class ParallelFoldReq[A](data: Seq[A], folder: (A, A) => A, seed: A, requester: AnyRef)
case class ParallelFoldWorkReq[A](taskId: Int, data: Iterable[A], folder: (A, A) => A, fromMaster: Actor)
case class ParallelFoldWorkResult[A](taskId: Int, result: A, folder: (A, A) => A, fromWorker: Actor)
case class Tick(s: String) // cluster-support
case class Maim(s: String)
case object Quit
Here's the Master:
class Master extends Actor {
val (worker1, worker2, numWorkers) = (new Worker, new Worker, 2)
val taskStatus = new HashMap[Int, (Any, Int, AnyRef)]
var nextTaskId = 0

def go = {
worker1.go; worker2.go
this.start
}

def act = {
init // cluster-support
log.debug("Scheduling ticks for Master")
ActorPing.scheduleAtFixedRate(this, Tick("master"), 0L, 5000L) // cluster-support
loop {
react {
case ParallelFoldReq(data, folder, seed, requester) =>
log.info("Master Received Request. Current Task Id: {}", nextTaskId)

val partitions = partition(data)
worker1 ! ParallelFoldWorkReq(nextTaskId, partitions(0), folder, self)
worker2 ! ParallelFoldWorkReq(nextTaskId, partitions(1), folder, self)

taskStatus += nextTaskId -> (seed, 0, requester)
nextTaskId += 1

case ParallelFoldWorkResult(taskId, wresult, folder, worker) =>
log.info("Master received result: {} - from worker {}. Task Id: " + taskId, wresult, worker)

var result = taskStatus(taskId)._1
result = folder(result, wresult)

var resultsReceived = taskStatus(taskId)._2
resultsReceived += 1
log.info("Results received: {}. Current Taskk Id: {}", resultsReceived, taskId)

if (resultsReceived == numWorkers) {
sendResultToRequester(taskId, result, taskStatus(taskId)._3)
taskStatus -= taskId
}
else {
taskStatus.update(taskId, (result, resultsReceived, taskStatus(taskId)._3))
log.info("Waiting for more results from worker. Current Taskk Id: {}", taskId)
}

case Maim(x) =>
log.info("Master asked to Cripple itself")
worker2 ! Quit

case Tick(x) =>
log.debug("Master got a Tick")
}
}
}
And here's the code for the two Workers:
object WorkerIdGenerator {
var nid = 0
def nextId = {nid += 1; nid}
}

class Worker extends Actor {
val id = WorkerIdGenerator.nextId
log.debug("Worker created with Id: {}", id)

def go = start

def act = {
init // cluster-support
log.debug("Scheduling ticks for Worker: {}", id)
ActorPing.scheduleAtFixedRate(this, Tick("worker"), 0L, 5000L) // cluster-support
loop {
react {

case ParallelFoldWorkReq(taskId, data, folder, master) =>
log.info("Worker {} received request. Current Task Id: {}", id, taskId)
Thread.sleep(1000 * 1)
val result = data.reduceLeft(folder)
master ! ParallelFoldWorkResult(taskId, result, folder, self)

case Quit =>
log.info("Worker asked to Quit: {}", id)
throw new RuntimeException("Bye from: " + this)

case Tick(x) =>
log.debug("Worker {} got a Tick", id)
}
}
}
}
And finally, here's the Application object that creates and runs the Actors:
object ParallelFolder extends util.Logged {
log.info("Parallel Folder Starting...")
val master = new Master
master.go
log.info("Parallel Folder ready to go.")

def main(args: Array[String]): Unit = {
if (args.size == 0 || !args(0).trim.equals("-c")) {
// not running in cluster. Initiate some work right here
for (i <- 1 to 1) {
val work = List(1,2,3,4,5,6)
log.info("Sending sequence to master for Parallel Fold: {}", work)
master ! ParallelFoldReq(work, (x:Int ,y:Int) => x+y, 0, "host://protocol/requesterLocation")
}
}
}

def fold[A](data: Seq[A], folder: (A, A) => A, x: A): A = {
master ! ParallelFoldReq(data, folder, x, self)
val ret = self.receive({case x => x})
ret.asInstanceOf[A]
}
}
It's time for some action. Let me go ahead and run the Parallel Folder:
$ pwd
/c/home/lalitp/work/clustered-fold/target/classes

$ cat run-plain.sh
M2_REPO="C:/Documents and Settings/lpant/.m2/repository"
java -cp ".;C:\home\lalitp\tools\scala-2.7.1.final\lib\scala-library.jar;$M2_REP
O/org/slf4j/slf4j-api/1.5.2/slf4j-api-1.5.2.jar;$M2_REPO/ch/qos/logback/logback-
classic/0.9.9/logback-classic-0.9.9.jar;$M2_REPO/ch/qos/logback/logback-core/0.9
.9/logback-core-0.9.9.jar" sample.ParallelFolder $*

$ run-plain.sh
12:05:38.127 [main] INFO sample.ParallelFolder$ - Parallel Folder Starting...
12:05:38.221 [main] INFO sample.ParallelFolder$ - Parallel Folder ready to go.
12:05:38.252 [main] INFO sample.ParallelFolder$ - Sending sequence to master for Pa
rallel Fold: List(1, 2, 3, 4, 5, 6)
12:05:38.252 [Thread-4] INFO sample.Master - Master Received Request. Current Task
Id: 0
12:05:38.252 [Thread-3] INFO sample.Worker - Worker 1 received request. Current Tas
k Id: 0
12:05:38.252 [Thread-1] INFO sample.Worker - Worker 2 received request. Current Tas
k Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Master received result: 6 - from worke
r sample.Worker@8b819f. Task Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Results received: 1. Current Taskk Id:
0
12:05:39.252 [Thread-3] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Master received result: 15 - from work
er sample.Worker@120a47e. Task Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Results received: 2. Current Taskk Id:
0
12:05:39.252 [Thread-3] INFO sample.Master - Result available for Requester: host:/
/protocol/requesterLocation. Current Taskk Id: 0. The Result is: 21
As you can see, ParallelFolder gets the Master going when it starts up. Then, the main method of ParallelFolder sends in a List to the Master for Folding. The Master does its thing with the help of the Workers, and reports the result.

Everything looks hunky-dory, so let me ramp it up and try to run the same program within a Terracotta cluster. Here's the plan of action for this scenario:
  • Start the Terracotta Server
  • Start an instance of ParallelFolder in Console#1
  • Start another instance of ParallelFolder in Console#2. These two instances are clustered via Terracotta
  • Start a client that feeds in work into the cluster
  • See what happens

Before going any further, let me show you the Cluster Client:
object ClusterClient extends util.Logged {
def main(args : Array[String]) : Unit = {
log.info("Attaching to WorkManager")
val master = new Master
master.init

for (i <- 1 to 10) {
val work = List(1,2,3,4,5,6)
log.info("Sending sequence to master for Parallel Fold: {}", work)
master ! ParallelFoldReq(work, (x:Int ,y:Int) => x+y, 0, "host://protocol/requesterLocation")
}

// Kill a worker
// log.info("Crippling the Master")
// master ! Maim("cripple")
}
}
And here's the Scala TIM actor configuration file:
$ cat clustered-scala-actors.conf

sample.Master: class
sample.Worker: custom
The tc-config.xml file is pretty standard (except for the transient-fields section; but more on that later), so I will not show it here. The file is available with the sample code for this Post.

Everything looks set, so let me go ahead and run the Terracotta Server:
$ start-tc-server.bat

[snip]
2008-07-01 13:36:49,141 INFO - Terracotta Server has started up as ACTIVE node o
n 0.0.0.0:9510 successfully, and is now ready for work.

Next, I'll run the two ParallelFolder instances
Console#1 :

$ cat run.sh
M2_REPO="C:/Documents and Settings/lpant/.m2/repository"
dso-java.bat -cp ".;C:\home\lalitp\tools\scala-2.7.1.final\lib\scala-library.jar
;$M2_REPO/org/slf4j/slf4j-api/1.5.2/slf4j-api-1.5.2.jar;$M2_REPO/ch/qos/logback/
logback-classic/0.9.9/logback-classic-0.9.9.jar;$M2_REPO/ch/qos/logback/logback-
core/0.9.9/logback-core-0.9.9.jar" sample.ParallelFolder $*

$ run.sh -c

[snip]
Parsing scala actors config file: clustered-scala-actors.conf
Configuring clustering for Scala Actor [sample.Master] with scope [class]
Configuring clustering for Scala Actor [sample.Worker] with scope [custom]
13:50:40.493 [main] INFO sample.ParallelFolder$ - Parallel Folder Starting...
13:50:41.258 [main] INFO sample.ParallelFolder$ - Parallel Folder ready to go.

Console#2

$ run.sh -c

[snip]
Parsing scala actors config file: clustered-scala-actors.conf
Configuring clustering for Scala Actor [sample.Master] with scope [class]
Configuring clustering for Scala Actor [sample.Worker] with scope [custom]
13:52:33.988 [main] INFO sample.ParallelFolder$ - Parallel Folder Starting...
13:52:34.800 [main] INFO sample.ParallelFolder$ - Parallel Folder ready to go.
And finally, the moment of truth! Let me run the Cluster Client:

Console#3:

$ cat run-cclient.sh
M2_REPO="C:/Documents and Settings/lpant/.m2/repository"
dso-java.bat -cp ".;C:\home\lalitp\tools\scala-2.7.1.final\lib\scala-library.jar
;$M2_REPO/org/slf4j/slf4j-api/1.5.2/slf4j-api-1.5.2.jar;$M2_REPO/ch/qos/logback/
logback-classic/0.9.9/logback-classic-0.9.9.jar;$M2_REPO/ch/qos/logback/logback-
core/0.9.9/logback-core-0.9.9.jar" sample.ClusterClient

$ run-cclient.sh

[snip]
Parsing scala actors config file: clustered-scala-actors.conf
Configuring clustering for Scala Actor [sample.Master] with scope [class]
Configuring clustering for Scala Actor [sample.Worker] with scope [custom]
14:03:29.159 [main] INFO sample.ClusterClient$ - Sending sequence to master for Par
allel Fold: List(1, 2, 3, 4, 5, 6)
14:03:29.706 [main] INFO sample.ClusterClient$ - Sending sequence to master for Par
allel Fold: List(1, 2, 3, 4, 5, 6)
14:03:29.706 [main] INFO sample.ClusterClient$ - Sending sequence to master for Par
allel Fold: List(1, 2, 3, 4, 5, 6)
Here's the output that shows up on Console#1 and Console#2

Console#1:

14:03:35.687 [Thread-14] INFO sample.Master - Master received result: 6 - from work
er sample.Worker@86554. Task Id: 0

14:03:35.718 [Thread-14] INFO sample.Master - Results received: 1. Current Taskk Id
: 0

14:03:35.718 [Thread-14] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 0
14:03:37.811 [Thread-14] INFO sample.Master - Master received result: 15 - from wor
ker sample.Worker@1e06fc2. Task Id: 0
14:03:37.811 [Thread-14] INFO sample.Master - Results received: 2. Current Taskk Id
: 0
14:03:37.811 [Thread-14] INFO sample.Master - Result available for Requester: host:
//protocol/requesterLocation. Current Taskk Id: 0. The Result is: 21
14:03:38.920 [Thread-15] INFO sample.Master - Master received result: 6 - from work
er sample.Worker@86554. Task Id: 1
14:03:38.920 [Thread-15] INFO sample.Master - Results received: 2. Current Taskk Id
: 1
14:03:38.920 [Thread-15] INFO sample.Master - Result available for Requester: host:
//protocol/requesterLocation. Current Taskk Id: 1. The Result is: 21
14:03:39.045 [Thread-14] INFO sample.Master - Master received result: 6 - from work
er sample.Worker@86554. Task Id: 2
14:03:39.045 [Thread-14] INFO sample.Master - Results received: 2. Current Taskk Id
: 2
14:03:39.045 [Thread-14] INFO sample.Master - Result available for Requester: host:
//protocol/requesterLocation. Current Taskk Id: 2. The Result is: 21

Console#2:

14:03:32.361 [Thread-16] INFO sample.Master - Master Received Request. Current Task
Id: 0

14:03:32.454 [Thread-12] INFO sample.Worker - Worker 1 received request. Current Ta
sk Id: 0
14:03:33.485 [Thread-13] INFO sample.Worker - Worker 2 received request. Current Ta
sk Id: 0

14:03:34.500 [Thread-16] INFO sample.Master - Master Received Request. Current Task
Id: 1
14:03:34.610 [Thread-13] INFO sample.Worker - Worker 2 received request. Current Ta
sk Id: 1
14:03:35.734 [Thread-16] INFO sample.Master - Master Received Request. Current Task
Id: 2
14:03:35.750 [Thread-12] INFO sample.Worker - Worker 1 received request. Current Ta
sk Id: 1
14:03:36.796 [Thread-13] INFO sample.Worker - Worker 2 received request. Current Ta
sk Id: 2
14:03:37.827 [Thread-12] INFO sample.Worker - Worker 1 received request. Current Ta
sk Id: 2
14:03:38.873 [Thread-16] INFO sample.Master - Master received result: 15 - from wor
ker sample.Worker@a86dfb. Task Id: 1
14:03:38.873 [Thread-16] INFO sample.Master - Results received: 1. Current Taskk Id
: 1
14:03:38.873 [Thread-16] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 1
14:03:39.045 [Thread-13] INFO sample.Master - Master received result: 15 - from wor
ker sample.Worker@a86dfb. Task Id: 2
14:03:39.045 [Thread-13] INFO sample.Master - Results received: 1. Current Taskk Id
: 2
14:03:39.045 [Thread-13] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 2

It seems to work!

I have highlighted the log trace for task#1 in the console output above; this shows how the Master and the Workers handle the task.

Let me take a moment to talk about exactly what's going on here. In the above scenario, Terracotta has clustered the Master and the two Workers. What exactly does that mean?
For any actor, clustering via Terracotta means the following:
  • The actor's mailbox is clustered and lives within the Terracotta NAM (Network Attached Memory)
  • Multiple copies of the actor run within the cluster, one per JVM/node
  • All the different copies of the actor are waiting to pull messages out of the clustered mailbox
  • An entity within any node in the cluster can send a message to the actor
  • This message shows up in the clustered mailbox of the actor
  • Only one copy of the actor gets the message. At this point, the message disappears from the mailbox. This provides for load balancing and fault tolerence:
    • load balancing: if a copy of the actor is busy in one VM, a different copy in another VM picks up the message
    • fault tolerance: if a copy of the actor dies in one VM, the remaining copies are still around, and a different copy in another VM picks up the message

So - can we conclude that the Scala TIM auto-magically clusters an actors based program and gives us all the great sounding features described above. Well - not quite. The following concessions have to be made to make standalone code cluster-friendly (in the listings above, such sections of code are marked with '// cluster-support'):
  • The waitingFor field within the Actor trait has to be marked transient for Terracotta (within tc-config.xml). Consequently, this field has to be inited outside of a constructor for copies that attach to Terracotta
  • Every clustered actor needs to schedule a heartbeat to pump messages from the clustered mailbox (but this is not as messed-up as it might first sound)

That's not too bad, is it?

I should mention that the current Scala TIM implementation uses named locks to protect the internal state of actors. This is a potential bottleneck, because it introduces lock contention during mailbox access by even unrelated actors. But, based on my currently rather limited knowledge of Terracotta, I think this can be improved.

In a future post, I'll talk more about the features and limitations discussed above. Till then, have fun.