Vespa elasticity

Vespa allows applications to grow (and shrink) their hardware while serving queries and accepting writes as normal. Data is automatically redistributed in the background using the minimal amount of data movement required to reestablish an even data distribution. No restarts or other operations are needed, just change the hardware listed in the configuration and redeploy the application.

The same mechanism is used to automatically recover from an unexpected loss of a machine - new copies of the data are created automatically to reestablish the configured data redundancy. Hence, faulty nodes is not a problem which requires immediate attention in Vespa. As long as the system has enough capacity in total to deal with the data and traffic it will self-heal from any hardware failure.

This article details three subjects:

Storage and retrieval

The data distribution system in Vespa consists of two main components: distributors and storage nodes. The storage nodes provide a Service Provider Interface (SPI) that abstracts how documents are stored in the elastic system. Documents are both written to and read from the system through a distributor.

To handle a large number of documents, Vespa groups them in buckets, either through hashing or through hints located in the document id. The SPI is the link between the elastic bucket management system and the documents storage. In Vespa, the SPI is implemented by proton nodes.

At feeding, a document is sent to all replicas of the bucket. If bucket replicas are out of sync, a bucket merge operation is run to re-sync the bucket. Buckets are split when they grow too large, and joined when they shrink. A bucket contains tombstones of recently removed documents. Read more about document expiry and batch removes in document expiry.

Writing documents

Documents enter Vespa using the Document API, using vespa-feeder, Document API, the Vespa HTTP Client or Java Document API (all binaries use the Java Document API). If no route is set, clients feed to the default route. Next, they enter document processing (round-robin) where the documents are prepared for indexing, before entering the distributor. The distributor determines which bucket should hold each document, and sends them to content nodes:

Document processing The document processing chain is a chain of processors that manipulate documents before they are stored. Document processors can be user defined. When using indexed search, the final step in the chain prepares documents for indexing. The document processing chain calculates a bucket id for each document, which is used to select the correct distributor, using the Document API. The Document API forwards requests to distributors. It calculates correct distributors to talk to using the distribution algorithm and knowledge of the cluster state. With no known cluster state, the client library will send requests to random nodes, which will reply with the updated state if the node was not the correct one. Cluster states are versioned, such that clients hitting outdated distributors does not override updated states with old states.
Distributor The distributor keeps track of which search nodes should hold which copy of each bucket, based on the redundancy setting and information from the cluster controller. Distributors are responsible for keeping track of metadata for a non-overlapping subset of the data in the cluster. The distributors each keep a bucket database containing metadata of buckets it is responsible for. This metadata indicates what content nodes store copies of the buckets, the checksum of the bucket content and the number of documents and meta entries within the bucket. Each incoming document is assigned to a bucket and forwarded to the right search nodes. All requests related to these buckets are sent through the distributors. Read more.
Cluster controller The cluster controller keeps track of the state of the nodes in the installation. This cluster state is used by the document processing chains to know which distributor to send documents to, as well as by the distributor to know which search nodes should have which bucket. Read more.
Search node The search node consists of a bucket management system which sends requests to a set of document databases, which each consists of three sub-databases. In short, this node activates and deactivates buckets for search. Refer to proton for details.

Other aspects of feeding:

Redundancy redundancy sets how many backend instances stores replicas of the same data. searchable-copies configures how many replicas to keep in the Ready sub-database. Redundancy is handled at the bucket level, by maintaining redundancy replicas of each bucket. No node may store more than one replica of a bucket. The distributors detect whether there are enough bucket replicas on the content nodes and add/remove as needed. Write operations wait for replies from every replica and fail if less than redundancy are persisted within timeout. Replica placement is found in document distribution.
Consistency Consistency is maintained at bucket level. Content nodes calculate checksums based on the bucket contents, and the distributors compare checksums among the bucket copies. If inconsistencies are detected, a merge is issued to resolve inconsistency. While there are inconsistent bucket replicas, the distributors try to route external operations to the best replica.

As buckets are split and joined, it is possible for replicas of a bucket to be split at different levels. A node may have been down while its buckets have been split or joined. This is called inconsistent bucket splitting. Bucket checksums can not be compared across buckets with different split levels. Consequently, distributors do not know whether all documents exist in enough replicas in this state. Due to this, inconsistent splitting is one of the highest maintenance priorities. After all buckets are split or joined back to the same level, the distributors can verify that all the replicas are consistent and fix any detected issues with a merge. Read more.

