Cutting down bag to pass to udf

Using Pig on a Hadoop cluster, I have a huge bag of huge tuples which I regularly add fields to as I continue to work on this project, and several UDFs which use various fields from it. I want to be able to call a UDF on just a few fields from each tuple and <strong>reconnect the result to that particular tuple</strong>. Doing a join to reconnect the records using unique ids takes forever on billions of records.

I think there should be a way to do this all inside the GENERATE statement, but I can't find the right syntax.

Here is some toy code using a Python UDF to get the idea across.

Register 'jumper.py' using jython as myfuncs; jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; sigmas = FOREACH byJumper GENERATE jumps.jumper, jumps.attempt, jumps.distance, jumps.location, myfuncs.conv2sigma(jumps.distance); rmf sigmas STORE sigmas INTO 'sigmas' USING PigStorage(',');

This is producing bags of tuples with single fields in each tuple, rather than tuples of the form I expect.

The input data is

    <li>people (with unique integer IDs),</li> <li>their long jump attempts (with unique-to-that-person integer IDs),</li> <li>the distance they jumped,</li> <li>the location they were jumping at the time.</li> </ul>

    For each jump we want to generate how many standard deviations (sigmas) the jumper was from their average, then later we'll correlate sigmas by location to see where jumpers do the best. We need to calculate the average and standard deviation for each person then a 'sigma' for each jump, and store the data with the new sigma field attached.

    The question is:

    How do we change this to output tuples like (jumper:int, attempt:int, distance:double, location:chararray, sigma:double)?

    I have tried FLATTEN in various ways and it only gets me enormous cross-products. I can change my UDF to take in jumper and attempt and output a triple then do a JOIN, but in the real world this solution is enormously impractical because of the size of the data sets.

    Here's the supporting code and data if you want to try it at home:

    jumper.py: (a quick, not thoughtful, implementation -- the only important thing here is that it takes a bag input and produces a bag output with one output tuple corresponding to each input tuple)

    #!/usr/local/bin/python # we're forced to use python 2.5.2 :-( from math import sqrt @outputSchema("y:bag{t:tuple(sigma:double)}") def conv2sigma(bag): s = 0.0 n = 0 dd = [] print('conv2sigma input bag:') print(bag) for word in bag: d = float(word[0]) dd.append(d) n += 1 s += d a = s / n ss = 0 for d in dd: ss += (d-a)**2 sd = sqrt(ss) outputBag = [] for d in dd: outputBag.append( ( (d-a)/sd, ) ) print('conv2sigma output bag:') print(outputBag) return outputBag

    The input file jumps.csv:

    0,0,5,a 0,1,6,b 0,2,7,c 0,3,5,a 0,4,8,c 0,5,7,b 0,6,6,b 0,7,7,c 0,8,5,a 1,0,6,a 1,1,5,a 1,2,7,b 1,3,4,a 1,4,5,a 1,5,7,b 1,6,8,c 1,7,9,c 1,8,5,a 1,9,4,a 1,10,5,a 1,11,6,b 1,12,8,c 1,13,8,b 2,0,7,b 2,1,5,a 2,2,6,b 2,3,5,a 2,4,7,c 2,5,5,a 2,6,6,c 2,7,5,a 2,8,7,b 2,9,5,a 2,10,6,b

    The output produced as written now:

    {(0),(0),(0),(0),(0),(0),(0),(0),(0)},{(1),(2),(3),(4),(5),(6),(7),(8),(0)},{(6.0),(7.0),(5.0),(8.0),(7.0),(6.0),(7.0),(5.0),(5.0)},{(b),(c),(a),(c),(b),(b),(c),(a),(a)},{(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(0.5751081237516715),(0.25160980414135625),(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(-0.39538683507927425)} {(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1)},{(8),(0),(1),(2),(3),(4),(5),(6),(7),(9),(10),(11),(12),(13)},{(5.0),(6.0),(5.0),(7.0),(4.0),(5.0),(7.0),(8.0),(9.0),(4.0),(5.0),(6.0),(8.0),(8.0)},{(a),(a),(a),(b),(a),(a),(b),(c),(c),(a),(a),(b),(c),(b)},{(-0.20716308289978433),(-0.03655819109996196),(-0.20716308289978433),(0.1340467006998604),(-0.3777679746996067),(-0.20716308289978433),(0.1340467006998604),(0.30465159249968277),(0.4752564842995052),(-0.3777679746996067),(-0.20716308289978433),(-0.03655819109996196),(0.30465159249968277),(0.30465159249968277)} {(2),(2),(2),(2),(2),(2),(2),(2),(2),(2),(2)},{(0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10)},{(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0)},{(b),(a),(b),(a),(c),(a),(c),(a),(b),(a),(b)},{(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684)}

    Each output tuple is a collection of bags, and each bag contains tuples with single entries from one field, which is not what we want.


    You will need to do this in two steps. Each jump has its own sigma value, so in order to properly associate each sigma with the correct jump, you will either need to pass the IDs to the sigma-computing UDF and then join the results back in (bad idea), or compute the summary statistics first (mean and standard deviation) and then later derive the sigma from that. Here's how:

    jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; jumperSummaries = FOREACH byJumper GENERATE group AS jumper, FLATTEN(jumps.(attempt, distance, location)), myfuncs.mean(jumps.distance) AS mean, myfunds.stddev(jumps.distance) AS stddev; sigmas = FOREACH jumperSummaries GENERATE jumper, attempt, distance, location, myfuncs.sigma(distance, mean, stddev) AS sigma;

    The FLATTEN ungroups all the jumps and gives you back your original input, except now every record also has copied the mean and standard deviation for that jumper, which you can then use to compute the sigma for each jump row-by-row.

    Note that while this is conceptually two steps, it still only takes one map-reduce job.


    For comparison with WinnieNicklaus' answer, and to draw comments, here is the solution I came up with:

    Register 'jumper.py' using jython as myfuncs; jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; sigmas0 = FOREACH byJumper GENERATE FLATTEN(jumps), FLATTEN(myfuncs.conv2sigma(jumps.(jumper,attempt,distance))); sigmas1 = FILTER sigmas0 BY jumper == s_id AND attempt == s_att; sigmas = FOREACH sigmas1 GENERATE jumper, attempt, distance, location, sigma; rmf sigmas STORE sigmas INTO 'sigmas' USING PigStorage(',');

    The first FOREACH creates a (potentially large) cross product sigma0, filters out the "incorrect" elements of the product and generates the desired fields. JOIN is often academically described this way.

    This seems like it may be slow.

    But it still results in a single Map-Reduce job, and seems to be fast in practice.

    The huge win for me is that it allows my UDF to do arbitrarily complicated things and return arbitrarily many tuples which are rejoined to the input data.


  • Download file on page load not working
  • Can we add more than one special price in magento using price type attribute?
  • Microsoft Graph API Shared Calendar IDs inconsistent between users
  • Cocos2d Jump Animation & Moving
  • C# - Index was out of range
  • Load page directly to anchor tag
  • How do I implement a picture instead of my red rectangle?
  • How to scale a large scale data in scikit-learn?
  • How to plot normal distribution with percentage of data as label in each band/bin?
  • Proof in COQ that equality is reflexivity
  • Regarding RandomTree in Weka
  • ggplot dodged barplot
  • Birt script behaves differently via web viewer
  • looking for a slight variant of GROUP BY
  • complex annotate on django query set
  • Oracle SQL: Receiving 'no matching unique or primary key' error and don't know why
  • Why does the first run of “XCTestCase -measureBlock:” takes so much time?
  • Taking mean across rows grouped by a variable in numpy
  • Neo4j: Filter nodes based on aggregate function
  • Most efficient way to replace lowest list values in dataframe in R
  • SetWindowsHookEx does not react on media keys
  • How to use carriage return with multiple line?
  • Control modification in presentation layer
  • How do I change content of ComboFieldEditor?
  • Retrieving value from sql ExecuteScalar()
  • How to add a column to a Pandas dataframe made of arrays of the n-preceding values of another column
  • Does CUDA 5 support STL or THRUST inside the device code?
  • Weird JavaScript statement, what does it mean?
  • Adding custom controls to a full screen movie
  • Do I've to free mysql result after storing it?
  • Confusion with PayPal's monthly billing cycle
  • SQL merge duplicate rows and join values that are different
  • KeystoneJS: Relationships in Admin UI not updating
  • Hits per day in Google Big Query
  • Why joiner is not used after Sequence generator or Update statergy
  • Can Visual Studio XAML designer handle font family names with spaces as a resource?
  • How can I remove ASP.NET Designer.cs files?
  • Are Kotlin's Float, Int etc optimised to built-in types in the JVM? [duplicate]
  • Checking variable from a different class in C#
  • Android Heatmap on canvas or ImageView