Thursday, December 2, 2010

Towards a Neo4j Connector Implementation

As discussed, the setup/architecture is rapidly changing as I go through the specification and doing things the Right Way (TM). As a result, the deployment instructions and code walkthough are almost obsolete. I suggest you keep an eye out in this blog for new posts or refer to the README that will go up with my next commit.

One of the coolest things about working in a managed environment (mostly in the form of an application server) is that many things are taken care for you automatically. It is completely natural to have an EJB acquire connections to different RDBMS's, do stuff with them and then expect things to commit or rollback based on heuristics/exceptions/whatever in a consistent manner, while you may not have the slightest idea that 2PC is involved somewhere. This transparency is essential for rapid development and, unfortunately, is mostly expected only for relational stores, mainly via JDBC. Since I have a thing for Neo4j and time in my hands, I thought that it might be cool to provide such facilities for it. The way to go is Java Connector Architecture (JCA), a way of integrating Enterprise Information Systems in an application server. JDBC is actually a domain specific implementation of JCA with a namespace of javax.sql instead of javax.resource. I would like to tell you what I have built, how it can be used and what I hope it will become.

What does a connector do, exactly?

If you want exactly, then read the spec. You have probably seen it however in your app server's administration interface, near the JDBC section. Connectors provide a platform independent way for application components to communicate to a (in general) non-relational storage manager. In addition, XA compliance and connection pooling is offered, via the application server. The idea is that you get the .rar file provided by your data storage manager's vendor, much like a JDBC driver, you install it and hopefully you have access to the store from your application component, be it an EJB, a Servlet or whatever your poison is. You simply get from the JNDI a ConnectionFactory, ask it for a Connection and you perform the store's specific operations (yeah, yeah, there is also CCI but that is boring). So, let's see how we could use that in an EJB to create a Neo4j node.

A usage example from an EJB

The reason for showing how to use it before explaining anything about it is simple - I do not care about the implementation that much yet. What I want is to show what is possible, what I consider the way to go and get feedback on all that. So, consider the following snippet.

public class GreeterBean implements Greeter {
    @Resource(name = "neo")
    private ConnectionFactory cnf;

@Resource(name = "mysql")
    private DataSource sql;

