pysyncobj package

SyncObj

class pysyncobj.SyncObj(selfNode, otherNodes, conf=None, consumers=None, nodeClass=<class 'pysyncobj.node.TCPNode'>, transport=None, transportClass=<class 'pysyncobj.transport.TCPTransport'>)

Main SyncObj class, you should inherit your own class from it.

Parameters
  • selfNode (Node or str) – object representing the self-node or address of the current node server ‘host:port’

  • otherNodes (iterable of Node or iterable of str) – objects representing the other nodes or addresses of partner nodes [‘host1:port1’, ‘host2:port2’, …]

  • conf (SyncObjConf) – configuration object

  • consumers (list of SyncObjConsumer inherited objects) – objects to be replicated

  • nodeClass (class) – class used for representation of nodes

  • transport (Transport or None) – transport object; if None, transportClass is used to initialise such an object

  • transportClass (class) – the Transport subclass to be used for transferring messages to and from other nodes

addNodeToCluster(node, callback=None)

Add single node to cluster (dynamic membership changes). Async. You should wait until node successfully added before adding next node.

Parameters
  • node (Node | str) – node object or ‘nodeHost:nodePort’

  • callback (function(FAIL_REASON, None)) – will be called on success or fail

destroy()

Correctly destroy SyncObj. Stop autoTickThread, close connections, etc.

destroy_synchronous()

Correctly destroy SyncObj. Stop autoTickThread, close connections, etc. and ensure the threads are gone.

doTick(timeToWait=0.0)

Performs single tick. Should be called manually if autoTick disabled

Parameters

timeToWait (float) – max time to wait for next tick. If zero - perform single tick without waiting for new events. Otherwise - wait for new socket event and return.

forceLogCompaction()

Force to start log compaction (without waiting required time or required number of entries)

getStatus()

Dumps different debug info about cluster to dict and return it

property hasQuorum

Does the cluster have a quorum according to this node

Return type

bool

isNodeConnected(node)

Checks if the given node is connected :param node: node to check :type node: Node :rtype: bool

isReady()

Check if current node is initially synced with others and has an actual data.

Returns

True if ready, False otherwise

Return type

bool

property otherNodes
Return type

set of Node

printStatus()

Dumps different debug info about cluster to default logger

property raftCommitIndex
Return type

int

property raftCurrentTerm
Return type

int

property raftLastApplied
Return type

int

property readonlyNodes
Return type

set of Node

removeNodeFromCluster(node, callback=None)

Remove single node from cluster (dynamic membership changes). Async. You should wait until node successfully added before adding next node.

Parameters
  • node (Node | str) – node object or ‘nodeHost:nodePort’

  • callback – will be called on success or fail

property selfNode
Return type

Node

setCodeVersion(newVersion, callback=None)

Switch to a new code version on all cluster nodes. You should ensure that cluster nodes are updated, otherwise they won’t be able to apply commands.

Parameters

newVersion – new code version

:type int :param callback: will be called on success or fail :type callback: function(FAIL_REASON, None)

tick_thread_alive()

Check if the tick thread is alive.

waitBinded()

Waits until initialized (binded port). If success - just returns. If failed to initialized after conf.maxBindRetries - raise SyncObjException.

waitReady()

Waits until the transport is ready for operation.

Raises

TransportNotReadyError – if the transport fails to get ready

replicated

pysyncobj.replicated(*decArgs, **decKwargs)

Replicated decorator. Use it to mark your class members that modifies a class state. Function will be called asynchronously. Function accepts flowing additional parameters (optional):

‘callback’: callback(result, failReason), failReason - FAIL_REASON. ‘sync’: True - to block execution and wait for result, False - async call. If callback is passed,

‘sync’ option is ignored.

‘timeout’: if ‘sync’ is enabled, and no result is available for ‘timeout’ seconds -

SyncObjException will be raised.

These parameters are reserved and should not be used in kwargs of your replicated method.

Parameters
  • func (function) – arbitrary class member

  • ver (int) – (optional) - code version (for zero deployment)

replicated_sync

pysyncobj.replicated_sync(*decArgs, **decKwargs)

SyncObjConf

class pysyncobj.SyncObjConf(**kwargs)

PySyncObj configuration object

appendEntriesBatchSizeBytes

Max number of bytes per single append_entries command.

appendEntriesPeriod

Interval of sending append_entries (ping) command. Should be less than raftMinTimeout.

appendEntriesUseBatch

Send multiple entries in a single command. Enabled (default) - improve overall performance (requests per second) Disabled - improve single request speed (don’t wait till batch ready)

autoTick

