Vespa Java Feeding Client

The Vespa Java Feeding Client is a Java API and command line tool to feed document operations to Vespa. Even though it is possible to feed using the Document API, there are several advantages by using the client:

  • Uses HTTP as transport protocol
  • Feeds in parallel to one or more Vespa clusters, ordering still guaranteed
  • Programmatic Java API that has very few runtime dependencies in its pom.xml, and which is completely independent of Vespa
  • Easy feeding from sources like e.g. Hadoop or Storm, or any other Java process
  • Feed with much higher performance than all naïve HTTP feeding approaches
  • Automatic retries, re-connect, performance tuning through parameters, compression, statistics supported
  • Supports unlimited sized data sources, it is processed streaming (the simple HTTP protocol is batch)
  • Built in back throttling, it is hard to flood the system using this library.
There is a prebuilt standalone command-line tool that let you feed from file using the API. This tool can be used outside the Vespa cluster.

Enable and build

The receiving process in Vespa is a container node - add document-api:

<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0">

  <container version="1.0" id="default">
     <document-api/>
  </container>

</services>
The sending side is any Java process. To use the Java Client API, add the following dependency to pom.xml:
<dependency>
  <groupId>com.yahoo.vespa</groupId>
  <artifactId>vespa-http-client</artifactId>
  <version> <!--set correct version--> </version>
</dependency>

Using the Java Vespa Feeding Client

Use the API by running a binary. This binary supports feeding document operations and is installed with Vespa - found at:

$VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar
Example:
$ java -jar $VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar --file file.json --host gatewayhost
Alternatively, read from stdin:
$ java -jar $VESPA_HOME/lib/jars/vespa-http-client-jar-with-dependencies.jar --host gatewayhost < file.json
Use --help to list supported features.

Using the Java Vespa Feeding Client API

In this API, call stream(document) on document operations. The API will block while/if the queues are full. When a document has been processed, a callback is asynchronously made. This callback will always happen, even when errors occur.

The API is centered around the FeedClient, which is created using a FeedClientFactory:

public interface FeedClient extends AutoCloseable {

    /**
     * Streams a document operation to cluster(s). If the pipeline and buffers are full, this call will be blocking.
     * Document operations might time out before they are sent. Failed operations are not re-attempted.
     * Don't call stream() after close is called.
     * @param documentId Document id of the document.
     * @param documentData The document data as JSON (as specified when using the factory to create the API)
     */
    void stream(String documentId, CharSequence documentData);

    /**
     * This callback is executed when new results are arriving. Don't do any heavy lifting in this thread (no IO, disk,
     * or heavy CPU usage) as this might block the progress of result processing and data sending.
     * This call back will run in a different thread than your main program so use e.g.
     * AtomicInteger for counters and follow general guides for thread-safe programming.
     * There is an example implementation in class SimpleLoggerResultCallback.
     */
    static interface ResultCallback {
        void onCompletion(String docId, Result documentResult);
    }

    /**
     * Waits for all results to arrive and closes the FeedClient. Don't call any other method after calling close().
     * Does not throw any exceptions.
     */
    @Override
    void close();
When creating a FeedClient, some config must be provided. The minimum is the hostname and port of one HTTP gateway, denoted as an Endpoint. Other options include:
  • One or more clusters to feed to in parallel.
  • The data format to use, UTF-8-encoded JSON.
  • The route and timeout for the data in the Vespa cluster.
  • Whether to use SSL or not (default: off).
  • Any additional HTTP headers
See the Javadoc for a full overview.

Feeding a stream of data

If there is already a file of data that is just waiting to be fed into Vespa, consider using one of the utility functions for feeding inputStreams into already created FeedClients.

