Tuesday, September 20, 2016

Zeppelin and Spark: Merge Multiple CSVs into Parquet


The purpose of this article is to demonstrate how to load multiple CSV files on an HDFS filesystem into a single Dataframe and write to Parquet.

Two approaches are demonstrated.  The first approach is not recommended, but is shown for completeness.

First Approach

One approach might be to define each path:

import locale
locale.setlocale(locale.LC_ALL, 'en_US')

p1 = "/data/output/followers/mitshu/ec2-52-39-251-219.us-west-2.compute.amazonaws.com/0-ec2-52-39-251-219.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p2 = "/data/output/followers/mitshu/ec2-52-42-100-207.us-west-2.compute.amazonaws.com/0-ec2-52-42-100-207.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p3 = "/data/output/followers/mitshu/ec2-52-42-198-4.us-west-2.compute.amazonaws.com/0-ec2-52-42-198-4.us-west-2.compute.amazonaws.com/twitterFollowers.csv"
p4 = "/data/output/followers/mitshu/ec2-54-70-37-224.us-west-2.compute.amazonaws.com/0-ec2-54-70-37-224.us-west-2.compute.amazonaws.com/twitterFollowers.csv"

and then open each CSV at that path as an RDD and transform to a dataframe:

rdd_m1 = sc.textFile(p1)
print rdd_m1.take(5)

