Visiting

Visiting is a feature to efficiently process a set of documents, identified by a document selection expression. A client sends visit-requests to distributors, which picks a copy of all buckets to visit and forwards the requests to the content nodes. The content nodes reads the documents, and processes these documents in the same process. Typical use cases for this processing is to perform streaming search in the data, send the data back to the client, or send the data to some other target.

Streaming search uses visiting in a latency-critical context.

Background tasks like migration, reprocessing, backup or debugging are not latency critical. It is more important that such tasks have little impact on the user driven load, and have low priority and long run time. Throttling ensures that only a few requests are active per content node at a time.

Tools

For programmatic access, check the visitor session API.

vespa-visit is used to run a visit operation. See vespa-visit --help or man vespa-visit for details. By default, vespa-visit gets visited documents and emits to stdout. However, the tool may specify a visitor target and be used as a tool to run reprocessing or migration. It supports keeping a progress file on disk, such that you can restart it if it should fail in the middle for some reason.

vespa-visit-target is a tool to set up an endpoint for visiting data. It binds to a socket or to a slobrok address, which is specified as a target in the visit client. Check vespa-visit-target --help for details.

Data export / import

Export data to stdout:

$ vespa-visit --jsonoutput
As a full data dump takes time / space / resources, refine as needed:
$ vespa-visit -v --priority LOW_1 -p progress-file --jsonoutput -s 'id.hash().abs() % 100 = 0' \
  --fieldset 'music:[document]' > 100.json
  • -v prints a % finished
  • --priority LOW_1 de-prioritizes the data dump - useful in live applications
  • -p progress-file saves progress, allowing the visitor to resume at next startup
  • -s 'id.hash().abs() % 100 = 0' dumps 1% of the corpus - see selection
  • --fieldset 'music:[document]' dumps all fields, excluding synthetic fields. When visiting Indexed Search, one might find more fields dumped than put in, causing problems when re-feeding documents. The solution is using a fieldset to return document fields only. [document] is a shorthand for all fields for the given document type.
Import into a cluster using Document API, like:
$ java -jar $VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar --file 100.json --host gatewayhost
Or see next section, routing output directly to another Vespa cluster.

Data migration / synchronization

To migrate a set of documents from one cluster to another, use visiting - as the data is transferred directly, using a compact serialization format, from the source nodes to the targets, this is performance optimal (data is not piped through the visit client). Implement backup this way, or dump to file.

Search node recovery: Feed the documents directly to a search cluster. Example, selecting documents of type music:

$ vespa-visit --selection music --datahandler indexing
This feeds from the source into the search cluster in the same application. Note that simultaneous feed can make updates go lost.

Include remove-entries in the visit operation using --visitremove - this dumps the tombstones of documents recently removed.

The storage policy can be configured to use a set of configuration servers from another cluster to configure itself. This is specified with the config parameter. As an example, the following route routes to the storage cluster mycluster with a configuration server on myconfigserver.mydomain.com:12345:

[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster]
The following examples illustrate how to copy all data from a source cluster to another cluster using vespa-visit:
# Copies all data in the local cluster, routing it to the remote cluster
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster]'

# Limit to 'music' documents only
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:19070;cluster=mycluster]' \
  --selection music

# Limit to all documents for user '1234'
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster]' \
  --selection id.user=1234

# Copies all music documents to a search cluster with id mycluster
# When copying data to a search cluster, 'clusterconfigid' must also be specified.
# It should be set to the same value as the cluster.
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster;clusterconfigid=mycluster]' \
  --selection music

Reprocessing

Reprocessing is used to solve these use cases:

  • To change the document type used in ways that will not be backward compatible, define a new type, and reprocess to change all the existing documents.
  • Document identifiers can be changed to a new scheme.
  • Search documents can be reindexed after indexing steps have been changed.
This example illustrates how one can identify a subset of the documents in a content cluster, reprocess these, and write them back. It is assumed that a Vespa cluster is set up, with data.

1. Set up a document reprocessing cluster

This example document processor:

  • deletes documents with an artist field whose value contains Metallica
  • uppercases title field values of all other documents
import com.yahoo.docproc.Arguments;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.docproc.documentstatus.DocumentStatus;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.Document;

/**
 * Example of using a document processor will modify and/or delete
 * documents in the context of a reprocessing use case.
 */
public class ReProcessor extends DocumentProcessor {
    private String deleteFieldName;
    private String deleteRegex;
    private String uppercaseFieldName;

    public ReProcessor() {
        deleteFieldName = "artist";
        deleteRegex = ".*Metallica.*";
        uppercaseFieldName = "title";
    }

