Tuesday, June 7, 2016

Python: Using the Slack API

Motivation
Slack is a great tool for team communication.  It also provides a simple way to integrate automated scripts, via incoming and outgoing web hooks.  This is useful for batch programs to post status updates automatically and/or notify team members when a program is done, has errored out, etc. 

Setup

Clone this repository:
https://github.com/os/slacker

Install the python program using
python setup.py install clean
pip install --user --upgrade slacker

Here's the python code:
from slacker import Slacker

message = """
 @johndoe: 16-476 done: 3.61%
""".strip()

slack = Slacker('private-key')
slack.chat.post_message('#channel', message, icon_emoji=":rolled_up_newspaper:", username="Pulse", link_names=1)


Monday, May 16, 2016

PySpark: Operations Overview

Reslient Distributed Dataset:  A dataset is distributed across the cluster nodes.  No single node has all the data.  The data is recoverable when a single node fails. 

an RDD is distributed across the worker nodes


Collect
<RDD>.collect() pulls back all the data from an RDD into the driver program.  You don't necessarily want to re-assemble a large dataset, distributed onto multiple nodes onto a single driver node over the network.  The function call will probably hang for a while and then die.

If you are doing exploratory data analysis, collect() can be useful when run in conjunction with the sample() function to decrease the size of the data.
rdd.sample(withReplacement=False, fraction=0.01, seed=1).collect()
Because I provided a seed, the random sample will be consistent.  Each time I call this function, I will see the same thing.

Count
The most basic form of count() tells you how many items are present in your RDD.
rdd.count()
it is very useful, and very simple.  There are no optional parameters.

There are a couple of interesting variations:
rdd.countApprox(timeout=200, confidence=0.5)
This function returns an approximate count.  If your dataset is extremely large, and you don't care exactly how many items you have, this is a great function to use. 

4 Paragraphs from a Zeppelin notebook showing
count() and countApprox() in action


Counting distinct elements in a large dataset can be a daunting task.
rdd.countApproxDistinct(relativeSD=0.05)
This function makes it easy by returning an approximate count of the number of distinct elements in the RDD.  Under the covers, the function uses the HyperLogLog algorithm.  This algorithm hashes data into buckets, then looks at smallest data in bucket, then estimates how many values are likely in bucket, then sees has an element of that size, then uses a harmonic mean of all its estimates.

By making the SD parameter smaller, the count will be more accurate and the function runtime will be slower.

First
Sometimes you just need one item in the RDD to play with.  You can use first() to pull the first item from the rdd.
rdd.first()
This function is simple.  There are no optional parameters, and it does respect any sorting that you've done.  You can't use first() on an empty RDD.

Limit a DataFrame (df) to 1 result
or use first() on the underlying RDD

Take
The take() function is a general purpose "return data to the driver" command.
rdd.take(1)
is almost the same as
rdd.first()
although take() returns a list instead of an element

take(n) vs first()



If I take as many rows as are in my collection, that's the same as using collection
rdd.take(rdd.count())
is functionally equivalent to
rdd.collect()
As a rule:
  • if you want one item, use first()
  • if you want all the items, use collect()
  • if you want 0 < x < MAX, use take(n)
Both first() and collect() are optimized for their own particular functions and are better than take(), under these specialized circumstances.  The take() function looks at one partition, and then estimates how many partitions will be needed to provide the number of elements you requested.  The function then returns all the elements you have requested to the driver.

Take Sample

This function allows you to pull a random sample of elements from your RDD into your driver program
rdd.takeSample(withReplacement=False, num=1, seed=1)
When you sample, you need to provide the target sample and if you want replacement while sampling.

Sampling without replacement means you can only select a given element once.  If the RDD has 100 elements, and you want a sample with 10,000 elements, and you set withReplacement=False the sample will only have 100 elements.  Each element can only be selected once.