 /**
   * Utility function that takes an array of JSON documents and calls the FeedClient for each element.
   * @param inputStream This can be a very large stream. The outer element is an array (of document operations).
   * @param feedClient The feedClient that will receive the document operations.
   * @param numSent increased per document sent to API (but no waiting for results).
   */
  public static void feedJson(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
      JsonReader.read(inputStream, feedClient, numSent);
  }

Result callbacks

After a document operation has been streamed, there will always be a callback eventually.

The Result has information on its document ID, the Endpoint it was fed to (recall that operations are sent to Endpoints based on hashing of document ID within one Cluster), whether it was successful or not, whether an error was transient or permanent, and any exception that was thrown (possibly containing an error message from the Vespa cluster).

The order of the callbacks is non-deterministic - the only guarantee is that operations are sequenced per document ID.

Error handling

The vespa-http-client is set up with a high number of retries and a long time out (configurable). The client will retry transient errors such as network problems and capacity problems. If they later succeed, they are not reported to the user. For permanent problems such as parsing problems, it will fail early. There should normally be no need to retry operations that have failed externally. All errors are propagated out to Result objects. The API has no (semi) transactional behavior (such as rollback in case of partial failure in a multi-cluster scenario). It is thus important that user code checks the state of the Result objects returned.

Feeding Multiple Clusters

Both the API and the command line tool have support for feeding multiple clusters. For example if there are a few documents that are to be inserted in two clusters, it is easy to feed both simultaneously. In the command line tool you specify several hosts comma separately, and in the API more clusters are added in the parameters. However, in a production system this is more complex than feeding a single cluster.

An important question is what should happen when feeding a cluster is problematic. This can be network problems, cluster problems, bugs, regions that are down due to power maintenance etc. If you want all clusters to be synchronized, it means that feeding should stop when one cluster can not receive data. If you want all clusters to be as updated as soon as possible at all times, it means feeding should continue to the clusters that are up and maybe be replayed to the clusters that are unavailable when they come up (with all the challenges this have).

In the current library, you will get feedback on each feed operation; which clusters where successful and which failed. On failures you can e.g. save the document to a file and retry it later (the document is in the reply). However, the cluster that is down might impact the global feed rate because the feeding library might wait for timeout before answering the request, and the number of in-flight requests is limited. One way around this is to have independent processes for each cluster and feed totally asynchronously. Which approach is best depends on the use case; how long lived is the data, is it acceptable that the clusters are out of synch, is it ok to drop data in case of failure and so on.

We know this is hard, and we are currently working on making a more sophisticated solution for our tenants. The design is still in progress, but we foresee automatic reply on clusters that comes up after being down, easier monitoring, alerting, and better transparency on what is happening.

Config parameters

The default values for the client should work fine in most cases. In some cases you want to try to tune feeding for higher throughput. You need to understand where the bottlenecks are in order to improve performance. If you start feeding from more machines in parallel, and the total throughput increases significantly, it is the feeding client that is the bottleneck. You can try to increase number of threads, enable compression, but you might need to feed from several machines to increase speed. Remember that ordering is only guaranteed per instance of the library, so when feeding from different machines you might want to shard data accordingly. If it does not increase performance, the vespa cluster is the bottleneck. You need to increase the number of containers with gateway nodes and/or increase the number of content nodes. Use --priority with a load type to assign priority classes to operations.

Configuration

The configuration parameters are set in the SessionParams builder. The SessionParams consists of two sub configurations, one is called FeedParams and one is called ConnectionParams. Additionally SessionParams consists of a set of clusters.

Endpoints

A cluster consists of one or more Endpoints. If the endpoint server is overloaded, it makes sense to add more endpoints to increase performance. A common pattern is to run the DocumentProcessor on the same server as the endpoint, so it really depends on what the DocumentProcessor is doing. You can start with one endpoint and monitor the server.

You can feed to several clusters and they can be in different data centers. Please note that the performance will be driven by the instance in the slowest data center with the slowest network seen from the feed client. It might be possible to set timeouts so the slowest data center does not slow down other instances, but this will cause data loss. It will create a model that is hard to understand, so it is not recommended. The client does have buffering so temporarily slowdowns is fine.

ConnectionParams

If throttled by the network, try enabling compression.

One parameter that has impact on performance is numPersistentConnectionsPerEndpoint. The client uses parallel requests to the gateway. This reduces the impact of round-trip time and enables using more of the network bandwidth. Setting it too high is not beneficial. This will cause more network sessions, and might create packet loss on the network. Setting it too low means you will not be able to use the network bandwidth. It seems like sweet spot is around 32. If feeding from hundreds of machines to a small cluster, it makes sense to use a low number such as 2.

FeedParams

There is a timeout that will cause the stream to fail if response from server is slow (or never happens). This timeout is the sum of the serverTimeout and clientTimeout. This value impacts performance since it can reduce the impact of a slow endpoint at the expense of data loss and high complexity. Slow cluster will then timeout, and the clusters might be overloaded. In case where the client is not even able to send document due to queued up system, the client *might* drop the document if the clientTimeout is reached before the document hits the network.

maxChunkSizeBytes is the max size of a request sent to an endpoint. If there are documents waiting to be sent it will pack them together up to this size. Setting this value to high can lead to out-of-memory for feeder or endpoint. Setting it too low means that the round trip time of the connection might have an impact. This parameter is related to numPersistentConnectionsPerEndpoint. Sweet spot is expected to be wide and a value of 200kbytes should work in most cases.

It is possible to limit the number of outstanding requests per cluster by adjusting maxInFlightRequests. It makes sense to tune this parameter if parts of the cluster gets overloaded.

Find slowest cluster when feeding multi-cluster

Get a snapshot of the current state by calling FeedClient getStatsAsJson(). This JSON is meant to be human readable, and can change at any time - example:

{
    "clusters": [
        {
            "clusterid": 0,
            "stats": {
                "session": [
                    {
                        "endpoint": {
                            "host": "localhost",
                            "port": 8080
                        },
                        "stats": {
                            "wrongSessionDetectedCounter": 0,
                            "wrongVersionDetectedCounter": 0,
                            "problemStatusCodeFromServerCounter": 0,
                            "executeProblemsCounter": 0,
                            "docsReceivedCounter": 4,
                            "statusReceivedCounter": 4,
                            "pendingDocumentStatusCount": 0
                        }
                    }
                ]
            }
        }
    ],
    "sessionParams": {
      // .. The configuration parameters used.
    }
}
Find slow endpoints as these will usually have more pending documents. Verify that documents are sharded nicely by looking at document received on various endpoints. For debugging purposes, dump this to a log regularly.

Sample code

package com.yahoo.example.feed;

import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedClientFactory;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.*;

import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
 * Sample feeder demonstrating how to programmatically feed to a Vespa cluster.
 */
public class SampleFileFeeder {

