Visiting

Visiting is a feature to efficiently process a set of documents, identified by a document selection expression. See request handling for details on how the feature works.

In short, visit iterates over all, or a set of, buckets and sends documents to (a set of) targets. The target is normally the visit client (like vespa-visit), but can be set a set of targets that act like sinks for the documents - see vespa-visit-target.

Typical use cases are data dump and reprocessing that are not time sensitive. However, streaming search uses visiting in a latency-critical context.

It is possible so prioritize visit load higher/lower, and throughput can be throttled.

Performance note: Due to the bucket iteration, visiting is normally not a low-latency iteration. An empty cluster has 8 distribution bits and hence 256 buckets - dumping just one document requires iteration, unless unless location-specific selections are used, and typically uses a second or two. To test visiting performance, use larger data sets, do not extrapolate from small. Use search for low latency operations on small result sets.

Tools

vespa-visit is used to run a visit operation. By default, vespa-visit gets visited documents and emits to stdout. However, the tool may specify a visitor target and be used as a tool to run reprocessing or migration. It supports keeping a progress file on disk, such that you can restart it if it should fail in the middle for some reason.

vespa-visit-target is a tool to set up an endpoint for visiting data. It binds to a socket or to a slobrok address, which is specified as a target in the visit client.

For programmatic access, see the visitor session API.

Selection

To select which documents to visit, use a document selection expression.

If co-localization of documents has been used, and the selection specifies location criteria, the visitor client may be able to identify one or a few buckets that are guaranteed to include all documents that will match the expression. In all other cases, visiting needs to visit all buckets in the cluster.

Note that streaming search typically relies on this behavior to ensure low latency, but the visitor client will only be able to map visitors to a bucket subset if the document selection is simple, i.e. maps to a location. If streaming search uses a lot of time, verify that the selection is actually simple enough for the visitor client to map it to a small bucket set.

Fields

To select which fields to output, use a fieldset. See examples below - common use cases is to use a comma-separated list of fields or the [document] / [all] shorthand.

Data export / import

Export data to stdout:

$ vespa-visit
As a full data dump takes time / space / resources, refine as needed:
$ vespa-visit -v --priority LOW_1 -p progress-file -s 'id.hash().abs() % 100 = 0' \
  --fieldset 'music:[document]' > 100.json
  • -v prints a % finished
  • --priority LOW_1 de-prioritizes the data dump - useful in live applications
  • -p progress-file saves progress, allowing the visitor to resume at next startup
  • -s 'id.hash().abs() % 100 = 0' dumps 1% of the corpus - see selection
  • --fieldset 'music:[document]' dumps all fields, excluding synthetic fields. When visiting Indexed Search, one might find more fields dumped than put in, causing problems when re-feeding documents. The solution is using a fieldset to return document fields only. [document] is a shorthand for all fields for the given document type.
Import into a cluster using Document API, like:
$ java -jar $VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar --file 100.json --host gatewayhost
Or see next section, routing output directly to another Vespa cluster.

Data migration / synchronization

To migrate a set of documents from one cluster to another, use visiting - as the data is transferred directly, using a compact serialization format, from the source nodes to the targets, this is performance optimal (data is not piped through the visit client). Implement backup this way, or dump to file.

Search node recovery: Feed the documents directly to a search cluster. Example, selecting documents of type music:

$ vespa-visit --selection music --datahandler indexing
This feeds from the source into the search cluster in the same application. Note that simultaneous feed can make updates go lost.

Include remove-entries in the visit operation using --visitremove - this dumps the tombstones of documents recently removed.

The storage policy can be configured to use a set of configuration servers from another cluster to configure itself. This is specified with the config parameter. As an example, the following route routes to the storage cluster mycluster with a configuration server on myconfigserver.mydomain.com:12345:

[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster]
The following examples illustrate how to copy all data from a source cluster to another cluster using vespa-visit:
# Copies all data in the local cluster, routing it to the remote cluster
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster]'

# Limit to 'music' documents only
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:19070;cluster=mycluster]' \
  --selection music

# Limit to all documents for user '1234'
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster]' \
  --selection id.user=1234

# Copies all music documents to a search cluster with id mycluster
# When copying data to a search cluster, 'clusterconfigid' must also be specified.
# It should be set to the same value as the cluster.
$ vespa-visit --datahandler '[Storage:config=tcp/myconfigserver.mydomain.com:12345;cluster=mycluster;clusterconfigid=mycluster]' \
  --selection music

Reprocessing

Reprocessing is used to solve these use cases:

  • To change the document type used in ways that will not be backward compatible, define a new type, and reprocess to change all the existing documents.
  • Document identifiers can be changed to a new scheme.
  • Search documents can be reindexed after indexing steps have been changed.
This example illustrates how one can identify a subset of the documents in a content cluster, reprocess these, and write them back. It is assumed that a Vespa cluster is set up, with data.

1. Set up a document reprocessing cluster

This example document processor:

  • deletes documents with an artist field whose value contains Metallica
  • uppercases title field values of all other documents
