Tuesday, September 20, 2016

Zeppelin and Spark: Merge Multiple CSVs into Parquet

Introduction

The purpose of this article is to demonstrate how to load multiple CSV files on an HDFS filesystem into a single Dataframe and write to Parquet.

Two approaches are demonstrated.  The first approach is not recommended, but is shown for completeness.


First Approach

One approach might be to define each path:
%pyspark

import locale
locale.setlocale(locale.LC_ALL, 'en_US')

p1 = "/data/output/followers/mitshu/ec2-52-39-251-219.us-west-2.compute.amazonaws.com/0-ec2-52-39-251-219.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p2 = "/data/output/followers/mitshu/ec2-52-42-100-207.us-west-2.compute.amazonaws.com/0-ec2-52-42-100-207.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p3 = "/data/output/followers/mitshu/ec2-52-42-198-4.us-west-2.compute.amazonaws.com/0-ec2-52-42-198-4.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p4 = "/data/output/followers/mitshu/ec2-54-70-37-224.us-west-2.compute.amazonaws.com/0-ec2-54-70-37-224.us-west-2.compute.amazonaws.com/twitterFollowers.csv"

and then open each CSV at that path as an RDD and transform to a dataframe:
%pyspark

rdd_m1 = sc.textFile(p1)
print rdd_m1.take(5)

df_m1 = rdd_m1.\
    map(lambda x: x.split("\t")).\
    filter(lambda x: len(x) == 6). \
    map(lambda x: {
        'id':x[0],
        'profile_id':x[1],
        'profile_name':x[2],
        'follower_id':x[3],
        'follwer_name':x[4],
        'unknown':x[5]})\
    .toDF()
df_m1.limit(5).show()
df_m1.registerTempTable("df_m1")
This would need to be repeated for each dataframe.

The dataframes could then be merged using the unionAll operator.
%pyspark
import pandas as pd

df = df_m1.unionAll(df_m2).unionAll(df_m3).unionAll(df_m4)

print "DF 1: {0}".format(df_m1.count())
print "DF 2: {0}".format(df_m2.count())
print "DF 3: {0}".format(df_m3.count())
print "DF 4: {0}".format(df_m4.count())
print "Merged Dataframe: {0}".format(df.count())


and finally written to parquet.
%pyspark

df.write.parquet("/data/output/followers/mitshu/joined.prq")


Easier Approach

Notice the convenient way of reading multiple CSV in nested directories into a single RDD:
%pyspark

path="/data/output/followers/mitshu/*/*/*.csv"
rdd = sc.textFile(path)
print "count = {}".format(rdd.count())
This is clearly better than defining each path individually.


There are multiple ways to transform RDDs into Dataframes (DFs):
%pyspark

def to_json(r):
    j = {}
    t = r.split("\t")
    j['num_followers'] = t[0]
    j['followed_userid'] = t[1]
    j['followed_handle'] = t[2]
    j['follower_userid'] = t[3]
    j['follower_handle'] = t[4]
    return j
    
df = rdd.map(to_json).toDF()
print "count = {}".format(df.count())
df.show()
This is not necessarily superior to the first approach; but it is an alternative to consider.

 

Load from Parquet

For subsequent analysis, load from Parquet using this code:
%pyspark

df = sqlContext.read.parquet("/data/output/followers/mitshu/joined.prq")
df.limit(5).show()

 

References

  1. [Blogger] Writing to Parquet

13 comments:

  1. nice steps you are covered in this topic. its much useful to me. keep update more things about search engine optimization issues and how to rectify it.
    PTE Coaching in Chennai

    ReplyDelete
  2. Robotic Process Automation (RPA) is one of the most exciting developments in Business Process Management (BPM) in recent history. Some industry experts believe it may be even more transformational than cloud computing transformational than cloud Automationminds team. (RPA)Automationminds lets you program in (RPA),

    ReplyDelete
  3. I think you did an awesome job explaining it. Sure beats having to research it on my own. Thanks
    Agra BCom Time Table 2020
    Allahabad BCom Time Table 2020
    Brij BCOM TimeTable 2020

    ReplyDelete
  4. It was really informative. Your website is very useful. Thank you for sharing!

    RMLAU BA First Year Result

    ReplyDelete
  5. Reach to the best software training institute in Chennai, Infycle Technologies, to enter the IT industry with well-defined skills. Infycle Technologies is the rapidly developing software training cum placement center in Chennai and is generally known for its significance in providing quality hands-on practical training with 200% guaranteed outcomes! Call 7502633633 to book a free demo and to avail the best offers.Best Software Training Institute in Chennai | Infycle Technologies

    ReplyDelete
  6. The best way to get the assignment done is to seek the help of professional Java Assignment Help experts. We have emerged as the student friendly assignment experts , we have 24/7 Online assignment support at Get Assignment Help Online.
    So do not hesitate to ask anything.

    ReplyDelete