Tuesday, July 1, 2008

Clustered Scala Actors

I have recently been looking at the Terracotta Integration Module (TIM) for Scala actors that was announced earlier this year by Jonas Boner.

As I started playing with the TIM, I decided that I wanted to do the following:
  • Write a simple but hopefully interesting actors based program
  • Try to cluster it using the Scala TIM
  • Identify the lessons learned on the way
At this point, I needed a sample problem to work on. After giving this some thought, I settled upon parallel-Folds as an interesting problem to try out.


As a quick aside: a Fold operation combines the elements of a sequence and a seed using a binary operator.

For associative binary operators, there is no ambiguity about the result. But for operators that are not associative, the order in which the elements are combined makes a difference.
Also, for non-commutative operators, if the seed value is not the identity element of the operation, it makes a difference whether the seed is combined at the end or the beginning of the sequence. To account for these factors, programming languages provide left-fold and right-fold operations.

All of this raises issues about the parallelizability of the Fold operation. For the sake of the current discussion, I will assume that we are dealing with Fold operators that are commutative and associative.

Here's a quick example of a (sequential) fold:
scala>val list = List(1,2,3)
list: List[Int] = List(1, 2, 3)

scala> list.foldLeft(0)((x, y) => x+y)
res5: Int = 6
In this example, the different elements of the sequence are combined from left to right using the plus operator.

With a parallel-Fold, the general idea is that the different elements of a sequence are combined in parallel.


So I went ahead and coded a class called ParallelFolder that implements parallel-Folds. This class uses a Master actor and a couple of Worker actors to do parallel-Fold operations. Here's how that works:
  • The Master receives a fold request containing a data sequence and a fold operator
  • The Master splits the provided data and farms the work out to the Workers
  • The Workers apply the fold operator to the provided data, and send the results back to the Master
  • The Master aggregates the results from the Workers by applying the provided fold operator, and then sends the final result back to the Requester
Enough talk. Let's look at some code.


Update: instructions for downloading and running the sample code for this Post are available at:
http://lalitpant.blogspot.com/2008/07/parallel-folds-sample-code.html


