/ BigData

Spark: PySpark Examples

Example 1: Top 3 Occurrences:

In this tutorial we will generate 400,000 lines of data that consists of Name,Country,JobTitle

Then we have a scenario where we would like to find out the Top 3 Occurences from our dataset.

Our Application to Generate Data:

#!/usr/bin/python

from faker import Factory
import time

timestart = time.strftime("%Y%m%d%H%M%S")
destFile = "dataset-" + timestart + ".txt"
print "Generating File: " + destFile
numberRuns = 100000

destFile = "dataset-" + timestart + ".txt"
file_object = open(destFile,"a")

def create_names(fake):
    for x in range(numberRuns):
        genName =  fake.first_name()
        genCountry = fake.country()
	genJob = fake.job()
        file_object.write(genName + "," + genCountry + "," + genJob + "\n" )

if __name__ == "__main__":
    fake = Factory.create()
    create_names(fake)
    file_object.close()

Our PySpark Application:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("RuanSparkApp01")
sc = SparkContext(conf=conf)
lines = sc.textFile("dataset-*")

wordCounts = lines.flatMap(lambda line: line.strip().split(",")) \
     .map(lambda word: (word, 1)) \
     .reduceByKey(lambda a, b: a + b, 1) \
     .map(lambda (a, b): (b, a)) \
     .sortByKey(1, 1) \
     .map(lambda (a, b): (b, a))

output = wordCounts.map(lambda (k,v): (v,k)).sortByKey(False).take(3)

for (count, word) in output:
    print "%i: %s" % (count, word)

First, we will generate our data:

$ for x in {1..4}; do python generate.py; done

Now that we have our dataset generated, run the pyspark app:

$ spark-submit spark-app.py

Then we will get the output that will more or less look like this:

1821: Engineer
943: Teacher
808: Scientist

Example 2: How many from New Zealand:

We will use the same dataset and below our pyspark application:

#!/usr/bin/python
from pyspark import SparkContext, SparkConf

logDataset = "dataset*"

conf = SparkConf().setAppName("RuanSparkApp01")
sc = SparkContext(conf=conf)
logActionData = sc.textFile(logDataset).cache()

findCountry = logActionData.filter(lambda s: 'New Zealand' in s).count()
print("New Zealand has been found: %i " % (findCountry) + "times")

And our output will look like this:

New Zealand has been found: 178 times

Note: This post is a still in progress, so I will add more examples as the time goes by