# Feed using Hadoop, Pig, Oozie

This document discusses methods to feed to Vespa from various grid resources.

Examples use a local file system. Prerequisite: Have some data on a Hadoop cluster, ready to feed to Vespa. This includes text or JSON files on HDFS, data in Hive or HBase table.

vespa-hadoop contains classes and utilities to enable feeding directly to Vespa endpoints. It is a minimal layer over the vespa-feed-client. Dependency:

<dependency>
<groupId>com.yahoo.vespa</groupId>
</dependency>


## Pig

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. Pig is the standard method for ETL and grid scale analysis at Yahoo. Pig features methods to load data from different types of sources, transform data, and store that data into various types of destinations.

The vespa-hadoop library provides the VespaStorage class which provides a simple way of storing data into a Vespa endpoint. Basic usage:

REGISTER path/to/vespa-hadoop.jar

A = LOAD '<path>' [USING <storage>] [AS <schema>];

-- apply any transformations

STORE A INTO '$ENDPOINT' USING VespaStorage();  To run the above Pig script, specify endpoint. The endpoint is the hostname to feed to. Port 4080 currently the default setting, to override the port use -Dvespa.feed.defaultport=8080. VespaStorage supports feeding to multiple clusters. To specify multiple endpoints, separate them by commas. The VespaStorage will write the operations to all clusters and wait for a reply/ack from all, so it's a blocking implementation. If one cluster is down for some reason (e.g. maintenance), also the remaining healthy clusters stops getting operations. As an example, assume the above script is called feed.pig. Run it from a Hadoop node to two Vespa instances (add vespa.feed.proxy.host/port as needed): pig -x local -f feed.pig -p ENDPOINT=hostname-of-endpoint-1,hostname-of-endpoint-2 -Dvespa.feed.proxy.host=proxy-host -Dvespa.feed.proxy.port=proxy-port -Dvespa.feed.defaultport=8080  ### Feeding pre-constructed operations In the case where the document operations are already formatted using the Vespa JSON format, one can feed this directly to a Vespa endpoint: REGISTER vespa-hadoop.jar DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); -- Load data - one column for json data data = LOAD 'feed-dump.data' AS (json:chararray); -- Store into Vespa STORE data INTO '$ENDPOINT' USING VespaStorage();

The source, e.g 'feed-dump.data' needs to contain exactly one valid JSON document operation per line, e.g
{"put": "id:namespace:music::0","fields": {"title":"foo"}}
{"put": "id:namespace:music::1","fields": {"title":"foo"}}
{"update": "id:namespace:music::2","fields": { "title": { "assign":"bar"}}}

To store multi-line formatted JSON operations use the VespaSimpleJsonLoader:
REGISTER vespa-hadoop.jar

-- Store into Vespa
STORE DATA INTO '$ENDPOINT' USING VespaStorage();  ### Feeding structured data Example schema: schema music { document music { field album type string { indexing: index } field artist type string { indexing: summary | index } field duration type int { indexing: summary } field title type string { indexing: summary | index } field year type int { indexing: summary | attribute } } }  Sample data, tab separated: Bad Michael Jackson Bad 1987 247 Recovery Eminem So Bad 2010 240  When loading data in Pig, specify a schema for that data unless it can be inferred from the data source. After selecting the fields to feed to Vespa, a document operation must be constructed. There are two ways this can be accomplished. Let the VespaDocumentOperation UDF construct the operation: REGISTER vespa-hadoop.jar -- Create valid Vespa put operations DEFINE VespaPutOperation com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( 'operation=put', 'docid=id:namespace:music::<artist>-<year>' ); DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); -- Load data from any source - here we load using PigStorage data = LOAD '<hdfs-path>' AS (album:chararray, artist:chararray, title:chararray, year:int, duration:int); -- Transform tabular data to a Vespa document operation JSON format data = FOREACH data GENERATE VespaPutOperation(*); -- Store into Vespa STORE data INTO '$ENDPOINT' USING VespaStorage();


Here, the VespaDocumentOperation is defined and configured to create the document operation. The notable part is the docid=... section, which specifies a document id template for each document. The fields inside the brackets will be replaced with the values in each record. This is a simple search and replace operation, define custom document IDs as needed. PUT is default, use (REMOVE, UPDATE) as well.

