Document Processor Development

The Document Processing Framework is the home for all global processing of Vespa input data (represented as Documents). It runs in the JDisc Container, and provides a development and hosting environment for processing components, and a model for composing such components developed by multiple development teams into a functional whole.

This document describes how to develop and deploy Document Processor components. For an introduction to the docproc framework, see Document Processing Design Goals and Features. To get started with development with the container, see Container developing. For reference, see the Document Processing Javadoc, and services.xml syntax reference.

Document Processors

The components of the docproc container are called document processors. A document processor is a component (usually deployed as part of an OSGi bundle) which extends the class com.yahoo.docproc.DocumentProcessor. All document processors must implement a single method:

public Progress process(Processing processing);
When the container receives a document operation, it will create a new Processing, and add the DocumentPuts, DocumentUpdates or DocumentRemoves to the List accessible through Processing.getDocumentOperations(). Furthermore, the call stack of the document processing chain in question will be copied to Processing.callStack(), so that document processors may freely modify the flow of control for this processing without affecting all other processings going on. After creation, the Processing is added to an internal queue.

A worker thread will retrieve a Processing from the input queue, and run its document operations through its call stack. A minimal, no-op document processor implementation is thus:

import com.yahoo.docproc.*;

public class SimpleDocumentProcessor extends DocumentProcessor {
    public Progress process(Processing processing) {
        return Progress.DONE;
    }
}

Clearly, the process() method should loop through all document operations in Processing.getDocumentOperations(), do whatever it sees fit to them, and return a Progress. It is acceptable for a document processor to throw an exception out of process().

public Progress process(Processing processing) {
    for (DocumentOperation op : processing.getDocumentOperations()) {
        if (op instanceof DocumentPut) {
            DocumentPut put = (DocumentPut) op;
            // TODO do something to 'put here
        } else if (op instanceof DocumentUpdate) {
            DocumentUpdate update = (DocumentUpdate) op;
            // TODO do something to 'update' here
        } else if (op instanceof DocumentRemove) {
            DocumentRemove remove = (DocumentRemove) op;
            // TODO do something to 'remove' here
        }
    }
    return Progress.DONE;
}
Possible outcomes of the process() method:

Progress.DONE
Returned if a document processor has successfully processed a Processing.
Progress.FAILED
Processing failed and the input message should return a fatal failure back to the feeding application, meaning that this application will not try to re-feed this document operation again.
Progress.LATER
The document processor wants to release the calling thread and be called again later. This is useful if e.g. calling an external service with high latency. The document processor may then save its state in the Processing and resume when called again later. There are no guarantees as to when the processor is called again with this Processing; it is simply appended to the back of the input queue.
Throwing a com.yahoo.docproc.TransientFailureException
Processing failed and the input message should return a transient failure back to the feeding application, meaning that this application may try to re-feed this document operation again.
Throwing any other RuntimeException
Same behavior as for Progress.FAILED.

By the use of Progress.LATER, this is an asynchronous model, where the processing of a document operation does not need to consume one thread for its entire lifespan. Note, however, that the document processors themselves are shared between all processing operations in a chain, and must thus be implemented in a thread-safe way.

Return an error message/reason by calling withReason().

if (op instanceof DocumentPut) {
    return Progress.FAILED.withReason("PUT is not supported");
}

Processing Context Variables

The Processing has a map String -> Object that can be used to pass information between document processors. It is also very useful when using Progress.LATER, as discussed above, to save the state of a processing.

/** Returns a context variable, or null if it is not set */
public Object getVariable(String name);

/** Returns an iterator of all context variables that are set */
public Iterator<Map.Entry<String, Object>> getVariableAndNameIterator();

/** Clears all context variables that have been set */
public void clearVariables();

/** Sets a context variable. */
public void setVariable(String name, Object value);

/** Removes a context variable. */
public Object removeVariable(String name);

/** Returns true if this variable is present, even if it is null */
public boolean hasVariable(String name);

Document Processor Chains

The call stack mentioned above is another name for a document processor chain. Document processor chains are a special case of the general component chains - to avoid confusion some concepts are explained here as well. A document processor chain is nothing more than a list of document processor instances, having an id, and represented as a stack. The document processor chains are typically not created for every processing, but are part of the configuration. Multiple ones may exist at the same time, the chain to execute will be specified by the message bus destination of the incoming message. The same document processor instance may exist in multiple document processor chains, which is why the CallStack of the Processing is responsible for knowing the next document processor to invoke in a particular message.