Sampling with replacement means each element in the input has an equal chance of being picked for each element in the output.  Using the example above, an RDD with 100 elements could be used to generate a random sample with 10,000 elements.  Each input element would be repeated multiple times in the output.

Another way to look at this is that if you had an input RDD like this:
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
and you selected
rdd.takeSample(withReplacement=True, num=rdd.count(), seed=1)
in theory, you could end up with an output (the random sample) that looked like this:
    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
this is very unlikely, but it could happen.

Use of takeSample() with the optional seed parameter

You can also provide a random seed when sampling.  Using the parameter enables repeatable research as it enables others to replicate your findings.  Use of this parameter guarantees the function will always make the same random sample selections.

Take Ordered

This function is similar to sorting the entire RDD, then performing a take() function.

Without this function, given a large RDD and a desire for an ordered sample, you would have to reduce over each partition keeping the first n elements by sort.  Then the elements would need to be combined in a reduce, then a final combine performed in the driver.  At each point, would only keep n items.  This is all handled behind the scenes.

This function is only fast if n is small.  Don't use this function when n ~= rdd.count().  As the value of n approaches the size of your rdd, it's best to just sort the rdd then use take() or takeSample()

rdd.takeOrdered(10)
 
the "userid" column is sorted in ascending order



References

  1. [StackOverflow] Take Ordered
    1. Good overview on a variety of takeOrdered(n, key) variations.

Thursday, May 12, 2016

Zeppelin and Spark: Writing to Parquet

A colleague gave me a CSV that was nearly a TB.

The format was simple:
TeamMarijo,34231221
TeVesoli,34279539
Keardly_,34262814
xElyFlds,34963944
Rrelaived,34289263
ayedema28,39008303 
 A user name followed by a user id.

The first step was to compress the file and scp it to the cluster, and upload it the remote server:
$ tar -cjf users.tar.gz users.csv
$ scp -P 22 users.tar.gz dsuser@<IP>:~
$ ssh dsuser@<IP>

Once on the remote server, copy to HDFS:
$ tar -xvf users.tar.gz
$ rm users.tar.gz
$ sudo su hdfs
$ hadoop fs -put users.csv /team/dev/craig/
$ hadoop fs -ls -h /team/dev/craig
Found 1 items
-rw-r--r--   2 dsuser hdfs      925.9 G 2016-05-12 15:59 /team/dev/craig/pi20160208.csv


I need to access the CSV, transform it to a data frame, and save as parquet:
%pyspark

rdd = sc.textFile("/team/dev/craig/pi20160208.csv")
df = rdd.map(lambda x : x.split(',')).map(lambda x : { 'handle': x[0].strip(), 'userid': x[1].strip()}).toDF()

print df.describe()
df.limit(5).show()
print df.count()

df.write.parquet("/team/dev/craig/pi20160208/")

The output looks like this:
DataFrame[summary: string]
+-------------+------+
|       handle|userid|
+-------------+------+
|     Huoiwnic| 13853|
|      hoviabo| 14864|
|       trisly| 55173|
|   PixlRkxoot| 55293|
+-------------+------+

29008260086

Now the file is saved in Apache Parquet format, I can load it like this:
%pyspark

df = sqlContext.read.parquet("/team/dev/craig/pi20160208")

Thursday, March 17, 2016

Python: Pandas and Geonames

Introduction

Use of Python pandas library to read and process Geonames open data.


Required

  1. Download allCountries.zip.  
    1. The schema and data set acquisition is discussed in more depth in this blog post
  2. Python and the pandas library installed. 
The data format is tab-delimited text in utf8 encoding.

The following code will read 
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import time
import pandas as pd 

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

