Thursday, December 3, 2015

Play Json date format customization

By default Play Json just truncates time zone when working with ZonedDateTime from Java 8.
So the following code:

case class MyClass(createdAt: ZonedDateTime)
implicit val myWrites = Json.writes[MyClass]
...
val d = MyClass(ZonedDateTime.parse("2015-10-01T12:13:14.00+02:00"))
val json = Json.toJson(d)

would produce following Json: {"createdAt": "2015-10-01T12:13:14"}. That’s because play.api.libs.json.DefaultWrites.DefaultZonedDateTimeWrites uses same formatter as DefaultLocalDateTimeWrites and simply disregards time zone. To display the time zone together with date time you’ll need to add following code before myWrites:

  implicit val timeWrites: Writes[ZonedDateTime] =
     Writes.temporalWrites[ZonedDateTime, DateTimeFormatter](DateTimeFormatter.ISO_DATE_TIME)

And then Json result will change to {"createdAt": "2015-10-01T12:13:14+02:00"}

Friday, November 13, 2015

javaOptions in sbt integration tests

My goal was to make separate config files for run, tests, and integration tests in Play application - so that default settings for database in run and integration test environments would be different, and in tests database settings would be unavailable to make sure that no unit test works with real DB. This may be achieved via config.resource system property. I’ve tried to apply solution from http://stackoverflow.com/questions/15399161/how-do-i-specify-a-config-file-with-sbt-0-12-2-for-sbt-test with such code in build.sbt:

lazy val root = (project in file(".")).enablePlugins(PlayScala).
configs(IntegrationTest).
settings(Defaults.itSettings : _*).
settings(javaOptions in Test ++= Seq("-Dconfig.resource=testing.conf")).
settings(javaOptions in IntegrationTest ++= Seq("-Dconfig.resource=integration.conf"))

But turned out that settings for integration tests have no effect, and default file application.conf is still being used. The reason is that IntegrationTest configuration is inherited from Runtime, not from Test, and by default doesn’t fork the JVM. And javaOptions are applied only to newly started JVMs. So there are few options available how to provide settings for integration tests:

  • Make integration tests fork the JVM with fork in IntegrationTest := true. Simple enough, but as system properties from original JVM aren’t inherited by forked one, you won’t be abie to override settings from configuration file using command line - and it might be essential when running integration tests on CI environment, where DB credentials differ from default.
  • Specify configuration file location as command-line option when starting sbt: sbt -Dconfig.resource=integration.conf it:test That leaves more space for mistakes when running tests, as developer needs to keep in mind location of config file (or remember that special script should be used for launching integration tests rather then raw sbt).
  • use Tests.Setup to provide custom initialization code before tests run. This way default name of configuration file is defined in build.sbt, but can be overriden from command line, as well as individual options.

I’ve chosen the last way, here how it looks in build.sbt:

lazy val root = (project in file(".")).enablePlugins(PlayScala).
configs(IntegrationTest).
settings(Defaults.itSettings : _*).
settings(javaOptions in Test ++= Seq("-Dconfig.resource=testing.conf")).
settings(testOptions in IntegrationTest += Tests.Setup(() =>
  if(System.getProperty("config.resource") == null)
    System.setProperty("config.resource", "integration.conf")
))

Saturday, April 18, 2015

Scala traits internals or what needs to be recompiled

I was wondering if classes inherited from traits need to be recompiled if trait code changes, so I’ve investigated a bit traits internals - how are they represented after compilation. I’d like to share some observations.

Suppose we have file SuperTrait1.scala:

trait SuperTrait1 {
  def doOp1(): Unit = {
    println("do op1 V1")
  }
}

SuperTrait2.scala:

trait SuperTrait2 {
  def doOp2(): Unit = {
    println("do op2 V1")
  }
}

and SubClass.scala:

class SubClass extends SuperTrait1 with SuperTrait2 {
  def doOps(): Unit = {
    doOp1()
    doOp2()
  }
}

After compilation we’ll get 5 class files: SubClass.class, SuperTrait1.class, SuperTrait1$class.class, SuperTrait2.class, SuperTrait2$class.class. Let’s take a look inside of SuperTrait1.class and SuperTrait1$class.class:

$ javap -p SuperTrait1
Compiled from "SuperTrait1.scala"
public interface SuperTrait1 {
  public abstract void doOp1();
}
$ javap -p SuperTrait1.class
Compiled from "SuperTrait.scala"
public abstract class SuperTrait1$class {
  public static void doOp1(SuperTrait1);
  public static void $init$(SuperTrait1);
}

So SuperTrait1 is an interface and SuperTrait1$class contains body of trait code, in form of static methods. Now to SubClass.class:

$ javap -p SubClass
Compiled from "SubClass.scala"
public class SubClass implements SuperTrait1,SuperTrait2 {
  public void doOp2();
  public void doOp1();
  public void doOps();
  public SubClass();
}

So SubClass implements (in strict meaning) interfaces of traits. But how it implements - by copying or by referencing? Let’s find it out:

$ javap -c SubClass
...
  public void doOp1();
Code:
   0: aload_0
   1: invokestatic  #27                 // Method SuperTrait1$class.doOp1:(LSuperTrait1;)V
   4: return

...

It’s clear from this decompilation that subclass doesn’t copy methods body, it just references static methods with actual impementation. So we may rest assured that changing trait methods body doesn’t require subclasses re-compilation. On the other hand, if methods signature is changed or new methods added to traits then there’s a reason for subclass re-compilation, as it doesn’t follow the contract of the interface any more.

Friday, March 20, 2015

Futures and blocking in Scala

Quite often I see around ignorance about core concept around Futures in Scala: transition from synchronous to asynchronous code and back, execution context configuration and so on. I’ve shot myself in the leg with same mistakes once, and then had to dive into the details and make the picture clear for myself. Here I’d like to share some lessons learned and make Futures a little less “magic”.

Let’s remind ourselves some basic primitives

Just to make sure we’re on the same page. Synchronous code always runs in a single thread and waits for step 1 to complete before going to step 2. So dependency between operations is expressed by order of expressions in the code. Asynchronous may run in multiple threads and use different forms of callbacks to build dependencies between operations. Future is an example of asynchronous code.

Whenever you’d like to run some code in background (therefore turning synchronous code into asynchronous):

val f = Future({doSomeStuff()})
doOtherStuff()

In this piece of code doSomeStuff() starts to run and then doOtherStuff() starts - without waiting for doSomeStuff() to complete. These 2 functions most probably will run in parallel.

When you have some Future you may specify some transformations on it. They will run asynchronously. Example:

val f2 = f.map({num => num*2})
yetAnotherStuff()

Again, this would return immidiately and proceed to yetAnotherSutff(), and eventually when result of f is available - calculate f2 as well

Whenever you have some Future and you absolutely need to get its result right now (even if you might need to stop the world for it):

import scala.concurrent.duration._
val v = Await.result(f, 5 seconds)
moreNewStuff()

This converts asynchronous code to synchronous - current thread blocks until result is available for up to 5 seconds. Afterwards it either goes on with some known value v, or throws timeout exception. Unlike previous examples moreNewStuff() isn’t started until f is complete (or times out).

In order for all this code to run you need an ExecutionContext - it provides thread pool which will be used for running background tasks. Simplest way to get it:

import scala.concurrent.ExecutionContext.Implicits.global

Now let’s take a look what might go wrong.

Blocking in standard pool

Moving slow I/O operations to background is often highly desired. It may be easy when underlying I/O layer runs asynchronously and only needs a thread to prepare data for sending/handle received data. Such API may be efficiently wrapped into “honest” Futures. A good example is Finagle/Netty stack. But what if you’re obliged to use API which is inherently blocking? For example, some wrapper for WebDriver, or Amazon Java SDK, or JDBC. Luckily Future.apply may help:

import scala.concurrent.ExecutionContext.Implicits.global
val res = Future({myLongIOOpertion()})
val res2 = res.map(r => myPostProcessing(r))