    public Progress process(Processing processing) {
        Iterator<DocumentOperation> it = processing.getDocumentOperations().iterator();
        while (it.hasNext()) {
            DocumentOperation op = it.next();
            if (op instanceof DocumentPut) {
                Document doc = ((DocumentPut) op).getDocument();

                // Delete the current document if it matches:
                String deleteValue = (String) doc.getValue(deleteFieldName);
                if (deleteValue != null) {
                    if (deleteValue.matches(deleteRegex)) {
                        it.remove();
                        continue;
                    }
                }

                // Uppercase the other field:
                String uppercaseValue = doc.getValue(uppercaseFieldName).toString();
                if (uppercaseValue != null) {
                    doc.setValue(uppercaseFieldName, uppercaseValue.toUpperCase());
                }
            }
        }
        return Progress.DONE;
    }
}
To compile this processor, see Developing with the JDisc Container. For more information on document processing, refer to Document processor Development. After having changed the Vespa setup, reload config:
$ vespa-deploy prepare music
$ vespa-deploy activate
Restart nodes as well to activate.

2. Select documents

Define a selection criteria for the documents to be reprocessed. (To reprocess all documents, skip this). For this example, assume all documents where the field year is greater than 1995. The selection string music.year > 1995 does this.

3. Set route

The visitor sends documents to a Messagebus route - examples:

  • default - Documents are sent to the default route.
  • indexing - Documents are sent to indexing.
  • <clustername>/chain.<chainname>: Documents are sent to the document processing chain chainname running in cluster clustername.
Assume you have a container cluster with id reprocessing containing a docproc chain with id reprocessing-chain. This example route sends documents from the content node, into the document reprocessing chain, and ultimately, into indexing:
reprocessing/chain.reprocessing-chain indexing
Details: Message Bus Routing Guide.

4. Reprocess

Start reprocessing:

$ vespa-visit -v --selection "music.year > 1995" \
  --datahandler "reprocessing/chain.reprocessing-chain indexing"
The '-v' option emits progress information on standard error.

Notes

With multiple search clusters with one document type each, and one content cluster storing all document types, there are a few adjustments that must be made. A visitor on a content node reads documents from disk, and sends batches of documents in messages over the network. If a visitor is set to visit all document types, one message may thus contain more than one document type. Since indexing is done in a separate indexing cluster for each search cluster, these indexing clusters can only handle documents of their own type. In the case where all documents regardless of document type are visited, some documents will thus end up in the wrong indexing cluster, and indexing will fail. In this use case, the solution is simple - use a selection string to reprocess one document type at a time.

Selection

To select which documents to visit, use a document selection expression.

If co-localization of documents has been used, and the selection specifieslocation criteria, the visitor client may be able to identify one or a few buckets that are guaranteed to include all documents that will match the expression. In all other cases, visiting needs to visit all buckets in the cluster.

Note that streaming search typically relies on this behavior to ensure low latency, but the visitor client will only be able to map visitors to a bucket subset if the document selection is simple. If streaming search uses a lot of time, verify that the selection is actually simple enough for the visitor client to map it to a small bucket set.

Selecting by last write time

In addition to the document selection, visiting can select a timeframe for documents to visit, using the internal last write time. This can be used to synchronize an external secondary storage, such as for instance a backup.

By default, the to timestamp is set to the current time when starting a visitor. This ensures that documents inserted after the visitor started are not visited. This property is important for instance for reprocessing, ensuring that documents that are reprocessed are not visited again by the same visitor.

Client visit requests

If the selection criteria managed to map the visitor to a specific set of buckets, these will be used when sending distributor visit requests. If not, the visit client will iterate the entire bucket space, typically at the minimum split level required to decide correct distributor.

The distributors will receive the requests and look at what buckets it has that are contained by the requested bucket. If more than one, the distributor will only start a limited number of bucket visitors at the same time. Once it has processed the first ones, it will reply to the visitor client with the last bucket processed.

As all buckets have a natural order, the client can use the returned bucket as a progress counter. Thus, after a distributor request has returned, the client knows one of the following:

  • All buckets contained within the bucket sent have been visited
  • All buckets contained within the bucket sent, up to this specific bucket in the order, have been visited
  • No buckets existed that was contained within the requested bucket
The client can decide whether to visit in strict order, allowing only one bucket to be pending at a time, or whether to start visiting many buckets at a time, allowing great throughput.

Distributor visit requests

The distributors receive visit requests from clients for a given bucket, which may map to none, one or many buckets within the distributor. It picks one or more of the first buckets in the order, picks out one content node copy of each and passes the request on to the content nodes.

Once all the content node requests have been responded to, the distributor will reply to the client with the last bucket visited, to be used as a progress counter.

Subsequent client requests to the distributor will have the progress counter set, letting the distributor ignore all the buckets prior to that point in the ordering.

Bucket splitting and joining does not alter the ordering, and does thus not affect visiting much as long as the buckets are consistently split. If two buckets are joined, where the first one have already been visited, a visit request has to be sent to the joined bucket. The content layer use the progress counter to avoid revisiting documents already processed in the bucket.

If the distributor only starts one bucket visitor at a time, it can ensure the visitor order is kept. Starting multiple buckets at a time may improve throughput and decrease latency, but progress tracking will be less fine grained, so a bit more documents may need to be revisited when continued after a failure.

Content node visit requests

The content node receives visit requests for single buckets for which they store documents. It may receive many in parallel, but their execution is independent of each other.

