Skip to content

Commit ea9d1a4

Browse files
Updated Edits
1 parent 39c83b6 commit ea9d1a4

File tree

37 files changed

+386
-2042
lines changed

37 files changed

+386
-2042
lines changed

Diff for: spark_streaming_basics/.ipynb_checkpoints/04_Basics of Transformations Exercise-checkpoint.ipynb

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@
5050
"source": [
5151
"lines = sc.textFile(\"greetings.txt\")\n",
5252
"\n",
53-
"# TODO: Use any of the functions above to create a script that generates a Wordcount of the file greetings.txt.\n",
54-
"sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())"
53+
"# TODO: Use any of the functions above to create a script that generates a Wordcount of the file greetings.txt.\n"
5554
]
5655
},
5756
{

Diff for: spark_streaming_basics/.ipynb_checkpoints/06_Transformation Operation Exercise-checkpoint.ipynb

+1-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@
6262
"rdd2 = rdd1.map(lambda x:(x[1], x[0]))\n",
6363
"\n",
6464
"##### TODO: Creat a `newRdd` variable with the elements from RDD2 that have the same second value of RDD1\n",
65-
"newRdd = rdd2.transform(lambda rdd: rdd.join(rdd.map(lambda x:(x[0],(x[1],x[2])))))\n",
66-
"newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect()\n"
65+
"\n"
6766
]
6867
},
6968
{

Diff for: spark_streaming_basics/.ipynb_checkpoints/08_Window Operations Exercise-checkpoint.ipynb

+3-5
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,9 @@
7171
"ip_bytes_request_count_dstream = ip_count.join(ip_bytes_sum_dstream)\n",
7272
"ip_bytes_request_count_dstream.pprint(num = 30)\n",
7373
"\n",
74-
"####### TODO: use window()to count data over a window##########################\n",
75-
"access_logs_window = access_log_dstream.window(windowDuration = 6, slideDuration=4) \n",
76-
"window_counts = access_logs_window.count()\n",
77-
"print( \" Window count: \")\n",
78-
"window_counts.pprint()\n",
74+
"####### TODO: use window()to count data over a window ##########################\n",
75+
"\n",
76+
"\n",
7977
"\n",
8078
"####### Exercise End ##########################################################\n",
8179
"\n",

Diff for: spark_streaming_basics/.ipynb_checkpoints/10_countByWindow transformation Exercise-checkpoint.ipynb

+1-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@
8282
"\n",
8383
"####### TODO: Windowed count operation using countByWindow() ###########\n",
8484
"\n",
85-
"request_count = access_log_dstream.countByWindow(windowDuration = 6, slideDuration=4)\n",
86-
"request_count.pprint()\n",
85+
"\n",
8786
"\n",
8887
"####### Exercise End ##########################################################\n",
8988
"\n",

Diff for: spark_streaming_basics/.ipynb_checkpoints/12_reduceByKeyAndWindow transformation Exercise-checkpoint.ipynb

-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@
8282
"\n",
8383
"####### TODO: use reduceByKeyAndWindow() to get Ip counts per window ###########\n",
8484
"\n",
85-
"ip_count_dstream = ip_dstream.reduceByKeyAndWindow(func = lambda x,y: x+y, invFunc = lambda x,y: x-y, windowDuration = 6, slideDuration=4)\n",
86-
"ip_count_dstream.pprint(num=30)\n",
8785
"\n",
8886
"####### Exercise End ##########################################################\n",
8987
"\n",

Diff for: spark_streaming_basics/.ipynb_checkpoints/14_countByValueAndWindow Transformation Exercise-checkpoint.ipynb

+1-3
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,7 @@
7070
"ip_bytes_request_count_dstream.pprint(num = 30)\n",
7171
"\n",
7272
"####### TODO: Windowed count operation using countByValueAndWindow() ###########\n",
73-
"ip_dstream = access_log_dstream.map(lambda entry: entry.ip)\n",
74-
"ip_address_request_count = ip_dstream.countByValueAndWindow(windowDuration = 6, slideDuration=4)\n",
75-
"ip_address_request_count.pprint()\n",
73+
"\n",
7674
"\n",
7775
"####### Exercise End ##########################################################\n",
7876
"\n",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# foreachRDD Exercise"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"Explain foreachRDD and the basic usage about foreachRDD"
15+
]
16+
},
17+
{
18+
"cell_type": "markdown",
19+
"metadata": {},
20+
"source": [
21+
"### Exercise"
22+
]
23+
},
24+
{
25+
"cell_type": "code",
26+
"execution_count": null,
27+
"metadata": {
28+
"collapsed": true
29+
},
30+
"outputs": [],
31+
"source": [
32+
"import pyspark\n",
33+
"import pyspark.streaming\n",
34+
"from pyspark.streaming import SparkContext\n",
35+
"from pyspark.streaming import StreamingContext\n",
36+
"import utils\n",
37+
"import twitter_app\n",
38+
"\n",
39+
"\n",
40+
"twitter_app()\n",
41+
"\n",
42+
"ssc = StreamingContext(\"local[*]\", \"SaveTweets\", Seconds(1))\n",
43+
"\n",
44+
"tweets = TwitterUtils.createStream(ssc, None)\n",
45+
" \n",
46+
"# Now extract the text of each status update into RDD's using map()\n",
47+
"statuses = tweets.map(lambda status: status.getText())\n",
48+
"\n",
49+
"totalTweets = int(0)\n",
50+
" \n",
51+
"def twitterStatus(rdd, time):\n",
52+
" \n",
53+
" if rdd.count() > 0:\n",
54+
" \n",
55+
" repartitionedRDD = rdd.repartition(1).cache()\n",
56+
" repartitionedRDD.saveAsTextFile(\"Tweets_\" + time.milliseconds.toString)\n",
57+
" \n",
58+
" totalTweets += repartitionedRDD.count()\n",
59+
" print(\"Tweet count: \" + totalTweets)\n",
60+
" if totalTweets > 1000:\n",
61+
" sys.exit(0)\n",
62+
"\n",
63+
"# TODO: use ForeachRDD to process the 'twitterStatus()' function\n",
64+
"\n",
65+
"\n",
66+
"###########################\n",
67+
" \n",
68+
"ssc.start()\n",
69+
"ssc.awaitTermination()"
70+
]
71+
},
72+
{
73+
"cell_type": "markdown",
74+
"metadata": {},
75+
"source": [
76+
"## References\n",
77+
"1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd"
78+
]
79+
},
80+
{
81+
"cell_type": "markdown",
82+
"metadata": {},
83+
"source": [
84+
" "
85+
]
86+
}
87+
],
88+
"metadata": {
89+
"kernelspec": {
90+
"display_name": "Python 3",
91+
"language": "python",
92+
"name": "python3"
93+
},
94+
"language_info": {
95+
"codemirror_mode": {
96+
"name": "ipython",
97+
"version": 3
98+
},
99+
"file_extension": ".py",
100+
"mimetype": "text/x-python",
101+
"name": "python",
102+
"nbconvert_exporter": "python",
103+
"pygments_lexer": "ipython3",
104+
"version": "3.6.3"
105+
}
106+
},
107+
"nbformat": 4,
108+
"nbformat_minor": 2
109+
}