Detailed process overview:

Content node expanded:

Retrieving documents

Retrieving documents is done either by specifying a single id to get, or use a selection expression to visit a range of documents - refer to the Document API. Overview:

Get When the persistence engine receives a get request, it scans through all the document databases, and for each one it checks all three sub-databases. Once the document is found, the scan is stopped and the document returned. If the document is found in a Ready sub-database, the document retriever will patch in any changes that is stored in the attributes before returning the document.
Visit A visit request creates an iterator over each candidate bucket. This iterator will retrieve matching documents from all sub-databases of all document databases. As for get, attributes are patched into documents in the Ready sub-database.

Indexed search has a separate pathway through the system. It does not use the distributor, nor has it anything to do with the SPI. It is orthogonal to the elasticity set up by the storage and retrieval described above. How queries move through the system:

A query enters the system through the QR-server (query rewrite server). The QR-server issues one query per document type to the top level dispatcher, which in turn keeps track of all search nodes and relays queries to them:

Container The Container knows all the document types and rewrites queries as a collection of queries, one for each type. Queries may have a restrict parameter, in which case the container will send the query only to the specified document types.
Top level dispatcher The top level dispatcher is a small process by default running on each container node, which is responsible for sending the query to each content node and collecting partial results. It pings all content nodes every second to know whether they are alive, and keeps open TCP connections to each one. If a node goes down, the elastic system will make the documents available on other nodes.
Content node matching The match engine receives queries and routes them to the right document database based on the document type. In the document database the query is passed straight to the Ready sub-database, where the searchable documents are. Based on information stored in the document meta store, the query is augmented with a blacklist that ensures only active documents can get a match.

Resizing

Resizing is orchestrated by the distributor, and happens gradually in the background. It uses a variation of the RUSH algorithms to distribute documents. Under normal circumstances it makes a minimal number of documents move when nodes are added or removed.

Modify the configuration to add/remove nodes, then deploy. Add an elastic content cluster to the application by adding a content element in services.xml. Documents are distributed over nodes with a configured redundancy.

Bucket distribution algorithm

Central to the ideal state distribution algorithm is the assignment of a node sequence to each bucket. The illustration shows how each bucket uses a pseudo-random sequence of numbers to derive the node sequence. The pseudo-random generator is seeded with the bucket id, so each bucket will always get the same sequence:

Each node is assigned a distribution-key, which is an index in the number sequence. The set of number/index pairs is sorted on descending numbers, and this new sequence of indexes determines which nodes the copies are placed on, with the first node being host to the primary copy, i.e. the active copy. This specification of where to place a bucket is called the bucket's ideal state.

Adding nodes

When adding a new node, new ideal states are calculated for all buckets. The buckets that should have a copy on the new node are moved, while the superfluous copies are removed. Redistribution example - add a new node to the system, with redundancy 2:

The distribution algorithm generates a random node sequence for each bucket. In this example with redundancy of two, the two copies sorted first will store copies of the data. The figure shows how random placement onto two nodes changes as a third node is introduced. The new node introduced takes over as primary copy for the buckets where it got sorted first in order, and as secondary copy for the buckets where it got sorted second. This ensures minimal data movement when nodes come or go, and allows capacity to be changed easily.

No buckets are moved between the existing nodes when a new node is added. Based on the pseudo-random sequences, some buckets change from primary to secondary, or are removed. Many nodes can be added in the same deployment. Adding more than one minimizes the total bucket redistribution, but increases the time to get to the ideal state. Procedure:

  1. Node setup: Prepare nodes by installing software, set up the file systems / directories and set configuration server(s). Details. Start the node.
  2. Modify configuration: Add a node-element in services.xml and hosts.xml. Refer to multinode install. It is key that the node's distribution-key is higher than the highest existing index.
  3. Deploy: Observe metrics to track progress as the cluster redistributes documents. Use the cluster controller to monitor the state of the cluster.
  4. Restart dispatch nodes?: ToDo check this.
  5. Tune performance: Use maxpendingidealstateoperations to tune concurrency of bucket merge operations from distributor nodes. Likewise, tune merges - concurrent merge operations per content node. The tradeoff is speed of bucket replication vs use of resources, which impacts the applications' regular load. ToDo: add something on bucket-oriented summary store as well.
  6. Finish: The cluster is done redistributing when idealstate.merge_bucket.pending is zero on all distributors.