The execution order of the document processors in a chain are not ordered explicitly, but by ordering constraints declared in the document processors or their configuration.

Writing a Document Processor

Refer to Basic Search Java to get started writing Vespa plugins from an application package. This is an example of a complete document processor:

package com.yahoo.vespatest;

import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.datatypes.StringFieldValue;

public class Rot13DocumentProcessor extends DocumentProcessor {
    private static final String FIELD_NAME = "title";

    @Override
    public Progress process(Processing processing) {
        for (DocumentOperation op : processing.getDocumentOperations()) {
            if (op instanceof DocumentPut) {
                DocumentPut put = (DocumentPut) op;
                Document document = put.getDocument();

                StringFieldValue oldTitle = (StringFieldValue) document.getFieldValue(FIELD_NAME);
                if (oldTitle != null) {
                    document.setFieldValue(FIELD_NAME, rot13(oldTitle.getString()));
                }
            }
        }
        return Progress.DONE;
    }

    private static String rot13(String s) {
        StringBuilder output = new StringBuilder();
        for (int i = 0; i < s.length(); i++) {
            char c = s.charAt(i);
            if (c >= 'a' && c <= 'm' || c >= 'A' && c <= 'M') {
                c += 13;
            } else if (c >= 'n' && c <= 'z' || c >= 'N' && c <= 'Z') {
                c -= 13;
            }
            output.append(c);
        }
        return output.toString();
    }
}
This will compile if the container-dev.jar is in the class path - the bundle plugin automates this. In production, the container will create one or more instances of this class and place it in the desired document processor chain(s) to process document operations, as specified in the configuration. The container will create a new instance of this document processor only when it is reconfigured, so any data needed by the document processor can be read and prepared from a constructor. Constructors accept configuration, as any other pluggable component.

For reference, read the Document Processing Javadoc.

Testing a Document Processor

A document processor is tested running inside a container using Application:

package com.yahoo.vespatest;

import com.yahoo.application.ApplicationBuilder;
import com.yahoo.application.Networking;
import com.yahoo.application.container.DocumentProcessing;
import com.yahoo.application.container.JDisc;
import com.yahoo.component.ComponentId;
import com.yahoo.component.ComponentSpecification;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentType;
import com.yahoo.processing.execution.chain.ChainRegistry;
import org.junit.Test;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

public class Rot13DocprocTest {

    @Test
    public void requireThatBasicDocumentProcessingWorksForDoc() throws IOException {
        JDisc container = new ApplicationBuilder()
                .servicesXml("<container version=\"1.0\">\n" +
                             "  <document-processing>\n" +
                             "    <chain id=\"myChain\">\n" +
                             "      <documentprocessor id=\"" +
                             Rot13DocumentProcessor.class.getCanonicalName() + "\"/>\n" +
                             "    </chain>\n" +
                             "  </document-processing>\n" +
                             "</container>\n")
                .documentType("music",
                              "document music {\n" +
                              "  field title type string { }\n" +
                              "}\n")
                .networking(Networking.disable)
                .build()
                .getJDisc("jdisc");

        DocumentProcessing docProc = container.documentProcessing();
        DocumentType type = docProc.getDocumentTypes().get("music");

        ChainRegistry<DocumentProcessor> chains = docProc.getChains();
        assertTrue(chains.allComponentsById().containsKey(new ComponentId("myChain")));

        Document doc = new Document(type, "id:test:music::this:is:a:great:album");
        doc.setFieldValue("title", "Great Album!");
        com.yahoo.docproc.Processing processing;
        DocumentProcessor.Progress progress;

        processing = new com.yahoo.docproc.Processing(doc);
        progress = docProc.process(ComponentSpecification.fromString("myChain"), processing);
        assertThat(progress, sameInstance(DocumentProcessor.Progress.DONE));
        assertThat(doc.getFieldValue("title").toString(), equalTo("Terng Nyohz!"));

        container.close();
    }
}

Deploying a Document Processor

Once the document processor passes unit tests, it can be deployed to the Vespa system which will host it. The document processor must be packaged as a bundle and added to the Vespa application package, which is then deployed. How to create a bundle is described in Building OSGi bundles. To include the document processor in services.xml, define a document processing chain (here named default) and add document processor to it:

