Building on Akka I/O

Using Akka Asynchronous Actor Networking

By Dennis M. Sosnoski, ScalableScala.com / Sosnoski Software Associates Ltd

## About me Java and Scala developer, writer, consultant and training instructor Open source developer: * Apache CXF web services for Java * JiBX XML data binding for Java * Apache Axis2 web services for Java, other older projects Author (mostly IBM developerWorks): * [JVM Concurrency series](http://www.ibm.com/developerworks/views/java/libraryview.jsp?search_by=jvm+concurrency:) Most recent experience in enterprise systems, especially data integration and web services Consulting and training services to clients worldwide
## Outline Akka basics Akka I/O module * UDP client/server * TCP client/server Building SMTP * SMTP protocol basics * Client and server configurations * Client example using FSM * Client/server code work-in-progress Akka reactive streams Spray I/O background
## Akka concurrency Akka a Scala library, for both Java and Scala applications Designed to support writing correct concurrent, fault-tolerant and scalable applications * Actors * Fault tolerance * Location transparency Supports scaling both up and out * Scaling up to use added resources effectively within a system * Scaling out to effectively use added systems in a cloud/cluster Applications needing high throughput and low latency are ideal candidates for Akka
## Actors Actors are light-weight entities with state * May have multiple (many) instances of a particular actor type * Each actor a processing step All interactions in terms of messages: * Do something in response to a received message * Initiate something external with a sent message (to another actor) Design actors by looking at the flow of data through your application Leads to event-driven "reactive" applications
## Akka actors Abstracts out the details of threading and synchronization * Thread needed to run an actor when an in message is available * Messages are immutable, so no synchronization required Provides message guarantees: * At-most-once delivery (as opposed to at-least-once, exactly-once) * Message ordering enforced between each actor pair (in absence of priority) High performance message queues (mailboxes) * No blocking on the "hot path" * Many different versions of queues
## Akka I/O Akka tries to avoid blocking wherever possible * Blocking ties up a thread waiting for something * Thread switches are costly: * 1) Direct overhead of context switch * 2) Added latency, both from switch and core state changes I/O a common cause of blocking * Especially network I/O for connected applications `akka.io` incorporates asynchronous networking into actor model: * UDP * TCP Originally part of Spray project

Akka UDP server

UDP socket server example:

  • Bind to listen to a local socket
  • Echo any received datagrams back to sender
class UdpEchoService(address: InetSocketAddress) extends Actor {
  import context.system
  IO(Udp) ! Udp.Bind(self, address)

  def receive = {
    case Udp.Bound(_) =>
      context.become(ready(sender))
  }

  def ready(socket: ActorRef): Receive = {
    case Udp.Received(data, remote) =>
      socket ! Udp.Send(data, remote)
    case Udp.Unbind => socket ! Udp.Unbind
    case Udp.Unbound => context.stop(self)
  }
}

Akka UDP client

UDP socket client example:

  • "Bind" to a local socket
  • Send string messages as UDP to remote address
  • Print received UDP messages to console
class UdpEchoClient(local: InetSocketAddress, remote: InetSocketAddress) extends Actor {
  import context.system
  IO(Udp) ! Udp.Bind(self, local)

  def receive = {
    case Udp.Bound(_) =>
      context.become(ready(sender))
  }

  def ready(socket: ActorRef): Receive = {
    case msg: String =>
      socket ! Udp.Send(ByteString(msg, "UTF-8"), remote)
    case Udp.Received(data, _) =>
      println(s"Client received ${data.decodeString("UTF-8")}")
    case Udp.Unbind => socket ! Udp.Unbind
    case Udp.Unbound => context.stop(self)
  }
}
## Akka I/O TCP handling Connection-based `Tcp` manager notifies you when connection established: * `Bind` message to open server socket and attach server listener * Manager will create connection actor when client connects * `Tcp.Connected` message from connection to server listener when client connects Server listener normally passes connection to a connection handler actor: * `Tcp.Register` message to connection to attach connection handler * `Tcp.Received` message to handler for received data * `Tcp.ConnectionClosed` message to handler when client disconnects * `Terminated` message (DeathWatch) if connection dies * Send data to client with `Tcp.Write` message to connection TCP echo example code
## SMTP basics **S**imple **M**ail **T**ransport **P**rotocol RFC 821 by Jon Postel in 1982 Replaced by RFC 2821 from Network Working Group in 2001: * Simplified some aspects to match actual "modern" usage * Added more options and extensions * Base protocol remains the same Client sends commands (and message data) to server Server sends responses to client Both commands and responses are line-structured (terminated by CR-LF sequence) Basic demonstration using telnet

SMTP state diagrams

