Feed using Hadoop, Pig, Oozie

This document discusses methods to feed to Vespa from various grid resources.

Examples use a local file system. Prerequisite: Have some data on a Hadoop cluster, ready to feed to Vespa. This includes text or JSON files on HDFS, data in Hive or HBase table.

vespa-hadoop contains classes and utilities to enable feeding directly to Vespa endpoints. It is a minimal layer over the Vespa HTTP client. Dependency:

<dependency>
  <groupId>com.yahoo.vespa</groupId>
  <artifactId>vespa-hadoop</artifactId>
  <version>...</version>
</dependency>

vespa-hadoop depends on the Apache HTTP client library. Hadoop also pulls in this library, possibly with a conflicting version. If jobs fail due to NoSuchFieldErrors or similar, try adding -Dmapreduce.job.user.classpath.first=true.

Pig

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. Pig is the standard method for ETL and grid scale analysis at Yahoo. Pig features methods to load data from many different types of sources, transform data, and store that data into various types of destinations.

Tip: try the blog recommendation tutorial for Pig usage.

The vespa-hadoop library provides the VespaStorage class which provides a simple way of storing data into a Vespa endpoint. Basic usage:

REGISTER path/to/vespa-hadoop.jar

DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(<options>);

A = LOAD '<path>' [USING <storage>] [AS <schema>];

-- apply any transformations

STORE A INTO '$ENDPOINT' USING VespaStorage();
To run the above Pig script, specify endpoint. The endpoint is whatever Vespa endpoint to feed to. VespaStorage supports feeding to multiple endpoints. To specify multiple endpoints, separate them by commas. Importantly, the endpoint should be defined as it is for the Vespa HTTP client, which means the endpoint host name, without 'http://' and port number. Port 8080 is assumed. As an example, assume the above script is called feed.pig. Run it from a Hadoop node to two Vespa instances (add vespa.feed.proxy.host/port as needed):
pig -x local -f feed.pig
    -p ENDPOINT=endpoint-1,endpoint-2
    -Dvespa.feed.proxy.host=proxy-host
    -Dvespa.feed.proxy.port=proxy-port

Feeding preconstructed operations

In the case where the data already has the required JSON format, feed this directly to a Vespa endpoint:

REGISTER vespa-hadoop.jar

DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();

-- Load data - one column for json data
data = LOAD '<source>' AS (json:chararray);

-- Store into Vespa
STORE data INTO '$ENDPOINT' USING VespaStorage();

Feeding structured data

In this example, use the search definition in basic-search. Sample data, tab separated:

Bad     Michael Jackson Bad     1987    247
Recovery        Eminem  So Bad  2010    240
When loading data in Pig, specify a schema for that data unless it can be inferred from the data source. After selecting the fields to feed to Vespa, a document operation must be constructed. There are two ways this can be accomplished. Let the VespaDocumentOperation UDF construct the operation:
REGISTER vespa-hadoop.jar

-- Create valid Vespa put operations
DEFINE VespaPutOperation
       com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
            'operation=put',
            'docid=id:music:music::<artist>-<year>'
       );

DEFINE VespaStorage
       com.yahoo.vespa.hadoop.pig.VespaStorage();

-- Load data from any source - here we load using PigStorage
data = LOAD '<hdfs-path>' AS (album:chararray, artist:chararray, title:chararray, year:int, duration:int);

-- Transform tabular data to a Vespa document operation JSON format
data = FOREACH data GENERATE VespaPutOperation(*);

-- Store into Vespa
STORE data INTO '$ENDPOINT' USING VespaStorage();
Here, the VespaDocumentOperation is defined and configured to create the document operation. The notable part is the docid=... section, which specifies a document id template for each document. The fields inside the brackets will be replaced with the values in each record. This is a simple search and replace operation, define custom document IDs as needed. PUT is default, use (REMOVE, UPDATE) as well.

Additionally, all fields in the relation (here data) will be converted into a JSON form with best effort, and should be suitable for most cases. It uses the Pig schema to determine type conversion, and the mapping between Pig data types and JSON Vespa types are as follows:

Pig typeJSON-Vespa type
intnumber
longnumber
floatnumber
doublenumber
chararraystring
bytearraybase64 encoded string
booleanboolean
datetimelong - milliseconds since epoch
bigintegernumber
bigdecimalnumber
tuplearray
bagarray of arrays
mapJSON object
In this case, the first row in data above will be transformed to the following JSON:
{
    "put": "id:music:music::Michael Jackson-1987"
    "fields": {
        "album": "Bad",
        "artist": "Michael Jackson",
        "duration": 247,
        "title": "Bad",
        "year": 1987
    }
}
In case the VespaDocumentOperation does not fit the needs, write a custom UDF (user defined function) in any of the supported languages to transform the data into valid Vespa operations. Refer to the Pig documentation for more details on how to write UDFs.

As an alternative to the above approach, let the VespaStorage class directly generate the document operations without using the above UDF:

