Processing makes it easy to create low-latency request/response processing applications. It is the recommended way of creating such applications on top of JDisc, but can also be used independently of JDisc. Processing lets you define application behavior by combining Processors performing simple tasks. Processors use a synchronous call model, but the underlying IO may be asynchronous.
Javadoc:
com.yahoo.processing.Processor
com.yahoo.processing.rendering.Renderer
To use processing, add this dependency to pom.xml:
<dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>container</artifactId> <version>8.444.18</version> <!-- Find latest version at search.maven.org/search?q=g:com.yahoo.vespa%20a:container --> <scope>provided</scope> </dependency>
Or read how to start a deployable project from scratch.
A processor subclasses Processor and implements a single method:
package com.mydomain.example; import com.yahoo.processing.*; import com.yahoo.processing.execution.Execution; import com.yahoo.processing.test.ProcessorLibrary.StringData; public class ExampleProcessor extends Processor { @Override public Response process(Request request, Execution execution) { // Process the Request: request.properties().set("foo","bar"); // Pass on to the next processor in the chain Response response=execution.process(request); // process the response response.data().add(new StringData(request,"Hello, world!")); return response; } }
Processors may work on both the request and response, pass on the request one or more times to further processors or create the result data internally or by contacting a remote service. The result data may be a nested composite structure where content is contributed by multiple processors.
Processors should carry out a single task and are combined into complete applications. This is achieved using Chains:
Chain<Processor> myChain=new Chain<Processor>(new ExampleProcessor(), new FooProcessor(), new BarProcessor()); Response response=new Execution(myChain).process(request); // execute this chain
This executes the three processors in order. The Execution keeps track of the execution state so the same processor instances may be used in many chains at the same time. When the execution reaches the end of the chain, the execution returns an empty Response to the processor calling it. An AsyncExecution class is provided as a convenience to perform an execution in a separate thread instead.
In most cases it is more convenient to configure chains and processor instances using external configuration. Chains of processors may be specified in a processing element in the services.xml file in the application package. The compiled processors are added to the application package as OSGi components. Chain configuration allows chains to be defined as sets of processors with ordering constraints, such that the global ordering of processors can be figured out by the framework, and set operations con chains can be used to define extensions and variants of chains.
In some cases it is useful to return a Response before all the data in it is available. This allows returning a partial response to clients with low latency even though the complete response contains some data arriving more slowly. The slow data can be added to the Response as a placeholder where actual data will arrive later. The processing framework allows waiting or listening for such completion events as Guava ListenableFutures.
If all data is added to the Response as future placeholders the processing framework becomes completely non-blocking.
Processors in real applications will typically depend on some configuration and/or other components to run. Such dependencies should be declared as straightforward constructor arguments to allow them to be injected at construction time.
The container runtime used to host the processing framework uses a dependency injection framework based in Guice, see container components.
As a processor may participate in many processing executions at one time, field values in a processing class should usually be immutable after construction is completed.
A Renderer is used to serialize the Response for return to a
client. Renderers are subclasses of
com.yahoo.processing.rendering.Renderer
. A convenience
superclass which handles waiting for future data in the asynchronous
case is provided as
com.yahoo.processing.rendering.AsynchronousSectionedRenderer
.
The default renderer, which renders in a simple JSON format is
com.yahoo.processing.rendering.ProcessingRenderer
and can be subclassed to customize rendering of each kind of Data item.
Processors are
regular components which are
added to the application package in
the renderer
section of the services.xml file. A renderer is selected
in the request by setting the format
parameter in the request
to the renderer id.
The Processing framework is meant to be generic and minimal. In some domains it is useful to employ a richer model of Processors, Requests, Responses and Executions targeted to that domain. An example is the Search domain, where Searchers, Queries and Results subclass Processors, Requests and Responses. The Processing framework is designed to allow such subclassing to build richer frameworks on top.
A processor can be tested running inside a container. We create a JDisc from services.xml:
import com.yahoo.application.container.JDisc; import com.yahoo.application.Networking; import com.yahoo.processing.Request; import com.yahoo.processing.Response; import com.yahoo.component.ComponentSpecification; import org.junit.Test; import static org.junit.Assert.assertThat; import static org.junit.matchers.JUnitMatchers.containsString; public class ContainerTest { @Test public void testSearch() { String servicesXml = "<container version=\"1.0\">" + " <processing>" + " <chain id=\"default\">" + " <processor id=\"com.mydomain.example.ExampleProcessor\" />" + " </chain>" + " </processing>" + "</container>"; try (JDisc container = JDisc.fromServicesXml(servicesXml, Networking.disable)) { Response response = container.processing().process(ComponentSpecification.fromString("default"), new Request()); assertThat(response.data().get(0).toString(), containsString("Hello, world!")); } } }
We can also examine which processors are in a chain and their ordering:
ChainRegistry<Processor> chains = container.processing().getChains(); Chain<Processor> defaultChain = chains.getComponent("default"); boolean foundExampleProcessor = false; for (Processor processor: defaultChain.components()) { if ("ExampleProcessor".equals(processor.getClassName())) foundExampleProcessor = true; } assertTrue("No instance of ExampleProcessor found in the default chain", foundExampleProcessor)
A complete application will usually be composed of several processor chains,
which may or may not invoke each other. To select a chain configured with
another id
than "default", add the chain ID as a GET
parameter named chain
.
In other words, given a chain named "testbed", as in:
<container version="1.0"> <processing> <chain id="testbed"> <processor id="com.yahoo.example.ExampleProcessor" /> </chain> </processing> </container>
The chain testbed could be tested from the command line by doing:
$ curl http://hostname:port/processing/?chain=testbed
This section contains a collection of "how do I" explanations with processing. Most of these pertains to the jDisc binding of Processing, but note that Processing is independent of jDisc and may be invoked programmatically in any environment.
Processors which interface with the network layer may need to access the network level request to access headers or request data, or to make outgoing calls through jDisc. The jDisc request is available through request properties:
httpRequest = (com.yahoo.container.jdisc.HttpRequest)processingRequest.properties().get("jdisc.request");
Response headers may be added to any Response by adding instances of
com.yahoo.processing.handler.ResponseHeaders
to the Response
(ResponseHeaders is a kind of response Data).
Multiple instances of this may be added to the Response, and the complete set of headers returned
is the superset of all such objects. Example Processor:
import com.yahoo.processing.Processor; import com.yahoo.processing.Request; import com.yahoo.processing.Response; import com.yahoo.processing.handler.ResponseHeaders; import com.yahoo.processing.execution.Execution; import java.util.Collections; import java.util.Map; import java.util.List; public class ResponseHeaderSetter extends Processor { private final Map<String,List<String>> responseHeaders; public ResponseHeaderSetter(Map<String,List<String>> responseHeaders) { this.responseHeaders = Collections.unmodifiableMap(responseHeaders); } @Override public Response process(Request request, Execution execution) { Response response = execution.process(request); response.data().add(new ResponseHeaders(responseHeaders, request)); return response; } }
This section lists a few example processors which shows some use cases for the asynchronous aspects of the API.
import com.yahoo.component.chain.Chain; import com.yahoo.processing.Processor; import com.yahoo.processing.Request; import com.yahoo.processing.Response; import com.yahoo.processing.execution.AsyncExecution; import com.yahoo.processing.execution.Execution; import com.yahoo.processing.response.FutureResponse; import java.util.*; /** * Call a number of chains in parallel */ public class Federator extends Processor { private final List<Chain<? extends Processor>> chains; public Federator(Chain<? extends Processor> … chains) { this.chains= Arrays.asList(chains); } @Override public Response process(Request request, Execution execution) { List<FutureResponse> futureResponses=new ArrayList<FutureResponse>(chains.size()); for (Chain<? extends Processor> chain : chains) { futureResponses.add(new AsyncExecution(chain,execution).process(request)); } Response response=execution.process(request); AsyncExecution.waitForAll(futureResponses,1000); for (FutureResponse futureResponse : futureResponses) { Response federatedResponse=futureResponse.get(); response.data().add(federatedResponse.data()); response.mergeWith(federatedResponse); } return response; } }
import com.yahoo.processing.*; import com.yahoo.processing.execution.Execution; import com.yahoo.processing.response.*; import com.yahoo.processing.test.ProcessorLibrary.StringData; /** * A data producer which producer data which will receive asynchronously. * This is not a realistic, thread safe implementation as only the incoming data * from the last created incoming data can be completed. */ public class AsyncDataProducer extends Processor { private IncomingData incomingData; @Override public Response process(Request request, Execution execution) { DataList dataList = ArrayDataList.createAsync(request); // Default implementation incomingData=dataList.incoming(); return new Response(dataList); } /** Called by some other data producing thread, later */ public void completeLateData() { incomingData.addLast(new StringData(incomingData.getOwner().request(), "A late hello, world!")); } }
import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.component.chain.Chain; import com.yahoo.processing.*; import com.yahoo.processing.execution.*; /** * A processor which registers a listener on the future completion of * asynchronously arriving data to perform another chain at that point. */ public class AsyncDataProcessingInitiator extends Processor { private final Chain<Processor> asyncChain; public AsyncDataProcessingInitiator(Chain<Processor> asyncChain) { this.asyncChain=asyncChain; } @Override public Response process(Request request, Execution execution) { Response response=execution.process(request); response.data().complete().addListener(new RunnableExecution(request, new ExecutionWithResponse(asyncChain, response, execution)), MoreExecutors.sameThreadExecutor()); return response; } }