Disable autoTick if you want to call onTick manually. Otherwise it will be called automatically from separate thread.

bindAddress

Bind address (address:port). Default - None. If None - selfAddress is used as bindAddress. Could be useful if selfAddress is not equal to bindAddress. Eg. with routers, nat, port forwarding, etc.

bindRetryTime

Will try to bind port every bindRetryTime seconds until success.

commandsQueueSize

Commands queue is used to store commands before real processing.

commandsWaitLeader

If true - commands will be enqueued and executed after leader detected. Otherwise - FAIL_REASON.MISSING_LEADER error will be emitted. Leader is missing when esteblishing connection or when election in progress.

connectionRetryTime

Interval between connection attempts. Will try to connect to offline nodes each connectionRetryTime.

connectionTimeout

When no data received for connectionTimeout - connection considered dead. Should be more than raftMaxTimeout.

deserializer

Custom deserialize function, it will be called when restore from fullDump. If specified - there should be a custom serializer too. Should return data - internal stuff that was passed to serialize.

dnsCacheTime

Time to cache dns requests (improves performance, no need to resolve address for each connection attempt).

dnsFailCacheTime

Time to cache failed dns request.

dynamicMembershipChange

If enabled - cluster configuration could be changed dynamically.

fullDumpFile

File to store full serialized object. Save full dump on disc when doing log compaction. None - to disable store.

journalFile

File to store operations journal. Save each record as soon as received.

leaderFallbackTimeout

When leader has no response from the majority of the cluster for leaderFallbackTimeout - it will fallback to follower state. Should be more than appendEntriesPeriod.

logCompactionBatchSize

Max number of bytes per single append_entries command while sending serialized object.

logCompactionMinEntries

Log will be compacted after it reach minEntries size or minTime after previous compaction.

logCompactionMinTime

Log will be compacted after it reach minEntries size or minTime after previous compaction.

logCompactionSplit

If true - each node will start log compaction in separate time window. eg. node1 in 12.00-12.10, node2 in 12.10-12.20, node3 12.20 - 12.30, then again node1 12.30-12.40, node2 12.40-12.50, etc.

maxBindRetries

Max number of attempts to bind port (default 0, unlimited).

onCodeVersionChanged

This callback will be called when cluster is switched to new version. onCodeVersionChanged(oldVer, newVer)

onReady

This callback will be called as soon as SyncObj sync all data from leader.

onStateChanged

This callback will be called for every change of SyncObj state. Arguments: onStateChanged(oldState, newState). WARNING: there could be multiple leaders at the same time!

password

Encrypt session with specified password. Install cryptography module to be able to set password.

pollerType

Sockets poller: * auto - auto select best available on current platform * select - use select poller * poll - use poll poller

preferredAddrType

Preferred address type. Default - ipv4. None - no preferences, select random available. ipv4 - prefer ipv4 address type, if not available us ipv6. ipv6 - prefer ipv6 address type, if not available us ipv4.

raftMaxTimeout

Same as raftMinTimeout

raftMinTimeout

After randomly selected timeout (in range from minTimeout to maxTimeout) leader considered dead, and leader election starts.

recvBufferSize

Size of receive for sockets.

sendBufferSize

Size of send buffer for sockets.

serializeChecker

Check custom serialization state, for async serializer. Should return one of SERIALIZER_STATE.

serializer

Custom serialize function, it will be called when logCompaction (fullDump) happens. If specified - there should be a custom deserializer too. Arguments: serializer(fileName, data) data - some internal stuff that is required to be serialized with your object data.

tcp_keepalive

TCP socket keepalive (keepalive_time_seconds, probe_intervals_seconds, max_fails_count) Set to None to disable

useFork

Use fork if available when serializing on disk.

FAIL_REASON

class pysyncobj.FAIL_REASON
DISCARDED = 3

Command discarded (cause of new leader elected and another command was applied instead)

LEADER_CHANGED = 5

Simmilar to NOT_LEADER - leader has changed without command commit.

MISSING_LEADER = 2

Leader is currently missing (leader election in progress, or no connection)

NOT_LEADER = 4

Leader has changed, old leader did not have time to commit command.

QUEUE_FULL = 1

Commands queue full

REQUEST_DENIED = 6

Command denied

SUCCESS = 0

Command successfully applied.

SERIALIZER_STATE

class pysyncobj.SERIALIZER_STATE
FAILED = 3

Serialization failed (should be returned only one time after finished).

NOT_SERIALIZING = 0

Serialization not started or already finished.

SERIALIZING = 1

Serialization in progress.

SUCCESS = 2

Serialization successfully finished (should be returned only one time after finished).