Feed using Hadoop, Pig, Oozie
This document discusses methods to feed to Vespa from various grid resources.
Examples use a local file system. Prerequisite: Have some data on a Hadoop cluster, ready to feed to Vespa. This includes text or JSON files on HDFS, data in Hive or HBase table.
vespa-hadoop
contains classes and utilities to enable feeding directly to Vespa endpoints.
It is a minimal layer over the Vespa HTTP client. Dependency:
<dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-hadoop</artifactId> <version>7.25.26</version> <!-- Find latest version at search.maven.org/search?q=g:com.yahoo.vespa%20a:vespa-hadoop --> </dependency>
vespa-hadoop
depends on the Apache HTTP client library.
Hadoop also pulls in this library, possibly with a conflicting version.
If jobs fail due to NoSuchFieldErrors or similar,
try adding -Dmapreduce.job.user.classpath.first=true
.
Pig
Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. Pig is the standard method for ETL and grid scale analysis at Yahoo. Pig features methods to load data from many different types of sources, transform data, and store that data into various types of destinations.
Tip: try the blog recommendation tutorial for Pig usage.
The vespa-hadoop library provides the VespaStorage
class
which provides a simple way of storing data into a Vespa endpoint. Basic usage:
REGISTER path/to/vespa-hadoop.jar DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(<options>); A = LOAD '<path>' [USING <storage>] [AS <schema>]; -- apply any transformations STORE A INTO '$ENDPOINT' USING VespaStorage();
To run the above Pig script, specify endpoint. The endpoint is the Vespa endpoint (hostname) to feed to. Importantly, the endpoint should be defined as it is for the Vespa HTTP client, which means the endpoint is equal to the hostname, without schema ('http(s)://' and without port number. Port 4080 currently the default setting, to override the port use -Dvespa.feed.defaultport=8080.
VespaStorage
supports feeding to multiple clusters.
To specify multiple endpoints, separate them by commas. The VespaStorage will write the operations to all clusters and wait for a reply/ack from all, hence it's a blocking implementation. If
one cluster is down for some reason (e.g maintenance) also the remaining healthy clusters stops getting operations.
As an example, assume the above script is called feed.pig
.
Run it from a Hadoop node to two Vespa instances
(add vespa.feed.proxy.host/port as needed):
pig -x local -f feed.pig -p ENDPOINT=hostname-of-endpoint-1,hostname-of-endpoint-2 -Dvespa.feed.proxy.host=proxy-host -Dvespa.feed.proxy.port=proxy-port -Dvespa.feed.defaultport=8080
Feeding preconstructed operations
In the case where the document operations are already formatted using the Vespa JSON format, one can feed this directly to a Vespa endpoint:
REGISTER vespa-hadoop.jar DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); -- Load data - one column for json data data = LOAD 'feed-dump.data' AS (json:chararray); -- Store into Vespa STORE data INTO '$ENDPOINT' USING VespaStorage();The source, e.g 'feed-dump.data' needs to contain exactly one valid JSON document operation per line, e.g
{"put": "id:namespace:music::0","fields": {"title":"foo"}} {"put": "id:namespace:music::1","fields": {"title":"foo"}} {"update": "id:namespace:music::2","fields": { "title": { "assign":"bar"}}}To store multi-line formatted JSON operations use the VespaSimpleJsonLoader:
REGISTER vespa-hadoop.jar DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); DATA = LOAD 'operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray); -- Store into Vespa STORE DATA INTO '$ENDPOINT' USING VespaStorage();
Feeding structured data
Example schema:
schema music { document music { field album type string { indexing: index } field artist type string { indexing: summary | index } field duration type int { indexing: summary } field title type string { indexing: summary | index } field year type int { indexing: summary | attribute } } }Sample data, tab separated:
Bad Michael Jackson Bad 1987 247 Recovery Eminem So Bad 2010 240When loading data in Pig, specify a schema for that data unless it can be inferred from the data source. After selecting the fields to feed to Vespa, a document operation must be constructed. There are two ways this can be accomplished. Let the
VespaDocumentOperation
UDF construct the operation:
REGISTER vespa-hadoop.jar -- Create valid Vespa put operations DEFINE VespaPutOperation com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( 'operation=put', 'docid=id:namespace:music::<artist>-<year>' ); DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); -- Load data from any source - here we load using PigStorage data = LOAD '<hdfs-path>' AS (album:chararray, artist:chararray, title:chararray, year:int, duration:int); -- Transform tabular data to a Vespa document operation JSON format data = FOREACH data GENERATE VespaPutOperation(*); -- Store into Vespa STORE data INTO '$ENDPOINT' USING VespaStorage();Here, the
VespaDocumentOperation
is defined and configured to create the document operation.
The notable part is the docid=...
section,
which specifies a document id template for each document.
The fields inside the brackets will be replaced with the values in each record.
This is a simple search and replace operation,
define custom document IDs as needed.
PUT
is default, use (REMOVE
, UPDATE
) as well.
Additionally, all fields in the relation (here data
) will be
converted into a JSON form with best effort, and should be suitable for most cases.
It uses the Pig schema to determine type conversion, and the mapping between
Pig data types and JSON Vespa types are as follows:
Pig type | JSON-Vespa type |
int | number |
long | number |
float | number |
double | number |
chararray | string |
bytearray | base64 encoded string |
boolean | boolean |
datetime | long - milliseconds since epoch |
biginteger | number |
bigdecimal | number |
tuple | array |
bag | array of arrays |
map | JSON object |
{ "put": "id:namespace:music::Michael Jackson-1987", "fields": { "album": "Bad", "artist": "Michael Jackson", "duration": 247, "title": "Bad", "year": 1987 } }In case the
VespaDocumentOperation
does not fit the needs,
write a custom UDF (user defined function) in any of the supported
languages to transform the data into valid Vespa operations.
Refer to the Pig documentation
for more details on how to write UDFs.
As an alternative to the above approach, let the VespaStorage
class directly generate the document operations without using the above UDF:
REGISTER vespa-hadoop.jar -- Transform tabular data to a Vespa document operation JSON format DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage( 'create-document-operation=true', 'operation=put', 'docid=id:namespace:music::<artist>-<year>' ); -- Load data from any source - here we load using PigStorage data = LOAD '<source>' AS (<schema>); -- transform and select fields -- Store into Vespa STORE data INTO '$ENDPOINT' USING VespaStorage();Here the required information to create document operations is added to the configuration of the
VespaStorage
class.
Internally it uses the same code as the above approach, so results will be identical.
Parameters
Parameters for VespaStorage
and VespaDocumentOperation
are:
parameter | default | description |
---|---|---|
create-document-operation | false | true or false. Only valid for VespaStorage |
operation | put | Any valid Vespa document operation |
docid | A document id template. Replaces <field-name> with its value for every record | |
exclude-fields | A list of fields to exclude when creating document operations. The field is still available for the document id template. | |
condition | A test-and-set condition template. Replaces <field-name> with its value for every record | |
create-if-non-existent | false | If true, adds create=true for creating documents if they don't yet exist. Only valid with operation=update . |
update-map-fields | A list of map-type fields to update individual entries in the map when creating document operations. | |
remove-map-fields | A list of map-type fields to remove individual entries in the map when creating document operations. | |
update-tensor-fields | A list of tensor-type fields to add or overwrite individual tensor cells when creating document operations. | |
remove-tensor-fields | A list of tensor-type fields to remove existing tensor fields in a document when creating document operations. | |
create-tensor-fields | A list of fields to format as tensors. | |
bag-as-map-fields | A list of bag with tuple fields to format as a map. | |
simple-array-fields | A list of fields containing a bag of single element tuples to format as an array. | |
simple-object-fields | A list of fields containing a tuple to format as a general JSON object which can be used for weightedsets, structs or maps. |
The following is an example on using the above parameters to update map and tensor fields. Example schema:
search album { document album { struct song { field name type string {} field duration type int {} } field songs type map<int, song> { indexing: summary } field song_popularity type tensor<float>(song{}) { indexing: attribute | summary } } }Use update parameters on specific fields:
REGISTER vespa-hadoop.jar -- Transform tabular data to a Vespa document operation JSON format DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage( 'create-document-operation=true', 'operation=update', 'docid=id:namespace:song::<docId>', 'update-map-fields=songs', 'update-tensor-fields=song_popularity' ); -- Load data from any source - here we load using PigStorage data = LOAD '<source>' AS (<schema>); -- output : {docId: chararray,songs: {t: (key: int,value: (name: chararray,duration: int))},song_popularity: map[]} DESCRIBE data; -- output: (1,{(1,(Together Forever,132))},[Song:1#10.5]) DUMP data;In this case, the data above will be transformed to the following JSON:
{ "update": "id:namespace:album::1", "create": true, "fields": { "songs{1}": { "assign": { "name": "Together Forever", "duration": 132 } }, "song_popularity": { "add": { "cells": [ { "address": { "song": 1 }, "value": 10.5 } ] } } } }
For the same data in an update operation, different parameters can be passed to VespaStorage to generate remove tensor and map field operations:
REGISTER vespa-hadoop.jar -- Transform tabular data to a Vespa document operation JSON format DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage( 'operation=update', 'docid=id:namespace:people::<docId>', 'remove-map-fields=mapField', 'remove-tensor-fields=tensorField', 'exclude-fields-name,duration' );In this case, the following JSON is generated:
{ "update": "id:namespace:song::1", "fields": { "songs{1}": { "remove": 0 }, "song_popularity": { "remove": { "addresses": [ {"song": "1"} ] } } } }
Feeding status
VespaStorage
does not check the schema before data feeding.
If the schema does not match the document type, feeding fails.
VespaStorage
emits a set of counters at the end of a Pig job,
and the values of these counters can differ than the ones reported by Pig.
The reason for this is that the number of records stored as
reported by Pig is the number of records sent to the feeder,
while the feeder feeds and gathers statistics asynchronously for those documents. Example:
... Vespa Feed Counters Documents failed=0 Documents ok=100000 Documents sent=100000 Documents skipped=1 ... Output(s): Successfully stored 100001 records in: "<endpoint>" ...Here, Pig reports that it successfully stored 100001 records, but
VespaStorage
skipped one operation, probably due to a formatting error and only 100000 operations was sent.
The skipped counter is incremented if VespaStorage
could not find a valid document id in the read record (e.g due to JSON formatting issues).
Any errors can be found in the job logs.
Efficiency
VespaStorage
uses the
Vespa HTTP Client,
and as such feeds as efficiently as possible.
As VespaStorage
runs in a Pig job which is compiled down to a MapReduce or Tez job,
a number of mappers or reducers will be used to cover the file if the input file is sufficiently large.
As each mapper will have their own client feeding to the endpoint, each with multiple threads,
make sure the cluster can take the load.
Each client will throttle to not overload the endpoint, but the clients does not know how many total clients there are so one can easily over-saturate
feeding endpoints.
There are two primary factors controlling feed load: the number of mappers or reducers, and the number of concurrent connections per mapper/reducer. For large jobs with lots of data one might consider controlling the number of splits and thus number of mappers to make based on the input file size. This can be done by controlling combine-files by e.g adding
This is for Yarn and Hadoop v2, and will combine the input so that each map processes about 50G of data. This setting effectively reduces the number of Vespa client instances. See the Pig and Yarn documentation for more information.To set the number of concurrent connections per mapper/reducer, set vespa.feed.connections. The default is 1 and rarely needs to be tuned upward.
Another way to limit the number of parallel tasks feeding to Vespa for large jobs is to use PARALLEL directive of PIGdata = LOAD '/some/data/*' USING PigStorage('\u0001') AS (guid:chararray, text:chararray, score:double); grouped_data = GROUP data by guid PARALLEL 100; STORE grouped_data INTO '$ENDPOINT' USING VespaStorage();Hadoop MapReduce
The
VespaStorage
Pig class builds upon anOutputFormat
class that feeds to Vespa.OutputFormat
s are used in MapReduce jobs, for special needs one can use this class from a MapReduce job. Example:import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class VespaFeed { public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text>& { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String... args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "vespafeed"); job.setJarByClass(VespaFeed.class); job.setMapperClass(FeedMapper.class); job.setMapOutputValueClass(Text.class); // Set output format class job.setOutputFormatClass(VespaOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }Run on Hadoop:hadoop jar my-jar.jar VespaFeed <hdfs-path> -Dvespa.feed.endpoint=endpoint -Dvespa.feed.defaultport=8080 -Dvespa.feed.proxy.host=proxyhost -Dvespa.feed.proxy.port=proxyportThe example above passes the each record straight through to the feeder as defined by the output format class. To feed, no reduce tasks are needed, for special requirements, such as the need to guarantee a certain order of the operations for each document, one can reduce on the document id and sort it out in the reducer.Configuration
Parameters are sent using
-Dparam=value
for Pig and MapReduce. For Oozie, specify them as parameters in the configuration section of the action.
parameter | default | description |
---|---|---|
vespa.feed.endpoint | Feed endpoint(s) | |
vespa.feed.defaultport | Feed endpoint port (e.g 8080) | |
vespa.feed.proxy.host | HTTP proxy hostname | |
vespa.feed.proxy.port | HTTP proxy port | |
vespa.feed.dryrun | false | Do not feed if set to true |
vespa.feed.usecompression | true | Compress the feed |
vespa.feed.data.format | json | The only supported format is json |
vespa.feed.progress.interval | 1000 | Log sent/ok/failed/skipped documents every n records |
vespa.feed.connections | 1 | Number of concurrent connections per mapper/reducer: setNumPersistentConnectionsPerEndpoint |
vespa.feed.route | default | Route to use on endpoint |