    private final Endpoint endpoint;
    private final SessionParams sessionParams;
    private final static Logger log = Logger.getLogger(SampleFileFeeder.class.getCanonicalName());

    /**
     * Whenever a result is received, this class is invoked. It keeps track of basic statistics.
     */
    static class ResultCallBack implements FeedClient.ResultCallback {

        final AtomicInteger resultsReceived = new AtomicInteger(0);
        final AtomicInteger errorsReceived = new AtomicInteger(0);
        final long startTimeMillis = System.currentTimeMillis();;

        @Override
        public void onCompletion(String docId, Result documentResult) {
            resultsReceived.incrementAndGet();
            if (!documentResult.isSuccess()) {
                log.warning("Problems with docID " + docId + ":" + documentResult.toString());
                errorsReceived.incrementAndGet();
            }
        }

        void dumpStatsToLog() {
            log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors.");
            log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms.");
        }
    }

    /**
     * Sample constructor without compression
     * @param endpoint The endpoint to feed to
     * @param dryRun if true, data is not sent to cluster.
     */
    public SampleFileFeeder(Endpoint endpoint, boolean dryRun) {
        this(endpoint, false, dryRun);
    }

    /**
     * More advanced constructor, that supports compression.
     *
     * @param endpoint The endpoint to feed to
     * @param useCompression  Whether to use compression or not
     * @param dryRun if true, will not send data to real cluster
     */
    public SampleFileFeeder(Endpoint endpoint, boolean useCompression, boolean dryRun) {
        this.endpoint = endpoint;
        this.sessionParams = new SessionParams.Builder()
                .addCluster(new Cluster.Builder().addEndpoint(this.endpoint).build())
                .setConnectionParams(new ConnectionParams.Builder()
                        .setDryRun(dryRun)
                        .setUseCompression(useCompression).build())
                .setFeedParams(new FeedParams.Builder()
                        .setDataFormat(FeedParams.DataFormat.JSON_UTF8)
                        .build())
                .build();
    }

