By Dennis M. Sosnoski, ScalableScala.com / Sosnoski Software Associates Ltd
Java and Scala developer, writer, consultant and training instructor
Open source developer
Author (mostly IBM developerWorks):
Most recent experience in enterprise systems, especially data integration and web services
Performance a special area of expertise
Consulting and training services to clients worldwide
Multiple computations operating simultaneously and potentially interacting
Long history of analysis of concurrency models and approaches
Until last decade, processor performance grew by increasing clock speed
Now performance grows mainly by adding cores
Performant applications require concurrency to use multiple cores
Enterprise applications need to be scalable
Course-grained scaling distributes users across multiple systems
Finer-grained scaling distributes tasks across multiple systems
Performance gain from concurrency limited
For doing more of a task:
For doing a task faster:
Concurrency requires handling asynchronous events (completions, in particular)
Two basic approaches to handling asynchronous events:
Blocking limits performance and scalability:
Non-blocking best for scalable applications
Big gains for concurrent programming in Java 8:
// task definitions private static CompletableFuture<Integer> task1(int input) { return TimedEventSupport.delayedSuccess(1, input + 1); } private static CompletableFuture<Integer> task2(int input) { return TimedEventSupport.delayedSuccess(2, input + 2); } private static CompletableFuture<Integer> task3(int input) { return TimedEventSupport.delayedSuccess(3, input + 3); } private static CompletableFuture<Integer> task4(int input) { return TimedEventSupport.delayedSuccess(1, input + 4); } /** * Run events with blocking waits. * * @return future for result (already complete) */ private static CompletableFuture<Integer> runBlocking() { Integer i1 = task1(1).join(); CompletableFuture<Integer> future2 = task2(i1); CompletableFuture<Integer> future3 = task3(i1); Integer result = task4(future2.join() + future3.join()).join(); return CompletableFuture.completedFuture(result); }
/** Run events with composition. */ private static CompletableFuture<Integer> runNonblocking() { return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1) .thenCombine(task3(i1), (i2,i3) -> i2+i3))) .thenCompose(i4 -> task4(i4)); } /** Run task2 and task3 and combine the results. This is just a refactoring of {@link * #runNonblocking()} to make it easier to understand the code. */ private static CompletableFuture<Integer> runTask2and3(Integer i1) { CompletableFuture<Integer> task2 = task2(i1); CompletableFuture<Integer> task3 = task3(i1); BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b; return task2.thenCombine(task3, sum); } /** Run events with composition. This is just a refactoring of {@link #runNonblocking()} to make * it easier to understand the code. */ private static CompletableFuture<Integer> runNonblockingAlt() { CompletableFuture<Integer> task1 = task1(1); CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3); return comp123.thenCompose(EventComposition::task4); }
Automatically partitition work to be done across threads
private final List<ChunkDistanceChecker> chunkCheckers; /** * Find best match to word executing chunk checkers in parallel. Each chunk checker finds the best * match for the set of words it knows. The individual results are combined here to find the * overall best result. */ public DistancePair bestMatch(String target) { return chunkCheckers.parallelStream() .map(checker -> checker.bestDistance(target)) .reduce(DistancePair.worstMatch(), (a, b) -> DistancePair.best(a, b)); }
Great for the special case of parallel operations
Similar features to Java 8 for some time:
But also async macro
// task definitions def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1) def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2) def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3) def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4) /** Run tasks with blocking waits. */ def runBlocking() = { val v1 = Await.result(task1(1), Duration.Inf) val future2 = task2(v1) val future3 = task3(v1) val v2 = Await.result(future2, Duration.Inf) val v3 = Await.result(future3, Duration.Inf) val v4 = Await.result(task4(v2 + v3), Duration.Inf) val result = Promise[Int] result.success(v4) result.future }
Can use standard collections (monadic) operators such as forEach
/** Run tasks with flatMap. */ def runFlatMap() = { task1(1) flatMap {v1 => val a = task2(v1) val b = task3(v1) a flatMap { v2 => b flatMap { v3 => task4(v2 + v3) }} } }
for composition for more complex cases
async
macroScala macros perform compile-time transformations of code
async
macro converts what looks like sequential code to asynchronous:
/** Run tasks with async macro. */ def runAsync(): Future[Int] = { async { val v1 = await(task1(1)) val a = task2(v1) val b = task3(v1) await(task4(await(a) + await(b))) } }
Similar to C# and F# feature, but cleaner and more flexible
Some limitations, but a very powerful abstraction
async
detailsasync
has several effects:
await
to be used inside block (suspends until future completed)Implemented with a generated state machine class
Limitations a result of implementation:
await
inside nested closure, class definitionawait
inside nested try
/ catch
blockGreat for simple situations
Complex scenarios run into problems:
Is it possible to abstract management out of the code?
Long-established approach to analyzing and modeling concurrency
Actors are light-weight entities with state
All interactions in terms of messages:
Large body of practical experience from Erlang
Akka a Scala library, for both Java and Scala applications
Designed to support writing correct concurrent, fault-tolerant and scalable applications
Supports scaling both up and out
Applications needing high throughput and low latency are ideal candidates for Akka
Abstracts out the details of threading and synchronization
Provides message guarantees:
High performance message queues (mailboxes)
Usage is up to you and application needs:
Keep actor execution non-blocking where possible
Creates an actor of type Root and sends it a Start message
Future
responsetimeout is an implicit argument to some of the ask calls on the next slide
case class Start() case class Generate() case class Number(n: Int) case class Report() case class Sum(n: Int) implicit val timeout = Timeout(2000) val system = ActorSystem("actor-demo") system.actorOf(Props[Root]) ! Start Thread sleep 2000 system shutdown
class Root extends Actor { def receive = { case Start => { val summer = context.actorOf(Props[Summer]) context.actorOf(Props[Generator]).tell(Generate, summer) context.actorOf(Props[Generator]).tell(Generate, summer) val sum = summer ? Report println("Sum of values is " + Await.result(sum, Duration.Inf)) } } } class Generator extends Actor { var lastOut = 0 def receive = { case Generate => { lastOut = lastOut + 1 sender ! Number(lastOut) } } } class Summer extends Actor { var total = 0 def receive = { case Number(n) => total = total + n case Report => sender ! Sum(total) } }
public static final Object START = new Object(); public static final Object STOP = new Object(); public static final Object GENERATE = new Object(); public static final Object REPORT = new Object(); public static class Number { public final int value; public Number(int val) { value = val; } } public static class Sum { public final int value; public Sum(int val) { value = val; } } public static void main(String[] args) { ActorSystem system = ActorSystem.create("actor-demo-java"); ActorRef root = system.actorOf(Props.create(Root.class)); root.tell(START, ActorRef.noSender()); try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignored */ } root.tell(STOP, ActorRef.noSender()); }
public static class Root extends UntypedActor { private final ActorRef summer = getContext().actorOf(Props.create(Summer.class)); @Override public void onReceive(Object msg) throws Exception { if (msg == START) { getContext().actorOf(Props.create(Generator.class)).tell(GENERATE, summer); getContext().actorOf(Props.create(Generator.class)).tell(GENERATE, summer); } else if (msg == STOP) { Future<Object> future = ask(summer, REPORT, 1000); Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); System.out.println("Sum of values is " + ((Sum)result).value); } else { unhandled(msg); } } } public static class Generator extends UntypedActor { private int lastOut; @Override public void onReceive(Object msg) throws Exception { if (msg == GENERATE) { getSender().tell(new Number(++lastOut), self()); } else { unhandled(msg); } } }
public static class Summer extends UntypedActor { private int total; @Override public void onReceive(Object msg) throws Exception { if (msg instanceof Number) { total += ((Number) msg).value; } else if (msg == REPORT) { getSender().tell(new Sum(total), self()); } else { unhandled(msg); } } }
Actors supervise the actors they create
When an actor fails (throws an exception) the supervisor can:
Easy to fail up to a level where everything can be restarted
"Let it Crash" philosophy simpler than error handling
Akka actor system organizes a hierarchy of actors
Dispatchers actually run actors
ExecutionContext
Actor system configuration (normally from file) links actors to dispatchers
Changing the configuration lets you restructure your system
Routers distribute messages to a pool of actors of the same type
Don't need to use routers - only when using a pool
Actors have names (set explicitly, or generated by default)
Name is decoupled from how it is deployed
Lets you scale out to cluster or cloud without code changes
Akka tries to avoid blocking wherever possible
I/O a common cause of blocking (especially network I/O for connected applications)
akka.io
incorporates asynchronous networking into actor model:
Originally part of Spray project, which still supplies other support
Further extensions part of Reactive Streams initiative
UDP socket server example:
class UdpEchoService(address: InetSocketAddress) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, address) def receive = { case Udp.Bound(_) => context.become(ready(sender)) } def ready(socket: ActorRef): Receive = { case Udp.Received(data, remote) => socket ! Udp.Send(data, remote) case Udp.Unbind => socket ! Udp.Unbind case Udp.Unbound => context.stop(self) } }
UDP socket client example:
class UdpEchoClient(local: InetSocketAddress, remote: InetSocketAddress) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, local) def receive = { case Udp.Bound(_) => context.become(ready(sender)) } def ready(socket: ActorRef): Receive = { case msg: String => socket ! Udp.Send(ByteString(msg, "UTF-8"), remote) case Udp.Received(data, _) => println(s"Client received ${data.decodeString("UTF-8")}") case Udp.Unbind => socket ! Udp.Unbind case Udp.Unbound => context.stop(self) } }
Forget object-oriented concepts! (at actor level)
Identify potentially concurrent operations (processing steps or separate)
Often convenient to supplement with special-purpose actors
Actor task should be significant processing
Low creation / termination overhead for actors
Existing Java applications may not be designed for concurrency
Look at application data flows in terms of actors
Some potential issues:
Clean approach combines Scala and Java:
Example of WSDL analysis application
Akka supports automatic cluster membership and notification
Configuration allows you control over distribution:
Doesn't avoid all distributed system issues:
But actor model makes many issues easier to handle
Scala language site - http://www.scala.org
Typesafe site - http://www.typesafe.com
Akka site - http://www.akka.io
Effective Actors - http://www.infoq.com/presentations/akka-scala-actor-patterns
Scaling out with Akka Actors - http://www.infoq.com/presentations/akka-scala-actors-distributed-system
Scalable Scala site - http://www.scalablescala.com
Sosnoski Software Associates Ltd.
Dennis Sosnoski - dms@sosnoski.com