Most Powerful Open Source ERP

NEO Protocol Specification

Specification of the NEO architecture and protocol
  • Last Update:2018-04-09
  • Version:002
  • Language:en

Agenda

  • Architecture
  • Network Layer
  • States
  • Workflows
  • Storage Backends

(this document is being written: see below for its state)

This document first introduces NEO architecture and the different roles of each node in a NEO cluster. Then it gives precise definition of communication protocol and semantic of workflows involved:

In Network Layer, we explain how data is exchanged between nodes: serialization format and link establishment. But we do not explain the semantic of messages exchanged between nodes.

In States, we define the different states of the cluster, of each node and cell.

In Workflows, we describe cluster workflows with sequence of messages exchanged between nodes and their semantic in relation to a workflow.

In Storage Backends, we describe the typical architecture of a possible implementation of storage backend.

Notice to implementers: A NEO deployement can be seen as a single application, in that a node can trust peers to implement the same algorithm. However, failures like a node being stopped or unpredictable network may happen at any time and such misbehaviour should be handled gracefully.

State of this Document

The following sections are finished:

  • Architecture:

    all except slides for Cluster Overview and Partitioning

  • Network Layer:

    itself and Messages

  • States:

    Node/Cell/Cluster States

  • Workflows:

    • slides for read, commit and conflict resolution
    • deadlock avoidance (missing slide ?)

Architecture

NEO is a distributed, redundant and scalable implementation of ZODB API, which means it is a transactional noSQL database with ZODB clients that can use it. More technically, NEO is a ZODB storage backend and this document assumes you have read the ZODB book.

Let us start by explaining the architecture of NEO. This section is inspired by Linux Solutions Presentation and summarized with less technical details.

Cluster Overview

NEO consists of three components: master nodes (M), storage nodes (S) and client nodes (C). Here, node means a software component which runs on a computer: it can either be a process or a set of threads inside another process. They can run in the same computer or different computers in a network.

At the level of an application accessing to a NEO database, there's the client part of NEO, which is used through the ZODB. Client nodes interact with the primary master node and storage nodes to perform transactions and retrieve data.

Among master nodes, one is the primary: as the arbitrator of the cluster, its tasks are to generate new identifiers for objects (OID) and transactions (TID), broadcast invalidations, control transactions, take decisions about topology changes. It does the minimum to guarantee the consistency of the database and in particular, it never sees any application data.

Other master nodes are called secondary. They are spare master nodes. One of them can take over the role of primary master if the current one fails. A master node becomes primary after an election.

Storage nodes are the only ones storing data persistently: mainly application data, i.e. objects and transactions that form the database, but also all the cluster metadata that the primary master needs to recover.

A fourth node type is optional. The administration of a NEO cluster is done via an admin node (A), which issues commands to the primary master. It comes with a command-line tool called neoctl.

Partitioning

A NEO database is split in partitions and each storage node is assigned to several partitions. These assignments form the partition table: each assignment is called cell.

Locating in which partition an OID belongs is done by computing OID % NP, where NP is the number of partitions. Then, the partition table is queried to know which storage node(s) contain(s) the object.

Load balancing is achieved by having several storage nodes, similarly to RAID0. For high availability, each partition can also be assigned to NR+1 storage nodes, similarly to RAID1+0: NR is the number of replicas.

Each cell has a state: for example, it is not readable if it does not contain all the partition data. By also looking at node states, a client knows where to load or store data.

The primary node broadcasts all changes to the partition table to all other nodes, and it also notifies of all node state changes.

Transactions

Common ZODB implementations do database-level locking, which means that the processing of a tpc_begin must wait that another transaction being committed is finished. In some cases like ZEO for 2 distinct server connections, the database is locked during vote, as described in the above schema.

For better scalability, NEO implements object-level locking, so that transactions modifying disjoint sets of oids can be committed simultaneously. This is done by using temporary TID (TTID) during the commit, and solving possible deadlocks.

During a commit, modified oids are write-locked. On tpc_finish, they are also read-locked because storage nodes may not be ready yet when the clients are notified of the new transaction.

NEO also aims to handle efficiently transactions of any size, in number of bytes or number of changed OIDs. For this, the protocol is designed in such way that the client and storage nodes can process a stream of data even on systems with limited RAM.

Network Layer

Communication between two nodes is done by exchanging messages over a streaming point-to-point protocol that supports guaranteed order and delivery of messages. Only TCP and SSL over TCP are implemented for now (over IPv4 and IPv6), but other protocols like UNIX sockets or QUIC would be possible.

We start by describing how these messages are serialized. Their semantics will be explained in the sections describing workflows.

Each message consists of a header that is immediately followed by a payload. The header consists in 3 unsigned integer fields, in the following order:

Message ID
32 bits
Message Code
16 bits
Payload Length
32 bits

All numeric values use big-endian byte-order.

Messages

Example with Python implementation:

Replicate(tid, '', untouched_dict)
# where msg_id is:
31
# tid:
'\x03\xc6{\x0bg\xe1\x82i'
# untouched_dict:
{0: None, 2: None}

The above packet is encoded into 42 bytes:

# header
0000001f 005a 00000020  # msg_id, size, length
# payload
03c67b0b67e18269        # tid
00000000                # len('')
                        # ''
00000002                # len(untouched_dict)
  00000000 00000000     # 0: None
  00000002 00000000     # 2: None

The following table lists the 64 different types of messages that can be exchanged. 41 are them are requests with response packets. 1 is a generic response packet for error handling. The remaining 22 are notification packets.