class GeonamesCountriesTxtFileReader(object):
 """
  A GeonamesCountriesTxtFileReader 
 """
 def __init__(self, input_file):
  """Return a GeonamesCountriesTxtFileReader object"""   
  self.input_file = input_file

 def get_header_row(self):
  return [ 
   'geonameid',
   'name',
   'asciiname',
   'alternatenames',
   'latitute',
   'longitude',
   'feature-class',
   'feature-code',
   'country-code',
   'alt-country-codes',
   'admin-code-1',
   'admin-code-2',
   'admin-code-3',
   'admin-code-4',
   'population',
   'elevation',
   'dem',
   'timezone',
   'modification-date' ]

 def get_data_types(self):
  return { 
   'geonameid' : int,
   'name' : str,
   'asciiname' : str,
   'alternatenames' : str,
   'latitute' : float,
   'longitude' : float,
   'feature-class' : str,
   'feature-code' : str,
   'country-code' : str,
   'alt-country-codes' : str,
   'admin-code-1' : str,
   'admin-code-2' : str,
   'admin-code-3' : str,
   'admin-code-4' : str,
   'population' : int,
   'elevation' : str,
   'dem' : str,
   'timezone' : str,
   'modification-date' : str }

 def read_csv(self):
  start = time.time()
  df = pd.read_csv(
   self.input_file,
   delim_whitespace=False,
   sep='\t',
   error_bad_lines=False,
   skiprows=0,
   encoding='utf-8',
   names=self.get_header_row(),
   dtype=self.get_data_types(),
   na_values=['none'],
   usecols=self.get_header_row())
  
  end = time.time()
  logger.info('Read CSV File (path = {}, elapsed-time = {})'.format(self.input_file, (end - start)))
  
  return df

This takes about 30 seconds to read on my Macbook Pro.


Working with the Reader

The following code shows how to access the reader and acquire a pandas Dataframe instance:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import codecs
import pandas as pd 

from geonames_reader import GeonamesCountriesTxtFileReader

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

in_file = '/Users/craigtrim/data/Data/geonames-2/allCountries.txt'

reader = GeonamesCountriesTxtFileReader(in_file)
df = reader.read_csv()

All manipulation of the data will occur after this point.


Filtering, Sorting and Writing to Disk

The following code will filter by populations greater than a million and sort the result:
df_pop = df[df.population > 1000000].sort(['population', 'name'], ascending=[0, 0])

df_pop.to_csv(
 out_file, 
 sep='\t', 
 header=[ 'name', 'population', 'alternatenames' ],
 columns=[ 'name', 'population', 'alternatenames' ],
 mode='w',
 encoding='utf-8')

The result is another data frame that is written to disk.

Filtering by Partial Strings

Select code containing a string value:
df[df['alternatenames'].str.contains("United States of America|USA")==True]

Note that if you use this variation:
df[df['alternatenames'].str.contains("United States of America")]
You will likely end up with this error 'cannot index with vector containing NA / NaN values'.


Writing to CSV


This function preserves the formatting of the original CSV file:
 def to_csv(self, df, out_file):
  df.to_csv(
   out_file, 
   delim_whitespace=False,
   sep='\t', 
   error_bad_lines=False,
   index=False,     # Write row names (index), default=True
   header=False,     # Write out column names, default=True
   mode='w',      # Python write mode, default ‘w’
   encoding='utf-8',    # defaults to ‘ascii’ on Python 2 and ‘utf-8’ on Python 3.
   names=self.get_header_row(),
   dtype=self.get_data_types(),
   na_values=['none'],
   usecols=self.get_header_row())
Note: The function calls to self.get_header_row() and self.get_data_types() are defined in GeonamesCountriesTxtFileReader.


Issues

  1. The "United States" and "United Kingdom" do not appear in the output file.
    1. On closer inspect, "United Kingdom" is listed as having a population of 0.
    2. Note that it is possible to iterate through each row in a dataframe like this:
      for index, row in df.iterrows():
       name = row['name'].lower().strip()
       if 'united' in name:
        print name, ", ", row['population']
      
    3. I don't know if this is because the actual data is bad, or if the delimiters were not specified correctly, and the "0" is another column value mistakenly assigned to population.