<?xml version="1.0" encoding="utf-8"?>
<services version="1.0">
  <admin version="2.0">
    <adminserver hostalias="node1" />
  </admin>
  <container version="1.0" id="default">
    <nodes>
      <node hostalias="node1"/>
    </nodes>
    <document-processing>
      <chain id="default">
        <documentprocessor id="com.yahoo.vespatest.Rot13DocumentProcessor"/>
      </chain>
    </document-processing>
  </container>
</services>
The document processor id above is resolved to the component bundle jar we added by the symbolic name in the manifest, and to the right class within the bundle by the class name. By keeping all these three the same, we keep things simple, but more advanced use where this is possible is also supported. This will be explained in later sections.

By creating a directory containing services.xml, hosts.xml and components/rot13docproc.jar, that directory becomes a complete application package containing a bundle. Deploy this to a Vespa instance.

Document Processing Execution Model

The Document Processing Framework works like this:

  1. A thread from the message bus layer appends an incoming message to an internal priority queue, shared between all document processing chains configured on a node. The priority is set based on the message bus priority of the message. Messages of the same priority are ordered FIFO.
  2. One worker thread from the docproc thread pool picks one message from the head of the queue, deserializes it, copies the call stack (chain) in question, and runs it through the document processors.
  3. Processing finishes if (a) the document(s) has passed successfully through the whole chain, or (b) a document processor in the chain has returned Progress.FAILED or thrown an exception.
  4. The same thread passes the message on to the message bus layer for further transport on to its destination.
There is a single instance of each document processor chain. In every chain, there is a single instance of each document processor. (Unless a chain is configured with multiple, identical document processors - this is a rare case.)

As is evident from the model above, multiple worker threads execute the document processors in a chain concurrently. Thus; many threads of execution can be going through the process() method of a document processor, at the same time.

This model places a very important constraint on document processor classes: instance variables are not safe. They must be eliminated, or made threadsafe somehow.

State in Document Processors

Any state in the document processor for the particular Processing should be kept as local variables in the process method, while state which should be shared by all Processings should be kept as member variables. As the latter kind will be accessed by multiple threads at any one time, the state of such member variables must be threadsafe. This critical restriction is similar to those of e.g. the Servlet API. Options for implementing a multithread-safe document processor with instance variables:

  1. Use immutable (and preferably final) objects: they never change after they are constructed; no modifications to their state occurs after the DocumentProcessor constructor returns.
  2. Use a single instance of a thread-safe class.
  3. Create a single instance and synchronize access to it across all threads (but this will severely limit your scalability).
  4. Arrange for each thread to have its own instance, e.g. with a ThreadLocal.

Asynchronous Execution

The execution model outlined above also shows one important restriction: If a document processor performs any high-latency operation in its process() method, a docproc worker thread will be occupied. With all n worker threads blocking on an external resource, throughput will be severely limited. This can be alleviated by saving the state in the Processing object, and returning Progress.LATER. A document processor doing a high-latency operation should use a pattern like this:

  1. Check a self-defined context variable in Processing for status. Basically, have we seen this Processing before?
  2. If no:
    1. We have been given a Processing object fresh off the network, we have not seen this before. Process it up until the high-latency operation.
    2. Start the high-latency operation (possibly in a separate thread).
    3. Save the state of the operation in a self-defined context variable in the Processing.
    4. Return Progress.LATER. This Processing is the appended to the back of the input queue, and we will be called again later.
  3. If yes:
    1. Retrieve the reference that we set in our self-defined context variable in Processing.
    2. Is the high-latency operation done? If so, return Progress.DONE.
    3. Is it not yet done? Return Progress.LATER again.
As is evident, this will let the finite set of document processing threads to perform more work at the same time.

Reconfiguring Document Processing

Consider the following configuration:

<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0">
  <container version="1.0" id="default">
    <document-processing>
      <chain id="default">
        <documentprocessor id="SomeDocumentProcessor">
          <config name="foo.something">
            <variable>value</variable>
          </config>
        </documentprocessor>
      </chain>
    </document-processing>
  </container>
</services>
Changing chain ids, components in a chain, component configuration, and schema mapping all takes effect after vespa-deploy activate - no restart required. Changing cluster names (i.e. the jdisc id) requires a restart of docproc services after vespa-deploy activate.