Vespa HTTP Client

The Vespa HTTP Feed Client is a Java API and command line tool to feed document operations to Vespa. The Vespa feeding client allows you to combine high throughput with feeding over HTTP:

  • Uses HTTP as transport protocol.
  • Feeds in parallel to one or more Vespa clusters, ordering still guaranteed.
  • Programmatic Java API that has few runtime dependencies, and no Vespa dependencies.
  • Easy feeding from sources like e.g. Hadoop or Storm, or any other Java process.
  • Feed with much higher performance than naïve HTTP feeding approaches.
  • Automatic retries, re-connect, performance tuning through parameters, compression and feed statistics.
  • Supports unlimited sized data sources, it is processed streaming (the simple HTTP protocol is batch).
  • Built in back throttling, it is hard to flood the system using this library.
There is a prebuilt standalone command-line tool that let you feed from file using the API. This tool can be used outside the Vespa cluster.

Enabling in your application

Add the document-api: to a container cluster to enable it to receive documents:

<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0">

  <container version="1.0" id="default">
     <document-api/>
  </container>

</services>

Using from the command line

Use the API by running a binary. This binary supports feeding document operations and is installed with Vespa - found at:

$VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar
Example:
$ java -jar $VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar --file file.json --endpoint http://document-api-host:8080
Alternatively, read from stdin:
$ java -jar $VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar --host document-api-host < file.json
Use --help to list supported features.

The JSON format accepted is the Document operation JSON format.

Using the Java Vespa Feeding Client API

This section explains the four steps needed to use the client API from your own Java code. For maximum throughput this is an async API. In some cases, especially when feeding data from systems which do not support ack'ing of individual operations such as e.g Kafka and DynamoDb, it is possible to use the SyncFeedClient wrapper instead, which blocks and returns a single result for a batch of operations. The throughput reduction from using this decreases with larger batch sizes.

1. Add the dependency

Add the following dependency to pom.xml:

<dependency>
  <groupId>com.yahoo.vespa</groupId>
  <artifactId>vespa-http-client</artifactId>
  <version>7.25.26</version> <!-- Find latest version at search.maven.org/search?q=g:com.yahoo.vespa%20a:vespa-http-client -->
</dependency>

2. Creating a client instance

Use FeedClientFactory to create a feed client instance. Instances are expensive - create one instance and use it for the duration of your client runtime (or until reconfiguration). You can share it between threads if desired. A suitable result callback must be passed to the client on creation, see handling callbacks below.

Make sure to close the client when done.

3. Feeding

Call a stream or feed method on your client instance to issue document operations. These calls will block when queues are full which ensures that the feed process will not produce data faster than the Vespa instance can receive it.

4. Handling callbacks

Exactly one onCompletion callback will always be received from the client for each operation passed in a stream call, whether or not it was successful. Implementing the onCompletion method of the callback is how you can know whether your feed operations were successfully persisted in Vespa. The usual way to implement this is to emit metrics for successful and failed operations that can be monitored and used for alerting. Callbacks will not arrive in the same order they are issued (except for operations on the same document).

In addition, onEndpointException callbacks may be received for connection level errors which may or may not be transient. This is to provide additional information about possible connectivity issues. It may be suitable to both log and emit metrics for these.

These callbacks are called from IO threads of the feed client so the implementation should return fast.

Feeding Multiple Clusters

Both the API and the command line tool have support for feeding multiple clusters. For example if there are a few documents that are to be inserted in two clusters, it is easy to feed both simultaneously. In the command line tool you specify several hosts comma separately, and in the API more clusters are added in the parameters. However, in a production system this is more complex than feeding a single cluster.

An important question is what should happen when feeding a cluster is problematic. This can be network problems, cluster problems, bugs, regions that are down due to power maintenance etc. If you want all clusters to be synchronized, it means that feeding should stop when one cluster can not receive data. If you want all clusters to be as updated as soon as possible at all times, it means feeding should continue to the clusters that are up and maybe be replayed to the clusters that are unavailable when they come up (with all the challenges this have).

In the current library, you will get feedback on each feed operation; which clusters where successful and which failed. On failures you can e.g. save the document to a file and retry it later (the document is in the reply). However, the cluster that is down might impact the global feed rate because the feeding library might wait for timeout before answering the request, and the number of in-flight requests is limited. One way around this is to have independent processes for each cluster and feed totally asynchronously. Which approach is best depends on the use case; how long lived is the data, is it acceptable that the clusters are out of synch, is it ok to drop data in case of failure and so on.

We know this is hard, and we are currently working on making a more sophisticated solution for our tenants. The design is still in progress, but we foresee automatic reply on clusters that comes up after being down, easier monitoring, alerting, and better transparency on what is happening.

Config parameters