Additionally, all fields in the relation (here data) will be converted into a JSON form with best-effort, and should be suitable for most cases. It uses the Pig schema to determine type conversion, and the mapping between Pig data types and JSON Vespa types are as follows:

 Pig type JSON-Vespa type int number long number float number double number chararray string bytearray base64 encoded string boolean boolean datetime long - milliseconds since epoch biginteger number bigdecimal number tuple array bag array of arrays map JSON object
In this case, the first row in data above will be transformed to the following JSON:
{
"put": "id:namespace:music::Michael Jackson-1987",
"fields": {
"artist": "Michael Jackson",
"duration": 247,
"year": 1987
}
}


In case the VespaDocumentOperation does not fit the needs, write a custom UDF (user defined function) in any of the supported languages to transform the data into valid Vespa operations. Refer to the Pig documentation for more details on how to write UDFs.

As an alternative to the above approach, let the VespaStorage class directly generate the document operations without using the above UDF:

REGISTER vespa-hadoop.jar

-- Transform tabular data to a Vespa document operation JSON format
DEFINE VespaStorage
'create-document-operation=true',
'operation=put',
'docid=id:namespace:music::<artist>-<year>'
);

-- Load data from any source - here we load using PigStorage
data = LOAD '<source>' AS (<schema>);

-- transform and select fields

-- Store into Vespa
STORE data INTO '$ENDPOINT' USING VespaStorage();  Here the required information to create document operations is added to the configuration of the VespaStorage class. Internally it uses the same code as the above approach, so results will be identical. ### Parameters Parameters for VespaStorage and VespaDocumentOperation are: parameterdefaultdescription create-document-operation false true or false. Only valid for VespaStorage operation put Any valid Vespa document operation docid A document id template. Replaces <field-name> with its value for every record exclude-fields A list of fields to exclude when creating document operations. The field is still available for the document id template. condition A test-and-set condition template. Replaces <field-name> with its value for every record create-if-non-existent false If true, adds create=true for creating documents if they don't yet exist. Only valid with operation=update. update-map-fields A list of map-type fields to update individual entries in the map when creating document operations. remove-map-fields A list of map-type fields to remove individual entries in the map when creating document operations. update-tensor-fields A list of tensor-type fields to add or overwrite individual tensor cells when creating document operations. remove-tensor-fields A list of tensor-type fields to remove existing tensor fields in a document when creating document operations. create-tensor-fields A list of fields to format as tensors. bag-as-map-fields A list of bag with tuple fields to format as a map. simple-array-fields A list of fields containing a bag of single element tuples to format as an array. simple-object-fields A list of fields containing a tuple to format as a general JSON object which can be used for weightedsets, structs or maps. The following is an example on using the above parameters to update map and tensor fields. Example schema: search album { document album { struct song { field name type string {} field duration type int {} } field songs type map<int, song> { indexing: summary } field song_popularity type tensor<float>(song{}) { indexing: attribute | summary } } }  Use update parameters on specific fields: REGISTER vespa-hadoop.jar -- Transform tabular data to a Vespa document operation JSON format DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage( 'create-document-operation=true', 'operation=update', 'docid=id:namespace:song::<docId>', 'update-map-fields=songs', 'update-tensor-fields=song_popularity' ); -- Load data from any source - here we load using PigStorage data = LOAD '<source>' AS (<schema>); -- output : {docId: chararray,songs: {t: (key: int,value: (name: chararray,duration: int))},song_popularity: map[]} DESCRIBE data; -- output: (1,{(1,(Together Forever,132))},[Song:1#10.5]) DUMP data;  In this case, the data above will be transformed to the following JSON: { "update": "id:namespace:album::1", "create": true, "fields": { "songs{1}": { "assign": { "name": "Together Forever", "duration": 132 } }, "song_popularity": { "add": { "cells": [ { "address": { "song": 1 }, "value": 10.5 } ] } } } }  For the same data in an update operation, different parameters can be passed to VespaStorage to generate remove tensor and map field operations: REGISTER vespa-hadoop.jar -- Transform tabular data to a Vespa document operation JSON format DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage( 'operation=update', 'docid=id:namespace:people::<docId>', 'remove-map-fields=mapField', 'remove-tensor-fields=tensorField', 'exclude-fields-name,duration' );  In this case, the following JSON is generated: { "update": "id:namespace:song::1", "fields": { "songs{1}": { "remove": 0 }, "song_popularity": { "remove": { "addresses": [ {"song": "1"} ] } } } }  ### Feeding status VespaStorage does not check the schema before data feeding. If the schema does not match the document type, feeding fails. VespaStorage emits a set of counters at the end of a Pig job, and the values of these counters can differ than the ones reported by Pig. The reason for this is that the number of records stored as reported by Pig is the number of records sent to the feeder, while the feeder feeds and gathers statistics asynchronously for those documents. Example: ... Vespa Feed Counters Documents failed=0 Documents ok=100000 Documents sent=100000 Documents skipped=1 ... Output(s): Successfully stored 100001 records in: "<endpoint>" ...  Here, Pig reports that it successfully stored 100001 records, but VespaStorage skipped one operation, probably due to a formatting error and only 100000 operations was sent. The skipped counter is incremented if VespaStorage could not find a valid document id in the read record (e.g. due to JSON formatting issues). Any errors can be found in the job logs. ### Efficiency VespaStorage uses the vespa-feed-client, and as such feeds as efficiently as possible. As VespaStorage runs in a Pig job which is compiled down to a MapReduce or Tez job, a number of mappers or reducers will be used to cover the file if the input file is sufficiently large. As each mapper will have their own client feeding to the endpoint, each with multiple threads, make sure the cluster can take the load. Each client will throttle to not overload the endpoint, but the clients does not know how many total clients there are so one can easily over-saturate feeding endpoints. There are two primary factors controlling feed load: the number of mappers or reducers, and the number of concurrent connections per mapper/reducer. For large jobs with lots of data one might consider controlling the number of splits and thus number of mappers to make based on the input file size. This can be done by controlling combine-files by e.g adding pig -Dpig.maxCombinedSplitSize=53687091200 -f feed.pig  This is for Yarn and Hadoop v2, and will combine the input so that each map processes about 50G of data. This setting effectively reduces the number of Vespa client instances, this can also be inverted using a lower setting to get higher number of tasks. See the Pig and Yarn documentation for more information. To set the number of concurrent connections per mapper/reducer, set vespa.feed.connections. The default is 1 and rarely needs to be tuned upward. Another way to limit the number of parallel tasks feeding to Vespa for large jobs is to use PARALLEL directive of PIG data = LOAD '/some/data/*' USING PigStorage('\u0001') AS (guid:chararray, text:chararray, score:double); grouped_data = GROUP data by guid PARALLEL 100; STORE grouped_data INTO '$ENDPOINT' USING VespaStorage();