import com.yahoo.docproc.Arguments;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.docproc.documentstatus.DocumentStatus;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.Document;

/**
 * Example of using a document processor will modify and/or delete
 * documents in the context of a reprocessing use case.
 */
public class ReProcessor extends DocumentProcessor {
    private String deleteFieldName;
    private String deleteRegex;
    private String uppercaseFieldName;

    public ReProcessor() {
        deleteFieldName = "artist";
        deleteRegex = ".*Metallica.*";
        uppercaseFieldName = "title";
    }

    public Progress process(Processing processing) {
        Iterator<DocumentOperation> it = processing.getDocumentOperations().iterator();
        while (it.hasNext()) {
            DocumentOperation op = it.next();
            if (op instanceof DocumentPut) {
                Document doc = ((DocumentPut) op).getDocument();

                // Delete the current document if it matches:
                String deleteValue = (String) doc.getValue(deleteFieldName);
                if (deleteValue != null) {
                    if (deleteValue.matches(deleteRegex)) {
                        it.remove();
                        continue;
                    }
                }

                // Uppercase the other field:
                String uppercaseValue = doc.getValue(uppercaseFieldName).toString();
                if (uppercaseValue != null) {
                    doc.setValue(uppercaseFieldName, uppercaseValue.toUpperCase());
                }
            }
        }
        return Progress.DONE;
    }
}
To compile this processor, see Vespa Applications. For more information on document processing, refer to Document processor Development. After having changed the Vespa setup, reload config:
$ vespa-deploy prepare music
$ vespa-deploy activate
Restart nodes as well to activate.

2. Select documents

Define a selection criteria for the documents to be reprocessed. (To reprocess all documents, skip this). For this example, assume all documents where the field year is greater than 1995. The selection string music.year > 1995 does this.

3. Set route

The visitor sends documents to a Messagebus route - examples:

  • default - Documents are sent to the default route.
  • indexing - Documents are sent to indexing.
  • <clustername>/chain.<chainname>: Documents are sent to the document processing chain chainname running in cluster clustername.
Assume you have a container cluster with id reprocessing containing a docproc chain with id reprocessing-chain. This example route sends documents from the content node, into the document reprocessing chain, and ultimately, into indexing:
reprocessing/chain.reprocessing-chain indexing
Details: Message Bus Routing Guide.

4. Reprocess

Start reprocessing:

$ vespa-visit -v --selection "music.year > 1995" \
  --datahandler "reprocessing/chain.reprocessing-chain indexing"
The '-v' option emits progress information on standard error.

Notes

With multiple search clusters with one document type each, and one content cluster storing all document types, there are a few adjustments that must be made. A visitor on a content node reads documents from disk, and sends batches of documents in messages over the network. If a visitor is set to visit all document types, one message may thus contain more than one document type. Since indexing is done in a separate indexing cluster for each search cluster, these indexing clusters can only handle documents of their own type. In the case where all documents regardless of document type are visited, some documents will thus end up in the wrong indexing cluster, and indexing will fail. In this use case, the solution is simple - use a selection string to reprocess one document type at a time.

Request handling

Client

If the selection criteria managed to map the visitor to a specific set of buckets, these will be used when sending distributor visit requests. If not, the visit client will iterate the entire bucket space, typically at the minimum split level required to decide correct distributor.

The distributors will receive the requests and look at what buckets it has that are contained by the requested bucket. If more than one, the distributor will only start a limited number of bucket visitors at the same time. Once it has processed the first ones, it will reply to the visitor client with the last bucket processed.

As all buckets have a natural order, the client can use the returned bucket as a progress counter. Thus, after a distributor request has returned, the client knows one of the following:

  • All buckets contained within the bucket sent have been visited
  • All buckets contained within the bucket sent, up to this specific bucket in the order, have been visited
  • No buckets existed that was contained within the requested bucket
The client can decide whether to visit in strict order, allowing only one bucket to be pending at a time, or whether to start visiting many buckets at a time, allowing great throughput.

Distributor

The distributors receive visit requests from clients for a given bucket, which may map to none, one or many buckets within the distributor. It picks one or more of the first buckets in the order, picks out one content node copy of each and passes the request on to the content nodes.

Once all the content node requests have been responded to, the distributor will reply to the client with the last bucket visited, to be used as a progress counter.

Subsequent client requests to the distributor will have the progress counter set, letting the distributor ignore all the buckets prior to that point in the ordering.

Bucket splitting and joining does not alter the ordering, and does thus not affect visiting much as long as the buckets are consistently split. If two buckets are joined, where the first one have already been visited, a visit request has to be sent to the joined bucket. The content layer use the progress counter to avoid revisiting documents already processed in the bucket.

If the distributor only starts one bucket visitor at a time, it can ensure the visitor order is kept. Starting multiple buckets at a time may improve throughput and decrease latency, but progress tracking will be less fine grained, so a bit more documents may need to be revisited when continued after a failure.

Content node

