-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_rdd_example.py
73 lines (53 loc) · 2.58 KB
/
spark_rdd_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
# Custom function for computing a sum.
# Inputs: a and b are values from two different RDD records/tuples.
def custom_sum(a, b):
return a+b
def split_words(string):
return "".join((char if char.isalpha() or char.isnumeric() else " ").lower() for char in string).split()
def top5(records):
top5_list = sorted(list(records), key=lambda x: x[1])[-5:]
top5_list.reverse()
return top5_list
if __name__ == "__main__":
# Check the number of arguments
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
# Set a name for the application
appName = "PythonWordCount"
# Set the input folder location to the firsta rgument of the application
# NB! sys.argv[0] is the path/name of the script file
input_folder = sys.argv[1]
# Create a new Spark application and get the Spark session object
spark = SparkSession.builder.appName(appName).getOrCreate()
# Get the spark context object.
sc = spark.sparkContext
# Load input RDD from the data folder
lines = spark.read.text(input_folder).select(input_file_name(), "value").rdd.map(tuple)
# Take 5 records from the RDD and print them out
records = lines.take(5)
for record in records:
print(record)
# Apply RDD operations to compute WordCount
# lines RDD contains lines from the input files.
# Lets split the lines into words and use flatMap operation to generate an RDD of words.
words = lines.flatMapValues(lambda line: split_words(line))
# Transform words into (word, 1) Key & Value tuples
pairs = words.map(lambda word: (word, 1))
# Apply reduceBy key to group pairs by key/word and apply sum operation on the list of values inside each group
# Apply our of customSum function as the aggregation function, but we could also have used "lambda x,y: x+y" function
counts = pairs.reduceByKey(custom_sum).map(lambda filename_word_count: (filename_word_count[0][0],
(filename_word_count[0][1], filename_word_count[1])
)).groupByKey().mapValues(top5)
# Read the data out of counts RDD
# Output is a Python list (of (key, value) tuples)
output = counts.collect()
# Print each key and value tuple inside output list
for (word, count) in output:
print(word, count)
# Stop Spark session. It is not required when running locally through PyCharm.
#spark.sparkContext.stop()
#spark.stop()