Skip to content

Commit d649982

Browse files
Added a few more examples for reference
1 parent b69ec70 commit d649982

8 files changed

+707
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
__author__ = 'hanhanw'
2+
3+
import sys
4+
from pyspark import SparkConf, SparkContext
5+
from pyspark.sql.context import SQLContext
6+
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
7+
8+
9+
conf = SparkConf().setAppName("temp range sql")
10+
sc = SparkContext(conf=conf)
11+
sqlContext = SQLContext(sc)
12+
assert sc.version >= '1.5.1'
13+
14+
inputs1 = sys.argv[1]
15+
output = sys.argv[2]
16+
17+
18+
def get_range(recordings):
19+
recordings.registerTempTable('Recordings')
20+
21+
dfrange = sqlContext.sql("""
22+
SELECT r1.DateTime, r1.StationID, (r1.DataValue-r2.DataValue) AS Range FROM
23+
(SELECT StationID, DateTime, Observation, DataValue FROM Recordings
24+
WHERE Observation='TMAX') r1
25+
JOIN
26+
(SELECT StationID, DateTime, Observation, DataValue FROM Recordings
27+
WHERE Observation='TMIN') r2
28+
ON (r1.StationID = r2.StationID AND r1.DateTime = r2.DateTime)
29+
""")
30+
dfrange.registerTempTable('RangeTable')
31+
32+
df_maxrange = sqlContext.sql("""
33+
SELECT DateTime, MAX(Range) AS MaxRange FROM RangeTable
34+
GROUP BY DateTime
35+
""")
36+
df_maxrange.registerTempTable('MaxRange')
37+
38+
df_result = sqlContext.sql("""
39+
SELECT t1.DateTime as DateTime, t1.StationID as StationID, t2.MaxRange as MaxRange FROM
40+
RangeTable t1
41+
JOIN MaxRange t2
42+
ON (t1.DateTime = t2.DateTime AND t1.Range = t2.MaxRange)
43+
""")
44+
return df_result
45+
46+
47+
def main():
48+
temp_schema = StructType([
49+
StructField('StationID', StringType(), False),
50+
StructField('DateTime', StringType(), False),
51+
StructField('Observation', StringType(), False),
52+
StructField('DataValue', DoubleType(), False),
53+
StructField('MFlag', StringType(), True),
54+
StructField('QFlag', StringType(), True),
55+
StructField('SFlag', StringType(), True),
56+
StructField('OBSTime', StringType(), True),
57+
])
58+
59+
df = sqlContext.read.format('com.databricks.spark.csv').options(header='false').load(inputs1, schema=temp_schema)
60+
df = df.filter(df.QFlag == '')
61+
62+
dfrange = get_range(df)
63+
result = dfrange.rdd.map(lambda r: str(r.DateTime)+' '+str(r.StationID)+' '+str(r.MaxRange))
64+
outdata = result.sortBy(lambda r: r[0]).coalesce(1)
65+
outdata.saveAsTextFile(output)
66+
67+
if __name__ == "__main__":
68+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
__author__ = 'hanhanw'
2+
3+
import sys
4+
from pyspark import SparkConf, SparkContext
5+
from pyspark.sql.context import SQLContext
6+
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
7+
from pyspark.mllib.linalg import SparseVector
8+
from pyspark.mllib.feature import Normalizer
9+
import re
10+
import operator
11+
import math
12+
13+
conf = SparkConf().setAppName("733 A2 Q2 with cross validation")
14+
sc = SparkContext(conf=conf)
15+
assert sc.version >= '1.5.1'
16+
17+
training_inputs = sys.argv[1]
18+
testing_inputs = sys.argv[2]
19+
output = sys.argv[3]
20+
21+
def parse_point(line):
22+
ptn1 = "\(([\d\.]*),\sSparseVector\((.*?)\)\)"
23+
ptn2 = "(\d+),\s+\{(.*?)\}"
24+
m = re.search(ptn1, line)
25+
if m:
26+
label = float(m.group(1))
27+
features_str = m.group(2)
28+
mx = re.search(ptn2, features_str)
29+
num = float(mx.group(1))
30+
fs = mx.group(2)
31+
idx_set = []
32+
tfidf_scores = []
33+
if fs != '':
34+
fs_split = fs.split(', ')
35+
for f in fs_split:
36+
idx_set.append(f.split(': ')[0])
37+
tfidf_scores.append(f.split(': ')[1])
38+
sp = SparseVector(num, idx_set, tfidf_scores)
39+
LP = LabeledPoint(label, sp)
40+
return LP
41+
return None
42+
43+
44+
# Find the best step_size through cross validation, using RMSE as the error measurement
45+
def get_best_stepsize(step_sizes, training_lp, iterations, cv_trails):
46+
best_stepsize = 0
47+
lowest_RMSE = float("inf")
48+
num_folds = 4
49+
fold_set = [1]*num_folds
50+
cv_data = training_lp.randomSplit(fold_set) # 4 folds
51+
for step_size in step_sizes:
52+
total_RMSE = 0.0
53+
for i in range(num_folds):
54+
cv_testing = cv_data[i]
55+
cv_training = training_lp.subtract(cv_testing)
56+
model = LinearRegressionWithSGD.train(cv_training, iterations=iterations, step=step_size)
57+
values_and_preds = cv_testing.map(lambda p: (p.label, model.predict(p.features)))
58+
MSE = values_and_preds.map(lambda (v, p): (v-p)**2).reduce(operator.add)
59+
RMSE = math.sqrt(MSE)
60+
total_RMSE += RMSE
61+
avg_RMSE = total_RMSE/cv_trails
62+
if avg_RMSE < lowest_RMSE:
63+
lowest_RMSE = avg_RMSE
64+
best_stepsize = step_size
65+
66+
return best_stepsize
67+
68+
69+
# Get the lowest RMSE after getting the best step size through cross validation
70+
def get_best_result(best_step_size, training_lp, testing_lp, iterations):
71+
model = LinearRegressionWithSGD.train(training_lp, iterations=iterations, step=best_step_size)
72+
values_and_preds = testing_lp.map(lambda p: (p.label, model.predict(p.features)))
73+
MSE = values_and_preds.map(lambda (v, p): (v-p)**2).reduce(operator.add)
74+
RMSE = math.sqrt(MSE)
75+
76+
result_str = 'best step size got by cross validation cv: ' + str(best_step_size) + ', lowest RMSE: ' + str(RMSE)
77+
return result_str
78+
79+
80+
def main():
81+
training_data = sc.textFile(training_inputs)
82+
testing_data = sc.textFile(testing_inputs)
83+
84+
training_LP = training_data.map(parse_point).filter(lambda result: result is not None)
85+
testing_LP = testing_data.map(parse_point).filter(lambda result: result is not None)
86+
87+
t1 = range(1, 10)
88+
s2 = [t/10.0 for t in t1]
89+
step_sizes = s2
90+
iterations = 100
91+
cv_trails = 30
92+
93+
best_step_size = get_best_stepsize(step_sizes, training_LP, iterations, cv_trails)
94+
best_result = get_best_result(best_step_size, training_LP, testing_LP, iterations)
95+
96+
outdata = sc.parallelize([best_result])
97+
outdata.saveAsTextFile(output)
98+
99+
if __name__ == '__main__':
100+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
__author__ = 'hanhanw'
2+
3+
import sys
4+
from pyspark import SparkConf, SparkContext
5+
from pyspark.sql.context import SQLContext
6+
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
7+
from pyspark.mllib.linalg import SparseVector
8+
from pyspark.mllib.feature import Normalizer
9+
import re
10+
import operator
11+
import math
12+
13+
conf = SparkConf().setAppName("733 A2 Q2 normalized with cross validation")
14+
sc = SparkContext(conf=conf)
15+
assert sc.version >= '1.5.1'
16+
17+
training_inputs = sys.argv[1]
18+
testing_inputs = sys.argv[2]
19+
output = sys.argv[3]
20+
21+
def parse_point(line):
22+
ptn1 = "\(([\d\.]*),\sSparseVector\((.*?)\)\)"
23+
ptn2 = "(\d+),\s+\{(.*?)\}"
24+
m = re.search(ptn1, line)
25+
if m:
26+
label = float(m.group(1))
27+
features_str = m.group(2)
28+
mx = re.search(ptn2, features_str)
29+
num = float(mx.group(1))
30+
fs = mx.group(2)
31+
idx_set = []
32+
tfidf_scores = []
33+
if fs != '':
34+
fs_split = fs.split(', ')
35+
for f in fs_split:
36+
idx_set.append(f.split(': ')[0])
37+
tfidf_scores.append(f.split(': ')[1])
38+
sp = SparseVector(num, idx_set, tfidf_scores)
39+
LP = LabeledPoint(label, sp)
40+
return LP
41+
return None
42+
43+
44+
# Find the best step_size through cross validation, using RMSE as the error measurement
45+
def get_best_stepsize(step_sizes, training_lp, iterations, cv_trails):
46+
best_stepsize = 0
47+
lowest_RMSE = float("inf")
48+
num_folds = 4
49+
fold_set = [1]*num_folds
50+
cv_data = training_lp.randomSplit(fold_set) # 4 folds
51+
for step_size in step_sizes:
52+
total_RMSE = 0.0
53+
for i in range(num_folds):
54+
cv_testing = cv_data[i]
55+
cv_training = training_lp.subtract(cv_testing)
56+
model = LinearRegressionWithSGD.train(cv_training, iterations=iterations, step=step_size)
57+
values_and_preds = cv_testing.map(lambda p: (p.label, model.predict(p.features)))
58+
MSE = values_and_preds.map(lambda (v, p): (v-p)**2).reduce(operator.add)
59+
RMSE = math.sqrt(MSE)
60+
total_RMSE += RMSE
61+
avg_RMSE = total_RMSE/cv_trails
62+
if avg_RMSE < lowest_RMSE:
63+
lowest_RMSE = avg_RMSE
64+
best_stepsize = step_size
65+
66+
return best_stepsize
67+
68+
69+
# Gest the lowest RMSE after getting the best step size through cross validation
70+
def get_best_result(best_step_size, training_lp, testing_lp, iterations):
71+
model = LinearRegressionWithSGD.train(training_lp, iterations=iterations, step=best_step_size, regType = 'l2')
72+
values_and_preds = testing_lp.map(lambda p: (p.label, model.predict(p.features)))
73+
MSE = values_and_preds.map(lambda (v, p): (v-p)**2).reduce(operator.add)
74+
RMSE = math.sqrt(MSE)
75+
76+
result_str = 'best step size got by cross validation cv: ' + str(best_step_size) + ', lowest RMSE: ' + str(RMSE)
77+
return result_str
78+
79+
80+
def main():
81+
training_data = sc.textFile(training_inputs)
82+
testing_data = sc.textFile(testing_inputs)
83+
84+
training_LP_normalized = training_data.map(parse_point).filter(lambda result: result is not None)
85+
testing_LP_normalized = testing_data.map(parse_point).filter(lambda result: result is not None)
86+
87+
step_sizes = [0.01, 0.1, 1, 10, 200, 500, 1000, 10000]
88+
iterations = 100
89+
cv_trails = 10
90+
91+
best_step_size = get_best_stepsize(step_sizes, training_LP_normalized, iterations, cv_trails)
92+
best_result = get_best_result(best_step_size, training_LP_normalized, testing_LP_normalized, iterations)
93+
94+
outdata = sc.parallelize([best_result])
95+
outdata.saveAsTextFile(output)
96+
97+
98+
if __name__ == '__main__':
99+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
__author__ = 'hanhanw'
2+
3+
import sys
4+
from pyspark import SparkConf, SparkContext
5+
from pyspark.sql.context import SQLContext
6+
from pyspark.mllib.feature import Word2Vec
7+
import nltk
8+
import string
9+
import json
10+
11+
conf = SparkConf().setAppName("733 A2 Q4")
12+
sc = SparkContext(conf=conf)
13+
sqlContext = SQLContext(sc)
14+
assert sc.version >= '1.5.1'
15+
16+
inputs = sys.argv[1]
17+
model_output = sys.argv[2]
18+
similar_words_output = sys.argv[3]
19+
20+
21+
def clean_review(review_line):
22+
pyline = json.loads(review_line)
23+
review_text = str(pyline['reviewText'])
24+
replace_punctuation = string.maketrans(string.punctuation, ' '*len(string.punctuation))
25+
review_text = review_text.translate(replace_punctuation).split()
26+
review_words = [w.lower() for w in review_text]
27+
28+
return review_words
29+
30+
def generate_word2vec_model(doc):
31+
return Word2Vec().setVectorSize(10).setSeed(42).fit(doc)
32+
33+
def get_similar_words(model, word, output_num):
34+
st = model.findSynonyms(word, output_num)
35+
outstr = 'similiar words for ' + word + ': '
36+
for i in range(len(st)):
37+
outstr += '(' + str(st[i][0]) + ', ' + str(st[i][1]) + '), '
38+
return outstr
39+
40+
41+
def main():
42+
text = sc.textFile(inputs)
43+
44+
nltk_data_path = "[change to your nltk_data location]" # maybe changed to the sfu server path
45+
nltk.data.path.append(nltk_data_path)
46+
47+
cleaned_review = text.map(clean_review)
48+
model = generate_word2vec_model(cleaned_review)
49+
mv = model.getVectors()
50+
51+
# find similar words
52+
similar_words = []
53+
test_words = ['dog', 'happy']
54+
outnum = 2
55+
for w in test_words:
56+
outstr = get_similar_words(model, w, outnum)
57+
similar_words.append(outstr)
58+
59+
# save the model
60+
results = []
61+
for k,v in mv.items():
62+
tmp_str = str(k) + ',['
63+
for f in v:
64+
tmp_str += str(f) + ', '
65+
tmp_str += ']'
66+
results.append(tmp_str)
67+
68+
outmodel = sc.parallelize(results)
69+
out_similarwords = sc.parallelize(similar_words)
70+
outmodel.saveAsTextFile(model_output)
71+
out_similarwords.saveAsTextFile(similar_words_output)
72+
73+
if __name__ == '__main__':
74+
main()

0 commit comments

Comments
 (0)