Proton is Vespa's search core and runs on each content node as the vespa-proton-bin process.
Proton maintains disk and memory structures for documents (organized per document type),
handles read and write operations,
and execution of queries.
As the document data is dynamic, disk and memory structures are periodically optimized by
maintenance jobs.
The content node has a bucket management system
which sends requests to a set of document databases,
which each consists of three sub-databasesready, not ready and removed:
Bucket management
When the node starts up it first needs to get an overview of what documents and buckets it has.
Once metadata for all buckets are known, the content nodes transitions from down to up state.
As the distributors wants quick access to bucket metadata, it maintains an in-memory bucket
database to efficiently serve these requests.
The state of the bucket database can always be reconstructed from the durably persisted
search node state, but this is expensive and therefore only happens at process startup time.
This database is considered the source of truth for the state of the node's bucket metadata
for the duration of the process' lifetime. As incoming operations mutate the state of the
documents on the node, it is critical that the database is always kept in sync with these changes.
Persistence threads and operation dispatching
A content node has a pool of persistence threads that is created at startup
and remains fixed in size for the lifetime of the process. It is the responsibility of the
persistence threads to schedule incoming write and read operations received by the content
node, dispatch these to the search core, and to ensure the bucket database remains in sync
with changes caused by write operations.
Unless explicitly configured, the size of the thread pool is automatically set based on the
number of CPU cores available.
Persistence threads are backed by a persistence queue. Read/write-operations received
by the RPC subsystem are pushed onto this queue. The queue is operation deadline-aware; if
an operation has exceeded its deadline while enqueued, it is immediately failed back to
the sender without being executed. This avoids a particular failure scenario where a heavily
loaded node spends increasingly more and more time processing already doomed operations,
due to not being able to drain its queue quickly enough.
All operations bound for a particular data bucket (such as Puts, Gets etc.) execute in the
context of a bucket lock. Locks are shared for reads and exclusive for writes.
This means that multiple read operations can execute in parallel for the same bucket, but
only one write operation can execute for a bucket at any given time (and no reads can be
started concurrently with existing writes for a bucket, and vice versa). Note that some of
these locking restrictions can be relaxed when it's safe to do so—see
performance optimizations for details.
If a persistence thread tries to pop an operation from the queue and sees that the bucket it's
bound for is already locked, it will leave the operation in place in the queue and try the next
operation(s) instead. This means that although the queue acts as a FIFO for client operations
towards a single bucket, this is not the case across multiple buckets.
Write operations
Write operations are dispatched as asynchronous—i.e. non-blocking—tasks to the search core.
This increases parallelism by freeing up persistence threads to handle other operations, and a deeper
pipeline enables the search core to optimize transaction log synchronization and batching of data
structure updates.
Since a deeper pipeline comes at the potential cost of increased latency when many operations are in
flight, the maximum number of concurrent asynchronous operations is bounded by an adaptive persistence
throttling mechanism. The throttler will dynamically scale the window of concurrency until it reaches a
saturation point where further increasing the window size also results in increased operation latencies.
When the number of in-flight operations hits the current maximum, persistence threads will not
dispatch any more writes until the number goes down. Reads can still be processed during this time.
An asynchronous write-task holds on to the exclusive bucket lock for the duration of its lifetime.
Once the search core completes the write operation, the bucket database is updated with the new metadata
state of the bucket (which reflects the side effects of the write) prior to releasing the lock.
An operation reply is then generated and sent back via the RPC subsystem.
Read operations
Read operations are always evaluated synchronously—i.e. blocking—by persistence threads.
To avoid having potentially expensive maintenance read operations (such as those used for
replica reconciliation) block client
operations for prolonged amounts of time, a subset of the persistence threads are not allowed
to process such maintenance operations.
Note that the condition evaluation step of a test-and-set write is considered a read sub-operation
and is therefore done synchronously. Since it's part of a write operation, it happens atomically in
the context of the exclusive lock of the higher-level operation.
Performance optimizations
To reduce thread context switches, some write operations may bypass the persistence thread queues
and be directly asynchronously dispatched to the search core from the RPC thread the operation
was received at.
Such operations must still successfully acquire the appropriate exclusive bucket lock—if the
lock cannot be immediately acquired the operation is pushed onto the persistence queue instead.
To reduce lock contention and thread wakeups, smaller numbers of persistence threads are grouped
together in stripes that share a dedicated per-stripe persistence queue.
Operations are routed deterministically to a particular stripe based on their bucket ID, meaning
that stripes operate on non-overlapping parts of the bucket space. Together, the many stripes and
queues form one higher level logical queue that covers the entire bucket space.
If the queue contains multiple non-conflicting write operations to the same bucket, these
may be dispatched in parallel in the context of the same write lock. This avoids having
to wait for an entire lock-execute-unlock roundtrip prior to dispatching the next write for the
same bucket. An example of conflicting writes is multiple Puts to the same document ID.
The maximum number of operations dispatched in parallel is implementation-defined.
Document database
Each document database is responsible for a single document type.
It has a component called FeedHandler which takes care of incoming documents, updates, and remove requests.
All requests are first written to a transaction log,
then handed to the appropriate sub-database, based on the request type.
Sub-databases
There are three types of sub-databases, each with its own
document meta store and document store.
The document meta store holds a map from the document id to a local id.
This local id is used to address the document in the document store.
The document meta store also maintains information on the
state of the buckets that are present in the sub-database.
The sub-databases are maintained by the Maintenance Controller.
The document distribution changes as the system is resized.
When the number of nodes in the system changes,
the Maintenance Controller will move documents between the Ready and
Not Ready sub-databases to reflect the new distribution.
When an entry in the Removed sub-database gets old it is purged.
The sub-databases are:
Not Ready
Holds the redundant documents that are not searchable,
i.e. the not ready documents.
Documents that are not ready are only stored, not indexed.
It takes some processing to move from this state to the ready state.
Ready
Maintains attributes and indexes of all ready documents and keeps them searchable.
One of the ready copies is active while the rest are not active:
Active
There should always be exactly one active copy of each
document in the system, though intermittently there may be more.
These documents produce results when queries are evaluated.
Not Active
The ready copies that are not active are indexed but will not produce results.
By being indexed, they are ready to take over immediately
if the node holding the active copy becomes unavailable.
Read more in searchable-copies.
Removed
Keeps track of documents that have been removed.
The id and timestamp for each document is kept.
This information is used when buckets from two nodes are merged.
If the removed document exists on another node but with a different timestamp,
the most recent entry prevails.
Transaction log
Content nodes have a transaction log to persist mutating operations.
The transaction log persists operations by file append.
Having a transaction log simplifies proton's in-memory index structures
and enables steady-state high performance, read more below.
All operations are written and synced to the transaction log.
This is sequential (not random) IO but might impact overall feed performance if running on NAS attached storage
where the sync operation has a much higher cost than on local attached storage (e.g. SSD).
See sync-transactionlog.
Default, proton will
flush components
like attribute vectors and memory index on shutdown, for quicker startup after scheduled restarts.
Document store
Documents are stored as compressed serialized blobs in the document store.
Put, update and remove operations are persisted in the transaction log
before updating the document in the document store.
The operation is acked to the client and the result of the operation is immediately seen in search results.
Files in the document store are written sequentially, and occur in pairs - example:
-rw-r--r-- 1 owner users 4133380096 Aug 10 13:36 1467957947689211000.dat
-rw-r--r-- 1 owner users 71192112 Aug 10 13:36 1467957947689211000.idx
The maximum size:
(in bytes) per .dat file on disk can be set using the following:
The files are written in sequence. proton starts with one pair
and grows it until maxfilesize. Once full, a new pair is started.
This means, the pair is immutable, except the last pair, which is written to.
Documents exist in multiple versions in multiple files.
Older versions are compacted away when a pair is scheduled for being the new active pair -
obsolete versions are removed, leaving only the active document version left in a new file pair
- which is the new active pair.
Files are written in chunks,
using compression settings.
Defragmentation
Document store compaction,
defragments and sort document store files.
It removes stale versions of documents (i.e. old version of updated documents).
It is triggered when the disk bloat of the document store is larger than the total disk usage of the document store
times
diskbloatfactor.
Refer to summary tuning for details.
Defragmentation status is best observed by tracking the
max_bucket_spread
metric over time.
A sawtooth pattern is normal for corpora that change over time.
The document_store_compact
metric tracks when proton is running the document store compaction job.
Compaction settings can be set too tight, in that case, the metric is always, or close to, 1.
When benchmarking, it is important to set the correct compaction settings,
and also make sure that proton has compacted files since (can take hours),
and is not actively compacting (document_store_compact should be 0 most of the time).
Note:
There is no bucket-compaction across files - documents will not move between files.
Optimized reads using chunks
As documents are clustered within the .dat file,
proton optimizes reads by reading larger chunks when accessing documents.
When visiting, documents are read in bucket order.
This is the same order as the defragmentation jobs uses.
The first document read in a visit operation for a bucket will read a chunk from the .dat file into memory.
Subsequent document accesses are served by a memory lookup only.
The chunk size is configured by
maxsize:
There can be 2^22=4M chunks. This sets a minimum chunk size based on maxfilesize -
e.g. an 8G file can have minimum 2k chunk size.
Finally, bucket size is configured by setting
bucket-splitting:
.dat file size - maxfilesize.
Larger files give less files and so better locality,
but compaction requires more memory and more time to complete.
chunk size - maxsize.
Smaller chunks give less wasted IO bytes but more IO operations.
bucket size - bucket-splitting.
Larger buckets give less buckets and better locality to nodes and files, but
incur more overhead during content layer bucket maintenance operations.
Overhead can be treated as linear in both CPU, memory and network usage
with the bucket size.
Memory usage
The document store has a mapping in memory from local ID (LID) to position in a document store file (.dat).
Part of this mapping is persisted in the .idx-file paired to the .dat file.
The memory used by the document store is linear with number of documents and updates to these.
Attribute fields are in-memory fields used for
matching, ranking, sorting and grouping.
Each attribute is a separate component that consists of a set of
data structures
to store values for that field across all documents in a sub-database.
Attributes are managed by the Ready sub-database.
Some attributes can also be managed by the Not Ready sub-database,
see high-throughput updates for details.
Index
Index fields are string fields, used for text search.
Other field types are attributes
and summary fields.
The Index in the Ready sub-database consists of a memory index and one or more disk indexes.
Mutating document operations are applied to the memory index,
which is flushed regularly.
Flushed memory indexes are merged with the primary disk index.
Proton stores position information in text indices by default, for proximity relevance -
posocc (below).
All the occurrences of a term is stored in the posting list, with its position.
This provides superior ranking features,
but is somewhat more expensive than just storing a single occurrence per document.
For most applications it is the correct tradeoff,
since most of the cost is usually elsewhere and relevance is valuable.
Applications that only need occurrence information for filtering
can use rank: filter
to optimize query performance, using only boolocc-files (below).
The memory index has a dictionary per index field.
This contains all unique words in that field with mapping to posting lists with position information.
The position information is used during ranking, see
nativeRank for details on how a text match score is calculated.
The disk index stores the content of each index field in separate folders.
Each folder contains:
Compressed posting lists with position information. File: posocc.dat.compressed.
Posting lists with only occurrence information (bitvector). These are generated for common words.
Files: boolocc.bdat, boolocc.idx.
Example:
$ pwd
/opt/vespa/var/db/vespa/search/cluster.mycluster/n1/documents/myschema/0.ready/index/index.flush.1/myfield
$ ls-la
total 7632
drwxr-xr-x 2 org users 145 Oct 29 06:09 .
drwxr-xr-x 74 org users 4096 Oct 29 06:11 ..
-rw-r--r-- 1 org users 4096 Oct 29 06:11 boolocc.bdat
-rw-r--r-- 1 org users 4096 Oct 29 06:11 boolocc.idx
-rw-r--r-- 1 org users 8192 Oct 29 06:11 dictionary.pdat
-rw-r--r-- 1 org users 8192 Oct 29 06:11 dictionary.spdat
-rw-r--r-- 1 org users 4120 Oct 29 06:11 dictionary.ssdat
-rw-r--r-- 1 org users 7778304 Oct 29 06:11 posocc.dat.compressed
Note that boolocc-files are empty if number of occurrences is small, like in the example above.
Proton maintenance jobs
The memory and disk data structures used in Proton are
periodically optimized by a set of maintenance jobs.
These jobs are automatically executed, and some can be tuned in
flush strategy tuning.
All jobs are described in the table below.
There is only one instance of each job at a time - e.g. attributes are flushed in sequence.
When a job is running, its metric is set to 1 - otherwise 0.
Use this to correlate observed performance or resource usage with job runs - see Run metric below.
The temporary resources used when jobs are executed are described in CPU, Memory and Disk.
The memory and disk usage metrics of components that are optimized by the jobs
are described in Metrics (with Metric prefix).
For a list of all available Proton metrics refer to the searchnode metrics in the
Vespa Metric Set.
Metrics are available at the Metrics API.
Flush a memory index to disk, then trigger disk index fusion.
The goal is to shrink memory usage by adding to the disk-backed indices.
Note: A high feed rate can cause multiple smaller flushed indices, like
$VESPA_HOME/var/db/vespa/search/cluster.name/n1/documents/doc/0.ready/index/index.flush.102
- see the high index number.
Multiple smaller indices is a symptom of too small memory indices compared to feed rate -
to fix, increase
flushstrategy > native > component > maxmemorygain.
CPU
Little - one thread flushes to disk
Memory
Little
Disk
Creates a new disk index, size of the memory index.
Defragment and sort document store files as documents are updated and deleted,
in order to reduce disk usage.
The file is sorted in bucket order on output. Triggered by
diskbloatfactor.
CPU
Little - one thread reads one files, sorts and writes a new file
Memory
Holds a document store file in memory plus memory for sorting the file.
Note: This is important on hosts with little memory!
Reduce
maxfilesize to increase number of files and use less temporary memory for compaction.
Disk
A new file is written while the current is serving, max temporary usage is 2x.
Triggered by nodes going up/down, refer to
content cluster elasticity and
searchable-copies.
It causes documents to be indexed or de-indexed, similar to feeding.
This moves documents between the ready and not ready sub-databases.
CPU
CPU similar to feeding.
Consumes capacity from the write threads, so has feeding impact
Memory
As feeding - e.g. the attribute memory usage and memory index in the ready sub-database will grow
Disk
As feeding
Run metric
content.proton.documentdb.job.bucket_move
lid-space compaction
Each sub-database has a
document meta store
that manages a local document id space (LID-space).
When a cluster grows with more nodes,
documents are redistributed to new nodes and each node ends up with fewer documents.
The result is holes in the LID-space, and a compaction is triggered when the bloat is above 1%.
This in-place defragments the
document meta store
and attribute vectors
by moving documents from high to low LIDs inside the sub-database.
Resources are freed on a subsequent attribute flush.
Retrieving documents is done by specifying an id to get,
or use a selection expression
to visit a range of documents - refer to the Document API.
Overview:
Get
When the content node receives a get request,
it scans through all the document databases,
and for each one it checks all three sub-databases.
Once the document is found, the scan is stopped and the document returned.
If the document is found in a Ready sub-database,
the document retriever will apply any changes that is stored in the
attributes before returning the document.
Visit
A visit request creates an iterator over each candidate bucket.
This iterator will retrieve matching documents from all sub-databases of all document databases.
As for get, attributes values are applied to document fields in the Ready sub-database.
Queries
Queries have a separate pathway through the system.
They do not use the distributor, nor do they go through the content node persistence threads.
They are orthogonal to the elasticity set up by the storage and retrieval described above.
How queries move through the system:
A query enters the system through the QR-server (query rewrite server)
in the Vespa Container.
The QR-server issues one query per document type to the search nodes:
Container
The Container knows all the document types and rewrites queries as a
collection of queries, one for each type.
Queries may have a restrict parameter,
in which case the container will send the query only to the specified document types.
It sends the query to content nodes and collect partial results.
It pings all content nodes every second to know whether they are alive,
and keeps open TCP connections to each one.
If a node goes down, the elastic system will make the documents available on other nodes.
Content node matching
The match engine
receives queries and routes them to the right document database based on the document type.
The query is passed to the Ready sub-database, where the searchable documents are.
Based on information stored in the document meta store,
the query is augmented with a blocklist that ensures only active documents are matched.
Custom Component State API
This section describes the custom extensions of the proton custom component state API.
Component status can be found by HTTP GET at http://host:stateport/state/v1/custom/component.
This gives an overview of the relevant search node components and their internal state.
Note that this is not a stable API, and it will expand and change between releases.