Server states Command states
## Handling mode changes Three reasonable approaches to handling mode changes in actors: 1. `become` makes the actor take on a new persona * Pass mode information in arguments to new receive method * Can use stack to allow popping back to prior persona * Simplest approach, but potentially requires more error handling 1. Create another actor and pass control * Pass mode information in actor constructor parameters * Supervision allows "Let it fail" error handling 1. Use Finite State Machine (FSM) `Actor` mixin
## FSM basics A FSM is a set of relations State(S) x Event(E) -> Actions (A), State(S') If you're in state S and event E occurs, do A and change to state S' Basic usage: * Define class hierarchy for state (case classes or regular) * Define class hierarchy for associated data (case classes or regular) * Use `with FSM[state-base, data-base]` on class definition
## FSM implementation Lets you use: * `startWith(state, data)` to define initial state * `when (state) { ... }` for the action method to use in each state * `whenUnhandled ...` for cases not handled by state * `onTransition { ... }` for actions on state transitions * `initialize` method to start it going where the 'when' methods match on tuples `Event(message, data)`, and state transition methods: * `stay` to stay in the same state * `stay using data` to stay in same state with different data * `goto state [using data]` to change state
## Timeout handling You can set timeouts on states in several ways: * `when (state, stateTimeout = 1 second)` as inactive timeout on state * `stay forMax 5.millis...` to stay with timeout * `goto state for Max 5.millis...` as timeout on state * `setTimer("name", MsgClass, 1 second, false)` for one-shot timer * `setTimer("name", MsgClass, 1 second, true)` for repeated timer * `cancelTimer("name")` to cancel, `isTimerActive("name")` to check SMTP FSM demo and code review
## Improving on FSM code FSM great for complex graphs, but overkill for simple SMTP state changes Limitations in code: * Need to handle multiple to/cc/bcc addresses * Need to allow request or response to be split across data blobs Build a simpler solution using multiple actors and `becomes`? * Actor to merge byte blobs and transform into lines (client and server) * Initialization actor to setup connection and pass to actual sender actor * Enumeration for sender/receiver actor states * Single sender for each destination server: * Tells manager when it can accept another email * Timeouts to close connection when no longer needed * Failure recovery Demonstration and discussion
## Securing connections Earlier akka.io included support for TLS connections * Construct pipelines to support layered protocols * Layers each handle appropriate messages directly * Pass application data on to next higher level Pipeline approach dropped in Akka 2.3 New alternative is supposed to be Reactive Streams Unfortunately still only available as 0.4 preview release * No support for TLS
## Reactive Streams Developing a standard for asynchronous stream processing on the JVM Asynchronous stream processing is complex: * Can't require infinite buffering (especially for intermediaries) * Asynchronous means can't just wait for ack/nak responses * Need some form of asynchronous back pressure to prevent overruns * TCP/IP example The Reactive Streams initiative is intended to find a minimal set of interfaces and methods for this purpose Intended to be consistent across JVM languages Includes some top people from across the community Basic demonstration [http://www.reactive-streams.org/](http://www.reactive-streams.org/)
## Spray I/O Spray project now merged into Akka team Spray extension components include: * `spray-can` high-performance direct HTTP web server * `spray-servlet` adapting standard Java servlet API to actors * `spray-json` serializing/deserializing JSON * `spray-client` HTTP client actor model * `spray-routing` routing DSL

Spray HTTP server example

class EchoHttpService(host: String, port: Int) extends Actor {
  import context.system
  IO(Http) ! Http.Bind(self, interface = host, port = port)

  def receive: Receive = {
    case Http.Connected(remote, _) =>
      sender ! Http.Register(context.actorOf(EchoHttpConnectionHandler.props(remote, sender)))
  }
}

object EchoHttpConnectionHandler {
  def props(remote: InetSocketAddress, connection: ActorRef): Props =
    Props(new EchoHttpConnectionHandler(remote, connection))
}

class EchoHttpConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor {
  context.watch(connection)

  def receive: Receive = {
    case HttpRequest(HttpMethods.GET, uri, _, _, _) =>
      sender ! HttpResponse(entity = uri.path.toString)
    case _: Tcp.ConnectionClosed =>
      context.stop(self)
    case Terminated(`connection`) =>
      context.stop(self)
  }
}
## Spray I/O limitations Spray higher level (not always a good thing) * HTTP message transfer as a single blob * Messages can be very large * Large memory consumption while it's being built * Latency issue in waiting until entire input available * Sending can embed file blobs directly * No good solution for receive spray-can good solution for: * JSON REST API for existing application * Stand-alone HTTP application interface spray-client good solution for: * Small request/response messages * Larger messages than can't be processed piecemeal
## Conclusion `akka.io` is going to be great, but some practical problems for now: * No support for HTTP or higher-layer protocols * No support for secure connections `spray.io` somewhat usable as a stopgap for HTTP, but also practical problems: * No streaming of requests or responses * Adds latency and memory overhead Reactive Streams *should* be great, but too early to tell
## References Akka site - http://www.akka.io Akka io documentation - http://doc.akka.io/docs/akka/snapshot/scala/io.html Reactive streams site - http://www.reactive-streams.org/ Spray io site - http://spray.io/ Scalable Scala site - http://www.scalablescala.com Sosnoski Software Associates Ltd. * New Zealand and worldwide - http://www.sosnoski.co.nz * United States - http://www.sosnoski.com Dennis Sosnoski - dms@sosnoski.com