References

  1. Acquiring Geonames Data
  2. [StackOverflow] Dataframe Selection by Partial String

Acquiring Geonames Data


Primary Data

These files can be downloaded here:
~/data/Data/geonames $ cd ../geonames-2/
~/data/Data/geonames-2 $ ls -lah
total 3568232
drwxr-xr-x  14 craigtrim  staff   476B Mar 17 10:38 .
drwxr-xr-x  17 craigtrim  staff   578B Mar 17 10:38 ..
-rw-r--r--@  1 craigtrim  staff   306M Mar 17 10:31 allCountries.zip
-rw-r--r--@  1 craigtrim  staff   111M Mar 17 10:14 alternateNames.zip
-rw-r--r--@  1 craigtrim  staff   7.6M Mar 17 10:11 cities1000.zip
-rw-r--r--@  1 craigtrim  staff   2.1M Mar 17 10:11 cities15000.zip
-rw-r--r--@  1 craigtrim  staff   3.5M Mar 17 10:11 cities5000.zip
-rw-r--r--@  1 craigtrim  staff   1.3M Mar 17 10:11 hierarchy.zip
-rw-r--r--@  1 craigtrim  staff   184K Mar 17 10:11 no-country.zip
-rw-r--r--@  1 craigtrim  staff   825K Mar 17 10:11 shapes_simplified_low.json.zip
-rw-r--r--@  1 craigtrim  staff   824K Mar 17 10:11 shapes_simplified_low.zip
-rw-r--r--@  1 craigtrim  staff   169K Mar 17 10:11 userTags.zip

"allCountries.zip" contains all countries combined in one file, see 'geoname' table for columns



Geonames Table


geonameid integer id of record in geonames database
name name of geographical point (utf8) varchar(200)
asciiname name of geographical point in plain ascii characters, varchar(200)
alternatenames alternatenames, comma separated, ascii names automatically transliterated, convenience attribute from alternatename table, varchar(10000)
latitude latitude in decimal degrees (wgs84)
longitude longitude in decimal degrees (wgs84)
feature class see http
feature code see http
country code ISO-3166 2-letter country code, 2 characters
cc2 alternate country codes, comma separated, ISO-3166 2-letter country code, 200 characters
admin1 code fipscode (subject to change to iso code), see exceptions below, see file admin1Codes.txt for display names of this code; varchar(20)
admin2 code code for the second administrative division, a county in the US, see file admin2Codes.txt; varchar(80)
admin3 code code for third level administrative division, varchar(20)
admin4 code code for fourth level administrative division, varchar(20)
population bigint (8 byte int)
elevation in meters, integer
dem digital elevation model, srtm3 or gtopo30, average elevation of 3''x3'' (ca 90mx90m) or 30''x30'' (ca 900mx900m) area in meters, integer. srtm processed by cgiar/ciat.
timezone the timezone id (see file timeZone.txt) varchar(40)
modification date date of last modification in yyyy-MM-dd format



 

Acquiring Files in Parts

As of 17-March-2016, all the individual geonames ZIP files could be downloaded with this script.  If you are on OS X, and don't have wget installed, install homebrew first, then use brew install wget

#!/bin/bash

download () {
 if [ -f "$1".zip ]
 then
  echo "$1.zip exists"
 else
  wget http://download.geonames.org/export/dump/"$1".zip
 fi
}