The visitor layer in the content node picks up the visit requests. There it is assigned a visitor thread, and an instance of the processor type is created for that visitor. The processor will set up an iterator in the backend and send one or more requests for documents to the backend.

The document selection specified in the visitor client is sent through to the backend, allowing it to filter out unwanted data at the lowest level possible.

Once documents are retrieved from the backend, back up to the visitor layer, the visit processor will process the data.

The default is one iterator request is pending to the backend at any time. By sending many small iterator requests, having several pending at a time, the processing may occur in parallel with the document fetching.

Visitor processor types

Dump visitor The most commonly used visitor processor type is the dump visitor. All it does is to send the read documents on to some external target specified by the visitor. Using the command line tool vespa-visit the default is to just send the documents back to the client, and have them printed to stdout. The dump visitor is used to implement reprocessing. Typically, using a messagebus route, that will send the documents through the document processing cluster and then back to the content cluster. Migration of documents from one cluster to another is also implemented using a dump visitor.
Streaming search visitor Another visitor processor is the streaming search visitor. This is run on the search containers, making it transparent whether search results were created from streaming search, or through indexed search in proton.
Statistics visitor There is also a statistics visitor that just generate some statistics of stored data

Priority

Visitor requests have priorities just like any other requests. These priorities are kept throughout the chain of requests. A low priority reprocessing visitor will end up sending low priority visitor requests to the content nodes, which will send low priority put operations to be reprocessed, which will end up as low priority put operations to the backend. Example:

$ vespa-visit --priority LOW_1 -p progress-file
Refer to load types for priority values.

Max concurrent visitors on content nodes

Content nodes have a maximum amount of visitors they will process at a time. This maximum enforces that there is enough memory for the currently active ones, and that the active ones will complete in reasonable time, rather than having a huge number of active visitors that all use a long time to finish as they compete with so many other visitors for resources.

To prevent a lot of low priority visitors to stop high priority visitors from being started, the maximum amount of concurrent visitors is a function that depends on the priority:

$$\text{maxconcurrent}_{V} = \text{fixed + variable} \times \frac{255 - \text{priority}_{V}}{255}$$

The fixed value represent the number always allowed to process in parallel regardless of priority, and the variable represent how many additional visitors can be run at maximum priority.

Thus, if only low priority visitors are running, there will be free slots in the queue for high priority visitors incoming.

Once visitors have started, backend requests for documents are also prioritized, but the processing of the documents are not. However, as the high priority backend requests will take priority, the high priority visitors should also get responses to process faster.

These values can be configured in tuning.

Queueing

Visitor requests may be queued at several layers. Visitor clients have a virtual queue by deciding only to send requests for a few buckets at a time. Distributors have a virtual queue deciding to only forward requests for some buckets at a time. These queues are per visitor, and can be adjusted through visitor parameters.

The content node queues up visitor requests if the maximum allowed is already running. If doing so, this queue will be kept in priority order. In the event of client hammering, this queue may also fill up, replying busy back to the clients.

The partition threads will queue up requests to read needed parts of data. These requests will be tagged with the priority of the visitor itself. Additionally, the responses from the backend will be queued for processing in the visitor thread. This queue is not a priority queue, but as it contains replies from data processed of a priority queue, this is not expected to be an issue.

Visitor targets

Requests sent from the visitor processor are sent to a visitor target - types:

Message bus routes You can specify a message bus route name directly, and this route will be used to send the results. This is typically used when doing reprocessing or migration. Message bus routes is set up in the application package. In addition, some routes may have been auto-generated in simple setups, for instance a route called default is generated if your setup is simple enough for the config model to likely guess where you want to send your data.
Slobrok address

You can also specify a slobrok address for data to be sent to. A slobrok address is a slash-separated path where you can use asterisk to mean any element within this path. For instance, if you have a docproc cluster called mydpcluster, it will have registered its nodes with slobrok names like docproc/cluster.mydpcluster/docproc/0/feed_processor, where the 0 here indicates the first node in the cluster. You can thus specify to send visit data to this docproc cluster by stating a slobrok address of docproc/cluster.mydpcluster/docproc/*/feed_processor. Note that this will not send all the data to one or all the nodes. The data sent from the visitor will be distributed among the matching nodes, but each message will just be sent to one node.

Slobrok names may also be used if you use vespa-visit-target to retrieve the data at some location. If you start vespa-visit-target on two nodes, listening to slobrok names mynode/0/visit-destination and mynode/1/visit-destination, you can send the results to these nodes by specifying mynode/*/visit-destination as the data handler.

vespadestination is a small java program, similar to vespa-visit-target in that it can receive messages from messagebus and print the contents to stdout. It can be useful in situations where you want to debug a route or a docproc, by using the vespadestination as the endpoint of your route.

TCP socket TCP sockets can also be specified directly. This requires that the endpoint speaks FNET RPC. This is typically done, either by using the vespa-visit-target tool, or by using a visitor destination programmatically by using utility class in the document API. A socket address looks like the following: tcp/hostname:port/servicename. For instance, an address generated by vespa-visit-target might look like: tcp/myhost.mydomain.com:12345/visit-destination