Skip to content

Commit 39c83b6

Browse files
Made edits to Files based on Suggestions
1 parent 3244dc7 commit 39c83b6

File tree

40 files changed

+4593
-255
lines changed

40 files changed

+4593
-255
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Basics of Transformations Demo 1"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"In Spark Streaming, DStreams are treated very similarly to the RDDs that make them up. Like RDDs, there are a wide variety of data transformation options. \n",
15+
"\n",
16+
"Here are some examples of the transformations from the Spark documentation that might be useful for your purposes\n",
17+
"\n",
18+
"| Transformation | Meaning |\n",
19+
"| ------------------------------ |:-------------|\n",
20+
"| **map**(func) | Return a new DStream by passing each element of the source DStream through a function func. |\n",
21+
"| **flatMap**(func)\t| Similar to map, but each input item can be mapped to 0 or more output items. |\n",
22+
"| **filter**(func)\t| Return a new DStream by selecting only the records of the source DStream on which func returns true. |\n",
23+
"| **repartition**(numPartitions)\t| Changes the level of parallelism in this DStream by creating more or fewer partitions. |\n",
24+
"| **union**(otherStream)\t| Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |\n",
25+
"| **count**()\t| Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |\n",
26+
"| **reduce**(func)\t| Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.\n",
27+
"| **countByValue**()\t| When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.\n",
28+
"| **reduceByKey**(func, [numTasks])\t| When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.\n",
29+
"| **join**(otherStream, [numTasks])\t| When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.\n",
30+
"| **cogroup**(otherStream, [numTasks])\t| When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.\n",
31+
"\n",
32+
"\n",
33+
"If you look at the spark streaming documentation, you will also find the `transform(func)` and `updateStateByKey(func)`. We will discuss these later in the course.\n"
34+
]
35+
},
36+
{
37+
"cell_type": "markdown",
38+
"metadata": {},
39+
"source": [
40+
"### Demo"
41+
]
42+
},
43+
{
44+
"cell_type": "markdown",
45+
"metadata": {
46+
"collapsed": true
47+
},
48+
"source": [
49+
"We're going to be demoing the map and flatmap functions. The important question is \"What is the difference between the two?\"\n",
50+
"\n",
51+
"`map`: It returns a new RDD by applying a function to each element of the RDD. Function in map can return only one item.\n",
52+
"\n",
53+
"`flatMap`: Similar to map, it returns a new RDD by applying a function to each element of the RDD, but output is flattened.\n",
54+
"Also, function in flatMap can return a list of elements (0 or more)\n",
55+
"\n",
56+
"Here's an example:"
57+
]
58+
},
59+
{
60+
"cell_type": "code",
61+
"execution_count": null,
62+
"metadata": {
63+
"collapsed": true
64+
},
65+
"outputs": [],
66+
"source": [
67+
"sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()"
68+
]
69+
},
70+
{
71+
"cell_type": "code",
72+
"execution_count": null,
73+
"metadata": {
74+
"collapsed": true
75+
},
76+
"outputs": [],
77+
"source": [
78+
"sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()"
79+
]
80+
},
81+
{
82+
"cell_type": "markdown",
83+
"metadata": {},
84+
"source": [
85+
"notice o/p is flattened out in a single list"
86+
]
87+
},
88+
{
89+
"cell_type": "markdown",
90+
"metadata": {},
91+
"source": [
92+
"Here's Another Example:"
93+
]
94+
},
95+
{
96+
"cell_type": "code",
97+
"execution_count": null,
98+
"metadata": {
99+
"collapsed": true
100+
},
101+
"outputs": [],
102+
"source": [
103+
"sc.parallelize([3,4,5]).map(lambda x: [x, x*x]).collect() "
104+
]
105+
},
106+
{
107+
"cell_type": "code",
108+
"execution_count": null,
109+
"metadata": {
110+
"collapsed": true
111+
},
112+
"outputs": [],
113+
"source": [
114+
"sc.parallelize([3,4,5]).flatMap(lambda x: [x, x*x]).collect() "
115+
]
116+
},
117+
{
118+
"cell_type": "markdown",
119+
"metadata": {},
120+
"source": [
121+
"notice that the list is flattened in the latter version"
122+
]
123+
},
124+
{
125+
"cell_type": "markdown",
126+
"metadata": {},
127+
"source": [
128+
"Here's another example, this time interacting with a file\n",
129+
"\n",
130+
"There is a text file `greetings.txt` with following lines:\n",
131+
"```\n",
132+
"Good Morning\n",
133+
"Good Evening\n",
134+
"Good Day\n",
135+
"Happy Birthday\n",
136+
"Happy New Year\n",
137+
"```"
138+
]
139+
},
140+
{
141+
"cell_type": "code",
142+
"execution_count": null,
143+
"metadata": {
144+
"collapsed": true
145+
},
146+
"outputs": [],
147+
"source": [
148+
"lines = sc.textFile(\"greetings.txt\")\n",
149+
"lines.map(lambda line: line.split()).collect()"
150+
]
151+
},
152+
{
153+
"cell_type": "code",
154+
"execution_count": null,
155+
"metadata": {
156+
"collapsed": true
157+
},
158+
"outputs": [],
159+
"source": [
160+
"lines.flatMap(lambda line: line.split()).collect()"
161+
]
162+
},
163+
{
164+
"cell_type": "markdown",
165+
"metadata": {},
166+
"source": [
167+
"# References\n",
168+
"1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams\n"
169+
]
170+
},
171+
{
172+
"cell_type": "markdown",
173+
"metadata": {},
174+
"source": [
175+
" "
176+
]
177+
}
178+
],
179+
"metadata": {
180+
"kernelspec": {
181+
"display_name": "Python 3",
182+
"language": "python",
183+
"name": "python3"
184+
},
185+
"language_info": {
186+
"codemirror_mode": {
187+
"name": "ipython",
188+
"version": 3
189+
},
190+
"file_extension": ".py",
191+
"mimetype": "text/x-python",
192+
"name": "python",
193+
"nbconvert_exporter": "python",
194+
"pygments_lexer": "ipython3",
195+
"version": "3.6.3"
196+
}
197+
},
198+
"nbformat": 4,
199+
"nbformat_minor": 2
200+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Basics of Transformations Demo 2"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"As we discussed earlier, there are a wide variety of data transformations available for use on DStreams, most of which are similar to those used on the DStreams' constituent parts.\n",
15+
"\n",
16+
"As a reminder, here is the list of transformations from the previous demo again:\n",
17+
"\n",
18+
"| Transformation | Meaning |\n",
19+
"| ------------------------------ |:-------------|\n",
20+
"| **map**(func) | Return a new DStream by passing each element of the source DStream through a function func. |\n",
21+
"| **flatMap**(func)\t| Similar to map, but each input item can be mapped to 0 or more output items. |\n",
22+
"| **filter**(func)\t| Return a new DStream by selecting only the records of the source DStream on which func returns true. |\n",
23+
"| **repartition**(numPartitions)\t| Changes the level of parallelism in this DStream by creating more or fewer partitions. |\n",
24+
"| **union**(otherStream)\t| Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |\n",
25+
"| **count**()\t| Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |\n",
26+
"| **reduce**(func)\t| Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.\n",
27+
"| **countByValue**()\t| When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.\n",
28+
"| **reduceByKey**(func, [numTasks])\t| When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.\n",
29+
"| **join**(otherStream, [numTasks])\t| When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.\n",
30+
"| **cogroup**(otherStream, [numTasks])\t| When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.\n",
31+
"\n",
32+
"\n",
33+
"If you look at the spark streaming documentation, you will also find the `transform(func)` and `updateStateByKey(func)`. We will discuss these later in the course.\n",
34+
"\n",
35+
"\n",
36+
"Let's go though another example:\n",
37+
"\n"
38+
]
39+
},
40+
{
41+
"cell_type": "markdown",
42+
"metadata": {},
43+
"source": [
44+
"### Demo"
45+
]
46+
},
47+
{
48+
"cell_type": "markdown",
49+
"metadata": {
50+
"collapsed": true
51+
},
52+
"source": [
53+
"Last time we went over the `map` and `flapmap` functions. We'll explore a few other options.\n",
54+
"\n",
55+
"Suppose we have a this example text from Dr Suess's _The Cat in the Hat_."
56+
]
57+
},
58+
{
59+
"cell_type": "code",
60+
"execution_count": 1,
61+
"metadata": {},
62+
"outputs": [
63+
{
64+
"ename": "NameError",
65+
"evalue": "name 'sc' is not defined",
66+
"output_type": "error",
67+
"traceback": [
68+
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
69+
"\u001b[1;31mNameError\u001b[0m Traceback (most recent call last)",
70+
"\u001b[1;32m<ipython-input-1-8b5aca44da72>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mlines\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0msc\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mparallelize\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;34m'Its fun to have fun,'\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;34m'but you have to know how.'\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 2\u001b[0m \u001b[1;31m# Suppose then that we want to get wordcounts for this. We can use the map function from before here.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 3\u001b[0m \u001b[1;31m# map returns a new RDD containing values created by applying the supplied function to each value in the original RDD\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 4\u001b[0m \u001b[1;31m# Here we use a lambda function which replaces some common punctuation characters with spaces and convert to lower\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 5\u001b[0m \u001b[1;31m# case, producing a new RDD:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
71+
"\u001b[1;31mNameError\u001b[0m: name 'sc' is not defined"
72+
]
73+
}
74+
],
75+
"source": [
76+
"scc = Streamingcontext(\"local[2]\",\"PythonSparkApp\", 10)\n",
77+
"\n",
78+
"myFile = scc.sparkContext.textFile(\"..data/DrSeuss.txt\")\n",
79+
"wordspair = myFile.flatMap(lambda row: row.split(\" \")).map(lambda x: (x, 1)).reduceByKey(lambda x,y : x + y)\n",
80+
"oldwordcount = wordspair.reduceByKey(lambda x,y : x + y)\n",
81+
"lines = scc.socketTextStream(\"192.168.56.101\", 9999)\n",
82+
"\n",
83+
"lines = sc.parallelize(['Its fun to have fun,','but you have to know how.'])\n",
84+
"# Suppose then that we want to get wordcounts for this. We can use the map function from before here. \n",
85+
"# map returns a new RDD containing values created by applying the supplied function to each value in the original RDD\n",
86+
"# Here we use a lambda function which replaces some common punctuation characters with spaces and convert to lower \n",
87+
"# case, producing a new RDD:\n",
88+
"\n",
89+
"wordcounts1 = lines.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())\n",
90+
"wordcounts1.take(10)\n",
91+
"pprint(wordcounts1)\n",
92+
"\n",
93+
"# The flatMap function takes these input values and returns a new, flattened list. In this case, the lines are split \n",
94+
"# into words and then each word becomes a separate value in the output RDD:\n",
95+
"\n",
96+
"wordcounts2 = wordcounts1.flatMap(lambda x: x.split())\n",
97+
"wordcounts2.take(20)\n",
98+
"pprint(wordcounts2)\n",
99+
"\n",
100+
"# Expect that the input RDD contains tuples of the form (key,value). Create a new RDD containing a tuple for \n",
101+
"# each unique value of key in the input, where the value in the second position of the tuple is created by \n",
102+
"# applying the supplied lambda function to the values with the matching key in the input RDD\n",
103+
"# Here the key will be the word and lambda function will sum up the word counts for each word. The output RDD \n",
104+
"# will consist of a single tuple for each unique word in the data, where the word is stored at the first position \n",
105+
"# in the tuple and the word count is stored at the second position\n",
106+
"\n",
107+
"wordcounts3 = wordcounts2.map(lambda x: (x, 1))\n",
108+
"wordcounts3.take(20)\n",
109+
"pprint(wordcounts3)\n",
110+
"\n",
111+
"wordcounts4 = wordcounts3.reduceByKey(lambda x,y:x+y)\n",
112+
"wordcounts4.take(20)\n",
113+
"pprint(wordcounts4)\n",
114+
"\n",
115+
"# map a lambda function to the data which will swap over the first and second values in each tuple, now the word count\n",
116+
"# appears in the first position and the word in the second position\n",
117+
"\n",
118+
"wordcounts5 = wordcounts4.map(lambda x:(x[1],x[0]))\n",
119+
"wordcounts5.take(20)\n",
120+
"pprint(wordcounts5)\n",
121+
"\n",
122+
"# we sort the input RDD by the key value (i.e., the value at the first position in each tuple)\n",
123+
"# In this example the first position stores the word count so this will sort the words so that the \n",
124+
"# most frequently occurring words occur first in the RDD. The ascending=False parameter results in a descending sort order\n",
125+
"\n",
126+
"wordcounts6 = wordcounts5.sortByKey(ascending=False)\n",
127+
"wordcounts6.take(20)\n",
128+
"pprint(wordcounts6)\n"
129+
]
130+
},
131+
{
132+
"cell_type": "markdown",
133+
"metadata": {},
134+
"source": [
135+
"# References\n",
136+
"1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams "
137+
]
138+
},
139+
{
140+
"cell_type": "markdown",
141+
"metadata": {},
142+
"source": [
143+
" "
144+
]
145+
}
146+
],
147+
"metadata": {
148+
"kernelspec": {
149+
"display_name": "Python 3",
150+
"language": "python",
151+
"name": "python3"
152+
},
153+
"language_info": {
154+
"codemirror_mode": {
155+
"name": "ipython",
156+
"version": 3
157+
},
158+
"file_extension": ".py",
159+
"mimetype": "text/x-python",
160+
"name": "python",
161+
"nbconvert_exporter": "python",
162+
"pygments_lexer": "ipython3",
163+
"version": "3.6.3"
164+
}
165+
},
166+
"nbformat": 4,
167+
"nbformat_minor": 2
168+
}

0 commit comments

Comments
 (0)