Skip to content

Commit 3244dc7

Browse files
Commit of Session 2 Files
1 parent 65765cf commit 3244dc7

File tree

64 files changed

+9391
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+9391
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Accumulators Demo"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"[Accumulators](https://spark.apache.org/docs/latest/programming-guide.html#accumulators) and [Broadcast variables](https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](https://spark.apache.org/docs/latest/programming-guide.html#accumulators) or [Broadcast variables](https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables) as well, you’ll have to create lazily instantiated singleton instances for [Accumulators](https://spark.apache.org/docs/latest/programming-guide.html#accumulators) and [Broadcast variables](https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.\n",
15+
"```python\n",
16+
"def getWordBlacklist(sparkContext):\n",
17+
" if (\"wordBlacklist\" not in globals()):\n",
18+
" globals()[\"wordBlacklist\"] = sparkContext.broadcast([\"a\", \"b\", \"c\"])\n",
19+
" return globals()[\"wordBlacklist\"]\n",
20+
"\n",
21+
"def getDroppedWordsCounter(sparkContext):\n",
22+
" if (\"droppedWordsCounter\" not in globals()):\n",
23+
" globals()[\"droppedWordsCounter\"] = sparkContext.accumulator(0)\n",
24+
" return globals()[\"droppedWordsCounter\"]\n",
25+
"\n",
26+
"def echo(time, rdd):\n",
27+
" # Get or register the blacklist Broadcast\n",
28+
" blacklist = getWordBlacklist(rdd.context)\n",
29+
" # Get or register the droppedWordsCounter Accumulator\n",
30+
" droppedWordsCounter = getDroppedWordsCounter(rdd.context)\n",
31+
"\n",
32+
" # Use blacklist to drop words and use droppedWordsCounter to count them\n",
33+
" def filterFunc(wordCount):\n",
34+
" if wordCount[0] in blacklist.value:\n",
35+
" droppedWordsCounter.add(wordCount[1])\n",
36+
" False\n",
37+
" else:\n",
38+
" True\n",
39+
"\n",
40+
" counts = \"Counts at time %s %s\" % (time, rdd.filter(filterFunc).collect())\n",
41+
"\n",
42+
"wordCounts.foreachRDD(echo)\n",
43+
"```\n",
44+
"See the full [source code](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/recoverable_network_wordcount.py)"
45+
]
46+
},
47+
{
48+
"cell_type": "markdown",
49+
"metadata": {},
50+
"source": [
51+
"Accumulators\n",
52+
"What is Accumulators and usage of Accumulators\n",
53+
"DEMO: Do a demo with Accumulators\n",
54+
"EXERCISE: Give an Exercise with Accumulators\n",
55+
"Fault-tolerance\n",
56+
"https://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics\n"
57+
]
58+
},
59+
{
60+
"cell_type": "markdown",
61+
"metadata": {},
62+
"source": [
63+
"### Demo"
64+
]
65+
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": 1,
69+
"metadata": {},
70+
"outputs": [
71+
{
72+
"ename": "ModuleNotFoundError",
73+
"evalue": "No module named 'pyspark'",
74+
"output_type": "error",
75+
"traceback": [
76+
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
77+
"\u001b[1;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)",
78+
"\u001b[1;32m<ipython-input-1-72dde181500f>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 37\u001b[0m \u001b[1;32mimport\u001b[0m \u001b[0msys\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 38\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 39\u001b[1;33m \u001b[1;32mfrom\u001b[0m \u001b[0mpyspark\u001b[0m \u001b[1;32mimport\u001b[0m \u001b[0mSparkContext\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 40\u001b[0m \u001b[1;32mfrom\u001b[0m \u001b[0mpyspark\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstreaming\u001b[0m \u001b[1;32mimport\u001b[0m \u001b[0mStreamingContext\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 41\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
79+
"\u001b[1;31mModuleNotFoundError\u001b[0m: No module named 'pyspark'"
80+
]
81+
}
82+
],
83+
"source": [
84+
"#\n",
85+
"# Licensed to the Apache Software Foundation (ASF) under one or more\n",
86+
"# contributor license agreements. See the NOTICE file distributed with\n",
87+
"# this work for additional information regarding copyright ownership.\n",
88+
"# The ASF licenses this file to You under the Apache License, Version 2.0\n",
89+
"# (the \"License\"); you may not use this file except in compliance with\n",
90+
"# the License. You may obtain a copy of the License at\n",
91+
"#\n",
92+
"# http://www.apache.org/licenses/LICENSE-2.0\n",
93+
"#\n",
94+
"# Unless required by applicable law or agreed to in writing, software\n",
95+
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
96+
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
97+
"# See the License for the specific language governing permissions and\n",
98+
"# limitations under the License.\n",
99+
"#\n",
100+
"\n",
101+
"\"\"\"\n",
102+
" Counts words in text encoded with UTF8 received from the network every second.\n",
103+
" Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>\n",
104+
" <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive\n",
105+
" data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data\n",
106+
" <output-file> file to which the word counts will be appended\n",
107+
" To run this on your local machine, you need to first run a Netcat server\n",
108+
" `$ nc -lk 9999`\n",
109+
" and then run the example\n",
110+
" `$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \\\n",
111+
" localhost 9999 ~/checkpoint/ ~/out`\n",
112+
" If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create\n",
113+
" a new StreamingContext (will print \"Creating new context\" to the console). Otherwise, if\n",
114+
" checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from\n",
115+
" the checkpoint data.\n",
116+
"\"\"\"\n",
117+
"from __future__ import print_function\n",
118+
"\n",
119+
"import os\n",
120+
"import sys\n",
121+
"\n",
122+
"from pyspark import SparkContext\n",
123+
"from pyspark.streaming import StreamingContext\n",
124+
"\n",
125+
"\n",
126+
"# Get or register a Broadcast variable\n",
127+
"def getWordBlacklist(sparkContext):\n",
128+
" if ('wordBlacklist' not in globals()):\n",
129+
" globals()['wordBlacklist'] = sparkContext.broadcast([\"a\", \"b\", \"c\"])\n",
130+
" return globals()['wordBlacklist']\n",
131+
"\n",
132+
"\n",
133+
"# Get or register an Accumulator\n",
134+
"def getDroppedWordsCounter(sparkContext):\n",
135+
" if ('droppedWordsCounter' not in globals()):\n",
136+
" globals()['droppedWordsCounter'] = sparkContext.accumulator(0)\n",
137+
" return globals()['droppedWordsCounter']\n",
138+
"\n",
139+
"\n",
140+
"def createContext(host, port, outputPath):\n",
141+
" # If you do not see this printed, that means the StreamingContext has been loaded\n",
142+
" # from the new checkpoint\n",
143+
" print(\"Creating new context\")\n",
144+
" if os.path.exists(outputPath):\n",
145+
" os.remove(outputPath)\n",
146+
" sc = SparkContext(appName=\"PythonStreamingRecoverableNetworkWordCount\")\n",
147+
" ssc = StreamingContext(sc, 1)\n",
148+
"\n",
149+
" # Create a socket stream on target ip:port and count the\n",
150+
" # words in input stream of \\n delimited text (eg. generated by 'nc')\n",
151+
" lines = ssc.socketTextStream(host, port)\n",
152+
" words = lines.flatMap(lambda line: line.split(\" \"))\n",
153+
" wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)\n",
154+
"\n",
155+
" def echo(time, rdd):\n",
156+
" # Get or register the blacklist Broadcast\n",
157+
" blacklist = getWordBlacklist(rdd.context)\n",
158+
" # Get or register the droppedWordsCounter Accumulator\n",
159+
" droppedWordsCounter = getDroppedWordsCounter(rdd.context)\n",
160+
"\n",
161+
" # Use blacklist to drop words and use droppedWordsCounter to count them\n",
162+
" def filterFunc(wordCount):\n",
163+
" if wordCount[0] in blacklist.value:\n",
164+
" droppedWordsCounter.add(wordCount[1])\n",
165+
" False\n",
166+
" else:\n",
167+
" True\n",
168+
"\n",
169+
" counts = \"Counts at time %s %s\" % (time, rdd.filter(filterFunc).collect())\n",
170+
" print(counts)\n",
171+
" print(\"Dropped %d word(s) totally\" % droppedWordsCounter.value)\n",
172+
" print(\"Appending to \" + os.path.abspath(outputPath))\n",
173+
" with open(outputPath, 'a') as f:\n",
174+
" f.write(counts + \"\\n\")\n",
175+
"\n",
176+
" wordCounts.foreachRDD(echo)\n",
177+
" return ssc\n",
178+
"\n",
179+
"if __name__ == \"__main__\":\n",
180+
" if len(sys.argv) != 5:\n",
181+
" print(\"Usage: recoverable_network_wordcount.py <hostname> <port> \"\n",
182+
" \"<checkpoint-directory> <output-file>\", file=sys.stderr)\n",
183+
" exit(-1)\n",
184+
" host, port, checkpoint, output = sys.argv[1:]\n",
185+
" ssc = StreamingContext.getOrCreate(checkpoint,\n",
186+
" lambda: createContext(host, int(port), output))\n",
187+
" ssc.start()\n",
188+
" ssc.awaitTermination()"
189+
]
190+
},
191+
{
192+
"cell_type": "markdown",
193+
"metadata": {},
194+
"source": [
195+
"## References\n",
196+
"1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints"
197+
]
198+
},
199+
{
200+
"cell_type": "markdown",
201+
"metadata": {},
202+
"source": [
203+
" "
204+
]
205+
}
206+
],
207+
"metadata": {
208+
"kernelspec": {
209+
"display_name": "Python 3",
210+
"language": "python",
211+
"name": "python3"
212+
},
213+
"language_info": {
214+
"codemirror_mode": {
215+
"name": "ipython",
216+
"version": 3
217+
},
218+
"file_extension": ".py",
219+
"mimetype": "text/x-python",
220+
"name": "python",
221+
"nbconvert_exporter": "python",
222+
"pygments_lexer": "ipython3",
223+
"version": "3.6.1"
224+
}
225+
},
226+
"nbformat": 4,
227+
"nbformat_minor": 2
228+
}

0 commit comments

Comments
 (0)