Tuesday, September 20, 2016

Zeppelin and Spark: Merge Multiple CSVs into Parquet


The purpose of this article is to demonstrate how to load multiple CSV files on an HDFS filesystem into a single Dataframe and write to Parquet.

Two approaches are demonstrated.  The first approach is not recommended, but is shown for completeness.

First Approach

One approach might be to define each path:

import locale
locale.setlocale(locale.LC_ALL, 'en_US')

p1 = "/data/output/followers/mitshu/ec2-52-39-251-219.us-west-2.compute.amazonaws.com/0-ec2-52-39-251-219.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p2 = "/data/output/followers/mitshu/ec2-52-42-100-207.us-west-2.compute.amazonaws.com/0-ec2-52-42-100-207.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p3 = "/data/output/followers/mitshu/ec2-52-42-198-4.us-west-2.compute.amazonaws.com/0-ec2-52-42-198-4.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p4 = "/data/output/followers/mitshu/ec2-54-70-37-224.us-west-2.compute.amazonaws.com/0-ec2-54-70-37-224.us-west-2.compute.amazonaws.com/twitterFollowers.csv"

and then open each CSV at that path as an RDD and transform to a dataframe:

rdd_m1 = sc.textFile(p1)
print rdd_m1.take(5)

df_m1 = rdd_m1.\
    map(lambda x: x.split("\t")).\
    filter(lambda x: len(x) == 6). \
    map(lambda x: {
This would need to be repeated for each dataframe.

The dataframes could then be merged using the unionAll operator.
import pandas as pd

df = df_m1.unionAll(df_m2).unionAll(df_m3).unionAll(df_m4)

print "DF 1: {0}".format(df_m1.count())
print "DF 2: {0}".format(df_m2.count())
print "DF 3: {0}".format(df_m3.count())
print "DF 4: {0}".format(df_m4.count())
print "Merged Dataframe: {0}".format(df.count())

and finally written to parquet.


Easier Approach

Notice the convenient way of reading multiple CSV in nested directories into a single RDD:

rdd = sc.textFile(path)
print "count = {}".format(rdd.count())
This is clearly better than defining each path individually.

There are multiple ways to transform RDDs into Dataframes (DFs):

def to_json(r):
    j = {}
    t = r.split("\t")
    j['num_followers'] = t[0]
    j['followed_userid'] = t[1]
    j['followed_handle'] = t[2]
    j['follower_userid'] = t[3]
    j['follower_handle'] = t[4]
    return j
df = rdd.map(to_json).toDF()
print "count = {}".format(df.count())
This is not necessarily superior to the first approach; but it is an alternative to consider.


Load from Parquet

For subsequent analysis, load from Parquet using this code:

df = sqlContext.read.parquet("/data/output/followers/mitshu/joined.prq")



  1. [Blogger] Writing to Parquet


  1. nice steps you are covered in this topic. its much useful to me. keep update more things about search engine optimization issues and how to rectify it.
    PTE Coaching in Chennai

  2. Robotic Process Automation (RPA) is one of the most exciting developments in Business Process Management (BPM) in recent history. Some industry experts believe it may be even more transformational than cloud computing transformational than cloud Automationminds team. (RPA)Automationminds lets you program in (RPA),

  3. I think you did an awesome job explaining it. Sure beats having to research it on my own. Thanks
    Agra BCom Time Table 2020
    Allahabad BCom Time Table 2020
    Brij BCOM TimeTable 2020

  4. It was really informative. Your website is very useful. Thank you for sharing!

    RMLAU BA First Year Result

  5. Reach to the best software training institute in Chennai, Infycle Technologies, to enter the IT industry with well-defined skills. Infycle Technologies is the rapidly developing software training cum placement center in Chennai and is generally known for its significance in providing quality hands-on practical training with 200% guaranteed outcomes! Call 7502633633 to book a free demo and to avail the best offers.Best Software Training Institute in Chennai | Infycle Technologies