download "AD"
download "AE"
download "AF"
download "AG"
download "AI"
download "AL"
download "AM"
download "AN"
download "AO"
download "AQ"
download "AR"
download "AS"
download "AT"
download "AU"
download "AW"
download "AX"
download "AZ"
download "BA"
download "BB"
download "BD"
download "BE"
download "BF"
download "BG"
download "BH"
download "BI"
download "BJ"
download "BL"
download "BM"
download "BN"
download "BO"
download "BQ"
download "BR"
download "BS"
download "BT"
download "BV"
download "BW"
download "BY"
download "BZ"
download "CA"
download "CC"
download "CD"
download "CF"
download "CG"
download "CH"
download "CI"
download "CK"
download "CL"
download "CM"
download "CN"
download "CO"
download "CR"
download "CS"
download "CU"
download "CV"
download "CW"
download "CX"
download "CY"
download "CZ"
download "DE"
download "DJ"
download "DK"
download "DM"
download "DO"
download "DZ"
download "EC"
download "EE"
download "EG"
download "EH"
download "ER"
download "ES"
download "ET"
download "FI"
download "FJ"
download "FK"
download "FM"
download "FO"
download "FR"
download "GA"
download "GB"
download "GD"
download "GE"
download "GF"
download "GG"
download "GH"
download "GI"
download "GL"
download "GM"
download "GN"
download "GP"
download "GQ"
download "GR"
download "GS"
download "GT"
download "GU"
download "GW"
download "GY"
download "HK"
download "HM"
download "HN"
download "HR"
download "HT"
download "HU"
download "ID"
download "IE"
download "IL"
download "IM"
download "IN"
download "IO"
download "IQ"
download "IR"
download "IS"
download "IT"
download "JE"
download "JM"
download "JO"
download "JP"
download "KE"
download "KG"
download "KH"
download "KI"
download "KM"
download "KN"
download "KP"
download "KR"
download "KW"
download "KY"
download "KZ"
download "LA"
download "LB"
download "LC"
download "LI"
download "LK"
download "LR"
download "LS"
download "LT"
download "LU"
download "LV"
download "LY"
download "MA"
download "MC"
download "MD"
download "ME"
download "MF"
download "MG"
download "MH"
download "MK"
download "ML"
download "MM"
download "MN"
download "MO"
download "MP"
download "MQ"
download "MR"
download "MS"
download "MT"
download "MU"
download "MV"
download "MW"
download "MX"
download "MY"
download "MZ"
download "NA"
download "NC"
download "NE"
download "NF"
download "NG"
download "NI"
download "NL"
download "NO"
download "NP"
download "NR"
download "NU"
download "NZ"
download "OM"
download "PA"
download "PE"
download "PF"
download "PG"
download "PH"
download "PK"
download "PL"
download "PM"
download "PN"
download "PR"
download "PS"
download "PT"
download "PW"
download "PY"
download "QA"
download "RE"
download "RO"
download "RS"
download "RU"
download "RW"
download "SA"
download "SB"
download "SC"
download "SD"
download "SE"
download "SG"
download "SH"
download "SI"
download "SJ"
download "SK"
download "SL"
download "SM"
download "SN"
download "SO"
download "SR"
download "SS"
download "ST"
download "SV"
download "SX"
download "SY"
download "SZ"
download "TC"
download "TD"
download "TF"
download "TG"
download "TH"
download "TJ"
download "TK"
download "TL"
download "TM"
download "TN"
download "TO"
download "TR"
download "TT"
download "TV"
download "TW"
download "TZ"
download "UA"
download "UG"
download "UM"
download "US"
download "UY"
download "UZ"
download "VA"
download "VC"
download "VE"
download "VG"
download "VI"
download "VN"
download "VU"
download "WF"
download "WS"
download "XK"
download "YE"
download "YT"
download "YU"
download "ZA"
download "ZM"
download "ZW"

It can be convenient to work with a parts file for the purpose of testing a parser or data reader, rather than reading the entire allCountries.txt [1.3 GB] file each time.

References

  1. [Official] Geonames Data Dump

Wednesday, March 9, 2016

Python: Paramiko for SSH-based Operations

A Generic Paramiko Wrapper