The VespaStorage Pig class builds upon an OutputFormat class that feeds to Vespa. OutputFormats are used in MapReduce jobs, for special needs one can use this class from a MapReduce job. Example:

import java.io.IOException;

public class VespaFeed {

public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text>& {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}

public static void main(String... args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "vespafeed");
job.setJarByClass(VespaFeed.class);
job.setMapperClass(FeedMapper.class);
job.setMapOutputValueClass(Text.class);

// Set output format class
job.setOutputFormatClass(VespaOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

hadoop jar my-jar.jar VespaFeed <hdfs-path>
-Dvespa.feed.endpoint=endpoint
-Dvespa.feed.defaultport=8080
-Dvespa.feed.proxy.host=proxyhost
-Dvespa.feed.proxy.port=proxyport


The example above passes each record straight through to the feeder as defined by the output format class. To feed, no reduce tasks are needed, for special requirements, such as the need to guarantee a certain order of the operations for each document, one can reduce on the document id and sort it out in the reducer.

## Configuration

Parameters are sent using -Dparam=value for Pig and MapReduce. For Oozie, specify them as parameters in the configuration section of the action.

parameterdefaultdescription
vespa.feed.endpoint

Feed endpoint(s)

vespa.feed.defaultport

Feed endpoint port (e.g 8080)

vespa.feed.proxy.host

HTTP proxy hostname

vespa.feed.proxy.port

HTTP proxy port

vespa.feed.dryrun false

Do not feed if set to true

vespa.feed.usecompression true

Compress the feed

vespa.feed.data.format json

The only supported format is json

vespa.feed.progress.interval 1000

Log sent/ok/failed/skipped documents every n records

vespa.feed.connections 1

Number of concurrent connections per mapper/reducer

vespa.feed.route default

Route to use on endpoint