But at what cost does it come? At cost of exhausting precious resource of threads in default ExecutionContext. How many threads are there? The answer can be found in scala.concurrent.impl.ExecutionContextImpl in standard Scala library. It’s controlled by scala.concurrent.context.minThreads, scala.concurrent.context.numThreads and scala.concurrent.context.maxThreads. By default they’re all equal to Runtime.getRuntime.availableProcessors. So on modern desktop you’ll probably have 4, 8 or 16 threads. If code runs on some VM in the cloud - there’s high chance that you’ll get only one thread in that pool. Therefore while you have even one long I/O operation running - no other operations on Futures or parallel collections can run. Even simplest .map or .filter on results of asynchronous operations. What can we do about it if we still do need long tasks on background? In the end, there may be some long calculations which do need a thread for all their time.

Alternative thread pool

Luckily, you may specify custom ExecutionContext in which Future-related operations should run. Here is example how you can define it:

import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
val THREAD_POOL_SIZE = 5
implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(THREAD_POOL_SIZE))

Now all performance issues become a local issue, and long operation can’t kill performance of all application.

Tell current pool to grow

If your execution context is backed by ForkJoinPool (default one usually is), you may instruct it to grow to some extent by wrapping blocking code in scala.concurrent.blocking:

import scala.concurrent.blocking

Future {
  blocking {myLongIOOpertion()}
}

How about Finagle Futures?

At it’s core Finagle Futures are very similar to standard Scala Futures (in fact, Finagle inspired all this API). However, important difference is that Finagle doesn’t allow to specify custom thread pool. To good or bad, Finagle uses single global object com.twitter.concurrent.Scheduler to run its jobs, and it can be customized only globally. Default implementation sticks to Netty’s pool of worker threads which is pretty limited. Therefore for Finagle things get stricter: long operations (including Awaits) in their methods will almost certainly lead to severe problems. If you really need to so some kind of such thing as a reaction to Finagle’s Future completion - wrap it in Scala’s standard Future and define custom ExecutionContext. Therefore the job of Finagle’s Scheduler would be just to resolve Scala’s Future - it’s blazingly fast. And all the heavy lifting would run in custom ExecutionContext

Conclusion

Therefore rule of thumb: avoid running long operations in standard ExecutionContext, it’s intended for relatively simple operations that convert the result retrieved from elsewhere. Another lesson that I’ve learned from my past failures - beware of Await.result(Future.apply()) chains. They may rise as a result of attempt to make your API look asynchronously, while you really need to use it in synchronous context. Such chains give you no benefit, they just waste resources and increase risks.

Saturday, February 7, 2015

SBT integration tests: automatically launch application

I’d like to share my experience in automatic launch of tested web application in SBT (tested with version 0.13.6) before running integration tests and shutting it down after tests. That wasn’t very straight-forward and involved custom tasks creation. I’d be happy to hear about easier ways if you know some.

SBT documentation about testing describes how to enable integration tests and run custom code before them via testOptions in IntegrationTest += Tests.Setup(...), but there’s a class visibility issue: this custom code resides in project definition code (project of project), so it has no direct access to project main classes - it is responsible for building main classes, so it has to be fully compiled before them. This leaves 2 options:

  1. Move integration tests to separate project, where build definition depends on the project where main classes are defined.
  2. Use dynamic class resolution - class which is to be launched should be referenced by string containing its name, and launched via SBT API: sbt.Fork.java.fork(...)

I’ve choosen the 2nd option as it would keep the project structure small enough. However, this leaves an open question: how would custom pre-integration test code know the correct classpath? I’ve solved it via shared object (let’s name it “remote control”) which gets initialized in main code build, and later used to start/stop application in projects build. Here full source code of my_project/project/Build.scala:

import java.io.IOException
import java.net.URL
import sbt._
import Keys._
import scala.util.{Failure, Success, Try}

//rather generic project build definition - enabling integration tests
object MyBuildBuild extends Build {
  lazy val root = Project(id = "my-project-id",
    base = file(".")).
    configs(IntegrationTest).
    settings(Defaults.itSettings : _*).
      settings(testOptions in IntegrationTest += Tests.Setup({_ => AppRunnerRemoteControl.start()})).
      settings(testOptions in IntegrationTest += Tests.Cleanup({_ => AppRunnerRemoteControl.stop()})).
      settings(parallelExecution in IntegrationTest := false)
}