df_m1 = rdd_m1.\
    map(lambda x: x.split("\t")).\
    filter(lambda x: len(x) == 6). \
    map(lambda x: {
This would need to be repeated for each dataframe.

The dataframes could then be merged using the unionAll operator.
import pandas as pd

df = df_m1.unionAll(df_m2).unionAll(df_m3).unionAll(df_m4)

print "DF 1: {0}".format(df_m1.count())
print "DF 2: {0}".format(df_m2.count())
print "DF 3: {0}".format(df_m3.count())
print "DF 4: {0}".format(df_m4.count())
print "Merged Dataframe: {0}".format(df.count())

and finally written to parquet.


Easier Approach

Notice the convenient way of reading multiple CSV in nested directories into a single RDD:

rdd = sc.textFile(path)
print "count = {}".format(rdd.count())
This is clearly better than defining each path individually.

There are multiple ways to transform RDDs into Dataframes (DFs):

def to_json(r):
    j = {}
    t = r.split("\t")
    j['num_followers'] = t[0]
    j['followed_userid'] = t[1]
    j['followed_handle'] = t[2]
    j['follower_userid'] = t[3]
    j['follower_handle'] = t[4]
    return j
df = rdd.map(to_json).toDF()
print "count = {}".format(df.count())
This is not necessarily superior to the first approach; but it is an alternative to consider.


Load from Parquet

For subsequent analysis, load from Parquet using this code:

df = sqlContext.read.parquet("/data/output/followers/mitshu/joined.prq")



  1. [Blogger] Writing to Parquet

Saturday, July 16, 2016

AWS: Syncing a Local Directory to an S3 Storage Bucket

The S3 PUT operation only supports uploading one object per HTTP request.

This can be problematic when thousands (or even millions) of files need to pushed (or synced) with an S3 bucket.

Install s3cmd from Github:
$ git clone https://github.com/s3tools/s3cmd.git
$ python setup.py install clean

A user with credentials and the appropriate access policy must exist.

s3cmd will require an Access Key ID and Secret Access Key from AWS (see references below).

A policy needs to be attached to the user you create in this step.

I chose to grant administrator access, although there is undoubtedly a finer grain of control that can be granted for S3 IO:
Configuring AWS Credentials (private information redacted)

Once installed, s3cmd needs to be configured.

Assuming you have already created a bucket in S3,  s3cmd is configured like this:
$ ./s3cmd --configure s3://<bucket>/

You will be asked for your access key and your secret key.

I accepted the default for region [US], and left the encryption password and path to GPG empty.

I selected No for use of https.  Finally, I left the proxy server option empty as well.

If the configuration gives a 403 error, re-check your credentials and access policy (above).

A successful configuration attempt looks like this (private information redacted):
~/workspaces/public/vagrant/s3cmd $ ./s3cmd --configure s3://***/

Enter new values or accept defaults in brackets with Enter.
Refer to user manual for detailed description of all options.

Access key and Secret key are your identifiers for Amazon S3. Leave them empty for using the env variables.
Access Key: ***
Secret Key: ***
Default Region [US]: 

Encryption password is used to protect your files from reading
by unauthorized persons while in transfer to S3
Encryption password: 
Path to GPG program [/usr/local/bin/gpg]: 

When using secure HTTPS protocol all communication with Amazon S3
servers is protected from 3rd party eavesdropping. This method is
slower than plain HTTP, and can only be proxied with Python 2.7 or newer
Use HTTPS protocol [No]: 

On some networks all internet access must go through a HTTP proxy.
Try setting it here if you can't connect to S3 directly
HTTP Proxy server name: 

New settings:
  Access Key: ***
  Secret Key: ***
  Default Region: US
  Encryption password: 
  Path to GPG program: 
  Use HTTPS protocol: False
  HTTP Proxy server name: 
  HTTP Proxy server port: 0

Test access with supplied credentials? [Y/n] Y
Please wait, attempting to list bucket: s3://***/
Success. Your access key and secret key worked fine :-)

Now verifying that encryption works...
Not configured. Never mind.

Save settings? [y/N] y
Configuration saved to '/Users/craigtrim/.s3cfg'

Usage is straightforward:
$ ./s3cmd sync ~/Documents/files/ s3://<bucket>/


  1. [Amazon] AWS Credentials
  2. [StackOverflow] Batch Uploads to S3
  3. [s3Cmd] S3 Sync Howto

Friday, July 15, 2016

OS X Terminal Recipes

Copy the first n files in a directory to a specified destination directory:
$ find . -maxdepth 1 -type f | head -1000 | xargs -I {} mv {} subs/01/

This script creates multiple sub directories, and copies n files to each sub directory:
for i in {1..20}
 echo $i
 mkdir $i
 find . -maxdepth 1 -type f | head -1000 | xargs -I {} mv {} $i/

Wednesday, July 13, 2016

Exposing a Python App via Django using Vagrant


I have a python application.  It's inner workings are complex, but the I/O is simple.  Text input (application/text) comes in, and a JSON structure (application/json) comes out.

I want to expose the python application via a REST interface.  Django is a popular web framework for Python that requires minimal "plumbing" and requires minimal up-front decisions about application infrastructure.  In other words, ideal for my needs.

At the time of this article, I am using OS X 10.11.5.

Initialize Directory

Create a directory (name is not relevant).  Place the Vagrantfile (below) into this directory.  Create a sub-directory called "scripts" and place the following shell scrips (setup_python.sh, git repo cloning) into this folder.

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu/trusty64"
  config.vm.network "public_network", bridge: [ 'en0', 'en1', 'en2', 'bridge0' ]
  config.vm.synced_folder("scripts", "/home/vagrant/scripts")

  config.vm.provider "virtualbox" do |v|
    v.gui = false
    v.memory = "4096"
    v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
    v.customize ["modifyvm", :id, "--natdnsproxy1", "on"]

  config.vm.define "cnc" do |cnc|
    cnc.vm.hostname = "cnc"
    cnc.vm.network "private_network", ip: ""
  config.vm.provision "shell", path: "scripts/init_env.sh"
Note: you may wish to change the elements highlighted in red bold. The italicized provisioning line is optional; the scripts shown below can be placed here.

sudo apt-get update -y
sudo apt-get install -y build-essential git libopenblas-dev python-dev python-scipy python-pip
sudo apt-get update -y

Run these commands to drop into an SSH session:
$ vagrant up 
$ vagrant ssh

Once in the SSH session, run the shell scripts from the "scripts" directory. I haven't included the code to clone my git repos.  Assumption is that this also happens at this point and is scripted; the installation works and is tested.

One of my projects contains a requirements.txt file.  The contents list "Django".

The file is installed using
$ pip install -r requirements.txt

This will install the latest version of Django.

I was using version 1.10b at the time of this article.
vagrant@cnc:~$ python -m django --version

Do not proceed any further if Django is not installed.

Also, type this command to ensure django is set correctly on the path:
~/workspaces/public/vagrant/cnc_tst_1 $ django-admin --help

Type 'django-admin help <subcommand>' for help on a specific subcommand.

Available subcommands:

Note that only Django core commands are listed as settings are not properly configured (error: Requested setting INSTALLED_APPS, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.).
~/workspaces/public/vagrant/cnc_tst_1 $ 
If you don't see a list of available subcommands, something is wrong.

Django Setup

Once in the VM, run this command
$ django-admin startproject mysite
$ cd mysite
$ python manage.py runserver

Outside the VM, on your host machine, you should be able to view the default Django app here:
The default page looks something like this:
Passing in as a parameter to the runserver will cause Django to listen on all public IPs.  This makes the web server accessible from other computers on our network.

Wiring in the Existing App

In Django parlance, an app is a Web application that does something.  A project is a collection of configuration and apps for a particular website. A project can contain multiple apps. An app can be in multiple projects.


  1. [AirPair] Django vs Flask vs Pyramid
    1. Django's "batteries included" approach makes it easy for developers who know Python already to dive in to web applications quickly without needing to make a lot of decisions about their application's infrastructure ahead of time. 
    2. Django has for templating, forms, routing, authentication, basic database administration, and more built in. 
    3. In contrast, Pyramid includes routing and authentication, but templating and database administration require external libraries.
  2. [Official] Writing your first Django app


  1. Warning: Remote connection disconnect. Retrying... 
    1. Solution 1: Patience, grasshopper. 
      The session shown here took 10 minutes:
      ~/workspaces/public/vagrant/cnc_tst_1 $ vagrant halt && vagrant up && vagrant ssh
      ==> cnc: Attempting graceful shutdown of VM...
      Bringing machine 'cnc' up with 'virtualbox' provider...
      ==> cnc: Checking if box 'ubuntu/trusty64' is up to date...
      ==> cnc: A newer version of the box 'ubuntu/trusty64' is available! You currently
      ==> cnc: have version '20160323.0.0'. The latest is version '20160708.1.0'. Run
      ==> cnc: `vagrant box update` to update.
      ==> cnc: Clearing any previously set forwarded ports...
      ==> cnc: Clearing any previously set network interfaces...
      ==> cnc: Preparing network interfaces based on configuration...
          cnc: Adapter 1: nat
          cnc: Adapter 2: bridged
          cnc: Adapter 3: hostonly
      ==> cnc: Forwarding ports...
          cnc: 8000 (guest) => 8000 (host) (adapter 1)
          cnc: 22 (guest) => 2222 (host) (adapter 1)
      ==> cnc: Running 'pre-boot' VM customizations...
      ==> cnc: Booting VM...
      ==> cnc: Waiting for machine to boot. This may take a few minutes...
          cnc: SSH address:
          cnc: SSH username: vagrant
          cnc: SSH auth method: private key
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
          cnc: Warning: Remote connection disconnect. Retrying...
      ==> cnc: Machine booted and ready!
      ==> cnc: Checking for guest additions in VM...
          cnc: The guest additions on this VM do not match the installed version of
          cnc: VirtualBox! In most cases this is fine, but in rare cases it can
          cnc: prevent things such as shared folders from working properly. If you see
          cnc: shared folder errors, please make sure the guest additions within the
          cnc: virtual machine match the version of VirtualBox you have installed on
          cnc: your host and reload your VM.
          cnc: Guest Additions Version: 4.3.36
          cnc: VirtualBox Version: 5.0
      ==> cnc: Setting hostname...
      ==> cnc: Configuring and enabling network interfaces...
      ==> cnc: Mounting shared folders...
          cnc: /vagrant => /Users/craigtrim/workspaces/public/vagrant/cnc_tst_1
          cnc: /home/vagrant/scripts => /Users/craigtrim/workspaces/public/vagrant/cnc_tst_1/scripts
      ==> cnc: Machine already provisioned. Run `vagrant provision` or use the `--provision`
      ==> cnc: flag to force provisioning. Provisioners marked to run always will still run.
      Welcome to Ubuntu 14.04.4 LTS (GNU/Linux 3.13.0-83-generic x86_64)
       * Documentation:  https://help.ubuntu.com/
        System information as of Wed Jul 13 19:24:50 UTC 2016
        System load:  0.0               Processes:           86
        Usage of /:   4.6% of 39.34GB   Users logged in:     0
        Memory usage: 2%                IP address for eth0:
        Swap usage:   0%                IP address for eth2:
        Graph this data and manage this system at:
        Get cloud support with Ubuntu Advantage Cloud Guest:
      Last login: Wed Jul 13 19:10:40 2016 from
    2. Solution 2: If the Virtualbox was not shut down properly, vagrant destroy may be the most effective answer.  This will remove all the files and require going through the setup again.

Tuesday, July 12, 2016

Zeppelin and Spark: Transforming a CSV to Parquet

Transform a CSV file to Parquet Format

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem.  Parquet is built to support very efficient compression and encoding schemes.  Twitter is starting to convert some of its major data source to Parquet in order to take advantage of the compression and deserialization savings.


Load the CSV file into an RDD

rdd = sc.textFile("/path/to/input/myfile.csv")
print rdd.take(5)

and the output looks like this:
[u'19999174,Logicknot,Cameron MacKinnon,0,Male, , , ', u'433249647,LogieTatjana,Tatjana Logie \u2653,0,Female, , , ', u'1346426538,Logistic_soares,Luciana Soares,0,Female, , , ', u'18355981,Loh69,Laurent Grad,0,Male, , , ', u'2976559335,LohMarta,Marta Loh,0,Female, , , ']


Now we need to convert the RDD into a Dataframe (DF):

df = rdd.\
    map(lambda x: x.split(",")).\
    filter(lambda x: len(x) == 8). \
    map(lambda x: {

The bad news is that this code might look complicated. The good news is that the code is a lot simpler than it appears.

The first map condition splits each record on the delimiter (a comma).  Now we're dealing with a list of 8 tokens per record.  The second condition (the filter) will reject any line that does not have 8 tokens.  The third, and final, map condition will take each token in the list and create a heading for it.

The output looks like this:
|age|age_max|age_min|country|        full_name|gender|  twitterhandle|    userid|
|   |       |       |      0|Cameron MacKinnon|  Male|      Logicknot|  19999174|
|   |       |       |      0|  Tatjana Logie ♓|Female|   LogieTatjana| 433249647|
|   |       |       |      0|   Luciana Soares|Female|Logistic_soares|1346426538|
|   |       |       |      0|     Laurent Grad|  Male|          Loh69|  18355981|
|   |       |       |      0|        Marta Loh|Female|       LohMarta|2976559335|

Note that we could have greatly simplified the code by just doing this:

df = rdd.\
    map(lambda x: x.split(",")).toDF()

And then we would have got this:
|        _1|             _2|               _3| _4|    _5| _6| _7| _8|
|  19999174|      Logicknot|Cameron MacKinnon|  0|  Male|   |   |   |
| 433249647|   LogieTatjana|  Tatjana Logie ♓|  0|Female|   |   |   |
|1346426538|Logistic_soares|   Luciana Soares|  0|Female|   |   |   |
|  18355981|          Loh69|     Laurent Grad|  0|  Male|   |   |   |
|2976559335|       LohMarta|        Marta Loh|  0|Female|   |   |   |

But presumably it's important to have column headers; hence the additional code in the first snippet above.

DF to Parquet

The final step is to transform the Dataframe into a Parquet file.

This can be accomplished in a single line:


Parquet to DF

Read the parquet file using this code:

df = sqlContext.read.parquet("/path/to/output/myfile")

And the output, as expected, looks like this:
|age|age_max|age_min|country|       full_name|gender|  twitterhandle|    userid|
|   |       |       |      0|nelson rodriguez|  Male| ojooooopublico|2771575827|
|   |       |       |      0|    Mary GarciaŠ°|Female|  ojor_ozefofuw|3056364751|
|   |       |       |      0|   Andres nucleo|  Male|ojosrastafara12|1035247920|
|   |       |       |      0|          Omaira|Female|      ojovalles| 183602059|
|   |       |       |      0|   Olivia Scotti|Female|       ojscotti|3072660401|

This saved us a significant amount of time. By saving as a parquet file, we not only achieve space efficiency on our cluster, but have the ability to rapidly load the parquet data into a Dataframe with the column headings we specified earlier.

Friday, June 10, 2016

Zeppelin and Spark: Finding Associated Hashtags


I'm finding that eBay related spam accounts for nearly 5% of all the tweets I'm analyzing. The @eBay username is a good indicator; I've found ~<3% of all tweets with this username to be valid.

Spam Hashtags

This code finds all the hashtags in tweets containing the @eBay keyword:

df = sqlContext.read.parquet("/output/craig/buzz/parquet/twitter")

df_ebay_tags = df.rdd \
    .filter(lambda x: '@ebay' in x[5].lower()) \
    .flatMap(lambda x: x[5].split(" ")) \
    .filter(lambda x: "#" in x) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x,y:x+y) \
    .map(lambda x: { 
        'tag': x[0],
        'count': x[1] 
    }).toDF().sort('count', ascending=False)


Associated Hashtags

This code will find all the hashtags associated with the show Gotham in Spanish:

import string

def cleanse(value):
    for i in list(string.punctuation):
        value = value.replace(i, "")
    return value

df_tags = df_fb.rdd \
    .filter(lambda x: x[4].lower() == "gotham") \
    .filter(lambda x: x[7].lower() == "es") \
    .flatMap(lambda x: x[6].lower().split(" ")) \
    .filter(lambda x: "#" in x) \
    .map(lambda x: cleanse(x)) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x,y:x+y) \
    .map(lambda x: { 
        'tag': x[0],
        'count': x[1] 
    }).toDF().sort('count', ascending=False)

print "Total: {}".format(

Not surprisingly, the tag #gotham is the most prevalent.  That #series should show up is promising; not shown here is the prior exclusion of #movie and #movies as we're particularly targeting a show.

HTML Output

This analysis is similar, but produces an HTML report with clickable links to explore the hashtags on Twitter:

print u"%html"
print u"<h3>Hashtags</h3"
print u"<br />"
for row in df_tags.rdd.filter(lambda x: x[0] > 10).toLocalIterator():
    url = u"""
    html = u"""
        <a href="{}" target="_new">{}</a>
    """.strip().format(url, row[1].strip())
    print u"""
        {}: {}
    """.strip().format(html, row[0])
    print u"<br />"


Zeppelin and Spark: Ad Hoc Twitter Feedback Analysis


A convenient screen in Zeppelin for performing ad-hoc analysis on twitter data for entity (brand or show) mentions. Paragraphs:
  1. Load data in parquet and view schema
  2. View breakdown by language
  3. View tweet distribution by twitter handle
  4. View tweet distribution by search keywords

Ad-Hoc Analysis in Zeppelin

Loading the data


import locale
locale.setlocale(locale.LC_ALL, 'en_US')


df_fb = sqlContext.read.parquet(path)


|    userid|         posted_time|category|       parent|       entity|           tweetid|             content|lang|search_object|       thandle|
|2217980506|2014-01-27 22:27:...|    show|grammy awards|grammy awards|427930900505329665|@DionneJames are ...|  en|      grammys|louisemorse123|

Breakdown by Language

This paragraph gives a breakdown by language. Zeppelin provides a useful pie chart to quickly get a sense of the distribution between the languages involved.


    lang, count(lang)
group by


Distribution by Twitter Handle

This is a useful paragraph to get a sense of distribution across twitter handles. In use cases where individual buzz is desirable, spam and/or official and/or fan accounts can be spotted quickly.


    thandle, count(thandle) as total
group by
order by
    total desc


This paragraph can be further refined by adding this additional WHERE clause:
    lower(thandle) like lower('%${entity}%')

This will output the distribution across all twitter handles that are similar to the entity name:

 It's not unlikely that a hard-core fan of a show or brand might take that name and make it part of their own username.  Having said that, if the use case requires differentiation between individual buzz and corporate/spam buzz, this becomes an easy way of identifying the fat tail (e.g. the top 10).

Distribution by Search Objects

The search patterns used to find the tweets can be analyzed with a similar distribution. If a given search object is wrong or ambiguous (like using "friends" to find instances of the sitcom Friends), this distribution can show the impact. It's worth quickly investigating the fat tail to look for evidences of ambiguity or anything that might have been used incorrectly.


    search_object, count(search_object) as total
group by
order by
    total desc