This is a generic class that wraps Paramiko SSH functionality:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import paramiko

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class SSHClient(object):
    def __init__(self, host, username, password):
        self.connection = self.connect(host, username, password)

    @staticmethod
    def connect(host, username, password):
        """
         Initiates an SSH Connection to the Host
        """
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        try:
            ssh.connect(host, username=username, password=password)
            logger.info("Connection Created (host = {}, username = {}, password = *****)".format(host, username))
        except ValueError:
            logger.error("Connection Failed")

        return ssh

    def execute(self, command):
        """
         Executes a command via SSH and returns results on StdOut
        """
        logger.debug('Executing command on std out:\n\t{}'.format(command.strip()))
        return self.connection.exec_command(command.strip())

    def close(self):
        self.connection.close()


Usage looks like this (assuming a dictionary of credential information):
ssh = SSHClient(creds["host"], creds["user"], creds["pass"])

# create the remote directory structure
ssh_stdin, ssh_stdout, ssh_stderr = ssh.execute("the command here")
logger.info("Results:\n\tstd-in = {}\n\tstd-out = {}\n\tstd-err = {}".format(
    ssh_stdin, ssh_stdout, ssh_stderr
))

ssh.close()



Using Paramiko and SCP

The following snippet illustrates the use of SCP to upload files to remote locations on multiple servers:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import os
import logging

from remote import SSHClient

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class FileUploader(object):
    def __init__(self, version, creds_list):
        self.version = version
        self.creds_list = creds_list

    def get_remote_directory(self):
        return "/disks/sdb/files/{}".format(
            self.version)

    def get_create_remote_directory_cmd(self):
        return "mkdir -p {}".format(
            self.get_remote_directory())

    @staticmethod
    def get_local_directory():
        directory = os.environ['CSV_DIR']
        return "cd {}".format(directory)

    def create_remote_directories(self, creds):
        ssh = SSHClient(creds["host"], creds["user"], creds["pass"])

        # create the remote directory structure
        ssh_stdin, ssh_stdout, ssh_stderr = ssh.execute(self.get_create_remote_directory_cmd())
        logger.info("Results:\n\tstd-in = {}\n\tstd-out = {}\n\tstd-err = {}".format(
            ssh_stdin, ssh_stdout, ssh_stderr
        ))

        ssh.close()

    def upload_from_local(self, creds):
        local_path = "{}/*.json".format(
            os.environ["CSV_DIR"])
        cmd = "scp {} {}@{}:{}".format(
            local_path, creds["user"], creds["host"], self.get_remote_directory()
        )

        os.system(cmd)

    def process(self):
        for creds in self.creds_list:
            self.create_remote_directories(creds)
            self.upload_from_local(creds)


def get_creds_list():
    creds_m1 = {
        "host": os.environ["HOST_1"],
        "user": os.environ["USER_1"],
        "pass": os.environ["PASS_1"]
    }

    creds_m2 = {
        "host": os.environ["HOST_2"],
        "user": os.environ["USER_2"],
        "pass": os.environ["PASS_2"]
    }

    return [creds_m1, creds_m2]


if __name__ == "__main__":
    from file_uploader import FileUploader

    uploader = FileUploader("2.0", get_creds_list())
    uploader.process()



Paramiko and Apache HIVE


If Hive is exposed on a port as a service on your server, there are better solutions for HIVE access, such as pyhs2.  pyHS2 is a python client driver for connecting to hive server 2.

However, there are times when Apache HIVE is not enabled as a service remotely accessible via a server and port.  In these situations, access via SSH may be a possibility.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import os
import paramiko

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

host = os.environ['HIVE_HOST']
username = os.environ['HIVE_USERNAME']
password = os.environ['HIVE_PASSWORD']
hive_location = os.environ['HIVE_LOCATION']

