Monday, May 16, 2016

PySpark: Operations Overview

Reslient Distributed Dataset:  A dataset is distributed across the cluster nodes.  No single node has all the data.  The data is recoverable when a single node fails. 

an RDD is distributed across the worker nodes


Collect
<RDD>.collect() pulls back all the data from an RDD into the driver program.  You don't necessarily want to re-assemble a large dataset, distributed onto multiple nodes onto a single driver node over the network.  The function call will probably hang for a while and then die.

If you are doing exploratory data analysis, collect() can be useful when run in conjunction with the sample() function to decrease the size of the data.
rdd.sample(withReplacement=False, fraction=0.01, seed=1).collect()
Because I provided a seed, the random sample will be consistent.  Each time I call this function, I will see the same thing.

Count
The most basic form of count() tells you how many items are present in your RDD.
rdd.count()
it is very useful, and very simple.  There are no optional parameters.

There are a couple of interesting variations:
rdd.countApprox(timeout=200, confidence=0.5)
This function returns an approximate count.  If your dataset is extremely large, and you don't care exactly how many items you have, this is a great function to use. 

4 Paragraphs from a Zeppelin notebook showing
count() and countApprox() in action


Counting distinct elements in a large dataset can be a daunting task.
rdd.countApproxDistinct(relativeSD=0.05)
This function makes it easy by returning an approximate count of the number of distinct elements in the RDD.  Under the covers, the function uses the HyperLogLog algorithm.  This algorithm hashes data into buckets, then looks at smallest data in bucket, then estimates how many values are likely in bucket, then sees has an element of that size, then uses a harmonic mean of all its estimates.

By making the SD parameter smaller, the count will be more accurate and the function runtime will be slower.

First
Sometimes you just need one item in the RDD to play with.  You can use first() to pull the first item from the rdd.
rdd.first()
This function is simple.  There are no optional parameters, and it does respect any sorting that you've done.  You can't use first() on an empty RDD.

Limit a DataFrame (df) to 1 result
or use first() on the underlying RDD

Take
The take() function is a general purpose "return data to the driver" command.
rdd.take(1)
is almost the same as
rdd.first()
although take() returns a list instead of an element

take(n) vs first()



If I take as many rows as are in my collection, that's the same as using collection
rdd.take(rdd.count())
is functionally equivalent to
rdd.collect()
As a rule:
  • if you want one item, use first()
  • if you want all the items, use collect()
  • if you want 0 < x < MAX, use take(n)
Both first() and collect() are optimized for their own particular functions and are better than take(), under these specialized circumstances.  The take() function looks at one partition, and then estimates how many partitions will be needed to provide the number of elements you requested.  The function then returns all the elements you have requested to the driver.

Take Sample

This function allows you to pull a random sample of elements from your RDD into your driver program
rdd.takeSample(withReplacement=False, num=1, seed=1)
When you sample, you need to provide the target sample and if you want replacement while sampling.

Sampling without replacement means you can only select a given element once.  If the RDD has 100 elements, and you want a sample with 10,000 elements, and you set withReplacement=False the sample will only have 100 elements.  Each element can only be selected once.

Sampling with replacement means each element in the input has an equal chance of being picked for each element in the output.  Using the example above, an RDD with 100 elements could be used to generate a random sample with 10,000 elements.  Each input element would be repeated multiple times in the output.

Another way to look at this is that if you had an input RDD like this:
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
and you selected
rdd.takeSample(withReplacement=True, num=rdd.count(), seed=1)
in theory, you could end up with an output (the random sample) that looked like this:
    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
this is very unlikely, but it could happen.

Use of takeSample() with the optional seed parameter

You can also provide a random seed when sampling.  Using the parameter enables repeatable research as it enables others to replicate your findings.  Use of this parameter guarantees the function will always make the same random sample selections.

Take Ordered

This function is similar to sorting the entire RDD, then performing a take() function.

Without this function, given a large RDD and a desire for an ordered sample, you would have to reduce over each partition keeping the first n elements by sort.  Then the elements would need to be combined in a reduce, then a final combine performed in the driver.  At each point, would only keep n items.  This is all handled behind the scenes.

This function is only fast if n is small.  Don't use this function when n ~= rdd.count().  As the value of n approaches the size of your rdd, it's best to just sort the rdd then use take() or takeSample()

rdd.takeOrdered(10)
 
the "userid" column is sorted in ascending order



References

  1. [StackOverflow] Take Ordered
    1. Good overview on a variety of takeOrdered(n, key) variations.

No comments:

Post a Comment