Tuesday, July 12, 2016

Zeppelin and Spark: Transforming a CSV to Parquet

Transform a CSV file to Parquet Format

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem.  Parquet is built to support very efficient compression and encoding schemes.  Twitter is starting to convert some of its major data source to Parquet in order to take advantage of the compression and deserialization savings.

CSV to RDD

Load the CSV file into an RDD
%pyspark

rdd = sc.textFile("/path/to/input/myfile.csv")
print rdd.take(5)

and the output looks like this:
[u'19999174,Logicknot,Cameron MacKinnon,0,Male, , , ', u'433249647,LogieTatjana,Tatjana Logie \u2653,0,Female, , , ', u'1346426538,Logistic_soares,Luciana Soares,0,Female, , , ', u'18355981,Loh69,Laurent Grad,0,Male, , , ', u'2976559335,LohMarta,Marta Loh,0,Female, , , ']


RDD to DF

Now we need to convert the RDD into a Dataframe (DF):
%pyspark

df = rdd.\
    map(lambda x: x.split(",")).\
    filter(lambda x: len(x) == 8). \
    map(lambda x: {
        'userid':x[0],
        'twitterhandle':x[1],
        'full_name':x[2],
        'country':x[3],
        'gender':x[4],
        'age':x[5],
        'age_min':x[6],
        'age_max':x[7]})\
    .toDF()
df.limit(5).show()

The bad news is that this code might look complicated. The good news is that the code is a lot simpler than it appears.

The first map condition splits each record on the delimiter (a comma).  Now we're dealing with a list of 8 tokens per record.  The second condition (the filter) will reject any line that does not have 8 tokens.  The third, and final, map condition will take each token in the list and create a heading for it.

The output looks like this:
+---+-------+-------+-------+-----------------+------+---------------+----------+
|age|age_max|age_min|country|        full_name|gender|  twitterhandle|    userid|
+---+-------+-------+-------+-----------------+------+---------------+----------+
|   |       |       |      0|Cameron MacKinnon|  Male|      Logicknot|  19999174|
|   |       |       |      0|  Tatjana Logie ♓|Female|   LogieTatjana| 433249647|
|   |       |       |      0|   Luciana Soares|Female|Logistic_soares|1346426538|
|   |       |       |      0|     Laurent Grad|  Male|          Loh69|  18355981|
|   |       |       |      0|        Marta Loh|Female|       LohMarta|2976559335|
+---+-------+-------+-------+-----------------+------+---------------+----------+


Note that we could have greatly simplified the code by just doing this:
%pyspark

df = rdd.\
    map(lambda x: x.split(",")).toDF()
df.limit(5).show()

And then we would have got this:
+----------+---------------+-----------------+---+------+---+---+---+
|        _1|             _2|               _3| _4|    _5| _6| _7| _8|
+----------+---------------+-----------------+---+------+---+---+---+
|  19999174|      Logicknot|Cameron MacKinnon|  0|  Male|   |   |   |
| 433249647|   LogieTatjana|  Tatjana Logie ♓|  0|Female|   |   |   |
|1346426538|Logistic_soares|   Luciana Soares|  0|Female|   |   |   |
|  18355981|          Loh69|     Laurent Grad|  0|  Male|   |   |   |
|2976559335|       LohMarta|        Marta Loh|  0|Female|   |   |   |
+----------+---------------+-----------------+---+------+---+---+---+

But presumably it's important to have column headers; hence the additional code in the first snippet above.

DF to Parquet

The final step is to transform the Dataframe into a Parquet file.

This can be accomplished in a single line:
%pyspark

df.write.parquet("/path/to/output/myfile")


Parquet to DF

Read the parquet file using this code:
%pyspark

df = sqlContext.read.parquet("/path/to/output/myfile")
df.limit(5).show()

And the output, as expected, looks like this:
+---+-------+-------+-------+----------------+------+---------------+----------+
|age|age_max|age_min|country|       full_name|gender|  twitterhandle|    userid|
+---+-------+-------+-------+----------------+------+---------------+----------+
|   |       |       |      0|nelson rodriguez|  Male| ojooooopublico|2771575827|
|   |       |       |      0|    Mary GarciaŠ°|Female|  ojor_ozefofuw|3056364751|
|   |       |       |      0|   Andres nucleo|  Male|ojosrastafara12|1035247920|
|   |       |       |      0|          Omaira|Female|      ojovalles| 183602059|
|   |       |       |      0|   Olivia Scotti|Female|       ojscotti|3072660401|
+---+-------+-------+-------+----------------+------+---------------+----------+

This saved us a significant amount of time. By saving as a parquet file, we not only achieve space efficiency on our cluster, but have the ability to rapidly load the parquet data into a Dataframe with the column headings we specified earlier.

3 comments:

  1. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Online Training India

    ReplyDelete
  2. What an insightful post! Your explanation of using Zeppelin and Spark to write to Parquet is incredibly helpful. The step-by-step approach makes it easy to follow. Thank you for sharing your expertise—this will definitely benefit many in the data community! Kalanruoto

    ReplyDelete