The code (#) of a response packet is the same as the corresponding request one, with the highest order bit set. Using Python language, it translates as follows:

response_code = request_code | 0x8000

The format columns refer to item types, which are described in the next table with Python literals, and in particular its struct format.

Message Types
# Message Description Workflow Nodes Format Answer Format
0 Error Error is a special type of message, because this can be sent against any other message, even if such a message does not expect a reply usually. * → * - PNumber code,
PString message
1 RequestIdentification Request a node identification. This must be the first packet for any connection. * ⇄ * PEnum type,
PUUID uuid,
PAddress address,
PString name,
PFloat id_timestamp
PEnum type,
PUUID my_uuid,
PNumber num_partitions,
PNumber num_replicas,
PUUID your_uuid
2 Ping Empty request used as network barrier. * ⇄ *
3 CloseClient Tell peer that it can close the connection if it has finished with us. * → * -
4 PrimaryMaster Ask node identier of the current primary master. ctl ⇄ A PUUID primary_uuid
5 NotPrimaryMaster Notify peer that I'm not the primary master. Attach any extra information to help the peer joining the cluster. SM → * PSignedNull primary,
[PAddress address]
-
6 NotifyNodeInformation Notify information about one or more nodes. M → * PFloat id_timestamp,
[(PEnum type, PAddress address, PUUID uuid, PEnum state, PFloat id_timestamp)]
-
7 Recovery Ask storage nodes data needed by master to recover. Reused by `neoctl print ids`. M ⇄ S
ctl ⇄ A ⇄ M
PPTID ptid,
PTID backup_tid,
PTID truncate_tid
8 LastIDs Ask the last OID/TID so that a master can initialize its TransactionManager. Reused by `neoctl print ids`. M ⇄ S
ctl ⇄ A ⇄ M
PTID last_oid,
PTID last_tid
9 PartitionTable Ask storage node the remaining data needed by master to recover. This is also how the clients get the full partition table on connection. M ⇄ S
C ⇄ M
PPTID ptid,
[(PNumber offset, [(PUUID uuid, PEnum state)])]
10 NotifyPartitionTable Send the full partition table to admin/storage nodes on connection. M → A, S PPTID ptid,
[(PNumber offset, [(PUUID uuid, PEnum state)])]
-
11 PartitionChanges Notify about changes in the partition table. M → * PPTID ptid,
[(PNumber offset, PUUID uuid, PEnum state)]
-
12 StartOperation Tell a storage node to start operation. Before this message, it must only communicate with the primary master. M → S PBoolean backup -
13 StopOperation Notify that the cluster is not operational anymore. Any operation between nodes must be aborted. M → S, C -
14 UnfinishedTransactions Ask unfinished transactions, which will be replicated when they're finished. S ⇄ M [PNumber offset] PTID max_tid,
[PTID unfinished_tid]
15 LockedTransactions Ask locked transactions to replay committed transactions that haven't been unlocked. M ⇄ S {PTID ttid: PTID tid}
16 FinalTID Return final tid if ttid has been committed, to recover from certain failures during tpc_finish. M ⇄ S
C ⇄ M, S
PTID ttid PTID tid
17 ValidateTransaction Do replay a committed transaction that was not unlocked. M → S PTID ttid,
PTID tid
-
18 BeginTransaction Ask to begin a new transaction. This maps to `tpc_begin`. C ⇄ M PTID tid PTID tid
19 FailedVote Report storage nodes for which vote failed. True is returned if it's still possible to finish the transaction. C ⇄ M PTID tid,
[PUUID uuid]
Error
20 FinishTransaction Finish a transaction. Return the TID of the committed transaction. This maps to `tpc_finish`. C ⇄ M PTID tid,
[PTID oid],
[PTID oid]
PTID ttid,
PTID tid
21 LockInformation Commit a transaction. The new data is read-locked. M ⇄ S PTID ttid,
PTID tid
PTID ttid
22 InvalidateObjects Notify about a new transaction modifying objects, invalidating client caches. M → C PTID tid,
[PTID oid]
-
23 UnlockInformation Notify about a successfully committed transaction. The new data can be unlocked. M → S PTID ttid -
24 GenerateOIDs Ask new OIDs to create objects. C ⇄ M PNumber num_oids [PTID oid]
25 Deadlock Ask master to generate a new TTID that will be used by the client to solve a deadlock by rebasing the transaction on top of concurrent changes. S → M → C PTID ttid,
PTID locking_tid
-
26 RebaseTransaction Rebase a transaction to solve a deadlock. C ⇄ S PTID ttid,
PTID locking_tid
[PTID oid]
27 RebaseObject Rebase an object change to solve a deadlock. C ⇄ S PTID ttid,
PTID oid
(PTID serial, PTID conflict_serial, (PBoolean compression, PChecksum checksum, PString data)?)?
28 StoreObject Ask to create/modify an object. This maps to `store`. C ⇄ S PTID oid,
PTID serial,
PBoolean compression,
PChecksum checksum,
PString data,
PTID data_serial,
PTID tid
PTID conflict
29 AbortTransaction Abort a transaction. This maps to `tpc_abort`. C → S
C → M → S
PTID tid,
[PUUID uuid]
-
30 StoreTransaction Ask to store a transaction. Implies vote. C ⇄ S PTID tid,
PString user,
PString description,
PString extension,
[PTID oid]
31 VoteTransaction Ask to vote a transaction. C ⇄ S PTID tid
32 GetObject Ask a stored object by its OID, optionally at/before a specific tid. This maps to `load/loadBefore/loadSerial`. C ⇄ S PTID oid,
PTID at,
PTID before
PTID oid,
PTID serial_start,
PTID serial_end,
PBoolean compression,
PChecksum checksum,
PString data,
PTID data_serial
33 TIDList Ask for TIDs between a range of offsets. The order of TIDs is descending, and the range is [first, last). This maps to `undoLog`. C ⇄ S PIndex first,
PIndex last,
PNumber partition
[PTID tid]
34 TransactionInformation Ask for transaction metadata. C ⇄ S PTID tid PTID tid,
PString user,
PString description,
PString extension,
PBoolean packed,
[PTID oid]
35 ObjectHistory Ask history information for a given object. The order of serials is descending, and the range is [first, last]. This maps to `history`. C ⇄ S PTID oid,
PIndex first,
PIndex last
PTID oid,
[(PTID serial, PNumber size)]
36 PartitionList Ask information about partitions. ctl ⇄ A PNumber min_offset,
PNumber max_offset,
PUUID uuid
PPTID ptid,
[(PNumber offset, [(PUUID uuid, PEnum state)])]
37 NodeList Ask information about nodes. ctl ⇄ A PEnum type [(PEnum type, PAddress address, PUUID uuid, PEnum state, PFloat id_timestamp)]
38 SetNodeState Change the state of a node. ctl ⇄ A ⇄ M PUUID uuid,
PEnum state
Error
39 AddPendingNodes Mark given pending nodes as running, for future inclusion when tweaking the partition table. ctl ⇄ A ⇄ M [PUUID uuid] Error
40 TweakPartitionTable Ask the master to balance the partition table, optionally excluding specific nodes in anticipation of removing them. ctl ⇄ A ⇄ M [PUUID uuid] Error
41 SetClusterState Set the cluster state. ctl ⇄ A ⇄ M PEnum state Error
42 Repair Ask storage nodes to repair their databases. ctl ⇄ A ⇄ M [PUUID uuid],
PBoolean dry_run
Error
43 RepairOne Repair is translated to this message, asking a specific storage node to repair its database. M → S PBoolean dry_run -
44 ClusterInformation Notify about a cluster state change. M → * PEnum state -
45 ClusterState Ask the state of the cluster ctl ⇄ A
A ⇄ M
PEnum state
46 ObjectUndoSerial Ask storage the serial where object data is when undoing given transaction, for a list of OIDs. C ⇄ S PTID tid,
PTID ltid,
PTID undone_tid,
[PTID oid]
{PTID oid: (PTID current_serial, PTID undo_serial, PBoolean is_current)}
47 TIDListFrom Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Used by `iterator`. C ⇄ S PTID min_tid,
PTID max_tid,
PNumber length,
PNumber partition
[PTID tid]
48 Pack Request a pack at given TID. C ⇄ M ⇄ S PTID tid PBoolean status
49 CheckReplicas Ask the cluster to search for mismatches between replicas, metadata only, and optionally within a specific range. Reference nodes can be specified. ctl ⇄ A ⇄ M {PNumber partition: PUUID source},
PTID min_tid,
PTID max_tid
Error
50 CheckPartition Ask a storage node to compare a partition with all other nodes. Like for CheckReplicas, only metadata are checked, optionally within a specific range. A reference node can be specified. M → S PNumber partition,
(PString upstream_name, PAddress address),
PTID min_tid,
PTID max_tid
-
51 CheckTIDRange Ask some stats about a range of transactions. Used to know if there are differences between a replicating node and reference node. S ⇄ S PNumber partition,
PNumber length,
PTID min_tid,
PTID max_tid
PNumber count,
PChecksum checksum,
PTID max_tid
52 CheckSerialRange Ask some stats about a range of object history. Used to know if there are differences between a replicating node and reference node. S ⇄ S PNumber partition,
PNumber length,
PTID min_tid,
PTID max_tid,
PTID min_oid
PNumber count,
PChecksum tid_checksum,
PTID max_tid,
PChecksum oid_checksum,
PTID max_oid
53 PartitionCorrupted Notify that mismatches were found while check replicas for a partition. S → M PNumber partition,
[PUUID uuid]
-
54 NotifyReady Notify that we're ready to serve requests. S → M -
55 LastTransaction Ask last committed TID. C ⇄ M
ctl ⇄ A ⇄ M
PTID tid
56 CheckCurrentSerial Check if given serial is current for the given oid, and lock it so that this state is not altered until transaction ends. This maps to `checkCurrentSerialInTransaction`. C ⇄ S PTID tid,
PTID oid,
PTID serial
PTID conflict
57 NotifyTransactionFinished Notify that a transaction blocking a replication is now finished. M → S PTID ttid,
PTID max_tid
-
58 Replicate Notify a storage node to replicate partitions up to given 'tid' and from given sources. M → S PTID tid,
PString upstream_name,
{PNumber partition: PAddress address}
-
59 ReplicationDone Notify the master node that a partition has been successfully replicated from a storage to another. S → M PNumber offset,
PTID tid
-
60 FetchTransactions Ask a storage node to send all transaction data we don't have, and reply with the list of transactions we should not have. S ⇄ S PNumber partition,
PNumber length,
PTID min_tid,
PTID max_tid,
[PTID tid]
PTID pack_tid,
PTID next_tid,
[PTID tid]
61 FetchObjects Ask a storage node to send object records we don't have, and reply with the list of records we should not have. S ⇄ S PNumber partition,
PNumber length,
PTID min_tid,
PTID max_tid,
PTID min_oid,
{PTID serial: [PTID oid]}
PTID pack_tid,
PTID next_tid,
PTID next_oid,
{PTID serial: [PTID oid]}
62 AddTransaction Send metadata of a transaction to a node that do not have them. S → S PTID tid,
PString user,
PString description,
PString extension,
PBoolean packed,
PTID ttid,
[PTID oid]
-
63 AddObject Send an object record to a node that do not have it. S → S PTID oid,
PTID serial,
PBoolean compression,
PChecksum checksum,
PString data,
PTID data_serial
-
Item Types
Type Encoding Null (e.g. Python's None)
(...) each item is encoded one after the other -
[...] count(!L), (...) -
{keys: values} [(key, value)] -
(...)? '\1', (...) '\0'
PAddress PString, port(!H) -
PBoolean !? -
PChecksum SHA1 (20 bytes) -
PEnum b -1
PFloat !d '\xff\xff\xff\xff\xff\xff\xff\xff'
PIndex !Q -
PNumber !L -
PPTID !Q '\x00\x00\x00\x00\x00\x00\x00\x00'
PSignedNull !l '\x00\x00\x00\x00'
PString size(!L), bytes -
PTID 8 bytes (TID or OID) '\xff\xff\xff\xff\xff\xff\xff\xff'
PUUID !l '\x00\x00\x00\x00'

Note: There's no UUID anymore in NEO and PUUID must renamed into PNID.

Enum Types
  • CellStates
    1. OUT_OF_DATE
    2. UP_TO_DATE
    3. FEEDING
    4. CORRUPTED
    5. DISCARDED
  • ClusterStates
    1. RECOVERING
    2. VERIFYING
    3. RUNNING
    4. STOPPING
    5. STARTING_BACKUP
    6. BACKINGUP
    7. STOPPING_BACKUP
  • ErrorCodes
    1. ACK
    2. NOT_READY
    3. OID_NOT_FOUND
    4. TID_NOT_FOUND
    5. OID_DOES_NOT_EXIST
    6. PROTOCOL_ERROR
    7. REPLICATION_ERROR
    8. CHECKING_ERROR
    9. BACKEND_NOT_IMPLEMENTED
    10. NON_READABLE_CELL
    11. READ_ONLY_ACCESS
    12. INCOMPLETE_TRANSACTION
  • NodeStates
    1. UNKNOWN
    2. DOWN
    3. RUNNING
    4. PENDING
  • NodeTypes
    1. MASTER
    2. STORAGE
    3. CLIENT
    4. ADMIN

Naming choice: For cell states, node states and node types, names are chosen to have unambiguous initials, which is useful to produce shorter logs or have a more compact user interface. This explains for example why RUNNING was preferred over UP.

Other Constants
INVALID_TID '\xff\xff\xff\xff\xff\xff\xff\xff'
INVALID_OID '\xff\xff\xff\xff\xff\xff\xff\xff'
INVALID_PARTITION 0xffffffff
ZERO_HASH '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
ZERO_TID '\x00\x00\x00\x00\x00\x00\x00\x00'
ZERO_OID '\x00\x00\x00\x00\x00\x00\x00\x00'
MAX_TID '\x7f\xff\xff\xff\xff\xff\xff\xff'

MAX_TID could be bigger but in the Python implementation, TIDs are stored as integers and some storage backend may have no support for values above 2⁶³-1 (e.g. SQLite).

Node ID namespaces are required to prevent conflicts when the master generates new ids before it knows those of existing storage nodes. The high-order byte of node ids is one the following values:

Storage0x00
Master-0x10
Client-0x20
Admin-0x30

Actually, only the high order bit is really important and the 31 other bits could be random, but extra namespace information and non-randomness of 3 LOB help to read logs.

Link Establishment

Before two nodes can exchange messages, they first need to connect to each other by way of underlying network.

For a given connection, one node was listening and another one dialed listening node address. The way to set address a node listens on is outside scope of this specification. However every node shares this address with peers in string form (see RequestIdentification message).

In the context of established link the node which dialed the listener is called "dialing node", while the node which was listening - "server node".

Handshake

NEO-specific handshake follows after underlying network connection is established to make sure nodes talks the same NEO protocol version:

HandshakeWord:
    version u32

A > B   HandshakeWord{protocol_version}
A < B   HandshakeWord{protocol_version}

A > B and B > A handshake transmissions are not ordered with respect to each other and can go in parallel.

If a node sees handshake word with not the version it is expecting, it closes underlying network link thus canceling establishing link to the node on the other side.

Handshake succeeds after both nodes receive from each other handshake word with the expected version.

This document describes protocol version 1. XXX move to top + create section for protocol changes?

Identification

After successful handshake the dialing node requests identification and server either accepts or rejects it.

If identification was successful the link becomes established and further message exchange on it can happen depending on peer types, workflow and current cluster state.

The dialing node sends identification request:

a > b    RequestIdentification{
                NodeType,       # type of dialing node
                UUID,           # UUID master assigned to dialing node, or 0 if not yet
                Address,        # address dialing node itself listens on, empty if not
                ClusterName,    # name of NEO cluster (to catch misconfiguration mistakes)
                IdTime,         # time when dialing node identified to master, or 0 if not yet   TODO(jm) clarify
            }

server node either accepts it:

a < b    AcceptIdentification{
                NodeType,       # type of server node
                MyUUID,         # UUID of server node
                NumPartitions,  # partition table parameters currently
                NumReplicas,    # fixed at cluster creation time
                YourUUID,       # UUID assigned to dialing node, if server is master, otherwise echo of RequestIdentification.UUID
                                # XXX(jm) review ^^^
            }

or rejects for some reason with Error message.

XXX(jm) document how in case two storage nodes connect to each other at the same time, there is only one link left.

PROPOSAL(kirr) today UUID (really node id) is assigned to node by master as small numbers. Previously UUID were generated as 16-bytes randoms by nodes themselves, but Grégory changed that so that only master assigns IDs to nodes (commit), and later Julien shortened UUIDs from 16 to 4 bytes (commit), if I understand correctly for simpler debugging. I propose we go back to nodes randomly generating their IDs themselves (and persisting it locally) so that upon reconnection a node always comes with the same ID. This should remove the complexity of master assigning node IDs dynamically with the property that the same node ID should be assigned when the same node reconnects. If we want short names for nodes it could be a layer above random-persistent UUIDs which is itself stored in the database and spread to all nodes by used distributed consensus protocol.
XXX(jm) please document how master currently assigns node IDs and makes sure it stays the same after e.g. node goes down then up and in the meantime master itself restarts and another nodes of the same type connect first before original node reconnects back to master.

Cluster State

Besides data actually stored in the database there is also state maintained among cluster nodes representing e.g. current set of peers, partition table, master's idea about what is currently going on with the cluster etc. The primary master maintains main version of this state and makes changes to it. The primary master shares this state with all its peers.

The cluster state consists of:

  • Node table
  • Partition table
  • Cluster state value    XXX better name?

Please see next sections which describe cluster state part by part.

Node Table

NodeTable contains information about known nodes in a cluster. It is

{} UUID -> (NodeType, NodeAddr, NodeState)

mapping associating node UUID with information about that node.

Master maintains this table and provides it to its peers to know each other via the following:

  • when a node initially connects to M, M sends it whole node table.
  • whenever a node table is changed M broadcasts nodetab changes to all its connected peers.

This way all nodes stay eventually informed about their peers in cluster.

Node States

Possible values for each type:

ADMIN CLIENT MASTER STORAGE
UNKNOWN notification only
DOWN
RUNNING
PENDING

Notification matrix:

ADMIN CLIENT MASTER STORAGE
ADMIN
CLIENT
MASTER
STORAGE
(e.g. clients only know the states
of master and storage nodes)

Node states describe the current status of nodes from primary master point of view. By definition, the primary master node is therefore always running and in section, master only refers to secondary master nodes. Admin and client nodes are also either running or forgotten, so node states are somehow only useful for other types.

UNKNOWN
Used to notify that a node can be forgotten.

Internally, each node keep a list of nodes for which they must know the state:

DOWN
The node is not connected to the primary master and other nodes must not (try to) communicate with it.
RUNNING
The node is up and running normally as part of the cluster.
PENDING
The storage node is connected to the cluster but it's still unused. It is an intermediate state before adding it to the partition table.

If the primary master node fails, all nodes must be able to find the new elected one, which explains why any change to the list of master nodes is broadcasted to all nodes without exception. Apart from that, the following changes are considered:

  • The current implementation of the admin node is minimal and we don't know yet if notifying admins about the state of admins would be useful.
  • The PENDING state may be removed. There's at least the following minor issue: a RUNNING storage node without any assigned partition is restarted as PENDING.

Partition Table

Partition Table represents object space partitioning in a cluster

It is

oid -> []Storage # XXX actually oid -> []uuid

mapping associating object id with list of storage nodes on where data for this oid should be written-to/loaded-from.

This mapping is organized as follows:

Oid space is divided (partitioned) into Np parts via

pid(oid) = oid % Np

rule. The oid % Np is known as partition identifier of oid.

There is also externally set "redundancy factor" R which describes the minimum amount of separate nodes a particular object data should be stored into at the same time.

Given Np, R and []Storage PartitionTable tries to organize

pid -> []Storage

mapping so that

  • redundancy level set by R is met
  • storages associated with adjacent pids are different

when such organization is reached the partition table is called operational and non-operational otherwise. XXX and if storages are ready

Master maintains partition table and saves it persistently to storage nodes for recovery purposes.

XXX schematic PT organization from kirr's notes:

        #Np (how-many partitions)    #R (replication factor)
Cell
        .node   (-> .nodeid, .addr)
        .cell_state

        .backup_tid         # last tid this cell has all data for
        .replicating        # currently replicating up to this (.replicating) tid

PT
        .id↑
        .partition_list [#Np] of []Cell
        .count_dict     {} node -> #node_used_in_pt

         Pt
        +-+
        | |
        +-+  +----------+ +------------+ +-----+
        | |  |node,state| |node2,state2| |cell3| ...
        +-+  +----------+ +------------+ +-----+
     Np | |
        +-+
        | |
        +-+     oid -> PTentry (as PT[oid % Np]
        | |     tid
        +-+

Cell States

%3 cluster DISCARDED DISCARDED OUT_OF_DATE OUT_OF_DATE UP_TO_DATE UP_TO_DATE OUT_OF_DATE->UP_TO_DATE replicated UP_TO_DATE->DISCARDED UP_TO_DATE->OUT_OF_DATE node lost FEEDING FEEDING UP_TO_DATE->FEEDING tweak CORRUPTED CORRUPTED UP_TO_DATE->CORRUPTED check FEEDING->CORRUPTED check
UP_TO_DATE
Normal state: cell is writable/readable, and it isn't planned to drop it.
OUT_OF_DATE
Write-only cell. Last transactions are missing because storage is/was down for a while, or because it is new for the partition. It usually becomes UP_TO_DATE when replication is done.
FEEDING
Same as UP_TO_DATE, except that it will be discarded as soon as another node finishes to replicate it. It means a partition is moved from 1 node to another. It is also discarded immediately if out-of-date.
DISCARDED
Not really a state: only used in network packets to tell storages to drop partitions.
CORRUPTED
A check revealed that data differs from other replicas. Cell is neither readable nor writable.

UP_TO_DATE is the initial cell state when the database is created. Then, i.e. after a tweak, new cells always start in OUT_OF_DATE state.

Cluster States

%3 cluster RECOVERING RECOVERING VERIFYING VERIFYING RECOVERING->VERIFYING STOPPING STOPPING RECOVERING->STOPPING RUNNING RUNNING VERIFYING->RUNNING STARTING_BACKUP STARTING_BACKUP VERIFYING->STARTING_BACKUP RUNNING->RECOVERING RUNNING->STOPPING BACKINGUP BACKINGUP STARTING_BACKUP->BACKINGUP BACKINGUP->STARTING_BACKUP STOPPING_BACKUP STOPPING_BACKUP BACKINGUP->STOPPING_BACKUP

The primary master indicates to other nodes its idea about what is currently going on with the cluster via the cluster state value.

RECOVERING
The cluster is initially in the RECOVERING state, and it goes back to this state whenever the partition table becomes non-operational again. An election of the primary master always happens, in case of a network cut between a primary master and all other nodes. The primary master:
  • first recovers its own data by reading it from storage nodes;
  • waits for the partition table be operational;
  • automatically switch to VERIFYING if the cluster can be safely started.
VERIFYING
Transient state, used to:
  • replay the transaction log, in case of unclean shutdown;
  • and actually truncate the DB if the user asked to do so.
Then, the cluster either goes to RUNNING or STARTING_BACKUP state.
RUNNING
Normal operation. The DB is read-writable by clients.
STOPPING
Transient state to shutdown the whole cluster.
STARTING_BACKUP
Transient state, during which the master (re)connect to the upstream master.
BACKINGUP
Backup operation. The master is notified of new transactions thanks to invalidations and orders storage nodes to fetch them from upstream. Because cells are synchronized independently, the DB is often inconsistent.
STOPPING_BACKUP
Transient state, when the user decides to go back to RUNNING state. The master stays in this state until the DB is consistent again. In case of failure, the cluster will go back to backup mode.

Workflows

XXX DRAWING HOW TRANSITIONS AND STATES RELATED TO WORKFLOWS XXX

The way how NEO cluster works is described via workflows. Workflows describe semantic and messages exchanged in relation to particular function of NEO. Some workflows, like read and commit, directly correspond to user-visible service. Others, e.g. recovery and verification are internal to NEO.

Most of the complexity lies in workflows that are not directly related to user-visible service and also in how workflows interact with each other.

There are 16 workflows in NEO:

  • election JM
  • state change propagation
  • cluster start
  • recovery
  • verification
  • running
  • read
  • commit
  • invalidation
  • conflict resolution
  • stopping
  • replication
  • backup
  • partition change JM
  • cluster state change JM
  • node state change JM
  • background check JM

Please see next sections which describe workflows one by one.

old text:

Base workflows are the workflows that relate to the absence of exceptions in NEO protocol operation. There are 12 base workflows in NEO: backup, recovery, election, read, commit, replication, topology change.

PROPOSAL(kirr) I would not split the workflows to those that do not have exceptions and those that do have. As NEO consists of several nodes and links - elements that might be failing, and it is / should be designed to still work in such conditions the errors should be expected and documented / programmed explicitly together with the more common "non-failing" workflow subpart.

Election

XXX make a drawing to show what happens and which messages are involved XXX

Election is the workflow responsible to select only one node in between potentially many master nodes of the cluster. The selected master node becomes known as the primary master. Once the primary master is elected it becomes reponsible for driving the cluster in every aspect where master is documented to be involved.

The election is kept running continuously even after primary master was elected. If current primary master goes down, another one should be reeelected to become primary and continue to drive the cluster.

XXX(jm) document election details and messages.

State change propagation

In NEO whole state of the cluster is centralized in how primary master views it. The primary master observes outside and internal events and makes adjustments to cluster state accordingly. It then propagates state changes to every other node it has link to in order to keep them updated with current cluster state.

XXX make a drawing ? XXX

Every node of the cluster establishes link to master node.

Upon establishing this link master sends newcomer node full node table:

. < M   NotifyNodeInformation{.RowList = all nodes from Node Table}

and continues to update it with similar messages when node table changes:

. < M   NotifyNodeInformation{.RowList = δNodeTab}

This way a node should be staying always eventually informed about other nodes in the cluster.

XXX(jm) why ClusterState is not handled in exactly the same way with NotifyClusterState?

XXX(jm) same for partition table?

XXX(jm) how this interracts with primary master being reelected and changed?

  • SendPartitionTable (M sends whole PT)
  • NotifyPartitionChanges (M sends δPT)

Cluster start

Cluster start is the workflow responsible to bringing cluster to operational state where it can provide service to clients.

The main task of a NEO cluster is to provide read/write service to clients. However before this operational service could be provided, the cluster has to undergo through several startup states which's purpose is to make sure the cluster is indeed ready.

The logic to drive which server-side node does what during startup states and states transition is located inside master node. For startup the master conducts the cluster through Recovery -> Verification -> Running workflows and states.

Recovery

XXX drawing XXX

Recovery is the process of making sure cluster's partition table can be operational. If consists of the following steps:

  • retrieve partition table saved to storages;
  • wait till enough storage nodes connect to cluster so that retrieved partition table could be operational.

Recovery workflow is used to recover partition table from storages and wait till enough storages connects to M, so that M can see the partition table could be operational. Successful recovery means all above preconditions are met and a command came to M to start the cluster.

M performing recovery:

  • starts from potentially no storage nodes known
  • accept connections from storage nodes
  • retrieve and recover latest previously saved partition table from storages
  • monitor whether partition table becomes operational wrt currently up nodeset
  • if yes - finish recovering upon receiving explicit "start" command, or reaching "autostart" criteria
  • start is also allowed if storages connected and say there is no partition table saved to them (empty new cluster case).

When a storage node successfully connects to master during recovery, it is put into the Node Table with PENDING state.

For every storage node - either already connected to M before recovery started, or that was connected to M during recovery, M reads saved-on-storage partition table:

M > S    Recovery{}
M < S    AnswerRecovery{.PTid}        XXX also .BackupTid, .TruncateTid

M > S    AskPartitionTable{}
M < S    AnswerPartitionTable{.PTid, .RowList}

The partition table with highest ptid is considered as current by master.

If for current partition table the set of storage nodes connected to M is such that the partition table can be operational w.r.t. that set, master makes decision that the cluster is ready to start. If at any time the condition becomes false, the master cancels the decision.

If a decision for cluster to be ready to start is made and the start command comes, master marks all currently connected storage nodes as RUNNING and transitions cluster to VERIFICATION state.

TODO also describe startup of new cluster when there is no PT saved on any storage.

Verification

XXX drawing XXX

Verification (data recovery) is the process that starts after successful Recovery. It:

  • finishes partly committed transactions via replay the transaction log, in case it was unclean shutdown previously, and
  • performs DB truncation if there previously was request issued to do so.

VERIFICATION starts with operational partition table (see RECOVERY)

For every storage node - either already connected to M before verification started, or that was connected to M during verification (XXX(jm) - correct?) M performs the following steps:

  • saves recovered partition table to all currently up storages;
  • asks all storages for partly finished transactions and decides cluster-wide which such transactions need to be either finished or rolled back;
  • executes decided finishes and rollbacks on the storages;
  • truncates the database, if it was previously requested;
  • retrieve last ids from storages along the way;
  • once we are done without losing too much storages in the process (so that partition table is still operational) we are ready to enter servicing state.

XXX when accepting S -> S.State = known ? RUNNING : PENDING
XXX recheck it just overwrites PT of newly came node
XXX is it good to just overwrite PT if it had .ptid > ptid we are currently working with?

saves recovered partition table to all currently up storages:

M > S   SendPartitionTable{...}

asks all storages for partly finished transactions and decides cluster-wide which such transactions need to be either finished or rolled back:

M > S   LockedTransactions{}
M < S   AnswerLockedTransactions{.TidDict}    # {} ttid -> tid

# TODO(jm) document what happens if .TidDict != {}
# XXX recheck about UnfinishedTransactions

executes decided finishes and rollbacks on the storages:

# see ^^^

XXX(jm) truncation happens somewhere here

retrieve last ids from storages along the way:

M > S   LastIDs{}
M < S   AnswerLastIDs{.last_oid, .last_tid

Running

XXX drawing XXX

Running is cluster state and associated workflow when the cluster is operational and read/write service is being provided to clients.

  • starts with operational partition table and all present storage nodes consistently either finished or rolled-back partly finished transactions.
  • monitor storages come & go and if partition table becomes non-operational leave to recovery.
  • provide service to clients while we are here.

For a storage - either already connected to M before entering RUNNING state, or that was connected to M during RUNNING (XXX(jm) - correct?) master commands it to start to provide service to clients:

M > S   StartOperation{}    # XXX .Backup
M < S   NotifyReady{}

After replying NotifyReady to master the storage node should accept and handle request messages from clients.

When master decides to leave RUNNING state it sends command to stop operation to all storages:

M > S    StopOperation

Stopping

When master decides to shutdown whole cluster it cares not to interrupt abruptly transactions that are already in the second phase of being committed. For this reason it enters cluster into transient STOPPING state to wait while such already ongoing transactions are finished. At the same time, the master node blocks any request to begin new transactions.

After all in-progress transactions are finished, master commands all storages to shutdown:

M > S    NotifyNodeInformation{S.UUID, ... DOWN}

Read

To load an object client first consults its partition table and node table snapshots (see state change propagation workflow) to get list of storage nodes which should currently have corresponding data. Then client randomly selects one storage node from that list and sends request to load the object to that storage node:

C > S    GetObject{oid, serial | tid_before}

and expects the storage to return corresponding object data:

C < S    AnswerObject{oid, serial, next_serial, compression, checksum, data, data_serial}

Due to distributed nature of the cluster it is possible that at the time when GetObject request is made the storage referenced by snapshot of client's tables is either no longer working or does not have the data for requested oid. In particular this is possible if master's messages about tables update was sent but not yet delivered to the client. Client has to handle such situations gracefully and properly retry the loading.

Currently client, upon seeing inconsistency in between its tables snapshot and what it actually gets from the storage, sends Ping to master with the idea that when Pong is delivered back, the queued updates will be already delivered. XXX(jm) review.

PROPOSAL(kirr) don't return next_serial at all as this can be made not used at all on client side (by changing NEO client cache), and on the other side returning next_serial forces storage to perform 2 SQL queries instead of only 1 for every GetObject.

XXX(kirr) the Ping trick makes many implicit assumptions about e.g. Master being single threaded and other implicit orderings that are true for current NEO/py implementation, but are generally not true if implementation is not made serial. If any ordering is required for correct working the protocol must specify it explicitly.

Commit

Commit is split into phase in accordance with ZODB Two-Phase Commit Protocol:

XXX(kirr) neither locking nor conflict handling is described and thus it is not clear where deadlock might come from, or how conflicts are handled
TODO(jm) please add locking and conflict resolution and integrate or add proper references to deadlock avoidance.
TODO(jm) interaction with concurrent nodetab/parttab changes.
TODO(jm) interaction with synchronous replication.

tpc_begin

Client first asks master to begin transaction. If this request succeeds the master returns "temporary" transaction ID (ttid) which will be denoting ongoing transaction until it is finished and transaction ID (tid) is assigned to it.

  C -> M        AskBeginTransaction(tid?)      // XXX(jm) please clarify why we also have tid in BeginTransaction
  C <- M        AnswerBeginTransaction(ttid)

data store

After transaction was begun, the client sends requests to storages to store corresponding objects data:

XXX(kirr) also document askNewOIDs.

store(oid, serial, data, ...):

  for S in storagesFor(oid):
        C -> S  AskStoreObject(oid, serial, ttid ...)

or if it wants an object to be unchanged during the transaction

storeKeepCurrent(oid, serial, ...):

  for S in storagesFor(oid):
        C -> S  CheckCurrentSerial(oid, serial, ttid)

--- eventually ---

  some S from ^^^ (both from AskStoreObject and CheckCurrentSerial):
        C <- S  AnswerStoreObject(conflicting, oid, serial)

here storagesFor(oid) is the function which consults client's partition table snapshot and returns all (XXX(jm) review "all") storage nodes associated with oid for which their cell is writeable.

The client does not wait for all answers from all storages it sent StoreObject request to. In order to proceed to vote phase it is enough to know that at least one storage successfully persisted object data.

tpc_vote

At vote phase client waits for the answers from the rest of the storages and performs vote request for the transaction. If voting on a storage succeeds the semantic promise is that data is now properly persisted on that storage node.

  - wait for all responses from S (see "eventually" ^^^)
    if conflicting:
        resolve conflicts                       // XXX(jm) please document

  for S' in storagesFor(ttid):
        C -> S'   AskStoreTransaction(ttid)     # empty answer

  for S" in (storagesFor(oid) - storagesFor):
        C -> S"  AskVoteTransaction(ttid)       # empty answer

  - wait for all responses from S' and S" ^^^

tpc_finish

Upon successfully voting the client sends request to finish transaction to master and waits for corresponding response. The master in turn, under client request, assigns real tid (the transaction while ongoing to be committed was identified only by ttid) and issues requests to storages to mark the transaction as committed for real:

C -> M  AskFinishTransaction(ttid, cache_dict, checked_list)

     for S in M.allStorages:
        M -> S  AskLockInformation(ttid, tid)
        ...

     eventually:
        M <- S  AnswerInformationLocked(ttid)

C <- M  AnswerTransactionFinished(ttid, tid)

     for S in M.allStorages:
        M -> S  NotifyUnlockInformation(ttid)

XXX(jm) please document the "unlock information" message better. I already deduced the meaning from the code at least 2 or 3 times and always forget it as the LockInformation / UnlockInformation naming is too unclear from my (kirr) point of view.

tpc_abort

In case client wants to abort the transaction, it sends AbortTransaction to involved storages and master at any point before tpc_finish phase:

C -> Sv(participating for oidv & tidv) + M
        AbortTransaction(ttid)

XXX(jm) describe why there is no reply for abort to client and what happens in between M - S during abort.

Invalidation

Invalidation is the workflow responsible for keeping other clients notified of changes whenever one client commits something. Invalidation is used to keep client caches in sync with the database and in backup implementation (see "Backup").

Whenever a transaction is committed, primary master sends to all clients, except the one who performed the commit, the following notification:

M > C   InvalidateObjects{tid, []oid}

with the meaning that there was new transaction with ID=tid committed, and it changed objects in []oid.

Conflict Resolution

XXX

Deadlock avoidance workflows

There are 236 deadlock avoidance workflows. XXX(jm) document

Replication

XXX drawing XXX

Replication is the workflow responsible for keeping multiple copies of data on several storage nodes, so that if a storage node becomes inaccessible, others will be used to keep NEO cluster continue providing service. The level of desired repliction is usually setup at cluster configuration time with Nreplica parameter which sets on how many nodes the same data should be stored simultaneously.

In NEO there are several types of replications:

  • synchronous replication, and
  • asynchronous replication.

synchronous

The data is replicated synchronously when it is committed: the client who performs the commit sends changes to all storages that are currently up and should be keeping committed data according to the partition table. Synchronous replication is tightly interrelated to commit process. XXX(jm) document how synchronous (= internal in code speak, right) replication and commit interoperate.

asynchronous

Asynchronous replication is used whenever there is a need to replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:

  • A new storage is added to an existing cluster,
  • A node was separated from cluster and rejoins it,
  • In a backup cluster (see "Backup"), the master notifies a node that new data exists upstream.

Replication happens per partition. Reference node can change between partitions.

synchronous replication

XXX(jm) document synchronous replication

asynchronous replication

XXX(jm) document in detail how results of replication affect partition table (cell states changes).

The primary master is responsible for making decisions to replicate data from one node to another. Whenever such decision is made the master commands the storage node that it wants to also have data to start replicating the data from a reference node. The sinking node reports to master replication completion when done on per-partition basis:

// master commands a storage node to replicate partitions up to given 'tid_max' and from given sources.
M -> R  Replicate{tid_max, upstream_name, source_dict /* {} partition -> address */ }
...

// eventually after R finishes replicating requested range for a partition it reports "done" to master.
M <- R  ReplicationDone{partition1, max_tid1}
M <- R  ReplicationDone{partition2, max_tid2}
... 
M <- R  ReplicationDone{partitionN, max_tidN}

Replication of one partition is performed in rsync-style manner in 2 parts, done sequentially:

  • Transaction (metadata) replication
  • Object (metadata+data) replication

Both parts follow the same mechanism:

  • The range of data to replicate is split into chunks of some N items (transaction or object).
  • For every chunk, the requesting node sends to seeding node the list of items it already has.
  • Before answering, the seeding node sends 1 packet for every missing item. For items that are already on the replicating node, there is no check that values matches.
  • The seeding node finally answers with the list of items to delete (usually empty).
R -> S  AskFetchTransactions{partition, max_batch, min_tid, max_tid, tid_list_already_have}
R <- S  AddTransaction{tid, user, desc, ext, packed, ttid, oid_list}
R <- S  AddTransaction{...}
...     up to max_batch transactions in [min_tid, max_tid] that R does not have
R <- S  AddTransaction{...}
R <- S  AnswerFetchTransactions{pack_tid, next_tid, tid_list_to_delete}

XXX pack_tid: currently always None

next_tid returned by AnswerFetchTransactions indicates whether there are more transactions to replicate after max_tid and if yes - it is first tid from those. If next_tid != None, S should start next AskFetchTransaction cycle with min_tid = next_tid.

if next_tid is None - S says that all transactions metadata in requested range was replicated and R starts to replicate objects:

R -> S  AskFetchObjects{partition, max_batch, min_tid, max_tid, min_oid, already_have_oid_dict /* {} serial -> []oid */}
R <- S  AddObject{oid, serial, compression, checksum, data, data_serial}
R <- S  AddObject{...}
...     so many objects information for requested range that R does not have
R <- S  AddObject{...}
R <- S  AnswerFetchObjects{pack_tid, next_tid, next_oid, oid_to_delete_dict /* {} serial -> []oid */}

XXX pack_tid: currrently always None

next_tid, next_oid returned by AnswerFetchObjects indicate whether there are more objects to fetch in requested range. If (next_tid, next_oid) != (None, None) - yes, there are, and R starts new AskFetchObjects cycle with min_tid, min_oid = next_tid, next_oid.

Backup

XXX drawing XXX

Backup, or more exactly online backup, is the workflow responsible for keeping data from one NEO cluster to be also available at another NEO cluster. If the first cluster is the main one used for a service, and the second cluster is located in e.g. another building or another country, the link latency in between clusters will be much more than intra cluster link latencies. With current state of NEO it is not practical to mix storage nodes with different link latencies into one cluster as the slower ones will be retarding most operations (else it makes more sense to just use all nodes as replicas). However the cluster located at another physical place has high chances to be resilient to physical failure at original place and thus the backup cluster (likey with backup service instance also moved nearby it) can be used in the case of such damage.

Online backup relies on Invalidation and Replication for its working. Whenever a backup cluster is setup its primary master connects as client node to master of the main cluster to receive invalidation messages this way staying informed about transactions committed. The backup master issues commands to its storage nodes to replicate corresponding data from storages of the main cluster. The storages in turn use the same replication protocol which is used in main cluster for asynchronous replication.

XXX(jm) document in more detail how backing up works, in particular how backup cluster reports its partition table and cell states there.

Deadlock avoidance

A typical scenario is the following one:

  1. C1 does a first store (X or X)
  2. C2 stores X and X; one is delayed
  3. C1 stores the other -> deadlock

When solving the deadlock, the data of the first store may only exist on the storage nodes.

Deadlocks are detecting by comparing locking TIDs; initially a locking TID is the same as the TTID. When trying to (write-)lock an OID that is already locked by a smaller locking TID, the store is delayed: this behaviour is chosen because a transaction that starts first is likely to store first. Otherwise, when trying to lock with a smaller locking TID, a deadlock is issued.

Deadlock avoidance Protocol

The main idea behind deadlock avoidance is to rebase OIDs that have already been stored. In other that the comparison of locking TIDs keeps working, the master is involved to deliver a new locking TID:

1. Sa: mark the transaction as being rebased
2. Sa: release all write-locks
3. Sa: sortAndExecuteQueuedEvents
4. Sa > M   NotifyDeadlock{
                ttid,                  # to identify the transaction
                current locking tid,   # M must ignore late/redundant packets
              }
5. Sa: waiting that the transaction is rebased, the store is delayed
6. M > C    NotifyDeadlock{ttid, new locking tid}
7. C > S    # to all involved storage nodes
            RebaseTransaction{ttid, new locking tid}
8. Sb: sets new locking tid
9. Sb (a!=b): same as 2
10. Sb: sortAndExecuteQueuedEvents

At step 10, a new deadlock may happen, in which case it loops to 1 and an empty answer is sent back to the client:

11. Sb > C   AnswerRebaseTransaction{[]}

If there's no new deadlock:

11. lock all oids that can be relocked immediately
12. Sb > C   AnswerRebaseTransaction{
                oids that are locked by another transaction
                + those that conflict
              }

Then for each oid returned by AnswerRebaseTransaction:

13. C > Sb   RebaseObject{ttid,oid}

The processing of RebaseObject is similar to that of StoreObject, with the following differences:

  • The storage node already has the data.
  • Nothing to do in the following case: there was a previous rebase for this oid, it was still delayed during the second RebaseTransaction, and then a conflict was reported when another transaction was committed. A similar case is when a partition is dropped.
  • On conflict, the data must be sent back to the client in case that it does not have it anymore. In this case, the storage node forget this oid.

Possible answers:

  • locked: AnswerRebaseObject{}
  • conflict: AnswerRebaseObject{old tid,saved tid,data} where data is None for checkCurrent or undo (the client has enough information on its side to distinguish the 2 cases)

On client, AnswerRebaseObject is also handled in a similar way that AnswerStoreObject.

Note

RebaseObject is a request packet to simplify the implementation. For more efficiency, this should be turned into a notification, and RebaseTransaction should answered once all objects are rebased (so that the client can still wait on something).

Cascaded deadlocks

So-called cascaded deadlock is when a new deadlock happens during deadlock avoidance. The above protocol does handle this case. Here is a example with a single storage node:

# locking tids: t1 < t2 < t3

1. A2 (t2 stores A)
2. B1, c2 (t2 checks C)
3. A3 (delayed), B3 (delayed), D3 (delayed)
4. C1 -> deadlock: B3
5. d2 -> deadlock: A3

# locking tids: t3 < t1 < t2

6. t3 commits
7. t2 rebase: conflicts on A and D
8. t1 rebase: new deadlock on C
9. t2 aborts (D non current)

all locks released for t1, which rebases and resolves conflicts

Deadlock avoidance also depends on correct ordering of delayed events (sortAndExecuteQueuedEvents), otherwise the cluster could enter in an infinite cascade of deadlocks. Here is an overview with 3 storage nodes and 3 transactions modifying the same oid (2 replicas):

S1     S2     S3     order of locking tids          # abbreviations:
l1     l1     l2     123                            #  l: lock
q23    q23    d1q3   231                            #  d: deadlock triggered
r1:l3  r1:l2  (r1)   # for S3, we still have l2     #  q: queued
d2q1   q13    q13    312                            #  r: rebase

Above, we show what happens when a random transaction gets a lock just after that another is rebased. Here, the result is that the last 2 lines are a permutation of the first 2, and this can repeat indefinitely with bad luck.

The probability of deadlock must be reduced by processing delayed stores/checks in the order of their locking tid. In the above example, S1 would give the lock to 2 when 1 is rebased, and 2 would vote successfully.

SQL Reference Data Structure

XXX

Config

The purpose of config table is to persist storage node configuration.

XXX(jm) review

The schema is:

        .name   str
        .value  str
        (cluster name, node id, partitions, ptid, replicas, version, zodb=pickle...)

Pt

The purpose of pt table is to persist partition table.

XXX(jm) review

Pt schema is:

        .rid    int     // = row id = part of oid space
        .nid    int
        .state  tinyint // = cell state

        pkey (rid, nid)

Trans

The purpose of trans table is to store committed transactions metadata.

Trans schema is:

        .partition      smallint
        .tid            bigint
        .packed         bool
        .oids           mediumblob      // []oid
        .user           blob
        .description    blob
        .ext            blob
        .ttid           bigint          // XXX ?

        pkey (partition, tid)

Obj

The purpose of obj table is to persist committed object metadata.

XXX(jm) review

Obj schema is:

        .partition      smallint
        .oid            bigint
        .tid            bigint
        .data_id        bigint | NULL   // -> data.id
        .value_tid      bigint | NULL

        pkey (partition, tid, oid)
        key (partition, oid, tid)
        key data_id

Data

The purpose of data table is to store actual object content.

XXX(jm) review

Data schema is:

        .id             bigint
        .hash           sha1            // UNIQUE (hash, compression)
        .compression    tinyint
        .value          mediumblob

        key id
        key (hash, compression) // <- from UNIQUE ^^^

Ttrans & tobj

The purpose of ttrans and tobj tables is to store not-yet-committed transactions and objects metadata.

TODO(jm) explain t{trans,obj} -> {trans,obj} process.

Appendix

XXX

References

The original design from Yoshinori Okuji introduces most concepts of NEO architecture and protocol with much details to describe the network protocol. Although the original protocol has been changed since then, the concepts described in the document are still valid and reading the document helps understanding the architecture of NEO.

DZUG presentation XXXX

 

XXX

XXX

Todo

  • JM:
    • make drawings and explanation for READH + JM marked workflows + all deadlock avoidances
  • Kirr
    • fill the document with all you know
  • JPS
    • state change drawings