//the core part of solution - shared object
object AppRunnerRemoteControl {
  //receive class path from main build definition
  def setClassPath(cp: Seq[File]): Unit = {
    this.cp = cp
  }
  //in order to have remote control logs in same style as the build logs
  def setLog(log: Logger): Unit = {
    this.log = Option(log)
  }

  def start(): Unit = {
    log.foreach(_.info("starting application ..."))
    val options = ForkOptions(outputStrategy = Some(StdoutOutput))
    //build classpath string
    val cpStr = cp.map(_.getAbsolutePath).mkString(":")
    val arguments: Seq[String] = List("-classpath", cpStr, "-Dmy.custom.property=myCustomValue")
    //Here goes the name of the class which would be launched
    val mainClass: String = "my.pkg.AppRunner"
    //Launch it. Pay attention that class name comes last in the list of arguments
    proc = Option(Fork.java.fork(options, arguments :+ mainClass))

    //make sure application really started or failed before proceed to the tests
    waitForStart().recover({case e =>
      stop()
      throw e
    }).get
  }

  def stop(): Unit = {
    log.foreach(_.info(s"stopping application $proc ..."))
    //kill application
    proc.foreach(_.destroy())
    proc = None
  }

  private def waitForStart(): Try[_] = {
    val maxAttempts = 10
    val u = new URL("http://localhost:8080")
    val c = u.openConnection()
    val result = (1 to maxAttempts).toStream map {i =>
      log.foreach(_.info(s"connection attempt $i of $maxAttempts"))
      Try {c.connect()}} find {
      case Success(_) => true
      case Failure(e: IOException) => Thread.sleep(1000); false
      case Failure(_) => false
    }
    if(result.isEmpty)
      Failure(new RuntimeException(s"Failed to connect to application after $maxAttempts attempts"))
    else
      Success(None)
  }

  var log: Option[Logger] = None
  var cp: Seq[File] = Nil
  var proc: Option[Process] = None
}

In order to use these capabilities main build has to be amended as well. Here is excerpt from my_project/build.sbt:

lazy val integrate = taskKey[Unit]("Starts REST API server and runs integration tests")

lazy val preIntegrationTests = taskKey[Unit]("Starts REST API server and runs integration tests")

preIntegrationTests := {
  val cp: Seq[File] = (fullClasspath in IntegrationTest).value.files
  AppRunnerRemoteControl.setClassPath(cp)
  AppRunnerRemoteControl.setLog(streams.value.log)
}

integrate := {
  preIntegrationTests.value
  (test in IntegrationTest).value
}

Now you may run this command to start the application, run integration tests, and stop the application:

sbt integrate

Initially I was planning to have just one custom task - integrate. But it turned out that macroses used during defining tasks make sure that all dependencies of the tasks are invoked before running the tasks - not at the moment when they’re mentioned in task code. So the following code:

integrate := {
    val cp: Seq[File] = (fullClasspath in IntegrationTest).value.files
    AppRunnerRemoteControl.setClassPath(cp)
    AppRunnerRemoteControl.setLog(streams.value.log)
    (test in IntegrationTest).value
}

would first run integration tests code ((test in IntegrationTest).value), as integrate depends on that task. And only then run code of integrate itself which should run the application for testing.

Friday, January 2, 2015

Iteratees raison d'ĂȘtre