    public String getMessage() {
        Node node = null;
        Connection conn1 = null;
        Connection conn2 = null;
        Connection conn3 = null;
        java.sql.Connection sqlConn = null;
        StringBuffer message = new StringBuffer();
        long currentId = -1;
        try {
            conn1 = cnf.getConnection();
            conn2 = cnf.getConnection();
            conn3 = cnf.getConnection();
            node = conn2.createNode();
            currentId = node.getId();
            node.setProperty("foo""BAR "+currentId);
            for (Node n : conn3.getAllNodes()) {
                if (n.hasProperty("foo")) {
                else {
                    message.append("node ").append(n.getId())
.append(" did not have property foo.<br/>");
            sqlConn = sql.getConnection();
            PreparedStatement st = sqlConn
("Insert into Sample values (?,?,?)");
        catch (ResourceException e) {
            throw new Error(e);
        catch (XAException e) {
            throw new Error(e);
        catch (SQLException e) {
            throw new Error(e);
        finally {
            try {
                if (conn1 != null) {
                if (conn2 != null) {
                if (conn3 != null) {
                if(sqlConn != null) {
            catch (Exception e) {
                throw new Error(e);
        return message.toString();

We ask the application server to inject a Resource for us, with a name that we have already configured (in a container specific way). This is a ConnectionFactory that returns, well, Connections that allow us to do what a Neo EmbeddedGraphDatabase would allow us to do, minus the indexing, remote shell, everything, actually, except the primitives manipulation. This is because every ManagedConnection allows for management of one XAResource and, given that the indexing is a different XAResource than the Neo store, things have to be made to look nice, a thing I have not had time to do yet. But that is not important right now, since this is not production ready.
I use three connections over the same XAResource, a guarantee that comes from the fact that they are created in the same thread, as expected. Calling close() on them simply invalidates them. The "program" creates a new Node, sets a property and then gets the same property from all currently stored Nodes from the database. Simple but enough to prove my point.
The code also manipulates a JDBC connection (I used a mySQL instance). Note that nowhere do we ask a GraphDatabase to begin a transaction nor do we commit. As when you use a JDBC connection, entering the EJB method begins the tx (depending on the annotation, of course) and exiting normally commits, any other way rolls back. Both resources participate in a 2PC protocol, orchestrated by the application server's transaction manager. This means that crash recovery should also work, by the way, but I have not tested it yet.

So there. To make this run some legwork is needed and that is what the rest of the post is about. But while this code is small, I think it demonstrates a powerful concept: Mixing SQL and NoSQL sources in the same environment with a minimum of management overhead. Cats and dogs, living together, as others have put it.

A flash code walkthough

So, you decided you want to play. Fair enough. The code currently comes in three components: The transaction manager proxy, the actual connector and the modified neo4j kernel. The first needs some explanation.
The TransactionManager is used in the kernel for more tasks than its interface suggests. The Transaction objects it returns are used by various classes to keep a map of resources specific to each running transaction and it is also used for resource enlistment. The transaction status is also useful to know and there is also the need to set the transaction in rollbackOnly mode. In a managed environment however the availability of the provided TransactionManager (and from there the Transaction object) is not certain or standardized. Also, it is not the actual transaction manager that is needed but a specific subset of its functionality. So, instead of going through hoops to accommodate every application server out there or hacking up the kernel to remove what does not convenience us to be there, I went another way. The JTA 1.1 spec defines a TransactionSynchronizationRegistry interface that must be made available by conforming app servers. From there almost all the above functionality can be implemented in a portable way. Check the API for details. Actually, the only thing it does not do is the XAResource enlistment. However, this is OK because that is not where it is supposed to happen anyway. Instead the ManagedConnection has a getXAResource() method that returns the XAResource it uses. When the ManagedConnection is first associated with a transaction, the XAResource is retrieved and enlisted by the transaction manager, transparently.
The connector is a bare minimum implementation of the JCA spec, v1.6, hopefully not entirely wrong. The NeoResourceAdapter does the database instantiation and shutdown, the NeoManagedConnectionFactory keeps the ManagedConnections mapped to the current transactional context, the NeoManagedConnections return Connections that are simple shells over them and simply forward calls. The architecture is actually very simple if you read the spec, meaningless to discuss otherwise.
The jta kernel has a ManagedGraphDbImpl that enforces the use of the "container" transaction manager, returns the XAResource mapped to the current top level tx and disallows indexing operations. Nothing fancy yet.

Set up the environment

What follows has been successfully performed on Glassfish and unsuccessfully on Geronimo and JonAS. The last two require custom descriptors alongside the ra.xml I provide. If you want to deploy to them, you should probably consult the documentation - JonAS, for example provides a utility that adds its descriptor automatically in the rar.

The container provided txm is here. The connector code is here and the jta kernel, as always, here. Download and maven install, no editing should be needed. To see this in action, you must create an ear that holds an enterprise application, ready for deployment on an application server. The jta kernel jar must replace your main kernel (no functionality has been changed, it should work as a drop in replacement) and the txm service should be added in the lib/ of the ear, or anywhere where it can be picked up. The .rar that is the connector however must be added as an ear module. In your application.xml, next to the EJB jar and the war you should add


and the archive must be on the top level, next to the other ear modules. Bundle up the ear and deploy. Next, go to the administrative console and add a connector connection pool bound to the Neo connector (it should be available) and create a connector resource over that pool bound to the JNDI location you have asked your EJB to find it (neo in the example above). You should be set to go.

From here

So, what do you think? Is the approach viable? What I want to create is something that is at least as easy to use as JDBC and with the same functionality/guarantees. Of course, the graph database is embedded now, no indexing, dubious pooling and a whole lot of other issues but the first step has been taken. I hope that soon it will be possible to seamlessly integrate Neo4j in enterprise applications in all the available layers, from the EJB up to Spring. If only we had Object to Graph mapping standards...

Monday, November 22, 2010

Using JOTM as a TransactionManager in Neo4j

If you follow the neo4j mailing list you may have noticed that lately I have been working on providing support in Neo for plugging in JTA compliant Transaction Manager implementations. In this post I will try to explain how this was done, how you can enable JOTM support that is provided mainly as an example and what further improvements are possible after completing this project.


During my series of posts for the internals of Neo, I had touched upon the whole XA business and how Neo comes with (complete, as it turned out) support for fitting into an XA environment. There I had described the various classes that expose the database store as an XAResource and how the TxManager class enlists it as needed in top level transactions, along with other XAResources, typically ones from the Index store. I will not explain more here, but it is a good idea to have in mind the general framework. The idea was that, since the TxManager implements the javax.transaction.TransactionManager interface, what keeps us from substituting it with a different, third party implementation? More importantly, how can it be done and how robust will the result be? If you want to see how this turned out and like to hear good news, keep reading. Note that the next paragraph is story telling, so if you want the main course, skip it.

Story time

One of the most used JTA compatible implementations of transaction managers out there is the objectweb's JOTM package. I prefered it over Atomikos' solution mainly because, being a newbie in the field, I wanted something that provided source code so it would help me with learning and debugging. A fine choice, as it turned out.
I had some teething problems, having to do with initializing the JOTM instance, enabling recovery, searching through versions for resolved bugs, but I came out victorious. After figuring all this out, I tore out the TxManager class from the TxModule in the Neo kernel and substituted it with a Jotm instance. No problems there, it worked first try. Enlisting the resources for recovery was a bit more trouble but I managed it, in a way that will be explained below. After that, I had to make sure that everything worked properly for recovery in all disaster scenarios. After getting bored of using the debugger to breakpoint/kill the JVM on commit()/rollback()/prepare(), after a suggestion by Tobias Ivarsson on IRC, I detoured a bit and hacked up a solution with JDI. After some (little) automation I had tested many scenarios with satisfactory precision and, as it turned out, all the hard work was already done by the Neo team - I did not have to touch not even one line of code to make things work out.
Now, of course, the kernel had a dependency on the JOTM package, which is unacceptable. I looked at the way Lucene is enlisted as a service automatically and using this framework I added a hook in the kernel to discover and register services that provide TransactionManager instances. Now one can create a .jar file, add it and its dependencies in the classpath and the kernel will pick it up and make it available for use. Neat! To verify the usability and stability of this, I branched the recovery-robustness suite that the dev team uses to check that crashes are not catastrophic when JOTM is used. For the time being, all works well.

Getting technical: The hooks

If you want to add support for a custom tx manager implementation, the first thing to look are the interface TransactionManagerImpl and the class TransactionManagerService. The first is there out of necessity - when the TxModule is started, the recovery process must be initiated. Each tx manager does it in its own way, so the init() method is there to abstract this concept. The XaDataSourceManager is passed to provide all XAResources that are registered with the kernel and which may have pending transactions. The stop() method is of course used to notify the tx manager of shutdown so that resources can be released and cleanup performed. The native implementation of Neo (TxManager and ReadOnlyTxManager) have been altered to implement this interface. In addition, it extends javax.transaction.TransactionManager so that it can be used "vanilla" after initialization.
The TransactionManagerService is a convenience class. It extends Service so that it can be discovered and loaded and implements the TransactionManagerImpl interface so that it can be used without a cast in the TxModule. The only thing defined is a constructor with a String argument, which is the name under which the Service will be registered - your implementation will be known by this name. To add a TransactionManager implementation as a service, you must extend this class, as it will be demonstrated by the JOTMServiceImpl.
The Config object has seen the addition of the constant TXMANAGER_IMPLEMENTATION, with a value of "tx_manager_impl". If you want your custom implementation of tx-manager-as-a-service to be used, you must start the EmbeddedGraphDatabase with a custom configuration that will contain at least this key with a value of the name of your service.

The sample implementation

Here you can see the project that holds the JOTM implementation of the above. It consists of only one class that extends TransactionManagerService, has a name of "jotm", on init() constructs a local Jotm instance and registers with it all XAResources reported by XaDataSourceManager, requests recovery and is done. All TransactionManager interface methods are passed to this local instance. stop() simply stops the Jotm instance. The annotation @Service.Implementation is from the org.neo4j.helpers package and is there to ensure that this class is registered. For the magic however to happen a META-INF/services/org.neo4j.kernel.impl.transaction.TransactionManagerService resource must exist, that will contain a single line with the fully qualified name of the service class. Bundle all that up in a jar, put it in your classpath, change your configuration parameters and bam! nothing will work. It is obvious that you must provide the bundles that consist the JOTM implementation and also, configure it right. So let's do that.

Making it play

The version I have used in all my tests is 2.1.9, I suggest you do the same. The jars needed are:

which is the JOTM core impl and requires


If you are a maven user, the dependencies to add are


Also, you will need to provide a configuration for JOTM to use, necessary for recovery which is turned off by default. The location is passed via a JVM argument as


which must contain a file in which at least the property jotm.recovery.Enabled must be set to true. Here is a sample. Another property in there is howl.log.FileDirectory which is a path where the transaction WAL will be kept.
I confess to not have studied CAROL, so if you have any problems I suggest you use the whole configuration directory as provided here which works.

After that, adding the jotm-connector-service bundle as described above, jta kernel with correct parameters and all, you should get a working environment. The howl logs must appear and you must not have tm_tx_log.* files in your store directory, since the native TxManager is not used. If you want to use it instead, an empty configuration or a "tx_manager_impl" parameter with a value of "native" will do the trick. There you go, you used an alternative TransactionManager implementation, as promised. Go ahead, implement a long operation that involves indexing and kill it on commit(). When restarted, the store will be brought up to a consistent state. You can also alter the sample service project to bind the Jotm instance to a JNDI location and enlist resources from a different store altogether.

For maven users

Apart from the dependencies for jotm above, you can download and install the jta kernel and the JOTMService project. Add their coordinates to your pom.xml as in


substituting the main kernel for the jta one. Things should work.

From here

This whole thing lacks some safety measures, mainly the fact that the database is allowed to come up with a different tx manager than that which was used when it crashed. I think it should be added and the best location I can think is the neostore top level store file. I will have to get back to you on this.
I now know enough to go to the second phase of my work: make Neo work in an application server via JCA, providing container managed transactions in the same way JDBC resources do it. This way you can use your favorite graph database from an EJB side by side with a relational store and rest assured that the 2PC will work while you are safe from the details. How cool would that be? Until then, happy hacking.

Thursday, November 4, 2010

Neo4j Internals: Transactions (Part 3) as a complete run and a conclusion

This is the last post in the series, at least the core one. Here I will try to follow a path from the initialization of the db engine and through the begin() of a transaction and creation of a Node to the commit and shutdown. It will require knowledge of all the material so far covered but it shouldn't be hard to follow, since everything needed has been covered almost completely. So, without further ado...

Creating the EmbeddedGraphDatabase

The first thing to do to start working with a Neo instance is to create a new EmbeddedGraphDatabase. This is a thin shell over an EmbeddedGraphDbImpl, so we instantiate that. We provide it with the user-provided parameter map (or an empty one) and the default implementations for factories for LockManager that will create LockManager instances which will use a provided TransactionManager, IdGenerator for assigning ids to created entities by delegating to the store managers, RelationshipTypeCreator for creating RelationshipTypes, TxIdGenerator for assigning ids to txs that delegates to the TransactionManager, TxFinishHook for creating tx synchonization hooks on tx finish that does nothing and LastCommittedTxIdSetter for storing the last successfully committed tx id that also does nothing. EmbeddedGraphDbImpl in turn creates a new TxModule to hold a TransactionManager and a XaDataSourceManager, a new LockManager, a new Config, and a GraphDbInstance (as of late there is also an Indexer but we conveniently ignore that).

The Config object has a long story ahead of it. It accepts and stores most of the instances created by the factories at EmbeddedGraphDbImpl and also creates a new IdGeneratorModule, a new PersistenceModule, an AdaptiveCacheManager and a GraphDbModule. That last one parses the parameters map and decides on what type of Cache to ask NodeManager to create and whether the user has requested a full r/w database or a read-only one, creating as a result - wait for it - yes, the proper NodeManager instance, though that work will actually happen later on, in GraphDbInstance. References to all these are stored and the whole GraphDbModule is kept in the Config.

GraphDbInstance is where the database store is started. On start(), it uses an AutoConfigurator, which in its very brief lifetime computes some sensible defaults for the sizes in-memory images of the various stores, regardless of whether they will be buffered or memory mapped. These sizes are also placed in the Config object. Next comes what we all have been waiting for - that's right, the Store instantiation. The TxModule is retrieved from the Config and is asked to registerDataSource() as the DEFAULT_DATA_SOURCE_NAME the NIO_NEO_DB_CLASS which currently is NeoStoreXaDataSource. The XaDataSourceManager in TxModule is passed the parameters and instantiates the class via reflection, assuming that there is a constructor accepting a Map argument (which represents the configuration as a parameter map) and stores the result in a Map, pointed to by the resource's name. As we have seen previously, NeoStoreXaDataSource creates the actual store via instantiating a NeoStore and create()ing a XaContainer, possibly triggering a recovery or else "simply" instantiating the various datafiles and id stores. This is the major performance hiccup in the startup sequence, since all the above run in the same thread, a necessary measure to ensure a successful log recovery if it proves necessary. Obviously, if the database was created, from now on you can see the files in the db directory.

Going back to GraphDbInstance, a NioNeoDbPersistenceSource is created and stored in the Config and also provided in the IdGeneratorModule, where it is used as the actual source of entity ids. Note that the actual association of the PersistenceSource with the DataSource is made when a few lines below, GraphDbInstance calls start on the PersistenceSource passing as an argument the XaDataSourceManager. After that, init() is called on the modules in the Config (which currently do nothing) and then they are start()ed. This makes the TxModule to bind the XaDataSourceManager to the TransactionManager, the PersistenceModule to create the PersistenceManager, the PersistenceSource to aquire the DataSource, the IdGenerator to get the PersistenceSource and the GraphDbModule to create and start() the NodeManager. Starting the NodeManager causes the parameter map to be parsed and discover the sizes and type of the caches and register them with the CacheManager.
So there we are. The store is initialized, as well as the DataSource over it, the TxModule is up and running with its TransactionManager, the NodeManager has built all that it needs and the LockReleaser and LockManager are in place. This is pretty much what is needed to start working, so it is about time we did that.

Beginning a transaction

Explicitly beginning a tx is necessary only if you need to perform write operations. For read-only scenarios, where no locks are acquired, you can get away by simply asking the db for the element. This is not the typical (or interesting for that matter) scenario, so let's create something. This requires a tx so let's start that. Calling beginTx() on an EmbeddedGraphDbImpl asks the referenced GraphDbInstance for the TransactionManager (stored in the TxModule in the Config) and then asking to begin() one. No reference needs to be stored, recall that txs are thread bound, so as long as we are in the same thread we know which is our tx. However, an API must be provided for the demarcation of transactional code, so a TopLevelTransaction object is created, wrapped around the TxManager and returned to the user. This object is a simple wrapper around the TxManager, forwarding all calls of the Transaction interface to it, relying on the thread-to-tx mapping stored in the TxManager for the operation success. That is the object you receive on beginTx() so that you can work your magic.
We have already seen in some detail the workings of the TxManager class (which is the implementation of the TransactionManager interface) but let's follow the code. Calling begin() retrieves the currentThread() and maps a new TransactionImpl to it. Creating the TxImpl also assigns it a global Tx identifier via the Xid mechanism. Note that for now, no resources are enlisted, no log entries have been written and the state of the tx is STATUS_ACTIVE. To see in action the full mechanism we have to create a Node.

Creating the Node

To create a Node we call createNode() on EmbeddedGraphDatabase which forwards it to EmbeddedGraphDbImpl which sends it to NodeManager.createNode(). There the IdGenerator is asked for the nextId() for Node objects, which hands it off to the PersistenceSource. The implementing class is NioNeoDbPersistenceSource, which forwards it to the NeoStoreXaDataSource, which finally retrieves the NodeStore and returns the nextId(). You must have noticed here that in fact there is no Resource enlisted in the current tx yet. Now that we have the id, we can create a new NodeImpl and acquire a WRITE lock on it, creating also a NodeProxy object to return to the user.
Now comes the fun part. Still in NodeManager.createNode(), we ask the PersistenceManager to nodeCreate() the brand new Node for this id. The PersistenceManager has no idea how to do that, so it getResource() to get a ResourceConnection to do it. Of course the ResourceConnection is returned by the referenced PersistenceSource instance (which in our case is a NioNeoDbPersistenceSource returning NioNeoDbResourceConnections) and it indeed has a reference to an XAResource (that is an inner class that simply holds an Object as an id). So, after retrieving the current tx it asks it to enlist the XAResource, leading to all the magic described here. Also, a TxCommitHook is added to the tx that releases the connection and the makes the resources used reclaimable by the garbage collector upon the end of the connection usable life. Note that the XaResource is registered with the XaResourceManager and mapped to a WriteTransaction when the resource is start()ed in the TransactionImpl.
After we write to the TxLog and setup the XAResource to the tx, we still have an operation to do. Recall that the Connection holds the EventConsumers which forward to the corresponding store. The related consumer in this case is the NeoStoreXaConnection.NodeEventConsumerImpl, which for createNode() events (and all others for that matter) retrieves the WriteTransaction and tells it to nodeCreate() for the id and the WriteTransaction creates the NodeRecord object and stores it.
Let's make a check here: Nothing is written on the disk, including the Resource tx. The store is untouched, not even the id store has been tampered. The only thing in permanent storage is the global tx record marking it as STARTed and the branchId for the Resource. The fact that a record has been created is in memory only. If we crash here, there is an implicit rollback guarrantee.
We are not done yet, since the user has nothing to work with. Before returning the NodeProxy from the NodeManager, first we cache it and then we ask the LockReleaser (via NodeManager.releaseLock()) to release the Lock for this Node, an action that eventually results in keeping in memory the fact that this Node is locked and ask the current tx to add a Synchronization that will release the locks after completion. Now we can return the NodeProxy to the user.

Committing the transaction

So, the time has come to commit() the tx and make our valuable Node permanent. Calling success() on the TopLevelTransaction marks it simply as successful, meaning that on finish() it will be commit()ted. So, let's call finish(). The TransactionManager is asked for the current tx and commit() is called on it. This calls back the TransactionManager, which gets the current tx and does all those nice things that we discussed some time ago, such as writing to the TxLog and calling commit hooks. In a nutshell, the tx gets the single enlisted resource (since we are not using but one) and decides that, since this is a 1PC we just tell the resource to commit(). In our case, this leads to the NeoStoreXaConnection.NeoStoreXaResource to call the XaResourceManager to commit, which means that the WriteTransaction is asked to prepare(), compiling our new NodeRecord to a NodeCommand and writing that out to the XaLogicalLog, an action after which we can rest assured that our change will be performed no matter what. If this succeeds, XaResourceManager calls commit() on the WriteTransaction, meaning the Node creation command is executed, asking the NodeStore to write out that Record. We are now done, meaning the XaLogicalLog adds a DONE entry, the Transaction is removed from the txThreadMap in the TxManager, the TxLog marks the Transaction also as TX_DONE and the Transaction status is set to STATUS_NO_TRANSACTION. Now everything is on disk and both the Resource and the global txs are marked as successful.

Closing the database

We wrote our target 9 bytes on the disk (you do remember that a Node record is 9 bytes, right?) and we are ready to close the database. So we go ahead and call shutdown() on the EmbeddedGraphDatabase, which ends up calling GraphDbInstance.shutdown(). The config is asked for the various modules and tells them to stop(). The GraphDbModule tells the NodeManager to clearPropertyIndexes() and clearCache() and then stop(), operations that do nothing fancy, they just null out various references. The IdGeneratorModule and PersistenceModule have no-op stop() methods. The TxModule.stop() asks the TxManager to close() the TxLog that in turn close()es the log file channel. The most interesting part is in PersistenceSource.stop(). This forwards to NeoStoreXaDataSource which calls flushAll() to NeoStore, leading eventually to every store force()ing all buffers out to disk. This ensures a nice, clean shutdown of the disk store. The XaLogicalLog must also be closed, an operation already described in detail <a>some time before</a>. Then we call close() on NeoStore, which in essence closes the file channels used for persistence operations. This ends the file channel closing cycle, leaving the GraphDbImpl to call destroy() on the same modules, which currently are all no-ops. We can now exit.

From here

This post concludes a rough description of the core operations in Neo. I estimate around 2/3 of the code in the kernel component have been covered, leaving out aspects such as kernel component registrations, the indexing framework, traversals and a host of other extremely interesting components. I do not know to what extent I could have fitted them here, but I think that by understanding what I have discussed so far, one can navigate the code and understand the remaining pieces easily.
Truth been told, I have reached a point where I no longer want to write about Neo but instead I want to start hacking it. If the need arises and I find it interesting, I may write again about some other operation, but first I want to get a better feeling for the code. If you have specific requests/questions regarding my articles, I suggest you send a mail to the Neo mailing list and we can discuss it there, or hang around the #neo4j channel at freenode.

I hope that by reading my work you got at least part of the knowledge I got out of writing it.

Creative Commons License
This work is licensed under a Creative Commons Attribution 3.0 Unported License.

Tuesday, November 2, 2010

Neo4j Internals: Interlude - Xa roundup and consistency

The time has come for the most boring of these posts (imagine that!). There are some details that haven't been referenced yet, mainly regarding the interactions between the various classes that lead from the persistence store up to the NodeManager and from there to you, the user. There are many classes that have to be explained, with a lot of cross-references, but if you have followed my work thus far, then it shouldn't be that difficult to digest. Possibly, this post should have come before the talk about transactions, but hey!, I am currently myself on the path to enlightenment concerning the internals of Neo, so I understood things somewhat out of order. This is the reason I label this article as interlude, since practically no tx talk will take place. Prerequisites are not that demanding in this post, although if this is your first contact with Neo4j internals you will in all probability be overwhelmed. So, we begin.

DataSources and XaConnections over a persistence store

XaConnections encapsulate a XaResource towards a XA compatible persistence store. It is not part of the XA specification but is a useful abstraction provided by Neo that couples a XaTransaction with a XAResource. The concrete implementation is NeoStoreXaConnection that holds a NeoStoreXaResource as the implementation of the XAResource and a WriteTransaction as the implementation of the XaTransaction. The XA related interface exposed by NeoStoreXaConnection is getXaResource() : XAResource that returns an instance of the inner class NeoStoreXaResource, which forwards all XAResource operations to a XaResourceManager implementation and defines the isSameRm() XA-required method for XAResources by equality comparison on the filename of the supporting store. Finally, NeoStoreXaConnection returns event consumers for operations on Neo primitives such as NodeEventConsumer and PropertyIndexEventConsumer that forward the requested operations to the WriteTransaction encapsulated by the parent NeoStoreXaConnection. These event consumers are used by NioNeoDbPersistenceSource to implement ResourceConnections, but that is discussed in detail later in this post.
XaDataSource is an abstract class that defines the means of obtaining XaConnections from a data source and some facilities for recovering txs. The idea is that classes extending XaDataSource will encapsulate a transactional resource, capable of supporting XAResources so that they can fit in a XA environment. This is obvious from the LogBackedXaDataSource extending class, where all tx recovery operations are forwarded to an underlying XaLogicalLog. Neo extends this with NeoStoreXaDataSource which, apart from creating XaConnections, is pretty busy: On instantiation is responsible for creating the NeoStore that is the on-disk storage of the graph, creates a XaContainer to help with housekeeping (more on that next), even creates the IdGenerators for the various Stores. Is also provides implementations (as inner classes) for a XaCommandFactory and a XaTransactionFactory that it passes to the XaContainer for recovery purposes. This gives it the role of a Facade over the details of a lot of things I have described previously, summing up XaLogicalLogs, XaResourceManagers, Stores and their paraphernalia into a data source practically ready for fitting into a XA environment.
Before we leave NeoStoreXaDataSource, a note on its instantiation. Instead of the usual new call for creating an instance, there is a more roundabout way for getting a Neo DataSource up and running. When the database starts, the TxModule object held by the Config is asked to register DataSources, as it goes around the various components (the Indexer service is another example of a user of DataSources). For the Neo kernel, when GraphDbInstance is start()ed, the TxModule in the Config object is asked to register a DataSource with an implementing class of NeoStoreXaDataSource and there it is passed to the DataSourceManager which instantiates it via reflection. DataSourceManager keeps a mapping from identifying Strings to XaDataSource instances, maintaining this way a single instance for every data source. The identifying String is kept in the Config as DEFAULT_DATA_SOURCE_NAME.

Management of XaResources

The mapping of a XAResource to a XaTransaction represented by a XaConnection is realized in the XaResourceManager. This class mainly keeps a Map<XAResource,Xid> and a Map<Xid,XidStatus>, XidStatus being an inner class that, with the help of another inner class, TransactionStatus, holds the current status of the tx and the tx itself identified by its xid and mapped by an XAResource. Essentially, from this mapping, all tx operations on an XAResource are forwarded to the related tx. This helps the decoupling of tx operations that XaResources are asked to perform from any implementation details of the XaLogicalLog or the backing store, leaving XaResources, XaTransactions and XaConnections as thin shells that can be useful in higher layers.
XaResourceManager also participates in failure recovery in conjunction with the XaLogicalLog, accepting via the methods the recreated txs as the logical log reads them and then completing them. In a sense, a XaResourceManager coupled with a XaLogicalLog are the equivalent of the TxManager+TxLog as we saw them last time but with the addition of a backing persistence store, in the form of a DataSource.

Binding related things together

The various components that help out a XaDataSource must be instantiated with a specific order and it is a nice idea to keep them together since they are closely coupled. This is a job for XaContainer, which keeps a XaDataSource, a XaCommandFactory, a XaTransactionFactory, a XaLogicalLog and a XaResourceManager. The idea is that the XaResourceManager and the XaLogicalLog must have access to a txFactory and a commandFactory before they start and additionally the log needs a XaResourceManager before being open()d, else the recovery will be impossible to proceed. This leads to a specific serialization of the instantiation/initialization operations and this is done by the XaContainer. XaContainer in turn is created by NeoStoreXaDataSource when it is instantiated, which passes its internal implementations of CommandFactory and TransactionFactory to the create() method of XaContainer, leaving to it the creation of instances of XaLogicalLog and XaResourceManager. To open the log (and possibly trigger a recovery) you must call  on after initializing it, ensuring that everything is in place.

An intermediate interface: The ResourceConnection

PersistenceSource defines an interface that exposes the expected functionality for every persistence store that Neo can use to store its data on disk. The operations themselves are abstracted as ResourceConnections that are returned by a PersistenceSource. For that reason, NioNeoDbPersistenceSource implements this as an inner class, NioNeoDbResourceConnection, that accepts a NeoStoreXaDataSource, extracts from it the XaConnection and from there the various primitive event consumers, dispatching to them the operations each is supposed to handle. This 2-level indirection is a purely engineering construct, having no other impact on the logic of any subsystem.

Addressing problems with store-memory consistency: The LockReleaser

There is an issue I haven't touched upon yet. We have seen how the various records are updated in the store and kept locked for the duration of a tx, ensuring their isolation guarantees. However, there remains to be seen how the modifications upon a Primitive are kept in memory for reading within a tx and how overlapping creation/deletions/changes of properties are managed. This is the task assigned to LockReleaser, with the more general responsibility of locking the entities that are to be modified and releasing the locks upon commit. The core idea is that, per transaction, we keep a set of changes for every element and its properties. The set of changes in the properties of a primitive are kept as instances of the inner classes CowNodeElement or CowRelElement for Nodes and Relationships respectively and the set of those elements (one for each corresponding primitive) are kept as instances of the inner class PrimitiveElement. The cowMap field is a Map<Transaction,PrimitiveElement> that keeps the mapping of the changes for the current tx. The easy part is deletion, where calling delete() on a primitive passes the call to NodeManager, which forwards the call first to LockReleaser, marking the corresponding CowElement as deleted via a boolean field and then to the PersistenceManager which updates the XaTransaction (WriteTransaction in the case of NioNeoDbPersistenceSource). The great management overhead and the bulk of the code is the addition, deletion and changing of properties for Nodes and Relationships. Two sets are kept for each one, a propertyAddMap and a propertyRemoveMap. When a property is added, it is appended in the propertyAddMap for the primitive, while removals are appended in the propertyRemoveMap. Asking a primitive for a property passes from the Proxy (that implements the Node or Relationship interface and is the user visible class) to the NodeManager, which retrieves the corresponding NodeImpl or RelationshipImpl and there propertyAddMap and propertyRemoveMap are consolidated, keeping the actual changeset and finally retrieving the requested property, if present. To make this clear, let's see an example.
Say you have a Node and you add a property via setProperty("foo","bar"). Initially, the NodeProxy simply forwards the call to the corresponding (based on id) NodeImpl. There it is locked (in the Primitive.setProperty() method) for WRITE by the LockReleaser. The full propertyMap is brought into  memory if not already there (NodeManager.loadProperties()) and the addProperty and removeProperty maps for this primitive are obtained. Note that currently there are 3 sets of properties in memory for this primitive. The ones loaded from the store (the propertyMap), the so far in this tx added (the addPropertyMap) and the ones so far removed (the removePropertyMap). These have to be aggregated into a currently consistent set so that we can decide whether to create a new property or to change an existing one. There are three stages for this. First, we check the currently known property indexes that are resident in memory. If it is not there, we make sure we bring all property indexes in memory and we check those. If it is also not there, then we create it. In the mean time, if the property value was found in either the stored property set or in the add map (it was added previously in this tx) then we retrieve it, removing it from the removePropertyMap. Its value is changed, it is added in the addPropertyMap and the WRITE lock is released. Similar paths are followed in all other operations, including additions and removals of Relationships for Nodes. Finally, before commit(), the addPropertyMap, removePropertyMap and propertyMap are consolidated in the final version at Primitive.commitPropertyMaps(), which adds all properties in addPropertyMap and then removes all properties in removePropertyMap from propertyMap. This brings the in-memory copy of this Primitive back to a consistent state with the now updated in-file version, getting rid of all the versions in the temporary add and remove maps.
LockReleaser is also used by WriteTransaction to invalidate cached entries. The various removeFromCache() calls are there to ensure that after a tx which deletes a primitive is committed, the corresponding entry in the cache is removed so that it cannot be referenced again. This is used in WriteTransaction, where after a delete command, a rollback of a creation command or the execution of a recovered command, the matching removeFromCache call is made to the LockReleaser, which forwards it to the keeper of the cache, the omnipotent NodeManager.

Managing locks: Again, the LockReleaser (with help from the NodeManager)

LockManager and RagManager were described in a previous post as the core mechanism that provides isolation between txs. Now we will see where the lock aquisition and release are done. First, WRITE locks are acquired on Node and Relationship creation events from NodeManager (in createNode() and createRelationship()) and from Primitives (via NodeManager.acquireLock()) on removeProperty, setProperty and delete events. Releasing them is not done right away, although the call to NodeManager.releaseLock() is done at the end of every method that acquires a lock (ie, the aforementioned). Obviously, we cannot release a WRITE lock before the tx completes, since that would lead to other txs reading uncommited changes (Neo currently guarantees only SERIALIZABLE isolation levels). So, we must postpone the releasing of the lock to be done on commit. This is done in LockReleaser.addLockToTransaction(), which adds a LockElement to a List<LockElement> that is mapped by the current tx (kept in lockMap, a Map<Transaction, List<LockElement>>) and also adds a tx Synchronization hook to the current tx that afterCompletion() releases all locks held by this tx.

Almost there

This post concludes a description of the core classes that provide the performance and ACID guarrantees of Neo. What remains to be seen is a walkthrough of the path from creating a new EmbeddedGraphDatabase to its shutdown. This will be the topic of the next post.

Creative Commons License
This work is licensed under a Creative Commons Attribution 3.0 Unported License.

Wednesday, October 27, 2010

Neo4j Internals: Transactions (Part 2) - XaResources, Transactions and TransactionManagers

We continue ploughing through the Neo codebase, peeling away the layers that sum up to the transaction handling mechanism. If you came here hoping for a walkthrough of the code paths that lead from a beginTx() to a tx.finish() you will be disappointed, as there are some more encapsulated behaviours we must discuss first. This time we will look into a higher level than last time, discussing the Transaction class and its implementations, Commands and TransactionManagers, touching a bit first on the subject of XAResources. As prerequisites go, everything that was assumed last time applies here also. If you do not know what that is, I suggest you read at least the previous post. Having said that, let's dig in.

What was it that txs manage again?

Neo's tx handling model is heavily influenced by the XA specification, being designed to fit into a JTA environment (although it does not do that yet). Describing the XA specification is something I do not intend to do here, it would go way out of scope, although I hope to revisit the subject some time later. I advise you to go and look it up if you are completely unfamiliar with it, especially this article.
Txs in Neo deal primarily in terms of XAResources. A XAResource is an abstraction of a connection to a transactional resource, such as a RDBMS, or in our case, over the Neo persistence store (the Lucene indexer also supports operations in a transactional context but we do not discuss indexing here). XAResources are enlisted with a tx during its lifetime to perform whatever operations are needed on whichever persistence backend they refer to. Since we discuss the Neo kernel here we simply concern ourselves with resources over the NeoStore and conveniently choose to ignore the Indexer component regardless of the fact that since 1.2M2 indexing has been intergrated into the kernel.
The above lead to a layered approach, having a high level tx that will manage the XAResources that participate in an operation and one or more lower level txs that each encapsulates the operations on a single XAResource. This is a miniaturization of the 2PC protocol found in distributed tx environments, implemented in Neo's case with a resource-encompassing TransactionImpl and a XAResource specific XaTransaction. We now look at each one in turn.

XaTransaction, the resource specific flavour and WriteTransaction, the NeoStore specific implementation

An XaTransaction is a Neo specific class, meaning that it does not belong to the XA framework, despite its name. It defines the basic structure of a transaction over a Neo resource, holding a XaLogicalLog, the state of the tx, its identifier and providing the expected operations of a tx (commit(), rollback() etc) plus addCommand() and injectCommand(), the first used during normal operations and the second - you guessed it - used during recovery. It is extended by WriteTransaction, the core tx implementing class over a NeoStore.
WriteTransaction exposes the expected interface: all operations that can be performed on a NeoStore are there. The fields are separated into two categories, Records and Commands. Records are stored in Maps from Integers to primitive's Records, the Integer being the primitive's id. Commands are stored in lists of Command objects. During normal (ie, non-recovery) operations, every action on a primitive of the database is recorded as a proper Record, which is a Record object obtained from the backing store manager (NodeStore for nodes, RelationshipStore for relationships etc). The Record is altered as the operation requires and is inserted in the proper map. On prepare, all stored records are turned into proper Commands and stored in the corresponding list. A walkthrough for a common operation will be enlightening here, so let's create a Relationship between two Nodes.
The method for that is WriteTransaction.relationshipCreate(). It accepts the id of the relationship, the id of the nodes and the type of the relationship. First, the two nodes, if not stored already in the nodeRecord map are fetched from the backing store and added there. Then, a new RelationshipRecord is created, marked as inUse() and added to the relRecord map. Next the relationship pointers of the two Node records must be updated (recall that the relationships a node participates in are kept in a doubly linked list in the relationship records). Although the new RelationshipRecord is thread confined and does not need to be locked, the rest of the node's relationships are potentially shared so they must be locked. For that purpose a dummy implementation of the Relationship interface that only knows of the RelationshipRecord's id is created so that the LockManager can obtain the lock on it. For both participating Nodes, if there exists another Relationship, it is in this way locked, its RelationshipRecord obtained, the proper nodeNextRel field is updated to include the new Relationship and the record is added in the relRecord map. We are done with the changes and note that the WRITE lock has not been released, as it is expected.
When on this tx the doPrepare() call is made, all record maps are scanned and the records they hold are transformed into Command objects. During each such creation, a proper LogEntry.Command entry is written in the referenced XaLogicalLog. Upon doCommit(), the created commands are execute()d one by one, then the held locks are released (via the LockReleaser) and the records and commands collections are cleared. If a doRollback() is requested instead then what must be undone is basically the releasing of all obtained entity ids from the backing stores. For that purpose, for every record present in the maps, if it is marked as created, the proper backing store is asked to free() the record's id. Then we clear the records and command collections and we are done.
During recovery, we do not hold records. Recall that LogEntry.Command entries are written in the XaLogicalLog, so, when the WAL finds a dangling tx, it reads in the Commands and calls injectCommand() on the tx. There they are directly added in the Command lists and executed during the subsequent doCommit(). This means that if a tx fails before the doPrepare() call, it is implicitly rolled back, with the unfreed ids recovered from the IdGenerator of each backing store when the database restarts.


It is worth to take a look at how commands are implemented, since they are such an integral part of the Neo tx operations. As WriteTransaction extends XaTransaction, likewise Command extends XaCommand, since not only NeoStore knows about commands (again, indexing makes use of this framework). Implementation-wise, Command is pretty simple - it needs to define a way to persist itself on a LogBuffer, to read itself back from a ReadableByteChannel and to execute itself. Obviously a NodeCommand will be different from a RelationshipCommand, for instance, so for every primitive operation a different command class must be implemented. Also, DynamicRecords are primitive-agnostic, so a utility static method is provided for use by every one interested. The implementation of the operations of Commands are conceptually pretty much the same among them and if you are familiar with the different persistent store operations explained elsewhere (you are, aren't you?) you will have no problem understanding what is going on there. As an example, look at Command.NodeCommand. Every such instance holds the two obvious things to do its work. A NodeRecord, which represents the changes to be performed on the store and a NodeStore, since this is where changes will be persisted. execute() simply asks the store to update() the NodeRecord. writeToFile() persists the Command, first writing a flag marking the entry as a NodeCommand and then writing out the fields of the Record. readCommand() assumes that the NodeCommand identifing flag has been read (and thus, the bits that follow are known to be a NodeCommand) and reads in the following bytes, restructuring a NodeCommand. The same pattern holds for every other of the remaining 4 types of Commands.

TransactionImpl, the glue between XaTransactions

Ok, we have XaTransactions over a NeoStore (more precisely over a NeoStoreXaDataSource, but that will be explained on the next post) and we can commit that. We also may have transactions over a Lucene index (via the same framework) and we can commit that also. But these are connected txs, meaning that if the WriteTransaction fails, we must make sure that the LuceneTransaction also fails and vice versa. You must recognise this as the essence of 2PC and this is what TransactionImpl takes care of. Instances of this class hold a list of XaResources, each of which is added to this top-level tx via a call to enlistResource() (and removed via delistResource()). Adding a resource such as NeoStoreXaConnection.NeoStoreXaResource (which is the NeoStore specific implementation of XaResource and will be discussed in the next post) to a TransactionImpl causes a record to be written to the txLog (discussed next) and binds it to the scope of this tx. All XaResources enlisted to a TransactionImpl are bound together in a sort of informal 2PC protocol. When requesting for a TransactionImpl.doCommit(), all enlisted resources are asked to prepare() their operation, returning an indicator of whether they succeded in their changes or not. If all resources return XAResource.XA_OK then they can be committed so each one is asked in turn to commit(). There are two different paths here, in essence distinguishing between 1PC and 2PC cases. If there are more than one distinct XaResources, a full 2PC protocol is followed, with voting and everything. If there is only one XaResource enlisted however, all these are bypassed, asking the resource directly to commit(), passing a boolean flag that informs the resource that this is a 1PC, hoping that the implementation will take care itself of the prepare() call. doRollback() is simpler, iterating over all enlisted resources and asking them to rollback().
There are some more things TransactionImpl does that are worth noting. First, every time there is a change in the tx status (for example, from ACTIVE to PREPARING, as defined in Status), the txManager is notified and a proper record is added to the txLog. This way, a Write Ahead Log of these high-level txs is kept for recovering from failures. This bookkeeping procedure is explained below. Also, there is the management of synchronization hooks, as they are defined in the JTA spec. A list of Synchronizations is maintained and the doBeforeCompletion() and doAfterCompletion() take care to notify them properly when required. To fully understand TransactionImpl and high-level txs, however, we must look also at the TxManager.

The TxManager and its TxLog

The existence of these classes should come as no surprise. Since we have these so-called high-level txs, we must provide a WAL and the means to recover them in case of failures. In essence, we have a mechanism similar to XaLogicalLog, but TransactionManager is agnostic of specific XaResources and as such has no interest in Commands. Instead, it has interest in XaResources in the abstract, so it needs a means of distinguishing them. Say hello to globalId and branchId. Each TransactionImpl is identified by a globalId (a byte[]) and every XaResource that it has enlisted is identified by a branchId (again, a byte[]). This is in correspondence with the XA specification, which binds these two together in an abstraction known as Xid. The first time a resource is enlisted with a TransactionImpl, a writeStartRecord() call is made to the txManager, writing in the txLog a record indicating the beggining of a new tx. Then, after each enlisting of a resource that has not been enlisted before (for instance, a resource could have been suspended and then re-enlisted to make it active again) a new branch is added in the txLog for this tx via a call to txManager.addBranch(). This way, when reconstructing a tx from the log after a failure, it is possible to reassociate a recovered tx with the resources it was managing.
TxManager "hides" the TransactionImpl objects from the rest of the code. Whenever a new tx must start, the begin() method is called and a new TransactionImpl is created and mapped to the current thread. This map is kept in txThreadMap, a Map<Thread,TransactionImpl>. All public methods of TxManager for managing the lifecycle of txs do not accept a TransactionImpl argument - they get the currentThread() and retrieve the corresponding TransactionImpl. If a begin() call is made from a thread with a mapped tx, an exception is thrown, reminding you that there is no support for nested txs. commit(), rollback(), suspend(), getStatus() and the rest follow the same pattern. An exception is resume(), which does accept a TransactionImpl - after all, which tx should be resumed?
The main operations in TxManager may seem a little complicated but there is not much to them. There is a lot of error handling, since there are so many things that can go wrong and management of the actual state in terms of flags requires a lot of if-else code. Let's walk through the commit() method, that is probably the most challenging.
As described before, the current TransactionImpl is first obtained from the txThreadMap. If it is STATUS_ACTIVE, beforeCompletion hooks are called and then commit() is called on the tx (we are now in the private commit() method). If the tx did not change it status to STATUS_COMMITED, doRollback() is asked from it, afterCompletion hooks are called, it is removed from the txThreadMap, a done entry is written in the log, its status is changed to STATUS_NO_TRANSACTION and the proper exception is thrown (failure during a commit leads to a rollback attempt and an exception thrown always, you know that, right?). Else, if the commit() succeeded, afterCompletion hooks are called, the tx is removed from the txThreadMap, a done entry is written in the log and the status is changed to STATUS_NO_TRANSACTION. If the status of the tx was STATUS_MARKED_ROLLBACK, then doRollback() is asked of the tx and the rest proceed as before.
Finally, a few words about the TxLog. Its philosophy is the same as the XaLogicalLog, writing entries formatted as per the Record inner class, flagged for their kind by the public static final byte fields of the class. It also rotates between files (named tm_tx_log.1 or .2, the active one marked by the contents of active_tx_log), but this rotation is triggered and managed by the TxManager at the getTxLog() and changeActiveLog() methods and performed by TxLog.switchToLogFile(). Of note is the getDanglingRecords() method, used during initialization of the TxManager and during log rotation. It goes over the contents of the log, reading Record entries. It keeps a map from Xid to List<Record>. If an TX_START Record is read, a Xid to empty list mapping is added in the map, else for BRANCH_ADD and MARK_COMMIT records, they are added to the list of the corresponding Xid. For TX_DONE records, the corresponding Xid entry is removed from the map, since this tx has successfully committed or rolled back.

Replaying half-committed txs

An interesting question is what happens if, during the execution of Commands of a tx, the system crashes without having a chance to roll back. In that case, the whole tx will be replayed, meaning that the commands that were executed before the failure will be executed again. The answer to this is: so what? Since the commands are composed of Records, every Command execution is the persistence of the changed record in the persistence store. Writing again a record is simply wasted I/O - nothing changes but the timestamps on the files. Also, since the recovered txs are sorted and executed in serially before normal operation resumes, no locks are necessary - these are not concurrent txs.

We are done for now

The above went a level higher in the hierarchy of the transaction handling code than the previous article. It also tried to make a small excursion into the realm of the XA framework, most of which will be discussed next time. As the posts go by, I tend to focus most on the mechanics rather than analysing the code line-by-line - I feel that if you have followed me during my writings on this subject you should have developed a similar "feel" for the actual implementations. Before closing, I want to thank the people at Neo for their kind words concerning my work and especially Peter Neubauer whose one line of an answer to a question I asked helped my more than he can imagine.

Creative Commons License
This work is licensed under a Creative Commons Attribution 3.0 Unported License.