The content node receives visit requests for single buckets for which they store documents. It may receive many in parallel, but their execution is independent of each other.

The visitor layer in the content node picks up the visit requests. There it is assigned a visitor thread, and an instance of the processor type is created for that visitor. The processor will set up an iterator in the backend and send one or more requests for documents to the backend.

The document selection specified in the visitor client is sent through to the backend, allowing it to filter out unwanted data at the lowest level possible.

Once documents are retrieved from the backend, back up to the visitor layer, the visit processor will process the data.

The default is one iterator request is pending to the backend at any time. By sending many small iterator requests, having several pending at a time, the processing may occur in parallel with the document fetching.

Priority

Visitor requests have priorities just like any other requests. These priorities are kept throughout the chain of requests. A low priority reprocessing visitor will end up sending low priority visitor requests to the content nodes, which will send low priority put operations to be reprocessed, which will end up as low priority put operations to the backend. Example:

$ vespa-visit --priority LOW_1 -p progress-file
Refer to load types for priority values.

Max concurrent visitors on content nodes

Content nodes process max-concurrent visitors at a time. This enforces enough memory for the currently active ones, and that the active ones will complete in reasonable time. To prevent a lot of low priority visitors to stop high priority visitors from being started, the maximum amount of concurrent visitors is a function that depends on the priority:

$$\text{maxconcurrent}_{V} = \text{fixed + variable} \times \frac{255 - \text{priority}_{V}}{255}$$

The fixed value represent the number always allowed to process in parallel regardless of priority, and the variable represent how many additional visitors can be run at maximum priority. Thus, if only low priority visitors are running, there will be free slots in the queue for high priority visitors incoming.

Once visitors have started, backend requests for documents are also prioritized, but the processing of the documents are not. However, as the high priority backend requests will take priority, the high priority visitors should also get responses to process faster.

Queueing

Visitor requests may be queued at several layers. Visitor clients have a virtual queue by deciding only to send requests for a few buckets at a time. Distributors have a virtual queue deciding to only forward requests for some buckets at a time. These queues are per visitor, and can be adjusted through visitor parameters.

The content node queues up visitor requests if the maximum allowed is already running. If doing so, this queue will be kept in priority order. In the event of client hammering, this queue may also fill up, replying busy back to the clients.

The partition threads will queue up requests to read needed parts of data. These requests will be tagged with the priority of the visitor itself. Additionally, the responses from the backend will be queued for processing in the visitor thread. This queue is not a priority queue, but as it contains replies from data processed of a priority queue, this is not expected to be an issue.

Visitor processor types

Dump visitor The most commonly used visitor processor type is the dump visitor. All it does is to send the read documents on to some external target specified by the visitor. Using the command line tool vespa-visit the default is to just send the documents back to the client, and have them printed to stdout. The dump visitor is used to implement reprocessing. Typically, using a messagebus route, that will send the documents through the document processing cluster and then back to the content cluster. Migration of documents from one cluster to another is also implemented using a dump visitor.
Streaming search visitor The streaming search visitor runs in the Vespa container, making it transparent whether search results were created from streaming or indexed search - see indexing mode.

Visitor targets

Requests sent from the visitor processor are sent to a visitor target - types:

Message bus routes You can specify a message bus route name directly, and this route will be used to send the results. This is typically used when doing reprocessing or migration. Message bus routes is set up in the application package. In addition, some routes may have been auto-generated in simple setups, for instance a route called default is generated if your setup is simple enough for the config model to likely guess where you want to send your data.
Slobrok address

You can also specify a slobrok address for data to be sent to. A slobrok address is a slash-separated path where you can use asterisk to mean any element within this path. For instance, if you have a docproc cluster called mydpcluster, it will have registered its nodes with slobrok names like docproc/cluster.mydpcluster/docproc/0/feed_processor, where the 0 here indicates the first node in the cluster. You can thus specify to send visit data to this docproc cluster by stating a slobrok address of docproc/cluster.mydpcluster/docproc/*/feed_processor. Note that this will not send all the data to one or all the nodes. The data sent from the visitor will be distributed among the matching nodes, but each message will just be sent to one node.

Slobrok names can be used when using vespa-visit-target to retrieve the data at some location. If you start vespa-visit-target on two nodes, listening to slobrok names mynode/0/visit-destination and mynode/1/visit-destination, you can send the results to these nodes by specifying mynode/*/visit-destination as the data handler.

vespa-destination is similar to vespa-visit-target in that it can receive messages from messagebus and print the contents to stdout. It can be useful in situations where you want to debug a route or a docproc, by using the vespadestination as the endpoint of your route.

TCP socket TCP sockets can also be specified directly. This requires that the endpoint speaks FNET RPC. This is typically done, either by using the vespa-visit-target tool, or by using a visitor destination programmatically by using utility class in the document API. A socket address looks like the following: tcp/hostname:port/servicename. For instance, an address generated by vespa-visit-target might look like: tcp/myhost.mydomain.com:12345/visit-destination