    /**
     * Feed all operations from a stream.
     *
     * @param stream The input stream to read operations from (JSON formatted).
     */
    public ResultCallBack batchFeed(InputStream stream, String batchId) {
        ResultCallBack results = new ResultCallBack();
        FeedClient feedClient = FeedClientFactory.create(this.sessionParams, results);
        AtomicInteger numSent = new AtomicInteger(0);
        log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'");
        FeedClient.feedJson(stream, feedClient, numSent);
        feedClient.close();  // Close will wait for all operations
        log.info("Feed " + numSent.get() + " operations to single cluster " + endpoint + ", batch: '" + batchId +".");
        results.dumpStatsToLog();
        return results;
    }
}
package com.yahoo.example.feed;

import com.yahoo.log.LogLevel;
import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedClientFactory;
import com.yahoo.vespa.http.client.SimpleLoggerResultCallback;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.config.SessionParams;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
 * Simple Streaming feeder implementation which will send operations to a Vespa endpoint.
 * Other threads communicate with the feeder by adding new operations on the BlockingQueue
 */

public class SampleStreamFeeder extends Thread {

    public static class Operation {
        final public String documentId;
        final public CharSequence data;

        public Operation(String id, CharSequence data) {
            this.documentId = id;
            this.data = data;
        }
    }

    private BlockingQueue<Operation> operations;
    private FeedClient feedClient;
    private AtomicInteger pending = new AtomicInteger(0);
    private final static Logger log = Logger.getLogger(SampleStreamFeeder.class.getCanonicalName());
    private AtomicBoolean  drain = new AtomicBoolean(false);
    private final CountDownLatch finishedDraining = new CountDownLatch(1);

    /**
     * Constructor
     * @param operations The shared blocking queue where other threads can put document operations to.
     * @param endPoint The endpoint to feed to
     */
    public SampleStreamFeeder(BlockingQueue<SampleStreamFeeder.Operation> operations, Endpoint endPoint, boolean dryRun) {
        this.operations = operations;
        SessionParams sessionParams = new SessionParams.Builder()
                .addCluster(new Cluster.Builder().addEndpoint(endPoint).build())
                .setConnectionParams(new ConnectionParams.Builder().setDryRun(dryRun).build())
                .setFeedParams(new FeedParams.Builder()
                        .setDataFormat(FeedParams.DataFormat.JSON_UTF8)
                        .build())
                .build();
        // Simple bundled logger result callback
        this.feedClient = FeedClientFactory.create(sessionParams, new SimpleLoggerResultCallback(this.pending, 10));
    }

    /**
     * Shutdown this feeder, waits until operations on queue is drained
     */
    public void close() throws InterruptedException {
        log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size());
        drain.set(true);
        finishedDraining.await();
    }

    @Override
    public void run() {
        while (!drain.get() || !operations.isEmpty()) {
            try {
                SampleStreamFeeder.Operation op = operations.poll(1, TimeUnit.SECONDS);
                if(op == null) // no operations available
                    continue;
                pending.incrementAndGet();
                log.info("Put document " + op.documentId);
                feedClient.stream(op.documentId, op.data);
            } catch (InterruptedException e) {
                log.log(LogLevel.ERROR, "Got interrupt exception.", e);
                break;
            }
        }
        log.info("Shutting down feeding thread");
        this.feedClient.close();
        finishedDraining.countDown();
    }
}