-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata.py
362 lines (287 loc) · 11.5 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import requests
import json
from datetime import datetime
import os
from difflib import SequenceMatcher
from sklearn.model_selection import train_test_split
import Preprocessor
bearer_token = "AAAAAAAAAAAAAAAAAAAAAMINZgEAAAAA5x%2Fnm4e%2FYuAaae1N1b7F7czW%2FN8" \
"%3Dq5K6FTJGdugV5loBhz7iyt2zTgE2nCR4rYUSYoDsdRNZtBgu49"
search_url = 'https://api.twitter.com/2/tweets/search/recent'
# Set authorization in request header
def bearer_oauth(r):
r.headers["Authorization"] = f"Bearer {bearer_token}"
r.headers["User-Agent"] = "laurensThesis"
return r
# Create connection between Twitter API and client side
def connect_to_endpoint(url, params):
response = requests.get(url, auth=bearer_oauth, params=params)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()
# Pull tweets from Twitter with general query
def request(limit):
query_params = {'query': '-project (#BTC OR #bitcoin OR #cardano OR #XRP OR #ETH) -Airdrop -#Airdrop -betting '
'-giveaway -NFT lang:en -is:retweet', 'max_results': limit}
json_response = connect_to_endpoint(search_url, query_params)
filename = "Data/"+datetime.now().strftime("%d-%m-%H-%M")+".json"
f = open(filename, 'w')
f.write(json.dumps(json_response, indent=0, sort_keys=True))
f.close()
print(f"{query_params['max_results']} tweets pulled.")
# Pull tweets from Twitter with query for tweets with media
def request_media(limit):
query_params = {'query': 'has:media -project (#BTC OR #bitcoin OR #cardano OR #XRP OR #ETH) '
'-Airdrop -#Airdrop -betting -giveaway -NFT lang:en -is:retweet', 'max_results': limit}
json_response = connect_to_endpoint(search_url, query_params)
filename = "Data/"+datetime.now().strftime("%d-%m-%H-%M")+".json"
f = open(filename, 'w')
f.write(json.dumps(json_response, indent=0, sort_keys=True))
f.close()
print(f"{query_params['max_results']} media tweets pulled.")
# Pull tweets from Twitter with query for negative sentiment tweets
def request_neg(limit):
query_params = {'query': '-project (#BTC OR #bitcoin OR #cardano OR #XRP OR #ETH) '
'(down OR bear OR bearish OR unstable OR weak OR crash OR down by OR 📉 OR scam OR 💸 '
'OR desperate OR lost OR risky OR sad OR decreasing) -Airdrop -#Airdrop -betting -giveaway'
'-NFT lang:en -is:retweet', 'max_results': limit}
json_response = connect_to_endpoint(search_url, query_params)
filename = "Data/"+datetime.now().strftime("%d-%m-%H-%M")+".json"
f = open(filename, 'w')
f.write(json.dumps(json_response, indent=0, sort_keys=True))
f.close()
print(f"{query_params['max_results']} negative tweets pulled.")
# Merge all json files in Data folder into one json file called 'Tweets.json'
def merge():
directory = "Data"
data = []
datastructure = {'data': data}
duplicate_check = []
for filename in os.listdir(directory):
f = os.path.join(directory, filename)
if os.path.isfile(f) and filename != "Tweets.json":
current = open(f, 'r')
values = json.load(current)
for tweet in range(0, values['meta']['result_count']):
tweet_id = str(values['data'][tweet]['id'])
tweet = values['data'][tweet]['text']
if project_block(tweet) and character_spam_check(tweet) and duplicate_checker(tweet, duplicate_check):
data.append({'id': tweet_id, 'text': tweet, 'label': "?"})
duplicate_check.append(tweet)
print(len(data))
f = open("Data/Tweets.json", 'w')
f.write(json.dumps(datastructure, indent=0, sort_keys=True))
f.close()
path, dirs, files = next(os.walk("Backups"))
file_count = len(files)
filename = f"Backups/Tweets{file_count}.json"
f = open(filename, 'w')
f.write(json.dumps(datastructure, indent=0, sort_keys=True))
f.close()
print(f"{len(duplicate_check)} tweets merged into Tweets.json")
# Block tweets that match more than 90% to tweets that are already included.
def duplicate_checker(string, tweet_list):
for tweet in tweet_list:
if SequenceMatcher(None, tweet, string).ratio() > 0.9:
return False
return True
# Block tweets containing excess amount of # or @ for Tweets.json merge.
def character_spam_check(string):
has_counter = 0
at_counter = 0
for character in string:
if character == '#':
has_counter += 1
if character == '@':
at_counter += 1
if has_counter > 5 or at_counter > 3:
return False
return True
# Block tweets containing the word 'project' for Tweets.json merge
def project_block(string):
if "project" in string:
return False
return True
# Compute and insert rank values for tweets in Tweets.json
def rank():
jsonfile = open('Data/Tweets.json', 'r')
values = json.load(jsonfile)
jsonfile.close()
for i in range(len(values['data'])):
values['data'][i]['rank'] = score_tweet(values['data'][i]['text'])
# sort descending score
values['data'] = sorted(values['data'], key=lambda x: x['rank'], reverse=True)
f = open("Data/Tweets.json", 'w')
f.write(json.dumps(values, indent=0, sort_keys=True))
f.close()
print("Tweets.json is now ranked")
# Return the score for tweet that is passed as parameter 'string'
def score_tweet(string):
score = 0
tags = ["bitcoin", "cardano", "ethereum", "ripple", "avax", "avalanche", "#crypto", "#bitcoin", "#btc",
"#eth", "#xrp", "#cryptocurrency", "#altcoin", "$btc", "$eth", "$xrp", "$sol", "$luna", "$ada",
"$usdt", "$avax"]
unrelated = [" nft ", "giveaway", "airdrop", "lightning", "#coinhuntworld", "#MasterMetals", "down", "up",
"bear", "bull", "long", "short", "change", "fear", "%"]
for tag in tags:
if tag in string.lower():
score += 1
for token in unrelated:
if token in string.lower():
score -= 5
if "$" in string:
score += 1
if "%" in string:
score += 1
return score
# Print the distribution of ranks in Tweets.json
def rank_distribution():
jsonfile = open('Data/Tweets.json', 'r')
values = json.load(jsonfile)
jsonfile.close()
distribution = {}
for i in range(len(values['data'])):
tweet_rank = str(values['data'][i]['rank'])
if tweet_rank in distribution.keys():
distribution[tweet_rank] += 1
else:
distribution[tweet_rank] = 1
ranks = sorted(distribution.keys(), key=lambda x: int(x))
rank_index = 0
while len(distribution.keys()) > 0:
print(f"Rank {ranks[rank_index]} occurs "
f"{distribution.pop(str(min([int(x) for x in distribution.keys()])))} times.")
rank_index += 1
def word_occurences():
dictionary = open('../venv/Lib/site-packages/textblob/en/en-spelling.txt', 'r')
known_words = [word.split()[0] for word in dictionary.readlines()]
jsonfile = open('LabeledData/LabeledTweets.json', 'r')
values = json.load(jsonfile)
jsonfile.close()
unigram = {}
tweet_number = 0
for tweet in values['X']:
for word in tweet.split():
if word.isalpha() and word.lower() not in known_words:
if word in unigram.keys():
unigram[word] += 15
else:
unigram[word] = 10
tweet_number += 1
f = open("Data/crypto_unigram.txt", 'w')
for k, v in unigram.items():
f.write(k.lower() + " " + str(v) + "\n")
# f.write(json.dumps(unigram, indent=0, sort_keys=True))
f.close()
# Tweets and their respective label are stored in a json file for use by a model
def prepare_data():
jsonfile = open('Data/Tweets.json', 'r')
values = json.load(jsonfile)
jsonfile.close()
X = []
y = []
for tweet in values['data']:
label = tweet['label']
if label in ["Positive", "Neutral", "Negative"]:
X.append(tweet['text'])
y.append(label)
data = {'X': Preprocessor.normalize_text(X), 'y': y}
f = open("LabeledData/LabeledTweets.json", 'w')
f.write(json.dumps(data, indent=0, sort_keys=True))
f.close()
return data
def data_to_model_format(x, y, filename):
dataset = []
for i in range(len(x)):
entry = {"text": x[i], "label": transform_label_num(y[i])}
dataset.append(entry)
f = open(f"LabeledData/{filename}", 'w')
f.write(json.dumps(dataset, indent=0))
def data_to_k_fold_model_format(x, y, fold):
i_test = range((fold-1)*60, fold*60)
i_train = []
i_train_final = []
i_val_indeces = []
for i in range(600):
if i not in i_test:
i_train.append(i)
for i in range(i_test[0] - 120, i_test[0]-66):
i_val_indeces.append(i)
i_val = []
for i in i_val_indeces:
i_val.append(i_train[i])
for i in i_train:
if i not in i_val:
i_train_final.append(i)
test_set = []
train_set = []
val_set = []
for i in range(len(x)):
entry = {"text": x[i], "label": transform_label_num(y[i])}
if i in i_test:
test_set.append(entry)
elif i in i_train_final:
train_set.append(entry)
else:
val_set.append(entry)
train_filename = "train"+str(fold)+".json"
f = open(f"LabeledData/{train_filename}", 'w')
f.write(json.dumps(train_set, indent=0))
test_filename = "test"+str(fold)+".json"
f = open(f"LabeledData/{test_filename}", 'w')
f.write(json.dumps(test_set, indent=0))
val_filename = "val"+str(fold)+".json"
f = open(f"LabeledData/{val_filename}", 'w')
f.write(json.dumps(val_set, indent=0))
def transform_label_num(textual_label):
if textual_label == "Negative":
return 0
elif textual_label == "Neutral":
return 1
else:
return 2
def transform_label_short(textual_label):
if textual_label == "Negative":
return "NEG"
elif textual_label == "Neutral":
return "NEU"
else:
return "POS"
def transform_short_label_num(short_label):
if short_label == "NEG":
return 0
elif short_label == "NEU":
return 1
else:
return 2
# Split tweet data corresponding labels into train test and validation sets.
def train_test_validate_split(data_x, data_y):
train_ratio = 0.70
validation_ratio = 0.15
test_ratio = 0.15
# train is now 70% of the entire data set
x_train, x_test, y_train, y_test = train_test_split(data_x, data_y,
test_size=int((len(data_x)) * test_ratio * 2), stratify=data_y)
# test is now 15% of the initial data set
# validation is now 15% of the initial data set
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test,
test_size=int((len(data_x)) * test_ratio), stratify=y_test)
return x_train, x_test, x_val, y_train, y_test, y_val
def read_set(filename):
jsonfile = open(filename, 'r')
values = json.load(jsonfile)
jsonfile.close()
x = []
y = []
for sample in values:
if filename == "LabeledData/complete_set_raw.json":
x.append(sample['text'][:128])
else:
x.append(sample['text'])
y.append(sample['label'])
return x, y
def read_labeled():
jsonfile = open("LabeledData/LabeledTweets.json", 'r')
values = json.load(jsonfile)
jsonfile.close()
return values['X']