Most Powerful Open Source ERP

NEO Protocol Specification

Specification of the NEO architecture and protocol
  • Last Update:2019-09-20
  • Version:001
  • Language:en


  • Architecture
  • Network Layer
  • States
  • Workflows
  • Storage Backends

(this document is being written: see below for its state)

This document first introduces NEO architecture and the different roles of each node in a NEO cluster. Then it gives precise definition of communication protocol and semantic of workflows involved:

In Network Layer, we explain how data is exchanged between nodes: serialization format and link establishment. But we do not explain the semantic of messages exchanged between nodes.

In States, we define the different states of the cluster, of each node and cell.

In Workflows, we describe cluster workflows with sequence of messages exchanged between nodes and their semantic in relation to a workflow.

In Storage Backends, we describe the typical architecture of a possible implementation of storage backend.

Notice to implementers: A NEO deployement can be seen as a single application, in that a node can trust peers to implement the same algorithm. However, failures like a node being stopped or unpredictable network may happen at any time and such misbehaviour should be handled gracefully.

State of this Document

The following sections are finished:

  • Architecture:

    all except slides for Cluster Overview and Partitioning

  • Network Layer:

    itself and Messages

  • States:

    Node/Cell/Cluster States

  • Workflows:

    • slides for read, commit and conflict resolution
    • deadlock avoidance (missing slide ?)


NEO is a distributed, redundant and scalable implementation of ZODB API, which means it is a transactional noSQL database with ZODB clients that can use it. More technically, NEO is a ZODB storage backend and this document assumes you have read the ZODB book.

Let us start by explaining the architecture of NEO. This section is inspired by Linux Solutions Presentation and summarized with less technical details.

Cluster Overview

NEO consists of three components: master nodes (M), storage nodes (S) and client nodes (C). Here, node means a software component which runs on a computer: it can either be a process or a set of threads inside another process. They can run in the same computer or different computers in a network.

At the level of an application accessing to a NEO database, there's the client part of NEO, which is used through the ZODB. Client nodes interact with the primary master node and storage nodes to perform transactions and retrieve data.

Among master nodes, one is the primary: as the arbitrator of the cluster, its tasks are to generate new identifiers for objects (OID) and transactions (TID), broadcast invalidations, control transactions, take decisions about topology changes. It does the minimum to guarantee the consistency of the database and in particular, it never sees any application data.

Other master nodes are called secondary. They are spare master nodes. One of them can take over the role of primary master if the current one fails. A master node becomes primary after an election.

Storage nodes are the only ones storing data persistently: mainly application data, i.e. objects and transactions that form the database, but also all the cluster metadata that the primary master needs to recover.

A fourth node type is optional. The administration of a NEO cluster is done via an admin node (A), which issues commands to the primary master. It comes with a command-line tool called neoctl.


A NEO database is split in partitions and each storage node is assigned to several partitions. These assignments form the partition table: each assignment is called cell.

Locating in which partition an OID belongs is done by computing OID % NP, where NP is the number of partitions. Then, the partition table is queried to know which storage node(s) contain(s) the object.

Load balancing is achieved by having several storage nodes, similarly to RAID0. For high availability, each partition can also be assigned to NR+1 storage nodes, similarly to RAID1+0: NR is the number of replicas.

Each cell has a state: for example, it is not readable if it does not contain all the partition data. By also looking at node states, a client knows where to load or store data.

The primary node broadcasts all changes to the partition table to all other nodes, and it also notifies of all node state changes.


Common ZODB implementations do database-level locking, which means that the processing of a tpc_begin must wait that another transaction being committed is finished. In some cases like ZEO for 2 distinct server connections, the database is locked during vote, as described in the above schema.

For better scalability, NEO implements object-level locking, so that transactions modifying disjoint sets of oids can be committed simultaneously. This is done by using temporary TID (TTID) during the commit, and solving possible deadlocks.

During a commit, modified oids are write-locked. On tpc_finish, they are also read-locked because storage nodes may not be ready yet when the clients are notified of the new transaction.

NEO also aims to handle efficiently transactions of any size, in number of bytes or number of changed OIDs. For this, the protocol is designed in such way that the client and storage nodes can process a stream of data even on systems with limited RAM.

Network Layer

Communication between two nodes is done by exchanging messages over a streaming point-to-point protocol that supports guaranteed order and delivery of messages. Only TCP and SSL over TCP are implemented for now (over IPv4 and IPv6), but other protocols like UNIX sockets or QUIC would be possible.

Connection failures shall also be detected. The NEO protocol has no mechanism for that, so it has to be done at the level of the underlying protocol, e.g. by enabling TCP keep-alives.

We start by describing how these messages are serialized; their semantics will be explained in the sections describing workflows. MessagePack is used for all packets. The handshake packet is an array with the following 2 fields:

  • Magic: "NEO" (3 bytes)
  • Version: positive integer

All other packets are arrays of 3 fields, as described in the next section:

  • Message ID: positive integer
  • Message Code: 16-bit positive integer
  • Arguments: array


The message code encodes the type of the message. The following table lists the 70 different types that can be exchanged. 44 are them are requests with response packets. 1 is a generic response packet for error handling. The remaining 25 are notification packets.

