-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
285 lines (226 loc) · 10.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
from scipy import sparse
from typing import Tuple, List
import torch
from embedded_topic_model.utils import data
def _remove_empty_documents(documents):
return [doc for doc in documents if doc != []]
def _create_list_words(documents):
return [word for document in documents for word in document]
def _create_document_indices(documents):
aux = [[j for i in range(len(doc))] for j, doc in enumerate(documents)]
return [int(x) for y in aux for x in y]
def _create_bow(document_indices, words, num_docs, vocab_size):
return sparse.coo_matrix(
([1] *
len(document_indices),
(document_indices,
words)),
shape=(
num_docs,
vocab_size)).tocsr()
def _split_bow(bow_in, num_docs):
indices = [[w for w in bow_in[doc, :].indices] for doc in range(num_docs)]
counts = [[c for c in bow_in[doc, :].data] for doc in range(num_docs)]
return indices, counts
def _create_dictionaries(vocabulary):
word2id = dict([(w, j) for j, w in enumerate(vocabulary)])
id2word = dict([(j, w) for j, w in enumerate(vocabulary)])
return word2id, id2word
def _to_numpy_array(documents):
return np.array([[np.array(doc) for doc in documents]],
dtype=object).squeeze()
def create_etm_datasets(
dataset: List[str],
train_size=1.0,
min_df=1,
max_df=100.0,
debug_mode=False) -> Tuple[list, dict, dict]:
"""
Creates vocabulary and train / test datasets from a given corpus. The vocabulary and datasets can
be used to train an ETM model.
By default, creates a train dataset with all the preprocessed documents in the corpus and an empty
test dataset.
This function preprocesses the given dataset, removing most and least frequent terms on the corpus - given minimum and maximum document-frequencies - and produces a BOW vocabulary.
Parameters:
===
dataset (list of str): original corpus to be preprocessed. Is composed by a list of sentences
train_size (float): fraction of the original corpus to be used for the train dataset. By default, uses entire corpus
min_df (float): Minimum document-frequency for terms. Removes terms with a frequency below this threshold
max_df (float): Maximum document-frequency for terms. Removes terms with a frequency above this threshold
debug_mode (bool): Wheter or not to log function's operations to the console. By default, no logs are made
Returns:
===
vocabulary (list of str): words vocabulary. Doesn't includes words not in the training dataset
train_dataset (dict): BOW training dataset, split in tokens and counts. Must be used on ETM's fit() method.
test_dataset (dict): BOW testing dataset, split in tokens and counts. Can be use on ETM's perplexity() method.
"""
vectorizer = CountVectorizer(min_df=min_df, max_df=max_df)
vectorized_documents = vectorizer.fit_transform(dataset)
documents_without_stop_words = [
[word for word in document.split()]
for document in dataset]
signed_documents = vectorized_documents.sign()
if debug_mode:
print('Building vocabulary...')
sum_counts = signed_documents.sum(axis=0)
v_size = sum_counts.shape[1]
sum_counts_np = np.zeros(v_size, dtype=int)
for v in range(v_size):
sum_counts_np[v] = sum_counts[0, v]
word2id = dict([(w, vectorizer.vocabulary_.get(w))
for w in vectorizer.vocabulary_])
id2word = dict([(vectorizer.vocabulary_.get(w), w)
for w in vectorizer.vocabulary_])
if debug_mode:
print('Initial vocabulary size: {}'.format(v_size))
# Sort elements in vocabulary
idx_sort = np.argsort(sum_counts_np)
# Creates vocabulary
vocabulary = [id2word[idx_sort[cc]] for cc in range(v_size)]
if debug_mode:
print('Tokenizing documents and splitting into train/test...')
num_docs = signed_documents.shape[0]
train_dataset_size = int(np.floor(train_size * num_docs))
test_dataset_size = int(num_docs - train_dataset_size)
idx_permute = np.random.RandomState(2022).permutation(num_docs).astype(int)
# Remove words not in train_data
vocabulary = list(set([w for idx_d in range(train_dataset_size)
for w in documents_without_stop_words[idx_permute[idx_d]] if w in word2id]))
# Create dictionary and inverse dictionary
word2id, id2word = _create_dictionaries(vocabulary)
if debug_mode:
print(
'vocabulary after removing words not in train: {}'.format(
len(vocabulary)))
docs_train = [[word2id[w] for w in documents_without_stop_words[idx_permute[idx_d]]
if w in word2id] for idx_d in range(train_dataset_size)]
idx_train = [idx_permute[idx_d] for idx_d in range(train_dataset_size)]
docs_test = [
[word2id[w] for w in
documents_without_stop_words[idx_permute[idx_d + train_dataset_size]]
if w in word2id] for idx_d in range(test_dataset_size)]
idx_test = [idx_permute[idx_d + train_dataset_size] for idx_d in range(test_dataset_size)]
if debug_mode:
print(
'Number of documents (train_dataset): {} [this should be equal to {}]'.format(
len(docs_train),
train_dataset_size))
print(
'Number of documents (test_dataset): {} [this should be equal to {}]'.format(
len(docs_test),
test_dataset_size))
if debug_mode:
print('Removing empty documents...')
docs_train = _remove_empty_documents(docs_train)
docs_test = _remove_empty_documents(docs_test)
# Remove test documents with length=1
docs_test = [doc for doc in docs_test if len(doc) > 1]
# Obtains the training and test datasets as word lists
words_train = [[id2word[w] for w in doc] for doc in docs_train]
words_test = [[id2word[w] for w in doc] for doc in docs_test]
docs_test_h1 = [[w for i, w in enumerate(
doc) if i <= len(doc) / 2.0 - 1] for doc in docs_test]
docs_test_h2 = [[w for i, w in enumerate(
doc) if i > len(doc) / 2.0 - 1] for doc in docs_test]
words_train = _create_list_words(docs_train)
words_test = _create_list_words(docs_test)
words_ts_h1 = _create_list_words(docs_test_h1)
words_ts_h2 = _create_list_words(docs_test_h2)
if debug_mode:
print('len(words_train): ', len(words_train))
print('len(words_test): ', len(words_test))
print('len(words_ts_h1): ', len(words_ts_h1))
print('len(words_ts_h2): ', len(words_ts_h2))
doc_indices_train = _create_document_indices(docs_train)
doc_indices_test = _create_document_indices(docs_test)
doc_indices_test_h1 = _create_document_indices(docs_test_h1)
doc_indices_test_h2 = _create_document_indices(docs_test_h2)
if debug_mode:
print('len(np.unique(doc_indices_train)): {} [this should be {}]'.format(
len(np.unique(doc_indices_train)), len(docs_train)))
print('len(np.unique(doc_indices_test)): {} [this should be {}]'.format(
len(np.unique(doc_indices_test)), len(docs_test)))
print('len(np.unique(doc_indices_test_h1)): {} [this should be {}]'.format(
len(np.unique(doc_indices_test_h1)), len(docs_test_h1)))
print('len(np.unique(doc_indices_test_h2)): {} [this should be {}]'.format(
len(np.unique(doc_indices_test_h2)), len(docs_test_h2)))
# Number of documents in each set
n_docs_train = len(docs_train)
n_docs_test = len(docs_test)
n_docs_test_h1 = len(docs_test_h1)
n_docs_test_h2 = len(docs_test_h2)
bow_train = _create_bow(
doc_indices_train,
words_train,
n_docs_train,
len(vocabulary))
bow_test = _create_bow(
doc_indices_test,
words_test,
n_docs_test,
len(vocabulary))
bow_test_h1 = _create_bow(
doc_indices_test_h1,
words_ts_h1,
n_docs_test_h1,
len(vocabulary))
bow_test_h2 = _create_bow(
doc_indices_test_h2,
words_ts_h2,
n_docs_test_h2,
len(vocabulary))
bow_train_tokens, bow_train_counts = _split_bow(bow_train, n_docs_train)
bow_test_tokens, bow_test_counts = _split_bow(bow_test, n_docs_test)
bow_test_h1_tokens, bow_test_h1_counts = _split_bow(
bow_test_h1, n_docs_test_h1)
bow_test_h2_tokens, bow_test_h2_counts = _split_bow(
bow_test_h2, n_docs_test_h2)
train_dataset = {
'tokens': _to_numpy_array(bow_train_tokens),
'counts': _to_numpy_array(bow_train_counts),
}
test_dataset = {
'test': {
'tokens': _to_numpy_array(bow_test_tokens),
'counts': _to_numpy_array(bow_test_counts),
},
'test1': {
'tokens': _to_numpy_array(bow_test_h1_tokens),
'counts': _to_numpy_array(bow_test_h1_counts),
},
'test2': {
'tokens': _to_numpy_array(bow_test_h2_tokens),
'counts': _to_numpy_array(bow_test_h2_counts),
}
}
return vocabulary, train_dataset, test_dataset, idx_train, idx_test
def get_document_topic_dist(self, tokens, counts) -> torch.Tensor:
"""
Obtains the document-topic distribution matrix.
The document-topic distribution matrix lists the probabilities for each topic on each document.
This is a normalized distribution matrix, and as such, each row sums to one.
tokens : tokenized documents obtained using create_etm_datasets
counts : word counts per document obtained using create_etm_datasets
Returns:
===
torch.Tensor: topic-word distribution matrix, with DxK dimension, where
D is the number of documents in the corpus and K is the number of topics
"""
self.model = self.model.to(self.device)
self.model.eval()
with torch.no_grad():
thetas = []
ind = torch.tensor(range(len(tokens)))
data_batch = data.get_batch(
tokens,
counts,
ind,
self.vocabulary_size,
self.device)
sums = data_batch.sum(1).unsqueeze(1)
normalized_data_batch = data_batch / sums if self.bow_norm else data_batch
theta, _ = self.model.get_theta(normalized_data_batch)
thetas.append(theta)
return torch.cat(tuple(thetas), 0)