Diff for: spark_streaming_basics/.ipynb_checkpoints/19_SQL Operations Exercise-checkpoint.ipynb

+4-6
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,16 @@
7171
"\n",
7272
" try:\n",
7373
" # TODO: Get the singleton instance of SparkSession\n",
74-
" spark = getSparkSessionInstance(rdd.context.getConf())\n",
74+
" \n",
7575
"\n",
7676
" # TODO: Convert RDD[String] to RDD[Row] to DataFrame\n",
77-
" rowRdd = rdd.map(lambda w: Row(word=w))\n",
78-
" wordsDataFrame = spark.createDataFrame(rowRdd)\n",
7977
"\n",
8078
" # TODO: Creates a temporary view using the DataFrame.\n",
81-
" wordsDataFrame.createOrReplaceTempView(\"words\")\n",
79+
" \n",
8280
"\n",
8381
" # TODO: Do word count on table using SQL and print it\n",
84-
" wordCountsDataFrame = spark.sql(\"select word, count(*) as total from words group by word\")\n",
85-
" wordCountsDataFrame.show()\n",
82+
" \n",
83+
" \n",
8684
" except:\n",
8785
" pass\n",
8886
"\n",

Diff for: spark_streaming_basics/Basics of Transformations Exercise.ipynb renamed to spark_streaming_basics/04_Basics of Transformations Exercise - Solution.ipynb

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
"| **reduceByKey**(func, [numTasks])\t| When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.\n",
3535
"| **join**(otherStream, [numTasks])\t| When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.\n",
3636
"| **cogroup**(otherStream, [numTasks])\t| When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.\n",
37-
"| **transform**(func)\t| Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.\n",
38-
"| **updateStateByKey**(func)\t| Return a new \"state\" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.\n",
37+
"\n",
38+
"\n",
39+
"If you look at the spark streaming documentation, you will also find the `transform(func)` and `updateStateByKey(func)`. We will discuss these later in the course.\n",
3940
"\n"
4041
]
4142
},