Here's the definition of the messages that flow through the system:
case class ParallelFoldReq[A](data: Seq[A], folder: (A, A) => A, seed: A, requester: AnyRef)
case class ParallelFoldWorkReq[A](taskId: Int, data: Iterable[A], folder: (A, A) => A, fromMaster: Actor)
case class ParallelFoldWorkResult[A](taskId: Int, result: A, folder: (A, A) => A, fromWorker: Actor)
case class Tick(s: String) // cluster-support
case class Maim(s: String)
case object Quit
Here's the Master:
class Master extends Actor {
val (worker1, worker2, numWorkers) = (new Worker, new Worker, 2)
val taskStatus = new HashMap[Int, (Any, Int, AnyRef)]
var nextTaskId = 0

def go = {
worker1.go; worker2.go
this.start
}

def act = {
init // cluster-support
log.debug("Scheduling ticks for Master")
ActorPing.scheduleAtFixedRate(this, Tick("master"), 0L, 5000L) // cluster-support
loop {
react {
case ParallelFoldReq(data, folder, seed, requester) =>
log.info("Master Received Request. Current Task Id: {}", nextTaskId)

val partitions = partition(data)
worker1 ! ParallelFoldWorkReq(nextTaskId, partitions(0), folder, self)
worker2 ! ParallelFoldWorkReq(nextTaskId, partitions(1), folder, self)

taskStatus += nextTaskId -> (seed, 0, requester)
nextTaskId += 1

case ParallelFoldWorkResult(taskId, wresult, folder, worker) =>
log.info("Master received result: {} - from worker {}. Task Id: " + taskId, wresult, worker)

var result = taskStatus(taskId)._1
result = folder(result, wresult)

var resultsReceived = taskStatus(taskId)._2
resultsReceived += 1
log.info("Results received: {}. Current Taskk Id: {}", resultsReceived, taskId)

if (resultsReceived == numWorkers) {
sendResultToRequester(taskId, result, taskStatus(taskId)._3)
taskStatus -= taskId
}
else {
taskStatus.update(taskId, (result, resultsReceived, taskStatus(taskId)._3))
log.info("Waiting for more results from worker. Current Taskk Id: {}", taskId)
}

case Maim(x) =>
log.info("Master asked to Cripple itself")
worker2 ! Quit

case Tick(x) =>
log.debug("Master got a Tick")
}
}
}
And here's the code for the two Workers:
object WorkerIdGenerator {
var nid = 0
def nextId = {nid += 1; nid}
}

class Worker extends Actor {
val id = WorkerIdGenerator.nextId
log.debug("Worker created with Id: {}", id)

def go = start

def act = {
init // cluster-support
log.debug("Scheduling ticks for Worker: {}", id)
ActorPing.scheduleAtFixedRate(this, Tick("worker"), 0L, 5000L) // cluster-support
loop {
react {

case ParallelFoldWorkReq(taskId, data, folder, master) =>
log.info("Worker {} received request. Current Task Id: {}", id, taskId)
Thread.sleep(1000 * 1)
val result = data.reduceLeft(folder)
master ! ParallelFoldWorkResult(taskId, result, folder, self)

case Quit =>
log.info("Worker asked to Quit: {}", id)
throw new RuntimeException("Bye from: " + this)

case Tick(x) =>
log.debug("Worker {} got a Tick", id)
}
}
}
}
And finally, here's the Application object that creates and runs the Actors:
object ParallelFolder extends util.Logged {
log.info("Parallel Folder Starting...")
val master = new Master
master.go
log.info("Parallel Folder ready to go.")

def main(args: Array[String]): Unit = {
if (args.size == 0 || !args(0).trim.equals("-c")) {
// not running in cluster. Initiate some work right here
for (i <- 1 to 1) {
val work = List(1,2,3,4,5,6)
log.info("Sending sequence to master for Parallel Fold: {}", work)
master ! ParallelFoldReq(work, (x:Int ,y:Int) => x+y, 0, "host://protocol/requesterLocation")
}
}
}

def fold[A](data: Seq[A], folder: (A, A) => A, x: A): A = {
master ! ParallelFoldReq(data, folder, x, self)
val ret = self.receive({case x => x})
ret.asInstanceOf[A]
}
}
It's time for some action. Let me go ahead and run the Parallel Folder:
$ pwd
/c/home/lalitp/work/clustered-fold/target/classes

$ cat run-plain.sh
M2_REPO="C:/Documents and Settings/lpant/.m2/repository"
java -cp ".;C:\home\lalitp\tools\scala-2.7.1.final\lib\scala-library.jar;$M2_REP
O/org/slf4j/slf4j-api/1.5.2/slf4j-api-1.5.2.jar;$M2_REPO/ch/qos/logback/logback-
classic/0.9.9/logback-classic-0.9.9.jar;$M2_REPO/ch/qos/logback/logback-core/0.9
.9/logback-core-0.9.9.jar" sample.ParallelFolder $*

$ run-plain.sh
12:05:38.127 [main] INFO sample.ParallelFolder$ - Parallel Folder Starting...
12:05:38.221 [main] INFO sample.ParallelFolder$ - Parallel Folder ready to go.
12:05:38.252 [main] INFO sample.ParallelFolder$ - Sending sequence to master for Pa
rallel Fold: List(1, 2, 3, 4, 5, 6)
12:05:38.252 [Thread-4] INFO sample.Master - Master Received Request. Current Task
Id: 0
12:05:38.252 [Thread-3] INFO sample.Worker - Worker 1 received request. Current Tas
k Id: 0
12:05:38.252 [Thread-1] INFO sample.Worker - Worker 2 received request. Current Tas
k Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Master received result: 6 - from worke
r sample.Worker@8b819f. Task Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Results received: 1. Current Taskk Id:
0
12:05:39.252 [Thread-3] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Master received result: 15 - from work
er sample.Worker@120a47e. Task Id: 0
12:05:39.252 [Thread-3] INFO sample.Master - Results received: 2. Current Taskk Id:
0
12:05:39.252 [Thread-3] INFO sample.Master - Result available for Requester: host:/
/protocol/requesterLocation. Current Taskk Id: 0. The Result is: 21
As you can see, ParallelFolder gets the Master going when it starts up. Then, the main method of ParallelFolder sends in a List to the Master for Folding. The Master does its thing with the help of the Workers, and reports the result.

Everything looks hunky-dory, so let me ramp it up and try to run the same program within a Terracotta cluster. Here's the plan of action for this scenario:
  • Start the Terracotta Server
  • Start an instance of ParallelFolder in Console#1
  • Start another instance of ParallelFolder in Console#2. These two instances are clustered via Terracotta
  • Start a client that feeds in work into the cluster
  • See what happens

Before going any further, let me show you the Cluster Client:
object ClusterClient extends util.Logged {
def main(args : Array[String]) : Unit = {
log.info("Attaching to WorkManager")
val master = new Master
master.init

for (i <- 1 to 10) {
val work = List(1,2,3,4,5,6)
log.info("Sending sequence to master for Parallel Fold: {}", work)
master ! ParallelFoldReq(work, (x:Int ,y:Int) => x+y, 0, "host://protocol/requesterLocation")
}

// Kill a worker
// log.info("Crippling the Master")
// master ! Maim("cripple")
}
}
And here's the Scala TIM actor configuration file:
$ cat clustered-scala-actors.conf

sample.Master: class
sample.Worker: custom
The tc-config.xml file is pretty standard (except for the transient-fields section; but more on that later), so I will not show it here. The file is available with the sample code for this Post.

Everything looks set, so let me go ahead and run the Terracotta Server:
$ start-tc-server.bat

[snip]
2008-07-01 13:36:49,141 INFO - Terracotta Server has started up as ACTIVE node o
n 0.0.0.0:9510 successfully, and is now ready for work.

Next, I'll run the two ParallelFolder instances
Console#1 :

$ cat run.sh
M2_REPO="C:/Documents and Settings/lpant/.m2/repository"
dso-java.bat -cp ".;C:\home\lalitp\tools\scala-2.7.1.final\lib\scala-library.jar
;$M2_REPO/org/slf4j/slf4j-api/1.5.2/slf4j-api-1.5.2.jar;$M2_REPO/ch/qos/logback/
logback-classic/0.9.9/logback-classic-0.9.9.jar;$M2_REPO/ch/qos/logback/logback-
core/0.9.9/logback-core-0.9.9.jar" sample.ParallelFolder $*

$ run.sh -c

[snip]
Parsing scala actors config file: clustered-scala-actors.conf
Configuring clustering for Scala Actor [sample.Master] with scope [class]
Configuring clustering for Scala Actor [sample.Worker] with scope [custom]
13:50:40.493 [main] INFO sample.ParallelFolder$ - Parallel Folder Starting...
13:50:41.258 [main] INFO sample.ParallelFolder$ - Parallel Folder ready to go.

Console#2

$ run.sh -c

[snip]
Parsing scala actors config file: clustered-scala-actors.conf
Configuring clustering for Scala Actor [sample.Master] with scope [class]
Configuring clustering for Scala Actor [sample.Worker] with scope [custom]
13:52:33.988 [main] INFO sample.ParallelFolder$ - Parallel Folder Starting...
13:52:34.800 [main] INFO sample.ParallelFolder$ - Parallel Folder ready to go.
And finally, the moment of truth! Let me run the Cluster Client:

Console#3:

$ cat run-cclient.sh
M2_REPO="C:/Documents and Settings/lpant/.m2/repository"
dso-java.bat -cp ".;C:\home\lalitp\tools\scala-2.7.1.final\lib\scala-library.jar
;$M2_REPO/org/slf4j/slf4j-api/1.5.2/slf4j-api-1.5.2.jar;$M2_REPO/ch/qos/logback/
logback-classic/0.9.9/logback-classic-0.9.9.jar;$M2_REPO/ch/qos/logback/logback-
core/0.9.9/logback-core-0.9.9.jar" sample.ClusterClient

$ run-cclient.sh

[snip]
Parsing scala actors config file: clustered-scala-actors.conf
Configuring clustering for Scala Actor [sample.Master] with scope [class]
Configuring clustering for Scala Actor [sample.Worker] with scope [custom]
14:03:29.159 [main] INFO sample.ClusterClient$ - Sending sequence to master for Par
allel Fold: List(1, 2, 3, 4, 5, 6)
14:03:29.706 [main] INFO sample.ClusterClient$ - Sending sequence to master for Par
allel Fold: List(1, 2, 3, 4, 5, 6)
14:03:29.706 [main] INFO sample.ClusterClient$ - Sending sequence to master for Par
allel Fold: List(1, 2, 3, 4, 5, 6)
Here's the output that shows up on Console#1 and Console#2

Console#1:

14:03:35.687 [Thread-14] INFO sample.Master - Master received result: 6 - from work
er sample.Worker@86554. Task Id: 0

14:03:35.718 [Thread-14] INFO sample.Master - Results received: 1. Current Taskk Id
: 0

14:03:35.718 [Thread-14] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 0
14:03:37.811 [Thread-14] INFO sample.Master - Master received result: 15 - from wor
ker sample.Worker@1e06fc2. Task Id: 0
14:03:37.811 [Thread-14] INFO sample.Master - Results received: 2. Current Taskk Id
: 0
14:03:37.811 [Thread-14] INFO sample.Master - Result available for Requester: host:
//protocol/requesterLocation. Current Taskk Id: 0. The Result is: 21
14:03:38.920 [Thread-15] INFO sample.Master - Master received result: 6 - from work
er sample.Worker@86554. Task Id: 1
14:03:38.920 [Thread-15] INFO sample.Master - Results received: 2. Current Taskk Id
: 1
14:03:38.920 [Thread-15] INFO sample.Master - Result available for Requester: host:
//protocol/requesterLocation. Current Taskk Id: 1. The Result is: 21
14:03:39.045 [Thread-14] INFO sample.Master - Master received result: 6 - from work
er sample.Worker@86554. Task Id: 2
14:03:39.045 [Thread-14] INFO sample.Master - Results received: 2. Current Taskk Id
: 2
14:03:39.045 [Thread-14] INFO sample.Master - Result available for Requester: host:
//protocol/requesterLocation. Current Taskk Id: 2. The Result is: 21

Console#2:

14:03:32.361 [Thread-16] INFO sample.Master - Master Received Request. Current Task
Id: 0

14:03:32.454 [Thread-12] INFO sample.Worker - Worker 1 received request. Current Ta
sk Id: 0
14:03:33.485 [Thread-13] INFO sample.Worker - Worker 2 received request. Current Ta
sk Id: 0

14:03:34.500 [Thread-16] INFO sample.Master - Master Received Request. Current Task
Id: 1
14:03:34.610 [Thread-13] INFO sample.Worker - Worker 2 received request. Current Ta
sk Id: 1
14:03:35.734 [Thread-16] INFO sample.Master - Master Received Request. Current Task
Id: 2
14:03:35.750 [Thread-12] INFO sample.Worker - Worker 1 received request. Current Ta
sk Id: 1
14:03:36.796 [Thread-13] INFO sample.Worker - Worker 2 received request. Current Ta
sk Id: 2
14:03:37.827 [Thread-12] INFO sample.Worker - Worker 1 received request. Current Ta
sk Id: 2
14:03:38.873 [Thread-16] INFO sample.Master - Master received result: 15 - from wor
ker sample.Worker@a86dfb. Task Id: 1
14:03:38.873 [Thread-16] INFO sample.Master - Results received: 1. Current Taskk Id
: 1
14:03:38.873 [Thread-16] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 1
14:03:39.045 [Thread-13] INFO sample.Master - Master received result: 15 - from wor
ker sample.Worker@a86dfb. Task Id: 2
14:03:39.045 [Thread-13] INFO sample.Master - Results received: 1. Current Taskk Id
: 2
14:03:39.045 [Thread-13] INFO sample.Master - Waiting for more results from worker.
Current Taskk Id: 2

It seems to work!

I have highlighted the log trace for task#1 in the console output above; this shows how the Master and the Workers handle the task.

Let me take a moment to talk about exactly what's going on here. In the above scenario, Terracotta has clustered the Master and the two Workers. What exactly does that mean?
For any actor, clustering via Terracotta means the following:
  • The actor's mailbox is clustered and lives within the Terracotta NAM (Network Attached Memory)
  • Multiple copies of the actor run within the cluster, one per JVM/node
  • All the different copies of the actor are waiting to pull messages out of the clustered mailbox
  • An entity within any node in the cluster can send a message to the actor
  • This message shows up in the clustered mailbox of the actor
  • Only one copy of the actor gets the message. At this point, the message disappears from the mailbox. This provides for load balancing and fault tolerence:
    • load balancing: if a copy of the actor is busy in one VM, a different copy in another VM picks up the message
    • fault tolerance: if a copy of the actor dies in one VM, the remaining copies are still around, and a different copy in another VM picks up the message

So - can we conclude that the Scala TIM auto-magically clusters an actors based program and gives us all the great sounding features described above. Well - not quite. The following concessions have to be made to make standalone code cluster-friendly (in the listings above, such sections of code are marked with '// cluster-support'):
  • The waitingFor field within the Actor trait has to be marked transient for Terracotta (within tc-config.xml). Consequently, this field has to be inited outside of a constructor for copies that attach to Terracotta
  • Every clustered actor needs to schedule a heartbeat to pump messages from the clustered mailbox (but this is not as messed-up as it might first sound)

That's not too bad, is it?

I should mention that the current Scala TIM implementation uses named locks to protect the internal state of actors. This is a potential bottleneck, because it introduces lock contention during mailbox access by even unrelated actors. But, based on my currently rather limited knowledge of Terracotta, I think this can be improved.

In a future post, I'll talk more about the features and limitations discussed above. Till then, have fun.

3 comments:

Jesper Nordenberg said...

The problem is of course that fold is not parallelizable except when using associative functions. What you have implemented is essentially some variant of MapReduce.

Lalit Pant said...

nordenberg:
I was hoping that my choice of problem (parallel fold) would not distract from the main point of the post (which is to investigate the clustering of Scala actors).

It seems to have ;)

But thanks for the feedback. I have gone back and updated the post to make my 'fold' related assumptions clear(er).

And yes - what I have implemented could lead to an interesting variant of MapReduce (and some day (soon) it well might!); but for now it's just a simple toy to demonstrate actor clustering.

Anonymous said...

Great post. Thanks. Clusterable Scala Actors is very cool and useful.