The default values for the client should work fine in most cases. In some cases you want to try to tune feeding for higher throughput. You need to understand where the bottlenecks are in order to improve performance. If you start feeding from more machines in parallel, and the total throughput increases significantly, it is the feeding client that is the bottleneck. You can try to increase number of threads, enable compression, but you might need to feed from several machines to increase speed. Remember that ordering is only guaranteed per instance of the library, so when feeding from different machines you might want to shard data accordingly. If it does not increase performance, the vespa cluster is the bottleneck. You need to increase the number of containers with gateway nodes and/or increase the number of content nodes. Use --priority with a load type to assign priority classes to operations.

Configuration

The configuration parameters are set in the SessionParams builder. The SessionParams consists of two sub configurations, one is called FeedParams and one is called ConnectionParams. Additionally SessionParams consists of a set of clusters.

Endpoints

A cluster consists of one or more Endpoints. If the endpoint server is overloaded, it makes sense to add more endpoints to increase performance. A common pattern is to run the DocumentProcessor on the same server as the endpoint, so it really depends on what the DocumentProcessor is doing. You can start with one endpoint and monitor the server.

You can feed to several clusters and they can be in different data centers. Please note that the performance will be driven by the instance in the slowest data center with the slowest network seen from the feed client. It might be possible to set timeouts so the slowest data center does not slow down other instances, but this will cause data loss. It will create a model that is hard to understand, so it is not recommended. The client does have buffering so temporarily slowdowns is fine.

ConnectionParams

If throttled by the network, try enabling compression.

One parameter that has impact on performance is numPersistentConnectionsPerEndpoint. The client uses parallel requests to the gateway. This reduces the impact of round-trip time and enables using more of the network bandwidth. Setting it too high is not beneficial. This will cause more network sessions, and might create packet loss on the network. Setting it too low means you will not be able to use the network bandwidth. It seems like sweet spot is around 32. If feeding from hundreds of machines to a small cluster, it makes sense to use a low number such as 2.

FeedParams

There is a timeout that will cause the stream to fail if response from server is slow (or never happens). This timeout is the sum of the serverTimeout and clientTimeout. This value impacts performance since it can reduce the impact of a slow endpoint at the expense of data loss and high complexity. Slow cluster will then timeout, and the clusters might be overloaded. In case where the client is not even able to send document due to queued up system, the client *might* drop the document if the clientTimeout is reached before the document hits the network.

maxChunkSizeBytes is the max size of a request sent to an endpoint. If there are documents waiting to be sent it will pack them together up to this size. Setting this value to high can lead to out-of-memory for feeder or endpoint. Setting it too low means that the round trip time of the connection might have an impact. This parameter is related to numPersistentConnectionsPerEndpoint. Sweet spot is expected to be wide and a value of 200kbytes should work in most cases.

It is possible to limit the number of outstanding requests per cluster by adjusting maxInFlightRequests. It makes sense to tune this parameter if parts of the cluster gets overloaded.

Find slowest cluster when feeding multi-cluster

Get a snapshot of the current state by calling FeedClient getStatsAsJson(). This JSON is meant to be human readable, and can change at any time - example:

{
    "clusters": [
        {
            "clusterid": 0,
            "stats": {
                "session": [
                    {
                        "endpoint": {
                            "host": "localhost",
                            "port": 8080
                        },
                        "stats": {
                            "wrongSessionDetectedCounter": 0,
                            "wrongVersionDetectedCounter": 0,
                            "problemStatusCodeFromServerCounter": 0,
                            "executeProblemsCounter": 0,
                            "docsReceivedCounter": 4,
                            "statusReceivedCounter": 4,
                            "pendingDocumentStatusCount": 0
                        }
                    }
                ]
            }
        }
    ],
    "sessionParams": {
      // .. The configuration parameters used.
    }
}
Find slow endpoints as these will usually have more pending documents. Verify that documents are sharded nicely by looking at document received on various endpoints. For debugging purposes, dump this to a log regularly.

Sample code

package com.yahoo.example.feed;

import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedClientFactory;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.*;

import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
 * Sample feeder demonstrating how to programmatically feed to a Vespa cluster.
 */
public class SampleFileFeeder {

    private final Endpoint endpoint;
    private final SessionParams sessionParams;
    private final static Logger log = Logger.getLogger(SampleFileFeeder.class.getCanonicalName());

    /**
     * Whenever a result is received, this class is invoked. It keeps track of basic statistics.
     */
    static class ResultCallBack implements FeedClient.ResultCallback {

        final AtomicInteger resultsReceived = new AtomicInteger(0);
        final AtomicInteger errorsReceived = new AtomicInteger(0);
        final long startTimeMillis = System.currentTimeMillis();;

