Introduction
The purpose of this article is to demonstrate how to load multiple CSV files on an HDFS filesystem into a single Dataframe and write to Parquet.Two approaches are demonstrated. The first approach is not recommended, but is shown for completeness.
First Approach
One approach might be to define each path:%pyspark import locale locale.setlocale(locale.LC_ALL, 'en_US') p1 = "/data/output/followers/mitshu/ec2-52-39-251-219.us-west-2.compute.amazonaws.com/0-ec2-52-39-251-219.us-west-2.compute.amazonaws.com/twitterFollowers.csv" p2 = "/data/output/followers/mitshu/ec2-52-42-100-207.us-west-2.compute.amazonaws.com/0-ec2-52-42-100-207.us-west-2.compute.amazonaws.com/twitterFollowers.csv" p3 = "/data/output/followers/mitshu/ec2-52-42-198-4.us-west-2.compute.amazonaws.com/0-ec2-52-42-198-4.us-west-2.compute.amazonaws.com/twitterFollowers.csv" p4 = "/data/output/followers/mitshu/ec2-54-70-37-224.us-west-2.compute.amazonaws.com/0-ec2-54-70-37-224.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
and then open each CSV at that path as an RDD and transform to a dataframe:
%pyspark rdd_m1 = sc.textFile(p1) print rdd_m1.take(5) df_m1 = rdd_m1.\ map(lambda x: x.split("\t")).\ filter(lambda x: len(x) == 6). \ map(lambda x: { 'id':x[0], 'profile_id':x[1], 'profile_name':x[2], 'follower_id':x[3], 'follwer_name':x[4], 'unknown':x[5]})\ .toDF() df_m1.limit(5).show() df_m1.registerTempTable("df_m1")
The dataframes could then be merged using the unionAll operator.
%pyspark import pandas as pd df = df_m1.unionAll(df_m2).unionAll(df_m3).unionAll(df_m4) print "DF 1: {0}".format(df_m1.count()) print "DF 2: {0}".format(df_m2.count()) print "DF 3: {0}".format(df_m3.count()) print "DF 4: {0}".format(df_m4.count()) print "Merged Dataframe: {0}".format(df.count())
and finally written to parquet.
%pyspark df.write.parquet("/data/output/followers/mitshu/joined.prq")
Easier Approach
Notice the convenient way of reading multiple CSV in nested directories into a single RDD:%pyspark path="/data/output/followers/mitshu/*/*/*.csv" rdd = sc.textFile(path) print "count = {}".format(rdd.count())
There are multiple ways to transform RDDs into Dataframes (DFs):
%pyspark def to_json(r): j = {} t = r.split("\t") j['num_followers'] = t[0] j['followed_userid'] = t[1] j['followed_handle'] = t[2] j['follower_userid'] = t[3] j['follower_handle'] = t[4] return j df = rdd.map(to_json).toDF() print "count = {}".format(df.count()) df.show()
Load from Parquet
For subsequent analysis, load from Parquet using this code:%pyspark df = sqlContext.read.parquet("/data/output/followers/mitshu/joined.prq") df.limit(5).show()
References
- [Blogger] Writing to Parquet