Removing nodes

Whether a node fails or is deliberately removed, the same redistribution happens. If the remove is planned, the node remains up until it has been drained. Example of redistribution after node failure, redundancy 2:

In the figure, node 2 fails. This node held the active copies of bucket 2 and 6. Once the node fails the secondary copies are set active. If they were already in a ready state, they can start serving queries immediately. Otherwise they will have to index their data. All buckets that no longer have secondary copies are copied to the remaining nodes according to their pseudo-random sequence. It is important that the nodes retain their original index; otherwise the buckets would all have to move to regain their ideal states.

Do not remove more than redundancy-1 nodes at a time. Observe idealstate.merge_bucket.pending to know bucket replica status, when zero on all distributor nodes, it is safe to remove more nodes. If hierarchical distribution is used to control bucket replicas, remove all nodes in a group if the reduncancy settings ensure replicas in each group.

To increase bucket redundancy level before taking nodes out, retire nodes. Again, track idealstate.merge_bucket.pending to know when done. Use the State API or vespa-set-node-state to set a node to retired. The cluster controller's status page lists node states.

Merge clusters

To merge two content clusters, add nodes to the cluster like add node above, considering:

Migrate clusters

An alternative to increasing cluster size is building a new cluster, then migrate documents to it. This is supported using visiting.

Limitations and tradeoffs

Availability vs resources

Keeping index structures costs resources. Not all replicas of buckets are necessarily searchable, unless configured using searchable-copies. As Vespa indexes buckets on-demand, the most cost-efficient setting is 1, if one can tolerate temporal coverage loss during node failures. Note that searchable-copies does not apply to streaming search as this does not use index structures.

Distributing buckets using a hierarchy helps implement policies for availability and query performance.

Data retention vs size

When a document is removed, the document data is not immediately purged. Instead, a content cluster keeps remove-entries (tombstones of removed documents) for a configurable amount of time. The default is two weeks, refer to pruneremoveddocumentsage. This ensures that removed documents stay removed in a distributed system where nodes change state. Entries are removed periodically after expiry. Hence, if a node comes back up after being down for a week, removed documents are available again, unless the data on the node is wiped first. A larger pruneremoveddocumentsage will hence grow the storage size as this keeps document and tombstones longer.

Note: The backend does not store remove-entries for nonexistent documents. This to prevent clients sending wrong document identifiers from filling a cluster with invalid remove-entries. A side-effect is that if a problem has caused all replicas of a bucket to be unavailable, documents in this bucket cannot be marked removed until at least one copy is available again. Documents are written in new bucket replicas while the others are down - if these are removed, then older versions of these will not re-emerge, as the most recent change wins.

Transition time See transition-time for tradeoffs for how quickly nodes are set down vs. system stability.
Removing unstable nodes One can configure how many times a node is allowed to crash before it will automatically be removed. The crash count is reset if the node has been up or down continuously for more than the stable state period. If the crash count exceeds max premature crashes, the node will be disabled. Refer to troubleshooting.
Minimal amount of nodes required to be available A cluster is typically sized to handle a given amount of load. A given percentage of the cluster resources are required for normal operations, and the remainder is the available resources that can be used if some of the nodes are no longer usable. If the cluster loses enough hardware, it will be overloaded:

  • Remaining nodes may create disk full situation. This will likely fail a lot of write operations, and if disk is shared with OS, it may also stop the node from functioning.
  • Partition queues will grow to maximum size. As queues are processed in FIFO order, operations are likely to get long latencies.
  • Many operations may time out while being processed, causing the operation to be resent, adding more load to the cluster.
  • When new hardware is added, it cannot serve any requests before data is moved to the new nodes from the already overloaded nodes. Moving data puts even more load on the existing nodes, and as moving data is typically not high priority this may never actually happen.
To configure what the minimal cluster size is, use min-distributor-up-ratio and min-storage-up-ratio.