Play 2.0 framework and XA transactions

XA transactions are useful and out of the box, Play 2.0 today does not have support for them.  Here I show how to add that support:

First off, some examples when XA is useful:

– JPA uses two physical connections if you use entities from two different persistence.xml – those two connections might need to be committed in one transaction, so XA is your only option
– Committing a change in a database and at the same time committing a message to JMS.  E.g. you want to guarantee that an email is sent after you successfully commit an order to the database, asynchronously.  There are other ways, but JMS provides a transactional way to do this with little overhead in having to think about failure.
– Writing to a physically different database because of any of several political reasons (legacy system, different department responsible for different database server / different budgets).
– See http://docs.codehaus.org/display/BTM/FAQ#FAQ-WhywouldIneedatransactionmanager

So the way I see it, XA is something Play needs to "support".

Adding support is very easy.  I have created a play plugin which is based on Bitronix.  Resources are configured in the Bitronix JNDI tree (why on earth does Play use a config file rather than JNDI?! anyway…)  You start the transaction like this, "withXaTransaction":

    def someControllerMethod = Action {
                    withXaTransaction { ctx =>
                        TicketRepository.

addValidation(user.get, bookingRef, ctx)
                        ValidationRepository.
addValidation(bookingRef, user.get, ctx)
                    }
                    val tickets = TicketRepository.
getByEventUid(eventUid)
                    Ok(views.html.ticketsInEvent(
eventUid, getTickets(eventUid), user, eventValidationForm))
    }

The ctx object is an XAContext (my own class) which lets you look up resources like a datasource, or set rollback in case of a failure.  So the validation repo does this, using ScalaQuery (I used "withSession" rather than "withTransaction!"):

    def addValidation(bookingRef: String, validator: User, ctx: XAContext) = {
        val ds = ctx.lookupDS("jdbc/maxant/
scalabook_admin")
        Database.forDataSource(ds) withSession { implicit db: Session =>
            Validations.insert(Validation(
bookingRef, validator.email, new java.sql.Timestamp(now)))
        }
    }   

And the ticket repo does the following with JMS:

    def addValidation(user: User, bookingRef: String, ctx: XAContext) = {

        val xml =
            <ticketValidation>
                <bookingReference>{bookingRef}
</bookingReference>
                <validatorId>{user.email}</
validatorId>
            </ticketValidation>

        val qcf = ctx.lookupCF("jms/maxant/scalabook/ticketvalidations")
        val qc = qcf.createConnection("
ticketValidation","password")
        val qs = qc.createSession(false, Session.AUTO_ACKNOWLEDGE)
        val q = qs.createQueue("
ticketValidationQueue") //val q = ctx.lookup(QUEUE).asInstanceOf[Queue]
        val sender = qs.createProducer(q)
        val m = qs.createTextMessage(xml.
toString)
        sender.send(m)
        sender.close
        qs.close
        qc.close
    }   

I’ve tested it with writing to MySQL and sending a JMS message to JBoss (HornetQ) and it seems to work well (except getting hornetQ to play with Bitronix was a bitch – see here: https://community.jboss.org/thread/206180?tstart=0).

The scala code for the XA support is:

package ch.maxant.scalabook.play20.plugins.xasupport

import play.api.mvc.RequestHeader
import play.api.mvc.Results
import play.api.mvc.Request
import play.api.mvc.AnyContent
import play.api.mvc.Result
import play.api.mvc.Action
import play.api.mvc.Security
import play.api._
import play.api.mvc._
import play.api.data._
import play.api.data.Forms._
import ch.maxant.scalabook.
persistence.UserRepository
import bitronix.tm.
TransactionManagerServices
import java.util.Hashtable
import javax.naming.Context._
import javax.naming.InitialContext
import javax.sql.DataSource
import bitronix.tm.
BitronixTransaction
import java.io.File
import org.scalaquery.session.
Database
import org.scalaquery.SQueryException
import scala.collection.mutable.
ListBuffer
import java.sql.Connection
import java.sql.SQLException
import org.scalaquery.session.Session
import bitronix.tm.
BitronixTransactionManager
import javax.jms.ConnectionFactory

class XAContext {

    private val env = new Hashtable[String, String]()
    env.put(INITIAL_CONTEXT_
FACTORY, "bitronix.tm.jndi.BitronixInitialContextFactory")
    private val namingCtx = new InitialContext(env);

    var rollbackOnly = false
   
    def lookup(name: String) = {
        namingCtx.lookup(name)
    }
    def lookupDS(name: String) = {
        lookup(name).asInstanceOf[
DataSource]
    }
    def lookupCF(name: String) = {
        lookup(name).asInstanceOf[
ConnectionFactory]
    }
}

trait XASupport { self: Controller =>

    private lazy val tm = play.api.Play.current.plugin[XASupportPlugin] match {
      case Some(plugin) =>
plugin.tm
      case None => throw new Exception("There is no XASupport plugin registered. Make sure it is enabled. See play documentation. (Hint: add it to play.plugins)")
    }

    /**
     * Use this flow control to make resources used inside `f` commit with the XA protocol.
     * Conditions: get resources like drivers or connection factories out of the context passed to f.
     * Connections are opened and closed as normal, for example by the withSession flow control offered
     * by ScalaQuery / SLICK.
     */
    def withXaTransaction[T](f: XAContext => T): T = {
        tm.begin

        //get a ref to the transaction, in case when we want to commit we are no longer on the same thread and TLS has lost the TX.
        //we have no idea what happens inside f!  they might spawn new threads or send work to akka asyncly
        val t = tm.getCurrentTransaction
        Logger("XASupport").info("
Started XA transaction " + t.getGtrid())
        val ctx = new XAContext()
        var completed = false
        try{
            val result = f(ctx)
            completed = true
            if(!ctx.rollbackOnly){
                Logger("XASupport").info("
committing " + t.getGtrid() + "…")
                t.commit
                Logger("XASupport").info("
committed " + t.getGtrid())
            }
            result
        }finally{
            if(!completed || ctx.rollbackOnly){
                //in case of exception, or in case of set rollbackOnly = true
                Logger("XASupport").warn("
rolling back (completed=" + completed + "/ctx.rollbackOnly=" + ctx.rollbackOnly)
                t.rollback
            }
        }
    }
}

class XASupportPlugin(app: play.Application) extends Plugin {

    protected[plugins] var tm: BitronixTransactionManager = null
   
    override def onStart {
        //TODO how about getting config out of jar!
        val file = new File(".", "app/bitronix-default-config.
properties").getAbsolutePath
        Logger("XASupport").info("
Using Bitronix config at " + file)
        val prop = System.getProperty("bitronix.
tm.configuration", file) //default
        System.setProperty("bitronix.
tm.configuration", prop) //override with default, if not set

        //start the TM
        tm = TransactionManagerServices.
getTransactionManager
       
        Logger("XASupport").info("
Started TM with resource config " + TransactionManagerServices.getConfiguration.getResourceConfigurationFilename)
    }

    override def onStop {
        //on graceful shutdown, we want to shutdown the TM too
        Logger("XASupport").info("
Shutting down TM")
        tm.shutdown
        Logger("XASupport").info("TM shut down")
    }

}

Use the code as you like, I’m giving it away for free 🙂  Just don’t complain if it don’t work 😉

It would be nice to see this plugin extended and turned into something a little more production ready.  Even nicer would be for Play to support a transaction manager natively, including fetching resources out of JNDI.

Have fun!

(copyright Ant Kutschera 2012)