Wednesday, October 27, 2010

Neo4j Internals: Transactions (Part 2) - XaResources, Transactions and TransactionManagers

We continue ploughing through the Neo codebase, peeling away the layers that sum up to the transaction handling mechanism. If you came here hoping for a walkthrough of the code paths that lead from a beginTx() to a tx.finish() you will be disappointed, as there are some more encapsulated behaviours we must discuss first. This time we will look into a higher level than last time, discussing the Transaction class and its implementations, Commands and TransactionManagers, touching a bit first on the subject of XAResources. As prerequisites go, everything that was assumed last time applies here also. If you do not know what that is, I suggest you read at least the previous post. Having said that, let's dig in.

What was it that txs manage again?

Neo's tx handling model is heavily influenced by the XA specification, being designed to fit into a JTA environment (although it does not do that yet). Describing the XA specification is something I do not intend to do here, it would go way out of scope, although I hope to revisit the subject some time later. I advise you to go and look it up if you are completely unfamiliar with it, especially this article.
Txs in Neo deal primarily in terms of XAResources. A XAResource is an abstraction of a connection to a transactional resource, such as a RDBMS, or in our case, over the Neo persistence store (the Lucene indexer also supports operations in a transactional context but we do not discuss indexing here). XAResources are enlisted with a tx during its lifetime to perform whatever operations are needed on whichever persistence backend they refer to. Since we discuss the Neo kernel here we simply concern ourselves with resources over the NeoStore and conveniently choose to ignore the Indexer component regardless of the fact that since 1.2M2 indexing has been intergrated into the kernel.
The above lead to a layered approach, having a high level tx that will manage the XAResources that participate in an operation and one or more lower level txs that each encapsulates the operations on a single XAResource. This is a miniaturization of the 2PC protocol found in distributed tx environments, implemented in Neo's case with a resource-encompassing TransactionImpl and a XAResource specific XaTransaction. We now look at each one in turn.

XaTransaction, the resource specific flavour and WriteTransaction, the NeoStore specific implementation

An XaTransaction is a Neo specific class, meaning that it does not belong to the XA framework, despite its name. It defines the basic structure of a transaction over a Neo resource, holding a XaLogicalLog, the state of the tx, its identifier and providing the expected operations of a tx (commit(), rollback() etc) plus addCommand() and injectCommand(), the first used during normal operations and the second - you guessed it - used during recovery. It is extended by WriteTransaction, the core tx implementing class over a NeoStore.
WriteTransaction exposes the expected interface: all operations that can be performed on a NeoStore are there. The fields are separated into two categories, Records and Commands. Records are stored in Maps from Integers to primitive's Records, the Integer being the primitive's id. Commands are stored in lists of Command objects. During normal (ie, non-recovery) operations, every action on a primitive of the database is recorded as a proper Record, which is a Record object obtained from the backing store manager (NodeStore for nodes, RelationshipStore for relationships etc). The Record is altered as the operation requires and is inserted in the proper map. On prepare, all stored records are turned into proper Commands and stored in the corresponding list. A walkthrough for a common operation will be enlightening here, so let's create a Relationship between two Nodes.
The method for that is WriteTransaction.relationshipCreate(). It accepts the id of the relationship, the id of the nodes and the type of the relationship. First, the two nodes, if not stored already in the nodeRecord map are fetched from the backing store and added there. Then, a new RelationshipRecord is created, marked as inUse() and added to the relRecord map. Next the relationship pointers of the two Node records must be updated (recall that the relationships a node participates in are kept in a doubly linked list in the relationship records). Although the new RelationshipRecord is thread confined and does not need to be locked, the rest of the node's relationships are potentially shared so they must be locked. For that purpose a dummy implementation of the Relationship interface that only knows of the RelationshipRecord's id is created so that the LockManager can obtain the lock on it. For both participating Nodes, if there exists another Relationship, it is in this way locked, its RelationshipRecord obtained, the proper nodeNextRel field is updated to include the new Relationship and the record is added in the relRecord map. We are done with the changes and note that the WRITE lock has not been released, as it is expected.
When on this tx the doPrepare() call is made, all record maps are scanned and the records they hold are transformed into Command objects. During each such creation, a proper LogEntry.Command entry is written in the referenced XaLogicalLog. Upon doCommit(), the created commands are execute()d one by one, then the held locks are released (via the LockReleaser) and the records and commands collections are cleared. If a doRollback() is requested instead then what must be undone is basically the releasing of all obtained entity ids from the backing stores. For that purpose, for every record present in the maps, if it is marked as created, the proper backing store is asked to free() the record's id. Then we clear the records and command collections and we are done.
During recovery, we do not hold records. Recall that LogEntry.Command entries are written in the XaLogicalLog, so, when the WAL finds a dangling tx, it reads in the Commands and calls injectCommand() on the tx. There they are directly added in the Command lists and executed during the subsequent doCommit(). This means that if a tx fails before the doPrepare() call, it is implicitly rolled back, with the unfreed ids recovered from the IdGenerator of each backing store when the database restarts.