Iteratees were pretty hard concept to grasp for me. Thanks to nice article http://mandubian.com/2012/08/27/understanding-play2-iteratees-for-normal-humans/ I managed to understand what it is and how it works, but event then it wasn’t clear for me why one may need it - mentioned features seem to be achievable with simpler tools like
Scala lazy Streams (http://scala-lang.org/api/current/#scala.collection.immutable.Stream) and RxScala observables (http://reactivex.io/documentation/observable.html):

  1. Backpressure (produce data with such speed that consumer has time to process it) - lazy Streams do exactly this thing: element of Stream isn’t evaluated until someone attempts to retrieve it. RxScala observables at the moment seem to miss this feature.
  2. Ability to stop processing before input ends - for some cases lazy Stream has ready-made methods like find, collectFirst etc. which return result without iterating the full data set. Observables may be unsubscribed to stop processing. Iteratees as far as I see always require rather complex custom code.
  3. Composition - both lazy streams and observables allow composition of processing step in monad-like way.
  4. Asynchronous, non-blocking - observables are non-blocking as well. Lazy streams miss this point and it can be just partly emulated by wrapping into Future.

From these points in my mind arise such things as Scala lazy Streams (http://scala-lang.org/api/current/#scala.collection.immutable.Stream) and RxScala observables (http://reactivex.io/documentation/observable.html). They seem much simpler to understand then iteratees, that’s why there’s a natural question: what features do iteratees provide that make them worth learning (btw - I think this is the best introduction article: http://mandubian.com/2012/08/27/understanding-play2-iteratees-for-normal-humans/)?

I’ve implemented 2 pretty simple tasks (print all elements and calculate sum of all elements of a Seq) with each technology in order to feel the difference. Full code is available here: https://github.com/paul-lysak/misc_learning/blob/master/iteratee/play-iteratee/test/IterateeSpec.scala .

Having sample data let’s look at all implementations and then compare them:

val data = Seq[Int](10, 20, 30, 40, 50)

Iteratee implementation:

val itPrint = Iteratee.foreach[Int](a => println("element="+a))
val itSum = Iteratee.fold[Int, Int](0)(_ + _)

val en = Enumerator(data: _*)
val fp1 = en.run(itPrint)
Await.ready(fp, DurationInt(10).seconds)

val fs1 = en.run(itSum)
fs1.foreach(s => println("sum="+s))

Lazy Stream implementation:

val str = data.toStream
str.foreach(a => println("elStr="+a))

val s = str.fold(0)(_ + _)
println("sumStr="+s)

RxScala observable:

val o = Observable.from(data)
val o1 = Observable.from(data)

o.subscribe(a => println("rxItem="+a))

val so = o1.foldLeft(0)(_ + _)  
so.subscribe(a => println("rxSum="+a))

At its core trait play.api.libs.iteratee.Iteratee just defines reaction to 3 possible events (next item, empty input, end of input) in a pretty complex way. So constructing manually is pretty tedious and error-prone. However, luckily Iteratee companion object contains couple of utility methods that hide most of complexity and make Iteratee construction almost as simple as fold or map on regular collections - see examples in code above. But still - what makes Iteratees special? Here is what I can say:

  1. Unlike Streams and Observables, iteration logic is fully decoupled from data source. That means that you can define Iteretee before defining data source. I would call this killer feature of Iteratees - both stream and observable require that corresponding stream or observable already exist before doing foreach/map/fold/etc. .
  2. Ability to reduce threads consumption. Iteratee construction methods come in 2 flavours - blocking and non-blocking. For example, object Iteratee has method def fold[E, A](state: A)(f: (A, E) => A) and def foldM[E, A](state: A)(f: (A, E) => Future[A]). First one (fold) is blocking - despite the fact that Enumerator.run returns Future when called with such Iteratee and doesn’t block current thread, 1 thread from ExecutionContext will be 100% time busy with Iteratee until that Future completes - no matter what job Iteratee is doing. Second method (foldM) is non-blocking - as a reaction for new element it may run slow I/O operation and return Future that will be completed after I/O end. Thus the thread will be used only for doing actual job by CPU when sending I/O operation or processing its result. That’s clear advantage compared to lazy stream (which could be packed in future to partly emulate asynchronous behavior), but observables do the things this way too.
  3. Remainder handling - if there’s an error during some element processing, or Iteratee decides to stop before reaching input end, there are means to get failed element and remaining part of input. So we may retry operation or go on processing with another iteratee. That may be nice advantage in some special cases compared both to lazy streams and observables.

Conclusion: Iteratees seem to have richer feature set then similar tools, but it is harder to use. I would call 2 cases when I would definitely stick to Iteratees:

  1. When asynchronous handling (slow I/O for each element) and backpressure required at the same time
  2. When iteration logic should be strongly decoupled from data source