class HiveDbReader(object):
 """
  Purpose:
  An instance of a HiveDbReader Object  
 """
 def __init__(self):
  """Return a connector object"""   
  logger.debug("Initialized HiveDbReader instance")


 def get_ssh_connection(self):
  """
   Initiates an SSH Connection to the Host
  """
  ssh = paramiko.SSHClient()
  ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

  try:
   ssh.connect(host, username=username, password=password)
   logger.info("Connection Created (host = {}, username = {}, password = *****)".format(host, username))
  except ValueError:
   logger.error("Connection Failed")

  return ssh


 def execute_on_stdout(self, ssh, hive_sql):
  """
   Executes a HIVE SQL Command via SSH and returns results on StdOut

   Sample Command:
    /loc/to/hive -e 'select * from table where xyz'
  """
  hive_sql = hive_sql.strip()
  hive_command = '{} -e "{}"'.format(hive_location, hive_sql)
  logger.debug('Executing HIVE Command on StdOut:\n\t{}'.format(hive_command))

  return ssh.exec_command(hive_command)

 
 def get_employees(self):
  the_employees = []

  hive_select_cat_sql = """
    select employees from mydb group by employees;
   """.strip()

  ssh = self.get_ssh_connection()
  ssh_stdin, ssh_stdout, ssh_stderr = self.execute_on_stdout(ssh, hive_select_cat_sql)
  
  try:
   for line in ssh_stdout.readlines():
    an_employee = line.replace('"', '').strip()
    the_employees.append(an_employee)

  except Exception as e:
   logger.error("Hive Command Access Failed")
  
  ssh.close()  

  logger.info("Returned employees (total = {})".format(len(the_employees)))
  return the_employees

Monday, March 7, 2016

Python: Using Selenium to logon to Twitter

Recipe 1


Automatically log on to twitter.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import os
import sys
import codecs
import pprint

import unittest
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
import selenium.webdriver.support.ui as ui

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

class PythonOrgSearch(unittest.TestCase):

 def setUp(self):
  self.driver = webdriver.Firefox()

 def test_search_in_python_org(self):
  driver = self.driver
  driver.get("https://twitter.com/")

  driver.maximize_window()
  
  username = driver.find_element_by_class_name("js-username-field")
  password = driver.find_element_by_class_name("js-password-field")

  username.send_keys("myusername")
  password.send_keys("mypassword")

  wait = ui.WebDriverWait(driver, 5)
  driver.find_element_by_css_selector("button.submit.btn.primary-btn").click()


 def tearDown(self):
  #self.driver.close()
  print "close?"

if __name__ == "__main__":
 unittest.main()



Recipe 2


Logon on to twitter with Firefox (headlessly) and scrape followers from a user profile: 
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from pyvirtualdisplay import Display

def correct_url(url): 
 if not url.startswith("http://") and not url.startswith("https://"):
  url = "http://" + url
 return url

def scrollDown(browser, numberOfScrollDowns):
 body = browser.find_element_by_tag_name("body")
 while numberOfScrollDowns >=0:
  body.send_keys(Keys.PAGE_DOWN)
  numberOfScrollDowns -= 1
  if numberOfScrollDowns % 10 == 0:
   print 'remaining scroll downs ... {}'.format(numberOfScrollDowns)
 return browser

def crawl_url(url, run_headless=True):
 if run_headless:
  display = Display(visible=0, size=(1024, 768))
  display.start()

 url = correct_url(url)
 browser = webdriver.Firefox()
 browser.get(url)

 username = browser.find_element_by_class_name("js-username-field")
 password = browser.find_element_by_class_name("js-password-field")

 username.send_keys("username")
 password.send_keys("password")

 browser.find_element_by_css_selector("button.submit.btn.primary-btn").click()

 while True:
  target_set = set()

  browser = scrollDown(browser, 500)

  all_targets = browser.find_elements_by_class_name("u-linkComplex-target")
  for a_target in all_targets:
   target_set.add(a_target.text)

  fo = open('followers.dat', 'w')
  for target in target_set:
   fo.write(target + '\n')
  fo.close()

  print 'wrote {} to file'.format(len(target_set))

 browser.quit()

if __name__=='__main__':
 url = "https://twitter.com/username/followers/"
 crawl_url(url)



References

  1. [TidbitsOfProgramming] Crawling Websites that Loads Contents