The code (#) of a response packet is the same as the corresponding request one, with the highest order bit set. Using Python language, it translates as follows:

response_code = request_code | 0x8000

Message IDs are used to identify response packets: each node sends a request with a unique value and the peer replies using the same id as the request. Notification packets normally follow the same rule as request packets, for debugging purpose. In some complex cases where replying is done with several notification packets followed by a response one (e.g. replication), the notification packets must have the same id as the response.

Notice to implementers:

  • A 32-bit counter can be used for Message IDs, 0 being the value of the first sent message, and the value is reset to 0 after 0xffffffff.
  • On error, the implementer is free to answer with a Error packet before aborting the connection, so that the requesting node logs debugging information. We will only document other uses of Error.
Message Types
# Message Description Workflow Nodes
0 Error Error is a special type of message, because this can be sent against any other message, even if such a message does not expect a reply usually. * → *
1 RequestIdentification Request a node identification. This must be the first packet for any connection. * ⇄ *
2 Ping Empty request used as network barrier. * ⇄ *
3 CloseClient Tell peer that it can close the connection if it has finished with us. * → *
4 AskPrimary Ask node identier of the current primary master. ctl ⇄ A
5 NotPrimaryMaster Notify peer that I'm not the primary master. Attach any extra information to help the peer joining the cluster. SM → *
6 NotifyNodeInformation Notify information about one or more nodes. M → *
7 AskRecovery Ask storage nodes data needed by master to recover. Reused by `neoctl print ids`. M ⇄ S
ctl ⇄ A ⇄ M
8 AskLastIDs Ask the last OID/TID so that a master can initialize its TransactionManager. Reused by `neoctl print ids`. M ⇄ S
ctl ⇄ A ⇄ M
9 AskPartitionTable Ask storage node the remaining data needed by master to recover. M ⇄ S
10 SendPartitionTable Send the full partition table to admin/client/storage nodes on connection. M → A, C, S
11 NotifyPartitionChanges Notify about changes in the partition table. M → *
12 StartOperation Tell a storage node to start operation. Before this message, it must only communicate with the primary master. M → S
13 StopOperation Notify that the cluster is not operational anymore. Any operation between nodes must be aborted. M → S, C
14 AskUnfinishedTransactions Ask unfinished transactions, which will be replicated when they're finished. S ⇄ M
15 AskLockedTransactions Ask locked transactions to replay committed transactions that haven't been unlocked. M ⇄ S
16 AskFinalTID Return final tid if ttid has been committed, to recover from certain failures during tpc_finish. M ⇄ S
C ⇄ M, S
17 ValidateTransaction Do replay a committed transaction that was not unlocked. M → S
18 AskBeginTransaction Ask to begin a new transaction. This maps to `tpc_begin`. C ⇄ M
19 FailedVote Report storage nodes for which vote failed. True is returned if it's still possible to finish the transaction. C ⇄ M
20 AskFinishTransaction Finish a transaction. Return the TID of the committed transaction. This maps to `tpc_finish`. C ⇄ M
21 AskLockInformation Commit a transaction. The new data is read-locked. M ⇄ S
22 InvalidateObjects Notify about a new transaction modifying objects, invalidating client caches. M → C
23 NotifyUnlockInformation Notify about a successfully committed transaction. The new data can be unlocked. M → S
24 AskNewOIDs Ask new OIDs to create objects. C ⇄ M
25 NotifyDeadlock Ask master to generate a new TTID that will be used by the client to solve a deadlock by rebasing the transaction on top of concurrent changes. S → M → C
26 AskRebaseTransaction Rebase a transaction to solve a deadlock. C ⇄ S
27 AskRebaseObject Rebase an object change to solve a deadlock. C ⇄ S
28 AskStoreObject Ask to create/modify an object. This maps to `store`. C ⇄ S
29 AbortTransaction Abort a transaction. This maps to `tpc_abort`. C → S
C → M → S
30 AskStoreTransaction Ask to store a transaction. Implies vote. C ⇄ S
31 AskVoteTransaction Ask to vote a transaction. C ⇄ S
32 AskObject Ask a stored object by its OID, optionally at/before a specific tid. This maps to `load/loadBefore/loadSerial`. C ⇄ S
33 AskTIDs Ask for TIDs between a range of offsets. The order of TIDs is descending, and the range is [first, last). This maps to `undoLog`. C ⇄ S
34 AskTransactionInformation Ask for transaction metadata. C ⇄ S
35 AskObjectHistory Ask history information for a given object. The order of serials is descending, and the range is [first, last]. This maps to `history`. C ⇄ S
36 AskPartitionList Ask information about partitions. ctl ⇄ A
37 AskNodeList Ask information about nodes. ctl ⇄ A
38 SetNodeState Change the state of a node. ctl ⇄ A ⇄ M
39 AddPendingNodes Mark given pending nodes as running, for future inclusion when tweaking the partition table. ctl ⇄ A ⇄ M
40 TweakPartitionTable Ask the master to balance the partition table, optionally excluding specific nodes in anticipation of removing them. ctl ⇄ A ⇄ M
41 SetNumReplicas Set the number of replicas. ctl ⇄ A ⇄ M
42 SetClusterState Set the cluster state. ctl ⇄ A ⇄ M
43 Repair Ask storage nodes to repair their databases. ctl ⇄ A ⇄ M
44 NotifyRepair Repair is translated to this message, asking a specific storage node to repair its database. M → S
45 NotifyClusterInformation Notify about a cluster state change. M → *
46 AskClusterState Ask the state of the cluster ctl ⇄ A
A ⇄ M
47 AskObjectUndoSerial Ask storage the serial where object data is when undoing given transaction, for a list of OIDs. C ⇄ S
48 AskTIDsFrom Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Used by `iterator`. C ⇄ S
49 AskPack Request a pack at given TID. C ⇄ M ⇄ S
50 CheckReplicas Ask the cluster to search for mismatches between replicas, metadata only, and optionally within a specific range. Reference nodes can be specified. ctl ⇄ A ⇄ M
51 CheckPartition Ask a storage node to compare a partition with all other nodes. Like for CheckReplicas, only metadata are checked, optionally within a specific range. A reference node can be specified. M → S
52 AskCheckTIDRange Ask some stats about a range of transactions. Used to know if there are differences between a replicating node and reference node. S ⇄ S
53 AskCheckSerialRange Ask some stats about a range of object history. Used to know if there are differences between a replicating node and reference node. S ⇄ S
54 NotifyPartitionCorrupted Notify that mismatches were found while check replicas for a partition. S → M
55 NotifyReady Notify that we're ready to serve requests. S → M
56 AskLastTransaction Ask last committed TID. C ⇄ M
ctl ⇄ A ⇄ M
57 AskCheckCurrentSerial Check if given serial is current for the given oid, and lock it so that this state is not altered until transaction ends. This maps to `checkCurrentSerialInTransaction`. C ⇄ S
58 NotifyTransactionFinished Notify that a transaction blocking a replication is now finished. M → S
59 Replicate Notify a storage node to replicate partitions up to given 'tid' and from given sources. M → S
60 NotifyReplicationDone Notify the master node that a partition has been successfully replicated from a storage to another. S → M
61 AskFetchTransactions Ask a storage node to send all transaction data we don't have, and reply with the list of transactions we should not have. S ⇄ S
62 AskFetchObjects Ask a storage node to send object records we don't have, and reply with the list of records we should not have. S ⇄ S
63 AddTransaction Send metadata of a transaction to a node that does not have them. S → S
64 AddObject Send an object record to a node that does not have it. S → S
65 Truncate Request DB to be truncated. Also used to leave backup mode. ctl ⇄ A ⇄ M
M ⇄ S
66 FlushLog Request all nodes to flush their logs. ctl → A → M → *
67 AskMonitorInformation ctl ⇄ A
68 NotifyMonitorInformation A → A
69 NotifyUpstreamAdmin M → A
Enum Types
  1. CellStates
    1. OUT_OF_DATE
    2. UP_TO_DATE
    3. FEEDING
  2. ClusterStates
    3. RUNNING
  3. ErrorCodes
    1. ACK
    2. DENIED
    3. NOT_READY
  4. NodeStates
    1. UNKNOWN
    2. DOWN
    3. RUNNING
    4. PENDING
  5. NodeTypes
    1. MASTER
    2. STORAGE
    3. CLIENT
    4. ADMIN

Enum values are serialized using Extension mechanism: type is the number of the Enum type (as listed above), data is MessagePack serialization of the Enum value (i.e. a positive integer). For exemple, NodeStates.RUNNING is encoded as \xd4\x03\x02.

Naming choice: For cell states, node states and node types, names are chosen to have unambiguous initials, which is useful to produce shorter logs or have a more compact user interface. This explains for example why RUNNING was preferred over UP.

Other Constants
INVALID_TID '\xff\xff\xff\xff\xff\xff\xff\xff'
INVALID_OID '\xff\xff\xff\xff\xff\xff\xff\xff'
ZERO_HASH '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
ZERO_TID '\x00\x00\x00\x00\x00\x00\x00\x00'
ZERO_OID '\x00\x00\x00\x00\x00\x00\x00\x00'
MAX_TID '\x7f\xff\xff\xff\xff\xff\xff\xff'

MAX_TID could be bigger but in the Python implementation, TIDs are stored as integers and some storage backend may have no support for values above 2⁶³-1 (e.g. SQLite).

Node IDs are 32-bit integers. NID namespaces are required to prevent conflicts when the master generates new ids before it knows those of existing storage nodes. The high-order byte of node ids is one the following values:


Actually, only the high order bit is really important and the 31 other bits could be random, but extra namespace information and non-randomness of 3 LOB help to read logs.

Link Establishment

Before two nodes can exchange messages, they first need to connect to each other by way of underlying network.

For a given connection, one node was listening and another one dialed listening node address. The way to set address a node listens on is outside scope of this specification. Nodes publish their addresses to the primary master node during identification (see RequestIdentification message). Node information is then broadcast using NotifyNodeInformation.

In the context of established link the node which dialed the listener is called "dialing node", while the node which was listening - "server node".

Notice to implementers:

  • To avoid log flooding, a node should wait a while before retrying to establish a link, e.g. 1 second.
  • Bad SSL implementations like OpenSSL 1.0.2 do not try to reject non-SSL connections as soon as possible. MSG_PEEK can be used to check that the first byte is \x16 (SSL handshake).
Handshake (1/2)

NEO-specific handshake follows after underlying network connection is established to make sure nodes talks the same protocol. Handshake transmissions are not ordered with respect to each other and can go in parallel.

To avoid accidental DoS or stuck communication in case a connection is made with a non-NEO node, MessagePack should not be used to decode this first packet. Comparison should be done:

  • at the lowest level against the expected sequence of bytes;
  • as soon as bytes are received, so that the connection is rejected if the beginning does not match.

To simplify the implementation, the MessagePack encoding must be as small as possible, using bin format for the magic, so '\x92\xa3NEO\x01' for the current version.

Handshake succeeds if the comparison matches exactly. Else, the underlying network link is closed, thus cancelling link establishment.

The version is increased whenever upgrading a node may require to upgrade other nodes.

Handshake (2/2)

Notice to implementers:

  • When only the version differs, a different error should be logged.
  • To avoid log flooding, a node should wait a while before retrying to connect, e.g. 1 second.
  • Messages can be sent without waiting for the handshake phase to complete, even within the same network packet as the handshake.

After successful handshake, any node except tools connecting to the admin node to send command must identify with the server node which either accepts or rejects it. On success, the link becomes established and further message exchange on it can happen depending on peer types, workflow and current cluster state.

node_typeNodeTypes Type of the requesting node.
nidint | nil Node ID of the requesting node if one was already assigned.
address[host: bin, port: int] | nil Address the requesting node itself listens on.
namebin Name of the NEO cluster, to catch misconfiguration mistakes.
id_timestampfloat | nil Time when the requesting node was identified by the master.
extra{key: bin, value: any} Extra node properties.
node_typeNodeTypes Type of the server node
nidint | nil Node ID of the server node.
your_nidint | nil Node ID assigned to requesting node if server is master, otherwise echo of nid parameter.
Identification to the Primary Master

Once the cluster name is checked, the master should check nid and address and reject if it conflicts with an already identified node, using PROTOCOL_ERROR. In particular, it should protect against misconfiguration when cloning a node. For exemple, address must not be the same as the the one of the master. The wanted nid can be ignored for all nodes except storages (a negative nid is considered temporary).

See the beginning of IdentificationHandler.requestIdentification for an example of implementation (note: the uuid variable should be renamed into nid).

Then, processing depends on node_type. Unknown values result in PROTOCOL_ERROR. The master rejects with NOT_READY if the connecting node can't be serviced for the current cluster state (an untested idea is to suspend the identification until the cluster enters into an appropriate state). If accepted:

  1. A new id_timestamp is generated for the node using a strictly increasing function. Although a counter would work, an epoch value (with a protection against date jumps in the past) is more useful because the user can easily know when a node has joined the cluster.
  2. Information about the new node is broadcasted (NotifyNodeInformation) to other nodes.
  3. Optionally, storage node cloning.
  4. The node is marked as identified.
  5. AcceptIdentification is answered, immediately followed by information about nodes: a node knows its id_timestamp via this NotifyNodeInformation packet.
Extra Node Properties
  • Admin
    • backup: List of backup cluster names, for monitoring.
  • Client
    • backup: Backup cluster name. The client node is actually the primary master of a backup cluster.
  • Storage
    • new_nid: Storage node cloning, nid and address must be nil.
    • devpath: A list of strings, e.g. [room, machine, disk]. For maximum resiliency, cells of each partition are assigned as far as possible from each other, by checking the topology path of nodes.
Identification to Other Nodes

The general case is to only accept known nodes (from the master), with same cluster name, and with a id_timestamp that matches. It can mismatch in 2 ways:

  • if the id_timestamp value from master is smaller, the identification can be delayed, waiting for a NotifyNodeInformation packet;
  • else, the connecting node got disconnected from the master and NOT_READY must be returned.

Admin and storage nodes also accept connections from nodes of same type in a related backup cluster. In such case, nid, address and id_timestamp are nil. For storage, the cluster name must match as usual, whereas backup admins specify their own cluster name.

There's no identification between neoctl and an admin node.

Simultaneous Connection between 2 Nodes

As part of replication, it is possible that 2 storage nodes of the same cluster connect to each other at the same time. The race is solved by comparing nid: if the peer id is greater, identification succeeds, replacing the current outgoing link establishment, else it is rejected with PROTOCOL_ERROR.

It implies that the established link is used for requests from both sides. To avoid that storages remain connected with each other forever, the CloseClient packet (no argument) can be used to notify the peer that it's free to close the connection.

XXX Reusing a connection as client whereas CloseClient was already sent is unlikely to work reliably, because the peer may close at any time. The packet should be changed into AsClient(bool).

Operating Data

Besides data actually stored in the database, there is also state maintained among nodes representing e.g. current set of peers, partition table, master's idea about what is currently going on with the cluster etc. The primary master maintains main version of this state, makes changes to it and shares this state with all its peers.

These data consist of:

  • Node table
  • Partition table
  • Cluster state

Whenever some information is shared with a node, it is sent in whole just after identification; then incremental updates are sent after each change.

The next sections describe them.

Node Table

NodeTable contains information about known nodes in a cluster. It is

{} UUID -> (NodeType, NodeAddr, NodeState)

mapping associating node UUID with information about that node.

Master maintains this table and provides it to its peers to know each other via the following:

  • when a node initially connects to M, M sends it whole node table.
  • whenever a node table is changed M broadcasts nodetab changes to all its connected peers.

This way all nodes stay eventually informed about their peers in cluster.

Node States

Possible values for each type:

UNKNOWN notification only

Notification matrix:

(e.g. clients only know the states
of master and storage nodes)

Node states describe the current status of nodes from primary master point of view. By definition, the primary master node is therefore always running and in section, master only refers to secondary master nodes. Admin and client nodes are also either running or forgotten, so node states are somehow only useful for other types.

Used to notify that a node can be forgotten.

Internally, each node keep a list of nodes for which they must know the state:

The node is not connected to the primary master and other nodes must not (try to) communicate with it.
The node is up and running normally as part of the cluster.
The storage node is connected to the cluster but it's still unused. It is an intermediate state before adding it to the partition table.

If the primary master node fails, all nodes must be able to find the new elected one, which explains why any change to the list of master nodes is broadcasted to all nodes without exception. Apart from that, the following changes are considered:

  • The current implementation of the admin node is minimal and we don't know yet if notifying admins about the state of admins would be useful.
  • The PENDING state may be removed. There's at least the following minor issue: a RUNNING storage node without any assigned partition is restarted as PENDING.

Partition Table

Partition Table represents object space partitioning in a cluster

It is

oid -> []Storage # XXX actually oid -> []uuid

mapping associating object id with list of storage nodes on where data for this oid should be written-to/loaded-from.

This mapping is organized as follows:

Oid space is divided (partitioned) into Np parts via

pid(oid) = oid % Np

rule. The oid % Np is known as partition identifier of oid.

There is also externally set "redundancy factor" R which describes the minimum amount of separate nodes a particular object data should be stored into at the same time.

Given Np, R and []Storage PartitionTable tries to organize

pid -> []Storage

mapping so that

  • redundancy level set by R is met
  • storages associated with adjacent pids are different

when such organization is reached the partition table is called operational and non-operational otherwise. XXX and if storages are ready

Master maintains partition table and saves it persistently to storage nodes for recovery purposes.

XXX schematic PT organization from kirr's notes:

        #Np (how-many partitions)    #R (replication factor)
        .node   (-> .nodeid, .addr)

        .backup_tid         # last tid this cell has all data for
        .replicating        # currently replicating up to this (.replicating) tid

        .partition_list [#Np] of []Cell
        .count_dict     {} node -> #node_used_in_pt

        | |
        +-+  +----------+ +------------+ +-----+
        | |  |node,state| |node2,state2| |cell3| ...
        +-+  +----------+ +------------+ +-----+
     Np | |
        | |
        +-+     oid -> PTentry (as PT[oid % Np]
        | |     tid

Cell States

Normal state: cell is writable/readable, and it isn't planned to drop it.
Write-only cell. Last transactions are missing because storage is/was down for a while, or because it is new for the partition. It usually becomes UP_TO_DATE when replication is done.
Same as UP_TO_DATE, except that it will be discarded as soon as another node finishes to replicate it. It means a partition is moved from 1 node to another. It is also discarded immediately if out-of-date.
Not really a state: only used in network packets to tell storages to drop partitions.
A check revealed that data differs from other replicas. Cell is neither readable nor writable.

UP_TO_DATE is the initial cell state when the database is created. Then, i.e. after a tweak, new cells always start in OUT_OF_DATE state.

Cluster States


The primary master indicates to other nodes its idea about what is currently going on with the cluster via the cluster state value.

The cluster is initially in the RECOVERING state, and it goes back to this state whenever the partition table becomes non-operational again. An election of the primary master always happens, in case of a network cut between a primary master and all other nodes. The primary master:
  • first recovers its own data by reading it from storage nodes;
  • waits for the partition table be operational;
  • automatically switch to VERIFYING if the cluster can be safely started.
Transient state, used to:
  • replay the transaction log, in case of unclean shutdown;
  • and actually truncate the DB if the user asked to do so.
Then, the cluster either goes to RUNNING or STARTING_BACKUP state.
Normal operation. The DB is read-writable by clients.
Transient state to shutdown the whole cluster.
Transient state, during which the master (re)connect to the upstream master.
Backup operation. The master is notified of new transactions thanks to invalidations and orders storage nodes to fetch them from upstream. Because cells are synchronized independently, the DB is often inconsistent.
Transient state, when the user decides to go back to RUNNING state. The master stays in this state until the DB is consistent again. In case of failure, the cluster will go back to backup mode.



The way how NEO cluster works is described via workflows. Workflows describe semantic and messages exchanged in relation to particular function of NEO. Some workflows, like read and commit, directly correspond to user-visible service. Others, e.g. recovery and verification are internal to NEO.

Most of the complexity lies in workflows that are not directly related to user-visible service and also in how workflows interact with each other.

There are 16 workflows in NEO:

  • election JM
  • state change propagation
  • cluster start
  • recovery
  • verification
  • running
  • read
  • commit
  • invalidation
  • conflict resolution
  • stopping
  • replication
  • backup
  • partition change JM
  • cluster state change JM
  • node state change JM
  • background check JM

Please see next sections which describe workflows one by one.

old text:

Base workflows are the workflows that relate to the absence of exceptions in NEO protocol operation. There are 12 base workflows in NEO: backup, recovery, election, read, commit, replication, topology change.

PROPOSAL(kirr) I would not split the workflows to those that do not have exceptions and those that do have. As NEO consists of several nodes and links - elements that might be failing, and it is / should be designed to still work in such conditions the errors should be expected and documented / programmed explicitly together with the more common "non-failing" workflow subpart.


XXX make a drawing to show what happens and which messages are involved XXX

Election is the workflow responsible to select only one node in between potentially many master nodes of the cluster. The selected master node becomes known as the primary master. Once the primary master is elected it becomes reponsible for driving the cluster in every aspect where master is documented to be involved.

The election is kept running continuously even after primary master was elected. If current primary master goes down, another one should be reeelected to become primary and continue to drive the cluster.

XXX(jm) document election details and messages.

State change propagation

In NEO whole state of the cluster is centralized in how primary master views it. The primary master observes outside and internal events and makes adjustments to cluster state accordingly. It then propagates state changes to every other node it has link to in order to keep them updated with current cluster state.

XXX make a drawing ? XXX

Every node of the cluster establishes link to master node.

Upon establishing this link master sends newcomer node full node table:

. < M   NotifyNodeInformation{.RowList = all nodes from Node Table}

and continues to update it with similar messages when node table changes:

. < M   NotifyNodeInformation{.RowList = δNodeTab}

This way a node should be staying always eventually informed about other nodes in the cluster.

XXX(jm) why ClusterState is not handled in exactly the same way with NotifyClusterState?

XXX(jm) same for partition table?

XXX(jm) how this interracts with primary master being reelected and changed?

  • SendPartitionTable (M sends whole PT)
  • NotifyPartitionChanges (M sends δPT)

Cluster start

Cluster start is the workflow responsible to bringing cluster to operational state where it can provide service to clients.

The main task of a NEO cluster is to provide read/write service to clients. However before this operational service could be provided, the cluster has to undergo through several startup states which's purpose is to make sure the cluster is indeed ready.

The logic to drive which server-side node does what during startup states and states transition is located inside master node. For startup the master conducts the cluster through Recovery -> Verification -> Running workflows and states.


XXX drawing XXX

Recovery is the process of making sure cluster's partition table can be operational. If consists of the following steps:

  • retrieve partition table saved to storages;
  • wait till enough storage nodes connect to cluster so that retrieved partition table could be operational.

Recovery workflow is used to recover partition table from storages and wait till enough storages connects to M, so that M can see the partition table could be operational. Successful recovery means all above preconditions are met and a command came to M to start the cluster.

M performing recovery:

  • starts from potentially no storage nodes known
  • accept connections from storage nodes
  • retrieve and recover latest previously saved partition table from storages
  • monitor whether partition table becomes operational wrt currently up nodeset
  • if yes - finish recovering upon receiving explicit "start" command, or reaching "autostart" criteria
  • start is also allowed if storages connected and say there is no partition table saved to them (empty new cluster case).

When a storage node successfully connects to master during recovery, it is put into the Node Table with PENDING state.

For every storage node - either already connected to M before recovery started, or that was connected to M during recovery, M reads saved-on-storage partition table:

M > S    Recovery{}
M < S    AnswerRecovery{.PTid}        XXX also .BackupTid, .TruncateTid

M > S    AskPartitionTable{}
M < S    AnswerPartitionTable{.PTid, .RowList}

The partition table with highest ptid is considered as current by master.

If for current partition table the set of storage nodes connected to M is such that the partition table can be operational w.r.t. that set, master makes decision that the cluster is ready to start. If at any time the condition becomes false, the master cancels the decision.

If a decision for cluster to be ready to start is made and the start command comes, master marks all currently connected storage nodes as RUNNING and transitions cluster to VERIFICATION state.

TODO also describe startup of new cluster when there is no PT saved on any storage.


XXX drawing XXX

Verification (data recovery) is the process that starts after successful Recovery. It:

  • finishes partly committed transactions via replay the transaction log, in case it was unclean shutdown previously, and
  • performs DB truncation if there previously was request issued to do so.

VERIFICATION starts with operational partition table (see RECOVERY)

For every storage node - either already connected to M before verification started, or that was connected to M during verification (XXX(jm) - correct?) M performs the following steps:

  • saves recovered partition table to all currently up storages;
  • asks all storages for partly finished transactions and decides cluster-wide which such transactions need to be either finished or rolled back;
  • executes decided finishes and rollbacks on the storages;
  • truncates the database, if it was previously requested;
  • retrieve last ids from storages along the way;
  • once we are done without losing too much storages in the process (so that partition table is still operational) we are ready to enter servicing state.

XXX when accepting S -> S.State = known ? RUNNING : PENDING
XXX recheck it just overwrites PT of newly came node
XXX is it good to just overwrite PT if it had .ptid > ptid we are currently working with?

saves recovered partition table to all currently up storages:

M > S   SendPartitionTable{...}

asks all storages for partly finished transactions and decides cluster-wide which such transactions need to be either finished or rolled back:

M > S   LockedTransactions{}
M < S   AnswerLockedTransactions{.TidDict}    # {} ttid -> tid

# TODO(jm) document what happens if .TidDict != {}
# XXX recheck about UnfinishedTransactions

executes decided finishes and rollbacks on the storages:

# see ^^^

XXX(jm) truncation happens somewhere here

retrieve last ids from storages along the way:

M > S   LastIDs{}
M < S   AnswerLastIDs{.last_oid, .last_tid


XXX drawing XXX

Running is cluster state and associated workflow when the cluster is operational and read/write service is being provided to clients.

  • starts with operational partition table and all present storage nodes consistently either finished or rolled-back partly finished transactions.
  • monitor storages come & go and if partition table becomes non-operational leave to recovery.
  • provide service to clients while we are here.

For a storage - either already connected to M before entering RUNNING state, or that was connected to M during RUNNING (XXX(jm) - correct?) master commands it to start to provide service to clients:

M > S   StartOperation{}    # XXX .Backup
M < S   NotifyReady{}

After replying NotifyReady to master the storage node should accept and handle request messages from clients.

When master decides to leave RUNNING state it sends command to stop operation to all storages:

M > S    StopOperation


When master decides to shutdown whole cluster it cares not to interrupt abruptly transactions that are already in the second phase of being committed. For this reason it enters cluster into transient STOPPING state to wait while such already ongoing transactions are finished. At the same time, the master node blocks any request to begin new transactions.

After all in-progress transactions are finished, master commands all storages to shutdown:

M > S    NotifyNodeInformation{S.UUID, ... DOWN}


To load an object client first consults its partition table and node table snapshots (see state change propagation workflow) to get list of storage nodes which should currently have corresponding data. Then client randomly selects one storage node from that list and sends request to load the object to that storage node:

C > S    GetObject{oid, serial | tid_before}

and expects the storage to return corresponding object data:

C < S    AnswerObject{oid, serial, next_serial, compression, checksum, data, data_serial}

Due to distributed nature of the cluster it is possible that at the time when GetObject request is made the storage referenced by snapshot of client's tables is either no longer working or does not have the data for requested oid. In particular this is possible if master's messages about tables update was sent but not yet delivered to the client. Client has to handle such situations gracefully and properly retry the loading.

Currently client, upon seeing inconsistency in between its tables snapshot and what it actually gets from the storage, sends Ping to master with the idea that when Pong is delivered back, the queued updates will be already delivered. XXX(jm) review.

PROPOSAL(kirr) don't return next_serial at all as this can be made not used at all on client side (by changing NEO client cache), and on the other side returning next_serial forces storage to perform 2 SQL queries instead of only 1 for every GetObject.

XXX(kirr) the Ping trick makes many implicit assumptions about e.g. Master being single threaded and other implicit orderings that are true for current NEO/py implementation, but are generally not true if implementation is not made serial. If any ordering is required for correct working the protocol must specify it explicitly.


Commit is split into phase in accordance with ZODB Two-Phase Commit Protocol:

XXX(kirr) neither locking nor conflict handling is described and thus it is not clear where deadlock might come from, or how conflicts are handled
TODO(jm) please add locking and conflict resolution and integrate or add proper references to deadlock avoidance.
TODO(jm) interaction with concurrent nodetab/parttab changes.
TODO(jm) interaction with synchronous replication.


Client first asks master to begin transaction. If this request succeeds the master returns "temporary" transaction ID (ttid) which will be denoting ongoing transaction until it is finished and transaction ID (tid) is assigned to it.

  C -> M        AskBeginTransaction(tid?)      // XXX(jm) please clarify why we also have tid in BeginTransaction
  C <- M        AnswerBeginTransaction(ttid)

data store

After transaction was begun, the client sends requests to storages to store corresponding objects data:

XXX(kirr) also document askNewOIDs.

store(oid, serial, data, ...):

  for S in storagesFor(oid):
        C -> S  AskStoreObject(oid, serial, ttid ...)

or if it wants an object to be unchanged during the transaction

storeKeepCurrent(oid, serial, ...):

  for S in storagesFor(oid):
        C -> S  CheckCurrentSerial(oid, serial, ttid)

--- eventually ---

  some S from ^^^ (both from AskStoreObject and CheckCurrentSerial):
        C <- S  AnswerStoreObject(conflicting, oid, serial)

here storagesFor(oid) is the function which consults client's partition table snapshot and returns all (XXX(jm) review "all") storage nodes associated with oid for which their cell is writeable.

The client does not wait for all answers from all storages it sent StoreObject request to. In order to proceed to vote phase it is enough to know that at least one storage successfully persisted object data.


At vote phase client waits for the answers from the rest of the storages and performs vote request for the transaction. If voting on a storage succeeds the semantic promise is that data is now properly persisted on that storage node.

  - wait for all responses from S (see "eventually" ^^^)
    if conflicting:
        resolve conflicts                       // XXX(jm) please document

  for S' in storagesFor(ttid):
        C -> S'   AskStoreTransaction(ttid)     # empty answer

  for S" in (storagesFor(oid) - storagesFor):
        C -> S"  AskVoteTransaction(ttid)       # empty answer

  - wait for all responses from S' and S" ^^^


Upon successfully voting the client sends request to finish transaction to master and waits for corresponding response. The master in turn, under client request, assigns real tid (the transaction while ongoing to be committed was identified only by ttid) and issues requests to storages to mark the transaction as committed for real:

C -> M  AskFinishTransaction(ttid, cache_dict, checked_list)

     for S in M.allStorages:
        M -> S  AskLockInformation(ttid, tid)

        M <- S  AnswerInformationLocked(ttid)

C <- M  AnswerTransactionFinished(ttid, tid)

     for S in M.allStorages:
        M -> S  NotifyUnlockInformation(ttid)

XXX(jm) please document the "unlock information" message better. I already deduced the meaning from the code at least 2 or 3 times and always forget it as the LockInformation / UnlockInformation naming is too unclear from my (kirr) point of view.


In case client wants to abort the transaction, it sends AbortTransaction to involved storages and master at any point before tpc_finish phase:

C -> Sv(participating for oidv & tidv) + M

XXX(jm) describe why there is no reply for abort to client and what happens in between M - S during abort.


Invalidation is the workflow responsible for keeping other clients notified of changes whenever one client commits something. Invalidation is used to keep client caches in sync with the database and in backup implementation (see "Backup").

Whenever a transaction is committed, primary master sends to all clients, except the one who performed the commit, the following notification:

M > C   InvalidateObjects{tid, []oid}

with the meaning that there was new transaction with ID=tid committed, and it changed objects in []oid.

Conflict Resolution


Deadlock avoidance workflows

There are 236 deadlock avoidance workflows. XXX(jm) document


XXX drawing XXX

Replication is the workflow responsible for keeping multiple copies of data on several storage nodes, so that if a storage node becomes inaccessible, others will be used to keep NEO cluster continue providing service. The level of desired repliction is usually setup at cluster configuration time with Nreplica parameter which sets on how many nodes the same data should be stored simultaneously.

In NEO there are several types of replications:

  • synchronous replication, and
  • asynchronous replication.


The data is replicated synchronously when it is committed: the client who performs the commit sends changes to all storages that are currently up and should be keeping committed data according to the partition table. Synchronous replication is tightly interrelated to commit process. XXX(jm) document how synchronous (= internal in code speak, right) replication and commit interoperate.


Asynchronous replication is used whenever there is a need to replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:

  • A new storage is added to an existing cluster,
  • A node was separated from cluster and rejoins it,
  • In a backup cluster (see "Backup"), the master notifies a node that new data exists upstream.

Replication happens per partition. Reference node can change between partitions.

synchronous replication

XXX(jm) document synchronous replication

asynchronous replication

XXX(jm) document in detail how results of replication affect partition table (cell states changes).

The primary master is responsible for making decisions to replicate data from one node to another. Whenever such decision is made the master commands the storage node that it wants to also have data to start replicating the data from a reference node. The sinking node reports to master replication completion when done on per-partition basis:

// master commands a storage node to replicate partitions up to given 'tid_max' and from given sources.
M -> R  Replicate{tid_max, upstream_name, source_dict /* {} partition -> address */ }

// eventually after R finishes replicating requested range for a partition it reports "done" to master.
M <- R  ReplicationDone{partition1, max_tid1}
M <- R  ReplicationDone{partition2, max_tid2}
M <- R  ReplicationDone{partitionN, max_tidN}

Replication of one partition is performed in rsync-style manner in 2 parts, done sequentially:

  • Transaction (metadata) replication
  • Object (metadata+data) replication

Both parts follow the same mechanism:

  • The range of data to replicate is split into chunks of some N items (transaction or object).
  • For every chunk, the requesting node sends to seeding node the list of items it already has.
  • Before answering, the seeding node sends 1 packet for every missing item. For items that are already on the replicating node, there is no check that values matches.
  • The seeding node finally answers with the list of items to delete (usually empty).
R -> S  AskFetchTransactions{partition, max_batch, min_tid, max_tid, tid_list_already_have}
R <- S  AddTransaction{tid, user, desc, ext, packed, ttid, oid_list}
R <- S  AddTransaction{...}
...     up to max_batch transactions in [min_tid, max_tid] that R does not have
R <- S  AddTransaction{...}
R <- S  AnswerFetchTransactions{pack_tid, next_tid, tid_list_to_delete}

XXX pack_tid: currently always None

next_tid returned by AnswerFetchTransactions indicates whether there are more transactions to replicate after max_tid and if yes - it is first tid from those. If next_tid != None, S should start next AskFetchTransaction cycle with min_tid = next_tid.

if next_tid is None - S says that all transactions metadata in requested range was replicated and R starts to replicate objects:

R -> S  AskFetchObjects{partition, max_batch, min_tid, max_tid, min_oid, already_have_oid_dict /* {} serial -> []oid */}
R <- S  AddObject{oid, serial, compression, checksum, data, data_serial}
R <- S  AddObject{...}
...     so many objects information for requested range that R does not have
R <- S  AddObject{...}
R <- S  AnswerFetchObjects{pack_tid, next_tid, next_oid, oid_to_delete_dict /* {} serial -> []oid */}

XXX pack_tid: currrently always None

next_tid, next_oid returned by AnswerFetchObjects indicate whether there are more objects to fetch in requested range. If (next_tid, next_oid) != (None, None) - yes, there are, and R starts new AskFetchObjects cycle with min_tid, min_oid = next_tid, next_oid.


XXX drawing XXX

Backup, or more exactly online backup, is the workflow responsible for keeping data from one NEO cluster to be also available at another NEO cluster. If the first cluster is the main one used for a service, and the second cluster is located in e.g. another building or another country, the link latency in between clusters will be much more than intra cluster link latencies. With current state of NEO it is not practical to mix storage nodes with different link latencies into one cluster as the slower ones will be retarding most operations (else it makes more sense to just use all nodes as replicas). However the cluster located at another physical place has high chances to be resilient to physical failure at original place and thus the backup cluster (likey with backup service instance also moved nearby it) can be used in the case of such damage.

Online backup relies on Invalidation and Replication for its working. Whenever a backup cluster is setup its primary master connects as client node to master of the main cluster to receive invalidation messages this way staying informed about transactions committed. The backup master issues commands to its storage nodes to replicate corresponding data from storages of the main cluster. The storages in turn use the same replication protocol which is used in main cluster for asynchronous replication.

XXX(jm) document in more detail how backing up works, in particular how backup cluster reports its partition table and cell states there.

Deadlock avoidance

A typical scenario is the following one:

  1. C1 does a first store (X or X)
  2. C2 stores X and X; one is delayed
  3. C1 stores the other -> deadlock

When solving the deadlock, the data of the first store may only exist on the storage nodes.

Deadlocks are detecting by comparing locking TIDs; initially a locking TID is the same as the TTID. When trying to (write-)lock an OID that is already locked by a smaller locking TID, the store is delayed: this behaviour is chosen because a transaction that starts first is likely to store first. Otherwise, when trying to lock with a smaller locking TID, a deadlock is issued.

Deadlock avoidance Protocol

The main idea behind deadlock avoidance is to rebase OIDs that have already been stored. In other that the comparison of locking TIDs keeps working, the master is involved to deliver a new locking TID:

1. Sa: mark the transaction as being rebased
2. Sa: release all write-locks
3. Sa: sortAndExecuteQueuedEvents
4. Sa > M   NotifyDeadlock{
                ttid,                  # to identify the transaction
                current locking tid,   # M must ignore late/redundant packets
5. Sa: waiting that the transaction is rebased, the store is delayed
6. M > C    NotifyDeadlock{ttid, new locking tid}
7. C > S    # to all involved storage nodes
            RebaseTransaction{ttid, new locking tid}
8. Sb: sets new locking tid
9. Sb (a!=b): same as 2
10. Sb: sortAndExecuteQueuedEvents

At step 10, a new deadlock may happen, in which case it loops to 1 and an empty answer is sent back to the client:

11. Sb > C   AnswerRebaseTransaction{[]}

If there's no new deadlock:

11. lock all oids that can be relocked immediately
12. Sb > C   AnswerRebaseTransaction{
                oids that are locked by another transaction
                + those that conflict

Then for each oid returned by AnswerRebaseTransaction:

13. C > Sb   RebaseObject{ttid,oid}

The processing of RebaseObject is similar to that of StoreObject, with the following differences:

  • The storage node already has the data.
  • Nothing to do in the following case: there was a previous rebase for this oid, it was still delayed during the second RebaseTransaction, and then a conflict was reported when another transaction was committed. A similar case is when a partition is dropped.
  • On conflict, the data must be sent back to the client in case that it does not have it anymore. In this case, the storage node forget this oid.

Possible answers:

  • locked: AnswerRebaseObject{}
  • conflict: AnswerRebaseObject{old tid,saved tid,data} where data is None for checkCurrent or undo (the client has enough information on its side to distinguish the 2 cases)

On client, AnswerRebaseObject is also handled in a similar way that AnswerStoreObject.


RebaseObject is a request packet to simplify the implementation. For more efficiency, this should be turned into a notification, and RebaseTransaction should answered once all objects are rebased (so that the client can still wait on something).

Cascaded deadlocks

So-called cascaded deadlock is when a new deadlock happens during deadlock avoidance. The above protocol does handle this case. Here is a example with a single storage node:

# locking tids: t1 < t2 < t3

1. A2 (t2 stores A)
2. B1, c2 (t2 checks C)
3. A3 (delayed), B3 (delayed), D3 (delayed)
4. C1 -> deadlock: B3
5. d2 -> deadlock: A3

# locking tids: t3 < t1 < t2

6. t3 commits
7. t2 rebase: conflicts on A and D
8. t1 rebase: new deadlock on C
9. t2 aborts (D non current)

all locks released for t1, which rebases and resolves conflicts

Deadlock avoidance also depends on correct ordering of delayed events (sortAndExecuteQueuedEvents), otherwise the cluster could enter in an infinite cascade of deadlocks. Here is an overview with 3 storage nodes and 3 transactions modifying the same oid (2 replicas):

S1     S2     S3     order of locking tids          # abbreviations:
l1     l1     l2     123                            #  l: lock
q23    q23    d1q3   231                            #  d: deadlock triggered
r1:l3  r1:l2  (r1)   # for S3, we still have l2     #  q: queued
d2q1   q13    q13    312                            #  r: rebase

Above, we show what happens when a random transaction gets a lock just after that another is rebased. Here, the result is that the last 2 lines are a permutation of the first 2, and this can repeat indefinitely with bad luck.

The probability of deadlock must be reduced by processing delayed stores/checks in the order of their locking tid. In the above example, S1 would give the lock to 2 when 1 is rebased, and 2 would vote successfully.

SQL Reference Data Structure



The purpose of config table is to persist storage node configuration.

XXX(jm) review

The schema is:

        .name   str
        .value  str
        (cluster name, node id, partitions, ptid, replicas, version, zodb=pickle...)


The purpose of pt table is to persist partition table.

XXX(jm) review

Pt schema is:

        .rid    int     // = row id = part of oid space
        .nid    int
        .state  tinyint // = cell state

        pkey (rid, nid)


The purpose of trans table is to store committed transactions metadata.

Trans schema is:

        .partition      smallint
        .tid            bigint
        .packed         bool
        .oids           mediumblob      // []oid
        .user           blob
        .description    blob
        .ext            blob
        .ttid           bigint          // XXX ?

        pkey (partition, tid)


The purpose of obj table is to persist committed object metadata.

XXX(jm) review

Obj schema is:

        .partition      smallint
        .oid            bigint
        .tid            bigint
        .data_id        bigint | NULL   // ->
        .value_tid      bigint | NULL

        pkey (partition, tid, oid)
        key (partition, oid, tid)
        key data_id


The purpose of data table is to store actual object content.

XXX(jm) review

Data schema is:

        .id             bigint
        .hash           sha1            // UNIQUE (hash, compression)
        .compression    tinyint
        .value          mediumblob

        key id
        key (hash, compression) // <- from UNIQUE ^^^

Ttrans & tobj

The purpose of ttrans and tobj tables is to store not-yet-committed transactions and objects metadata.

TODO(jm) explain t{trans,obj} -> {trans,obj} process.




The original design from Yoshinori Okuji introduces most concepts of NEO architecture and protocol with much details to describe the network protocol. Although the original protocol has been changed since then, the concepts described in the document are still valid and reading the document helps understanding the architecture of NEO.

DZUG presentation XXXX





  • JM:
    • make drawings and explanation for READH + JM marked workflows + all deadlock avoidances
  • Kirr
    • fill the document with all you know
  • JPS
    • state change drawings