## About me
Java and Scala developer, writer, consultant and training instructor
Open source developer:
* Apache CXF web services for Java
* JiBX XML data binding for Java
* Apache Axis2 web services for Java, other older projects
Author (mostly IBM developerWorks):
* [JVM Concurrency series](http://www.ibm.com/developerworks/views/java/libraryview.jsp?search_by=jvm+concurrency:)
Most recent experience in enterprise systems, especially data integration and web services
Consulting and training services to clients worldwide
## Outline
Akka basics
Akka I/O module
* UDP client/server
* TCP client/server
Building SMTP
* SMTP protocol basics
* Client and server configurations
* Client example using FSM
* Client/server code work-in-progress
Akka reactive streams
Spray I/O background
## Akka concurrency
Akka a Scala library, for both Java and Scala applications
Designed to support writing correct concurrent, fault-tolerant and scalable applications
* Actors
* Fault tolerance
* Location transparency
Supports scaling both up and out
* Scaling up to use added resources effectively within a system
* Scaling out to effectively use added systems in a cloud/cluster
Applications needing high throughput and low latency are ideal candidates for Akka
## Actors
Actors are light-weight entities with state
* May have multiple (many) instances of a particular actor type
* Each actor a processing step
All interactions in terms of messages:
* Do something in response to a received message
* Initiate something external with a sent message (to another actor)
Design actors by looking at the flow of data through your application
Leads to event-driven "reactive" applications
## Akka actors
Abstracts out the details of threading and synchronization
* Thread needed to run an actor when an in message is available
* Messages are immutable, so no synchronization required
Provides message guarantees:
* At-most-once delivery (as opposed to at-least-once, exactly-once)
* Message ordering enforced between each actor pair (in absence of priority)
High performance message queues (mailboxes)
* No blocking on the "hot path"
* Many different versions of queues
## Akka I/O
Akka tries to avoid blocking wherever possible
* Blocking ties up a thread waiting for something
* Thread switches are costly:
* 1) Direct overhead of context switch
* 2) Added latency, both from switch and core state changes
I/O a common cause of blocking
* Especially network I/O for connected applications
`akka.io` incorporates asynchronous networking into actor model:
* UDP
* TCP
Originally part of Spray project
Akka UDP server
UDP socket server example:
- Bind to listen to a local socket
- Echo any received datagrams back to sender
class UdpEchoService(address: InetSocketAddress) extends Actor {
import context.system
IO(Udp) ! Udp.Bind(self, address)
def receive = {
case Udp.Bound(_) =>
context.become(ready(sender))
}
def ready(socket: ActorRef): Receive = {
case Udp.Received(data, remote) =>
socket ! Udp.Send(data, remote)
case Udp.Unbind => socket ! Udp.Unbind
case Udp.Unbound => context.stop(self)
}
}
Akka UDP client
UDP socket client example:
- "Bind" to a local socket
- Send string messages as UDP to remote address
- Print received UDP messages to console
class UdpEchoClient(local: InetSocketAddress, remote: InetSocketAddress) extends Actor {
import context.system
IO(Udp) ! Udp.Bind(self, local)
def receive = {
case Udp.Bound(_) =>
context.become(ready(sender))
}
def ready(socket: ActorRef): Receive = {
case msg: String =>
socket ! Udp.Send(ByteString(msg, "UTF-8"), remote)
case Udp.Received(data, _) =>
println(s"Client received ${data.decodeString("UTF-8")}")
case Udp.Unbind => socket ! Udp.Unbind
case Udp.Unbound => context.stop(self)
}
}
## Akka I/O TCP handling
Connection-based `Tcp` manager notifies you when connection established:
* `Bind` message to open server socket and attach server listener
* Manager will create connection actor when client connects
* `Tcp.Connected` message from connection to server listener when client connects
Server listener normally passes connection to a connection handler actor:
* `Tcp.Register` message to connection to attach connection handler
* `Tcp.Received` message to handler for received data
* `Tcp.ConnectionClosed` message to handler when client disconnects
* `Terminated` message (DeathWatch) if connection dies
* Send data to client with `Tcp.Write` message to connection
TCP echo example code
## SMTP basics
**S**imple **M**ail **T**ransport **P**rotocol
RFC 821 by Jon Postel in 1982
Replaced by RFC 2821 from Network Working Group in 2001:
* Simplified some aspects to match actual "modern" usage
* Added more options and extensions
* Base protocol remains the same
Client sends commands (and message data) to server
Server sends responses to client
Both commands and responses are line-structured (terminated by CR-LF sequence)
Basic demonstration using telnet
## Handling mode changes
Three reasonable approaches to handling mode changes in actors:
1. `become` makes the actor take on a new persona
* Pass mode information in arguments to new receive method
* Can use stack to allow popping back to prior persona
* Simplest approach, but potentially requires more error handling
1. Create another actor and pass control
* Pass mode information in actor constructor parameters
* Supervision allows "Let it fail" error handling
1. Use Finite State Machine (FSM) `Actor` mixin
## FSM basics
A FSM is a set of relations State(S) x Event(E) -> Actions (A), State(S')
If you're in state S and event E occurs, do A and change to state S'
Basic usage:
* Define class hierarchy for state (case classes or regular)
* Define class hierarchy for associated data (case classes or regular)
* Use `with FSM[state-base, data-base]` on class definition
## FSM implementation
Lets you use:
* `startWith(state, data)` to define initial state
* `when (state) { ... }` for the action method to use in each state
* `whenUnhandled ...` for cases not handled by state
* `onTransition { ... }` for actions on state transitions
* `initialize` method to start it going
where the 'when' methods match on tuples `Event(message, data)`, and state transition methods:
* `stay` to stay in the same state
* `stay using data` to stay in same state with different data
* `goto state [using data]` to change state
## Timeout handling
You can set timeouts on states in several ways:
* `when (state, stateTimeout = 1 second)` as inactive timeout on state
* `stay forMax 5.millis...` to stay with timeout
* `goto state for Max 5.millis...` as timeout on state
* `setTimer("name", MsgClass, 1 second, false)` for one-shot timer
* `setTimer("name", MsgClass, 1 second, true)` for repeated timer
* `cancelTimer("name")` to cancel, `isTimerActive("name")` to check
SMTP FSM demo and code review
## Improving on FSM code
FSM great for complex graphs, but overkill for simple SMTP state changes
Limitations in code:
* Need to handle multiple to/cc/bcc addresses
* Need to allow request or response to be split across data blobs
Build a simpler solution using multiple actors and `becomes`?
* Actor to merge byte blobs and transform into lines (client and server)
* Initialization actor to setup connection and pass to actual sender actor
* Enumeration for sender/receiver actor states
* Single sender for each destination server:
* Tells manager when it can accept another email
* Timeouts to close connection when no longer needed
* Failure recovery
Demonstration and discussion
## Securing connections
Earlier akka.io included support for TLS connections
* Construct pipelines to support layered protocols
* Layers each handle appropriate messages directly
* Pass application data on to next higher level
Pipeline approach dropped in Akka 2.3
New alternative is supposed to be Reactive Streams
Unfortunately still only available as 0.4 preview release
* No support for TLS
## Reactive Streams
Developing a standard for asynchronous stream processing on the JVM
Asynchronous stream processing is complex:
* Can't require infinite buffering (especially for intermediaries)
* Asynchronous means can't just wait for ack/nak responses
* Need some form of asynchronous back pressure to prevent overruns
* TCP/IP example
The Reactive Streams initiative is intended to find a minimal set of interfaces and methods for this purpose
Intended to be consistent across JVM languages
Includes some top people from across the community
Basic demonstration
[http://www.reactive-streams.org/](http://www.reactive-streams.org/)
## Spray I/O
Spray project now merged into Akka team
Spray extension components include:
* `spray-can` high-performance direct HTTP web server
* `spray-servlet` adapting standard Java servlet API to actors
* `spray-json` serializing/deserializing JSON
* `spray-client` HTTP client actor model
* `spray-routing` routing DSL
Spray HTTP server example
class EchoHttpService(host: String, port: Int) extends Actor {
import context.system
IO(Http) ! Http.Bind(self, interface = host, port = port)
def receive: Receive = {
case Http.Connected(remote, _) =>
sender ! Http.Register(context.actorOf(EchoHttpConnectionHandler.props(remote, sender)))
}
}
object EchoHttpConnectionHandler {
def props(remote: InetSocketAddress, connection: ActorRef): Props =
Props(new EchoHttpConnectionHandler(remote, connection))
}
class EchoHttpConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor {
context.watch(connection)
def receive: Receive = {
case HttpRequest(HttpMethods.GET, uri, _, _, _) =>
sender ! HttpResponse(entity = uri.path.toString)
case _: Tcp.ConnectionClosed =>
context.stop(self)
case Terminated(`connection`) =>
context.stop(self)
}
}
## Spray I/O limitations
Spray higher level (not always a good thing)
* HTTP message transfer as a single blob
* Messages can be very large
* Large memory consumption while it's being built
* Latency issue in waiting until entire input available
* Sending can embed file blobs directly
* No good solution for receive
spray-can good solution for:
* JSON REST API for existing application
* Stand-alone HTTP application interface
spray-client good solution for:
* Small request/response messages
* Larger messages than can't be processed piecemeal
## Conclusion
`akka.io` is going to be great, but some practical problems for now:
* No support for HTTP or higher-layer protocols
* No support for secure connections
`spray.io` somewhat usable as a stopgap for HTTP, but also practical problems:
* No streaming of requests or responses
* Adds latency and memory overhead
Reactive Streams *should* be great, but too early to tell
## References
Akka site - http://www.akka.io
Akka io documentation - http://doc.akka.io/docs/akka/snapshot/scala/io.html
Reactive streams site - http://www.reactive-streams.org/
Spray io site - http://spray.io/
Scalable Scala site - http://www.scalablescala.com
Sosnoski Software Associates Ltd.
* New Zealand and worldwide - http://www.sosnoski.co.nz
* United States - http://www.sosnoski.com
Dennis Sosnoski - dms@sosnoski.com