REGISTER vespa-hadoop.jar

-- Transform tabular data to a Vespa document operation JSON format
DEFINE VespaStorage
       com.yahoo.vespa.hadoop.pig.VespaStorage(
            'create-document-operation=true',
            'operation=put',
            'docid=id:music:music::<artist>-<year>'
       );

-- Load data from any source - here we load using PigStorage
data = LOAD '<source>' AS (<schema>);

-- transform and select fields

-- Store into Vespa
STORE data INTO '$ENDPOINT' USING VespaStorage();
Here the required information to create document operations is added to the configuration of the VespaStorage class. Internally it uses the same code as the above approach, so results will be identical. Parameters for VespaStorage and VespaDocumentOperation are:
parameterdefaultdescription
create-document-operation false true or false. Only valid for VespaStorage
operation put Any valid Vespa document operation
docid A document id template. Replaces <field-name> with it's value for every record
exclude-fields A list of fields to exclude when creating document operation. The field is still available for document id template

Feeding status

VespaStorage does not check the schema before data feeding. If the schema does not match the document type, feeding fails.

VespaStorage emits a set of counters at the end of a Pig job, and the values of these counters can differ than the ones reported by Pig. The reason for this is that the number of records stored as reported by Pig is the number of records sent to the feeder, while the feeder feeds and gathers statistics asynchronously for those documents. Example:

...
	Vespa Feed Counters
		Documents failed=0
		Documents ok=100000
		Documents sent=100000
		Documents skipped=1

...
Output(s):
Successfully stored 100001 records in: "<endpoint>"
...
Here, Pig reports that is Successfully stored 100001 records, but VespaStorage skipped one probably due to a formatting error. Any errors can be found in the job logs.

Efficiency

VespaStorage uses the Vespa HTTP Client, and as such feeds as efficiently as possible. As VespaStorage runs in a Pig job which is compiled down to a MapReduce or Tez job, a number of mappers or reducers will be used to cover the file if the input file is sufficiently large. As each mapper will have their own client feeding to the endpoint, each with multiple threads, make sure the cluster can take the load. Each client will throttle to not overload the endpoint. Using a high number of effective thread (tasks times threads per mapper), can easily saturate feeding endpoints.

There are two primary factors controlling feed load: the number of mappers or reducers, and the number of concurrent connections per mapper/reducer. Control the number of splits and thus number of mappers to make based on the input file size:

SET mapreduce.input.fileinputformat.split.minsize 128*1024*1024
This is for Yarn and Hadoop v2, and will split the file after approximately 128Mb. See the Pig and Yarn documentation for more information.

To set the number of concurrent connections per mapper/reducer, set vespa.feed.connections.

One way to limit the number of parallel tasks feeding to Vespa for large job is to use PARALLEL directive of PIG
data = LOAD '/projects/comms_psearch/psearch_suggest/hiveDB/ext_us_users_content_score/*' USING PigStorage('\u0001') AS (guid:chararray, query:chararray, to:double);
grouped_data = GROUP data by guid PARALLEL 100;
STORE grouped_data INTO '$ENDPOINT' USING VespaStorage();
Another alternative is to limit number of tasks by
SET mapreduce.jobtracker.maxtasks.perjob 100

Hadoop MapReduce

The VespaStorage Pig class builds upon an OutputFormat class that feeds to Vespa. OutputFormats are used in MapReduce jobs, for special needs one can use this class from a MapReduce job. Example:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class VespaFeed {

    public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text>& {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static void main(String... args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "vespafeed");
        job.setJarByClass(VespaFeed.class);
        job.setMapperClass(FeedMapper.class);
        job.setMapOutputValueClass(Text.class);

        // Set output format class
        job.setOutputFormatClass(VespaOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Run on Hadoop:
hadoop jar my-jar.jar VespaFeed <hdfs-path>
    -Dvespa.feed.endpoint=endpoint
    -Dvespa.feed.proxy.host=proxyhost
    -Dvespa.feed.proxy.port=proxyport
The example above passes the each record straight through to the feeder as defined by the output format class. To feed, no reduce tasks are needed, for special requirements, such as the need to guarantee a certain order of the operations for each document, one can reduce on the document id and sort it out in the reducer.

Configuration

Parameters are sent using -Dparam=value for Pig and MapReduce. For Oozie, specify them as parameters in the configuration section of the action.

parameterdefaultdescription
vespa.feed.endpoint Feed endpoint(s)
vespa.feed.proxy.host HTTP proxy hostname
vespa.feed.proxy.port HTTP proxy port
vespa.feed.dryrun false Do not feed if set to true
vespa.feed.usecompression true Compress the feed
vespa.feed.data.format json The only supported format is json
vespa.feed.progress.interval 1000 Log sent/ok/failed/skipped documents every n records
vespa.feed.connections 1 Number of concurrent connections per mapper/reducer: setNumPersistentConnectionsPerEndpoint
vespa.feed.route default Route to use on endpoint