register datasketches-memory-2.0.0.jar;
register datasketches-java-3.1.0.jar;
register datasketches-pig-1.1.0.jar;
-- very small sketch just for the purpose of this tiny example
DEFINE ReservoirSampling org.apache.datasketches.pig.sampling.ReservoirSampling('4');
DEFINE ReservoirUnion org.apache.datasketches.pig.sampling.ReesrvoirUnion('4');
raw_data = LOAD 'data.txt' USING PigStorage('\t') AS
(scale: double, label: chararray);
-- make a few independent sketches from the input data
sketches = FOREACH
(GROUP raw_data ALL)
GENERATE
DataToSketch(raw_data) AS sketch0,
DataToSketch(raw_data) AS sketch1,
DataToSketch(raw_data) AS sketch2
;
sketchBag = FOREACH
sketches
GENERATE
TOBAG(sketch0,
sketch1,
sketch2))
;
result = FOREACH
sketchBag
GENERATE
FLATTEN(ReservoirUnion(*)) AS (n, k, samples:{(scale, label)})
;
DUMP result;
DESCRIBE result;
The test data has 2 fields: scale and label. The first step of the query creates several reservoir samples from the input data. We merge the sketches into a bag in the next step, and then union the independent sketches and dump the results.
Results:
From ‘DUMP result’:
(24,4,{(30.0,h),(7.0,g),(6.0,f),(5.0,e)})
Running this script many, we will see each element appear with equal probability.
From ‘DESCRIBE result’:
result: {n: long,k: int,samples: {(scale: double,label: chararray)}}
1.0 a
2.0 b
3.0 c
4.0 d
5.0 e
6.0 f
7.0 g
30.0 h