Sunday, January 12, 2014

Reactive programming with Servlets 3 and Akka

Servlets 3 add asynchronous support. This gives an opportunity to develop in a reactive way. See The Reactive Manifesto.
To put it simple one can process request in background and send response when it is ready without holding a thread from servlets thread pool for this. Simple example below. Full source code of examples in this article is available.
package test;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(urlPatterns = "/test/async", asyncSupported = true)
public class TestAsyncServlet extends HttpServlet {

    private Executor executor;

    @Override
    public void init() throws ServletException {
        executor = Executors.newFixedThreadPool(30);
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        final AsyncContext asyncContext = req.startAsync();
        executor.execute(new Runnable() {

            @Override
            public void run() {
                HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();
                resp.setContentType("text/plain");
                try (PrintWriter writer = resp.getWriter()) {
                    writer.print("Ok");
                } catch (IOException ex) {
                    Logger.getLogger(TestAsyncServlet.class.getName()).log(Level.SEVERE, null, ex);
                }
                asyncContext.complete();
            }
        });
    }
}
Fixed thread pool is used to process requests in background and response is written directly from the thread of this thread pool when it is ready.
This model gives you an ability to integrate with such frameworks as Akka. See example below.
package test;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import scala.concurrent.duration.Duration;

@WebServlet(urlPatterns = "/test/akka", asyncSupported = true)
public class TestAkkaServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        final AsyncContext asyncContext = req.startAsync();
        final ActorSystem system = (ActorSystem) req.getServletContext()
                .getAttribute("ActorSystem");
        system.actorOf(Props.create(AskActor.class, asyncContext));
    }

    static class TestActor extends UntypedActor {

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg == "Test!!!") {
                getSender().tell("Ok", getSelf());
            } else {
                unhandled(msg);
            }
        }
    }

    static class AskActor extends UntypedActor {

        final private AsyncContext asyncContext;

        public AskActor(AsyncContext asyncContext) {
            this.asyncContext = asyncContext;
            ActorRef testActor = getContext()
                    .actorOf(Props.create(TestActor.class), "TestActor");
            getContext().watch(testActor);
            getContext().setReceiveTimeout(Duration.create("5 seconds"));
            testActor.tell("Test!!!", getSelf());
        }

        @Override
        public void onReceive(Object msg) throws IOException {
            HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();
            if (msg instanceof ReceiveTimeout) {
                getContext().stop(getSelf());
                resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                        "Timeout");
                asyncContext.complete();
            } else if (msg instanceof Terminated) {
                getContext().stop(getSelf());
                resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                        "Unexpectedly Stopped");
                asyncContext.complete();
            } else if (msg instanceof String) {
                getContext().stop(getSelf());
                resp.setContentType("text/plain");
                try (PrintWriter writer = resp.getWriter()) {
                    writer.print("Ok");
                }
                asyncContext.complete();
            } else {
                unhandled(msg);
            }
        }

    }

}
The workflow is the following.
  1. doGet() uses ActorSystem to create an actor of type AskActor which will be the root of supervision subtree dedicated to process this request. See What Supervision Means.
  2. AskActor creates an instance of TestActor and sends a message to it. Meanwhile it is watching the TestActor instance and in case it terminates for some reason AskActor will receive Terminated message. We also set timeout that means if there is no response from TestActor in specified amount of time AskActor will get ReceiveTimeout message.
  3. TestActor receives message from AskActor, processes it and sends a response.
  4. AskActor receives either response from TestActor or Terminated/ReceiveTimeout message in case of failure. It writes response using AsyncContext and stops itself. Stopping means termination of the whole supervision subtree. For instance, if AskActor gets ReceiveTimeout it stops itself and TestActor. In this way one cleans up all resources used to process this request.
One can also configure Akka in a declarative way to automatically restart TestActor, run it on another node, run it by separate dispatcher (thread pool), etc. See Akka configuration for more details.

Performance penalty. There is no. Well, almost. A simple benchmark
build/default/weighttp -n 1000000 -k -c 100 http://localhost:8080/TestServlet3WithAkka/test/async
weighttp - a lightweight and simple webserver benchmarking tool

starting benchmark...
spawning thread #1: 100 concurrent requests, 1000000 total requests
progress:  10% done
...
progress: 100% done

finished in 67 sec, 289 millisec and 340 microsec, 14861 req/s, 4093 kbyte/s
requests: 1000000 total, 1000000 started, 1000000 done, 1000000 succeeded, 0 failed, 0 errored
status codes: 1000000 2xx, 0 3xx, 0 4xx, 0 5xx
traffic: 282074708 bytes total, 280074708 bytes http, 2000000 bytes data


build/default/weighttp -n 1000000 -k -c 100 http://localhost:8080/TestServlet3WithAkka/test/akka
weighttp - a lightweight and simple webserver benchmarking tool

starting benchmark...
spawning thread #1: 100 concurrent requests, 1000000 total requests
progress:  10% done
...
progress: 100% done

finished in 81 sec, 380 millisec and 532 microsec, 12287 req/s, 3384 kbyte/s
requests: 1000000 total, 1000000 started, 1000000 done, 1000000 succeeded, 0 failed, 0 errored
status codes: 1000000 2xx, 0 3xx, 0 4xx, 0 5xx
traffic: 282074765 bytes total, 280074765 bytes http, 2000000 bytes data
But as you can see asynchronous support is far from ideal, see benchmarks at Performance analysis of our own full blown HTTP server with Netty 4.

No comments:

Post a Comment