Commands

It is worth to take a look at how commands are implemented, since they are such an integral part of the Neo tx operations. As WriteTransaction extends XaTransaction, likewise Command extends XaCommand, since not only NeoStore knows about commands (again, indexing makes use of this framework). Implementation-wise, Command is pretty simple - it needs to define a way to persist itself on a LogBuffer, to read itself back from a ReadableByteChannel and to execute itself. Obviously a NodeCommand will be different from a RelationshipCommand, for instance, so for every primitive operation a different command class must be implemented. Also, DynamicRecords are primitive-agnostic, so a utility static method is provided for use by every one interested. The implementation of the operations of Commands are conceptually pretty much the same among them and if you are familiar with the different persistent store operations explained elsewhere (you are, aren't you?) you will have no problem understanding what is going on there. As an example, look at Command.NodeCommand. Every such instance holds the two obvious things to do its work. A NodeRecord, which represents the changes to be performed on the store and a NodeStore, since this is where changes will be persisted. execute() simply asks the store to update() the NodeRecord. writeToFile() persists the Command, first writing a flag marking the entry as a NodeCommand and then writing out the fields of the Record. readCommand() assumes that the NodeCommand identifing flag has been read (and thus, the bits that follow are known to be a NodeCommand) and reads in the following bytes, restructuring a NodeCommand. The same pattern holds for every other of the remaining 4 types of Commands.

TransactionImpl, the glue between XaTransactions

Ok, we have XaTransactions over a NeoStore (more precisely over a NeoStoreXaDataSource, but that will be explained on the next post) and we can commit that. We also may have transactions over a Lucene index (via the same framework) and we can commit that also. But these are connected txs, meaning that if the WriteTransaction fails, we must make sure that the LuceneTransaction also fails and vice versa. You must recognise this as the essence of 2PC and this is what TransactionImpl takes care of. Instances of this class hold a list of XaResources, each of which is added to this top-level tx via a call to enlistResource() (and removed via delistResource()). Adding a resource such as NeoStoreXaConnection.NeoStoreXaResource (which is the NeoStore specific implementation of XaResource and will be discussed in the next post) to a TransactionImpl causes a record to be written to the txLog (discussed next) and binds it to the scope of this tx. All XaResources enlisted to a TransactionImpl are bound together in a sort of informal 2PC protocol. When requesting for a TransactionImpl.doCommit(), all enlisted resources are asked to prepare() their operation, returning an indicator of whether they succeded in their changes or not. If all resources return XAResource.XA_OK then they can be committed so each one is asked in turn to commit(). There are two different paths here, in essence distinguishing between 1PC and 2PC cases. If there are more than one distinct XaResources, a full 2PC protocol is followed, with voting and everything. If there is only one XaResource enlisted however, all these are bypassed, asking the resource directly to commit(), passing a boolean flag that informs the resource that this is a 1PC, hoping that the implementation will take care itself of the prepare() call. doRollback() is simpler, iterating over all enlisted resources and asking them to rollback().
There are some more things TransactionImpl does that are worth noting. First, every time there is a change in the tx status (for example, from ACTIVE to PREPARING, as defined in Status), the txManager is notified and a proper record is added to the txLog. This way, a Write Ahead Log of these high-level txs is kept for recovering from failures. This bookkeeping procedure is explained below. Also, there is the management of synchronization hooks, as they are defined in the JTA spec. A list of Synchronizations is maintained and the doBeforeCompletion() and doAfterCompletion() take care to notify them properly when required. To fully understand TransactionImpl and high-level txs, however, we must look also at the TxManager.

The TxManager and its TxLog

The existence of these classes should come as no surprise. Since we have these so-called high-level txs, we must provide a WAL and the means to recover them in case of failures. In essence, we have a mechanism similar to XaLogicalLog, but TransactionManager is agnostic of specific XaResources and as such has no interest in Commands. Instead, it has interest in XaResources in the abstract, so it needs a means of distinguishing them. Say hello to globalId and branchId. Each TransactionImpl is identified by a globalId (a byte[]) and every XaResource that it has enlisted is identified by a branchId (again, a byte[]). This is in correspondence with the XA specification, which binds these two together in an abstraction known as Xid. The first time a resource is enlisted with a TransactionImpl, a writeStartRecord() call is made to the txManager, writing in the txLog a record indicating the beggining of a new tx. Then, after each enlisting of a resource that has not been enlisted before (for instance, a resource could have been suspended and then re-enlisted to make it active again) a new branch is added in the txLog for this tx via a call to txManager.addBranch(). This way, when reconstructing a tx from the log after a failure, it is possible to reassociate a recovered tx with the resources it was managing.
TxManager "hides" the TransactionImpl objects from the rest of the code. Whenever a new tx must start, the begin() method is called and a new TransactionImpl is created and mapped to the current thread. This map is kept in txThreadMap, a Map<Thread,TransactionImpl>. All public methods of TxManager for managing the lifecycle of txs do not accept a TransactionImpl argument - they get the currentThread() and retrieve the corresponding TransactionImpl. If a begin() call is made from a thread with a mapped tx, an exception is thrown, reminding you that there is no support for nested txs. commit(), rollback(), suspend(), getStatus() and the rest follow the same pattern. An exception is resume(), which does accept a TransactionImpl - after all, which tx should be resumed?
The main operations in TxManager may seem a little complicated but there is not much to them. There is a lot of error handling, since there are so many things that can go wrong and management of the actual state in terms of flags requires a lot of if-else code. Let's walk through the commit() method, that is probably the most challenging.
As described before, the current TransactionImpl is first obtained from the txThreadMap. If it is STATUS_ACTIVE, beforeCompletion hooks are called and then commit() is called on the tx (we are now in the private commit() method). If the tx did not change it status to STATUS_COMMITED, doRollback() is asked from it, afterCompletion hooks are called, it is removed from the txThreadMap, a done entry is written in the log, its status is changed to STATUS_NO_TRANSACTION and the proper exception is thrown (failure during a commit leads to a rollback attempt and an exception thrown always, you know that, right?). Else, if the commit() succeeded, afterCompletion hooks are called, the tx is removed from the txThreadMap, a done entry is written in the log and the status is changed to STATUS_NO_TRANSACTION. If the status of the tx was STATUS_MARKED_ROLLBACK, then doRollback() is asked of the tx and the rest proceed as before.
Finally, a few words about the TxLog. Its philosophy is the same as the XaLogicalLog, writing entries formatted as per the Record inner class, flagged for their kind by the public static final byte fields of the class. It also rotates between files (named tm_tx_log.1 or .2, the active one marked by the contents of active_tx_log), but this rotation is triggered and managed by the TxManager at the getTxLog() and changeActiveLog() methods and performed by TxLog.switchToLogFile(). Of note is the getDanglingRecords() method, used during initialization of the TxManager and during log rotation. It goes over the contents of the log, reading Record entries. It keeps a map from Xid to List<Record>. If an TX_START Record is read, a Xid to empty list mapping is added in the map, else for BRANCH_ADD and MARK_COMMIT records, they are added to the list of the corresponding Xid. For TX_DONE records, the corresponding Xid entry is removed from the map, since this tx has successfully committed or rolled back.

Replaying half-committed txs

An interesting question is what happens if, during the execution of Commands of a tx, the system crashes without having a chance to roll back. In that case, the whole tx will be replayed, meaning that the commands that were executed before the failure will be executed again. The answer to this is: so what? Since the commands are composed of Records, every Command execution is the persistence of the changed record in the persistence store. Writing again a record is simply wasted I/O - nothing changes but the timestamps on the files. Also, since the recovered txs are sorted and executed in serially before normal operation resumes, no locks are necessary - these are not concurrent txs.

We are done for now

The above went a level higher in the hierarchy of the transaction handling code than the previous article. It also tried to make a small excursion into the realm of the XA framework, most of which will be discussed next time. As the posts go by, I tend to focus most on the mechanics rather than analysing the code line-by-line - I feel that if you have followed me during my writings on this subject you should have developed a similar "feel" for the actual implementations. Before closing, I want to thank the people at Neo for their kind words concerning my work and especially Peter Neubauer whose one line of an answer to a question I asked helped my more than he can imagine.


Creative Commons License
This work is licensed under a Creative Commons Attribution 3.0 Unported License.

No comments:

Post a Comment