This week I completed my work project and had a spare day to do some experimentation. I dabble sometimes with Scala which I think offers a cool combination of FP, OO and concurrency management. Akka is a middleware framework which works with Scala (and also Java) and provides an excellent abstraction to manage concurrency. Instead of dealing with threads, thread pools and low level synchronization we manage concurrency with something called as Actors.
From Akka's site - "Actors are objects which encapsulate state and behavior, they communicate exclusively by exchanging messages which are placed into the recipient’s mailbox. In a sense, actors are the most stringent form of object-oriented programming, but it serves better to view them as persons: while modeling a solution with actors, envision a group of people and assign sub-tasks to them, arrange their functions into an organizational structure and think about how to escalate failure (all with the benefit of not actually dealing with people, which means that we need not concern ourselves with their emotional state or moral issues). The result can then serve as a mental scaffolding for building the software implementation".
I think this is a great abstraction, dealing with people rather than threads :). To understand more about Akka see this presentation here.
In my last blog, I talked about the advantages of using threads and the performace boost they give. I also warned about proper concurrency management when introducing threads to an application. If you need concurrency on a JRuby platform, Akka should be able to help you. After all anything that works on JVM should work on JRuby as well.
With this in mind I wrote a simple "Hello World" in JRuby and Akka. To make this work, download the Akka distribution, create a folder and copy the following jars from the downloaded Akka ditribution in it -
- akka-actor-xx.jar
- config-xx.jar
- scala-library.jar
In the same folder create hello.rb like this -
require 'java'
require 'scala-library.jar'
require 'config-0.3.1.jar'
require 'akka-actor-2.0.3.jar'
java_import 'java.io.Serializable'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.ActorSystem'
java_import 'akka.actor.Props'
class Greeting
include Serializable
attr_reader :who
def initialize(who)
@who = who
end
end
class GreetingActor < UntypedActor
class << self
alias_method :apply, :new
alias_method :create, :new
end
def onReceive(message)
puts "Hello " + message.who;
end
end
system = ActorSystem.create("GreetingSystem");
props = Props.new(GreetingActor)
greeter = system.actorOf(props, "greeter");
greeter.tell(Greeting.new("Rocky Jaiswal"));
system.shutdown
system.await_termination
This is dead simple code, we create an Actor (GreetingActor) by extending UnTypedActor class, provided it an onReceive method which will receive messages, we created an Actor system, setup an Actor and passed it a message and voila we have a "Hello World" Akka program.
Now if you want to do this in a more "Ruby" way, there is an excellent lightweight Ruby wrapper around Akka called Mikka which will make your life a lot easier. Since I wanted to learn the API more, I did it in the more "close to the metal" way (sorry bad joke).
Now for something more interesting and complicated. For this we will again look at the Akka site here and calculate "Pi" with JRuby and Akka. Sounds exciting doesn't it.
The formula we will use is -
This is splittable so that we can create multiple chunks like (1, -1/3, 1/5) and (-1/7, 1/9, -1/11) and so on where each chunk has a number of elements (three in the example above). As these chunks are individually calculated we just sum up their results.
To work up the CPU and to get better results our number of chunks can be as high as ten thousand and each chunk can have ten thousand elements. You can go even higher if you want.
Now we need to setup an Actor system where each worker applies the function to the elements of its chunk and reports back with the results. Depending on your CPU we can create "n" number of workers. We can then assign the chunks to the workers one by one.
Finally when the Master has finished calculating the sum of the results reported by the workers it can notify another Actor that the value of Pi has been calculated.
So we have three Actors now, the Master or the supervisor Actor, the Worker Actor and finally a Listener Actor which is notified of the final result.
The Master Actor can recieve two types of messages, a "calculate" message which is an indication to commence the calculations and a "result" message which is a wrapper around the result calculated by a worker.
Now let us see the program pi.rb :
require 'java'
require 'scala-library.jar'
require 'config-0.3.1.jar'
require 'akka-actor-2.0.3.jar'
java_import 'java.io.Serializable'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.ActorSystem'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.RoundRobinRouter'
java_import 'akka.actor.Props'
java_import 'java.lang.System'
java_import 'akka.util.Duration'
java_import 'java.util.concurrent.TimeUnit'
#Wrapper for a calculate message
class Calculate
end
#Wrapper for a work unit
class Work
attr_reader :start, :no_of_elements
def initialize(start, no_of_elements)
@start = start
@no_of_elements = no_of_elements
end
end
#Wrapper for result
class Result
attr_reader :value
def initialize(value)
@value = value
end
end
#Wrapper for final result
class PiApproximation
attr_reader :pi, :duration
def initialize(pi, duration)
@pi = pi
@duration = duration
end
end
#The Worker
class Worker < UntypedActor
class << self
alias_method :apply, :new
alias_method :create, :new
end
def calculate_for_pi(start, no_of_elements)
acc = 0.0
start_elem = start * no_of_elements
end_elem = (start + 1) * no_of_elements - 1
(start_elem..end_elem).each do |elem|
acc = acc + (4.0 * (1 - (elem % 2) * 2) / (2 * elem + 1))
end
return acc
end
def onReceive(work)
result = calculate_for_pi(work.start, work.no_of_elements)
getSender().tell(Result.new(result), get_self)
end
end
#Master
class Master < UntypedActor
attr_accessor :start, :no_of_workers, :no_of_chunks, :no_of_elements, :listener, :pi, :no_of_results
class << self
alias_method :apply, :new
alias_method :create, :new
end
def init_worker
props = Props.new(Worker).withRouter(RoundRobinRouter.new(no_of_workers))
@worker_router = self.get_context.actorOf(props, "workerRouter")
end
def onReceive(message)
if (message.is_a?(Calculate))
(0...@no_of_chunks).each do |number|
@worker_router.tell(Work.new(number, @no_of_elements), get_self)
end
else
result = message
@pi = @pi + result.value
@no_of_results = @no_of_results + 1
if (@no_of_results == @no_of_chunks)
duration = Duration.create(System.currentTimeMillis - @start, TimeUnit::MILLISECONDS)
@listener.tell(PiApproximation.new(@pi, duration), get_self)
get_context.stop(get_self)
end
end
end
end
class Listener < UntypedActor
class << self
alias_method :apply, :new
alias_method :create, :new
end
def onReceive(message)
puts "Value of Pi - " + message.pi.to_s
puts "Duration of calculation - " + message.duration.to_s
get_context.system.shutdown
#get_context.system.await_termination
end
end
class MasterFactory
include UntypedActorFactory
def initialize(listener)
@@listener = listener
end
def create
self.class.create
end
def self.create
master = Master.new
master.no_of_workers = 8
master.no_of_chunks = 10000
master.no_of_elements = 10000
master.listener = @@listener
master.start = System.currentTimeMillis
master.pi = 0
master.no_of_results = 0
master.init_worker
return master
end
end
system = ActorSystem.create("PiSystem")
listener = system.actorOf(Props.new(Listener), "listener")
props_2 = Props.new(MasterFactory.new(listener))
master = system.actorOf(props_2, "master")
master.tell(Calculate.new)
If you read my explanation above this code is simple to understand, there is a minor hack or two to make Akka work on JRuby but most of this is self explanatory.
We can run this and see the output -
$ jruby pi.rb
Value of Pi - 3.1415926435897883
Duration of calculation - 5039 milliseconds
Finally lets see the effect this has on my i7 CPU
As you can see all 8 virtual cores on the i7 CPU have been used with utilization close to 100% at the time the program ran. I think my experiment is successful, sorry for the longish post, I hope you had as much fun reading it as I had making it. Code shown above is available here.