        @Override
        public void onCompletion(String docId, Result documentResult) {
            resultsReceived.incrementAndGet();
            if (!documentResult.isSuccess()) {
                log.warning("Problems with docID " + docId + ":" + documentResult.toString());
                errorsReceived.incrementAndGet();
            }
        }

        void dumpStatsToLog() {
            log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors.");
            log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms.");
        }
    }

    /**
     * Sample constructor without compression
     * @param endpoint The endpoint to feed to
     * @param dryRun if true, data is not sent to cluster.
     */
    public SampleFileFeeder(Endpoint endpoint, boolean dryRun) {
        this(endpoint, false, dryRun);
    }

    /**
     * More advanced constructor, that supports compression.
     *
     * @param endpoint The endpoint to feed to
     * @param useCompression  Whether to use compression or not
     * @param dryRun if true, will not send data to real cluster
     */
    public SampleFileFeeder(Endpoint endpoint, boolean useCompression, boolean dryRun) {
        this.endpoint = endpoint;
        this.sessionParams = new SessionParams.Builder()
                .addCluster(new Cluster.Builder().addEndpoint(this.endpoint).build())
                .setConnectionParams(new ConnectionParams.Builder()
                        .setDryRun(dryRun)
                        .setUseCompression(useCompression).build())
                .setFeedParams(new FeedParams.Builder()
                        .setDataFormat(FeedParams.DataFormat.JSON_UTF8)
                        .build())
                .build();
    }

    /**
     * Feed all operations from a stream.
     *
     * @param stream The input stream to read operations from (JSON formatted).
     */
    public ResultCallBack batchFeed(InputStream stream, String batchId) {
        ResultCallBack results = new ResultCallBack();
        FeedClient feedClient = FeedClientFactory.create(this.sessionParams, results);
        AtomicInteger numSent = new AtomicInteger(0);
        log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'");
        FeedClient.feedJson(stream, feedClient, numSent);
        feedClient.close();  // Close will wait for all operations
        log.info("Feed " + numSent.get() + " operations to single cluster " + endpoint + ", batch: '" + batchId +".");
        results.dumpStatsToLog();
        return results;
    }
}
package com.yahoo.example.feed;

import com.yahoo.log.LogLevel;
import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedClientFactory;
import com.yahoo.vespa.http.client.SimpleLoggerResultCallback;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.config.SessionParams;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
 * Simple Streaming feeder implementation which will send operations to a Vespa endpoint.
 * Other threads communicate with the feeder by adding new operations on the BlockingQueue
 */

public class SampleStreamFeeder extends Thread {

    public static class Operation {
        final public String documentId;
        final public CharSequence data;

        public Operation(String id, CharSequence data) {
            this.documentId = id;
            this.data = data;
        }
    }

    private BlockingQueue<Operation> operations;
    private FeedClient feedClient;
    private AtomicInteger pending = new AtomicInteger(0);
    private final static Logger log = Logger.getLogger(SampleStreamFeeder.class.getCanonicalName());
    private AtomicBoolean  drain = new AtomicBoolean(false);
    private final CountDownLatch finishedDraining = new CountDownLatch(1);

    /**
     * Constructor
     * @param operations The shared blocking queue where other threads can put document operations to.
     * @param endPoint The endpoint to feed to
     */
    public SampleStreamFeeder(BlockingQueue<SampleStreamFeeder.Operation> operations, Endpoint endPoint, boolean dryRun) {
        this.operations = operations;
        SessionParams sessionParams = new SessionParams.Builder()
                .addCluster(new Cluster.Builder().addEndpoint(endPoint).build())
                .setConnectionParams(new ConnectionParams.Builder().setDryRun(dryRun).build())
                .setFeedParams(new FeedParams.Builder()
                        .setDataFormat(FeedParams.DataFormat.JSON_UTF8)
                        .build())
                .build();
        // Simple bundled logger result callback
        this.feedClient = FeedClientFactory.create(sessionParams, new SimpleLoggerResultCallback(this.pending, 10));
    }

    /**
     * Shutdown this feeder, waits until operations on queue is drained
     */
    public void close() throws InterruptedException {
        log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size());
        drain.set(true);
        finishedDraining.await();
    }

    @Override
    public void run() {
        while (!drain.get() || !operations.isEmpty()) {
            try {
                SampleStreamFeeder.Operation op = operations.poll(1, TimeUnit.SECONDS);
                if(op == null) // no operations available
                    continue;
                pending.incrementAndGet();
                log.info("Put document " + op.documentId);
                feedClient.stream(op.documentId, op.data);
            } catch (InterruptedException e) {
                log.log(LogLevel.ERROR, "Got interrupt exception.", e);
                break;
            }
        }
        log.info("Shutting down feeding thread");
        this.feedClient.close();
        finishedDraining.countDown();
    }
}