Diff for: spark_streaming_basics/04_Basics of Transformations Exercise.ipynb

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@
5050
"source": [
5151
"lines = sc.textFile(\"greetings.txt\")\n",
5252
"\n",
53-
"# TODO: Use any of the functions above to create a script that generates a Wordcount of the file greetings.txt.\n",
54-
"sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())"
53+
"# TODO: Use any of the functions above to create a script that generates a Wordcount of the file greetings.txt.\n"
5554
]
5655
},
5756
{

Diff for: spark_streaming_basics/Transformation Operation Demo.ipynb renamed to spark_streaming_basics/06_Transformation Operation Exercise - Solution.ipynb

+11-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"cell_type": "markdown",
55
"metadata": {},
66
"source": [
7-
"# Transformation Operation Demo"
7+
"# Transformation Operation Exercise"
88
]
99
},
1010
{
@@ -27,8 +27,13 @@
2727
"cell_type": "markdown",
2828
"metadata": {},
2929
"source": [
30-
"### Demo\n",
31-
"\n",
30+
"### Exercise"
31+
]
32+
},
33+
{
34+
"cell_type": "markdown",
35+
"metadata": {},
36+
"source": [
3237
"Suppose we have two rdds that we need to join together. They are RDD1\n",
3338
"\n",
3439
"RDD1\n",
@@ -54,7 +59,9 @@
5459
"source": [
5560
"rdd = sc.parallelize([(u'2', u'100', 2),(u'1', u'300', 1),(u'1', u'200', 1)])\n",
5661
"rdd1 = sc.parallelize([(u'1', u'2'), (u'1', u'3')])\n",
57-
"rdd2 = rdd1.map(lambda x:(x[1],x[0]))\n",
62+
"rdd2 = rdd1.map(lambda x:(x[1], x[0]))\n",
63+
"\n",
64+
"##### TODO: Creat a `newRdd` variable with the elements from RDD2 that have the same second value of RDD1\n",
5865
"newRdd = rdd2.transform(lambda rdd: rdd.join(rdd.map(lambda x:(x[0],(x[1],x[2])))))\n",
5966
"newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect()\n"
6067
]

Diff for: spark_streaming_basics/06_Transformation Operation Exercise.ipynb

+1-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@
6262
"rdd2 = rdd1.map(lambda x:(x[1], x[0]))\n",
6363
"\n",
6464
"##### TODO: Creat a `newRdd` variable with the elements from RDD2 that have the same second value of RDD1\n",
65-
"newRdd = rdd2.transform(lambda rdd: rdd.join(rdd.map(lambda x:(x[0],(x[1],x[2])))))\n",
66-
"newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect()\n"
65+
"\n"
6766
]
6867
},
6968
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Window Operations Exercise"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"1. What is Window Operations(better with some graphs)\n",
15+
"2. Explain parameters (window length and sliding interval)\n",
16+
"3. Some of the popular Window operations\n",
17+
" * Window\n",
18+
" * countByWindow\n",
19+
" * reduceByKeyAndWindow\n",
20+
" * countByValueAndWindow\n"
21+
]
22+
},
23+
{
24+
"cell_type": "markdown",
25+
"metadata": {},
26+
"source": [
27+
"### Exercise"
28+
]
29+
},
30+
{
31+
"cell_type": "code",
32+
"execution_count": null,
33+
"metadata": {
34+
"collapsed": true
35+
},
36+
"outputs": [],
37+
"source": [
38+
"from pyspark import SparkConf, SparkContext\n",
39+
"from pyspark.streaming import StreamingContext\n",
40+
"import sys\n",
41+
"import random\n",
42+
"from apache_log_parser import ApacheAccessLog\n",
43+
"\n",
44+
"random.seed(15)\n",
45+
"\n",
46+
"if len(sys.argv) != 2:\n",
47+
" print('Please provide the path to Apache log file')\n",
48+
" print('10_10.py <path_to_log_directory>')\n",
49+
" sys.exit(2)\n",
50+
"\n",
51+
"conf = (SparkConf().setMaster(\"local[4]\").setAppName(\"log processor\").set(\"spark.executor.memory\", \"2g\"))\n",
52+
"\n",
53+
"sc = SparkContext(conf=conf)\n",
54+
"\n",
55+
"ssc = StreamingContext(sc, 2)\n",
56+
"ssc.checkpoint(\"checkpoint\")\n",
57+
" \n",
58+
"directory = sys.argv[1]\n",
59+
"print(directory)\n",
60+
"\n",
61+
"# create DStream from text file\n",
62+
"# Note: the spark streaming checks for any updates to this directory.\n",
63+
"# So first, start this program, and then copy the log file logs/access_log.log to 'directory' location\n",
64+
"log_data = ssc.textFileStream(directory)\n",
65+
"access_log_dstream = log_data.map(ApacheAccessLog.parse_from_log_line).filter(lambda parsed_line: parsed_line is not None)\n",
66+
"ip_dstream = access_log_dstream.map(lambda parsed_line: (parsed_line.ip, 1)) \n",
67+
"ip_count = ip_dstream.reduceByKey(lambda x,y: x+y)\n",
68+
"ip_count.pprint(num = 30)\n",
69+
"ip_bytes_dstream = access_log_dstream.map(lambda parsed_line: (parsed_line.ip, parsed_line.content_size))\n",
70+
"ip_bytes_sum_dstream = ip_bytes_dstream.reduceByKey(lambda x,y: x+y)\n",
71+
"ip_bytes_request_count_dstream = ip_count.join(ip_bytes_sum_dstream)\n",
72+
"ip_bytes_request_count_dstream.pprint(num = 30)\n",
73+
"\n",
74+
"####### TODO: use window()to count data over a window##########################\n",
75+
"access_logs_window = access_log_dstream.window(windowDuration = 6, slideDuration=4) \n",
76+
"window_counts = access_logs_window.count()\n",
77+
"print( \" Window count: \")\n",
78+
"window_counts.pprint()\n",
79+
"\n",
80+
"####### Exercise End ##########################################################\n",
81+
"\n",
82+
"ssc.start() \n",
83+
"ssc.awaitTermination()\n"
84+
]
85+
},
86+
{
87+
"cell_type": "markdown",
88+
"metadata": {},
89+
"source": [
90+
"## References\n",
91+
"1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams"
92+
]
93+
},
94+
{
95+
"cell_type": "markdown",
96+
"metadata": {},
97+
"source": [
98+
" "
99+
]
100+
}
101+
],
102+
"metadata": {
103+
"kernelspec": {
104+
"display_name": "Python 3",
105+
"language": "python",
106+
"name": "python3"
107+
},
108+
"language_info": {
109+
"codemirror_mode": {
110+
"name": "ipython",
111+
"version": 3
112+
},
113+
"file_extension": ".py",
114+
"mimetype": "text/x-python",
115+
"name": "python",
116+
"nbconvert_exporter": "python",
117+
"pygments_lexer": "ipython3",
118+
"version": "3.6.3"
119+
}
120+
},
121+
"nbformat": 4,
122+
"nbformat_minor": 2
123+
}

0 commit comments

Comments
 (0)