|
113 | 113 | },
|
114 | 114 | {
|
115 | 115 | "cell_type": "code",
|
116 |
| - "execution_count": 11, |
| 116 | + "execution_count": 6, |
117 | 117 | "metadata": {},
|
118 |
| - "outputs": [ |
119 |
| - { |
120 |
| - "ename": "SyntaxError", |
121 |
| - "evalue": "invalid syntax (<ipython-input-11-86135663c1da>, line 2)", |
122 |
| - "output_type": "error", |
123 |
| - "traceback": [ |
124 |
| - "\u001b[0;36m File \u001b[0;32m\"<ipython-input-11-86135663c1da>\"\u001b[0;36m, line \u001b[0;32m2\u001b[0m\n\u001b[0;31m dst = ssc.queueStream(transaction_rdd_queue) .transform(lambda rdd: rdd.join(customer_rdd)) .filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\u001b[0m\n\u001b[0m ^\u001b[0m\n\u001b[0;31mSyntaxError\u001b[0m\u001b[0;31m:\u001b[0m invalid syntax\n" |
125 |
| - ] |
126 |
| - } |
127 |
| - ], |
| 118 | + "outputs": [], |
128 | 119 | "source": [
|
129 | 120 | "# Join the streaming RDD and batch RDDs to filter out bad customers.\n",
|
130 |
| - "dst = ssc.queueStream(transaction_rdd_queue)\\\n", |
131 |
| - " .transform(lambda rdd: rdd.join(customer_rdd))\\\n", |
132 |
| - " .filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\n", |
| 121 | + "dst = ssc.queueStream(transaction_rdd_queue).transform(lambda rdd: rdd.join(customer_rdd)).filter(lambda rdd: rdd[1][1] == True)\n", |
133 | 122 | "## END OF EXERCISE SECTION ==================================\n",
|
134 | 123 | "dst.pprint()"
|
135 | 124 | ]
|
136 | 125 | },
|
137 | 126 | {
|
138 | 127 | "cell_type": "code",
|
139 |
| - "execution_count": 8, |
| 128 | + "execution_count": 7, |
140 | 129 | "metadata": {},
|
141 | 130 | "outputs": [
|
142 | 131 | {
|
143 |
| - "ename": "Py4JJavaError", |
144 |
| - "evalue": "An error occurred while calling o20.start.\n: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute\n\tat scala.Predef$.require(Predef.scala:224)\n\tat org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)\n\tat org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)\n\tat org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)\n\tat org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:748)\n", |
145 |
| - "output_type": "error", |
146 |
| - "traceback": [ |
147 |
| - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", |
148 |
| - "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", |
149 |
| - "\u001b[0;32m<ipython-input-8-90c9a179f130>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mssc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstart\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msleep\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m6\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0mssc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", |
150 |
| - "\u001b[0;32m~/spark-2.1.0-bin-hadoop2.7/python/pyspark/streaming/context.py\u001b[0m in \u001b[0;36mstart\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 194\u001b[0m \u001b[0mStart\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mexecution\u001b[0m \u001b[0mof\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mstreams\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 195\u001b[0m \"\"\"\n\u001b[0;32m--> 196\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jssc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstart\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 197\u001b[0m \u001b[0mStreamingContext\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_activeContext\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 198\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", |
151 |
| - "\u001b[0;32m~/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", |
152 |
| - "\u001b[0;32m~/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n", |
153 |
| - "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o20.start.\n: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute\n\tat scala.Predef$.require(Predef.scala:224)\n\tat org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)\n\tat org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)\n\tat org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)\n\tat org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:748)\n" |
| 132 | + "name": "stdout", |
| 133 | + "output_type": "stream", |
| 134 | + "text": [ |
| 135 | + "-------------------------------------------\n", |
| 136 | + "Time: 2018-03-03 08:00:19\n", |
| 137 | + "-------------------------------------------\n", |
| 138 | + "(0, (None, True))\n", |
| 139 | + "(8, (None, True))\n", |
| 140 | + "(4, (None, True))\n", |
| 141 | + "(2, (None, True))\n", |
| 142 | + "(6, (None, True))\n", |
| 143 | + "\n", |
| 144 | + "-------------------------------------------\n", |
| 145 | + "Time: 2018-03-03 08:00:20\n", |
| 146 | + "-------------------------------------------\n", |
| 147 | + "(0, (None, True))\n", |
| 148 | + "(8, (None, True))\n", |
| 149 | + "(4, (None, True))\n", |
| 150 | + "(2, (None, True))\n", |
| 151 | + "(6, (None, True))\n", |
| 152 | + "\n", |
| 153 | + "-------------------------------------------\n", |
| 154 | + "Time: 2018-03-03 08:00:21\n", |
| 155 | + "-------------------------------------------\n", |
| 156 | + "(0, (None, True))\n", |
| 157 | + "(8, (None, True))\n", |
| 158 | + "(4, (None, True))\n", |
| 159 | + "(2, (None, True))\n", |
| 160 | + "(6, (None, True))\n", |
| 161 | + "\n", |
| 162 | + "-------------------------------------------\n", |
| 163 | + "Time: 2018-03-03 08:00:22\n", |
| 164 | + "-------------------------------------------\n", |
| 165 | + "(0, (None, True))\n", |
| 166 | + "(8, (None, True))\n", |
| 167 | + "(4, (None, True))\n", |
| 168 | + "(2, (None, True))\n", |
| 169 | + "(6, (None, True))\n", |
| 170 | + "\n", |
| 171 | + "-------------------------------------------\n", |
| 172 | + "Time: 2018-03-03 08:00:23\n", |
| 173 | + "-------------------------------------------\n", |
| 174 | + "(0, (None, True))\n", |
| 175 | + "(8, (None, True))\n", |
| 176 | + "(4, (None, True))\n", |
| 177 | + "(2, (None, True))\n", |
| 178 | + "(6, (None, True))\n", |
| 179 | + "\n", |
| 180 | + "-------------------------------------------\n", |
| 181 | + "Time: 2018-03-03 08:00:24\n", |
| 182 | + "-------------------------------------------\n", |
| 183 | + "\n" |
154 | 184 | ]
|
155 | 185 | }
|
156 | 186 | ],
|
|
0 commit comments