-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfetch_google_trends.py
323 lines (247 loc) · 10.5 KB
/
fetch_google_trends.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# -*- coding: utf-8 -*-
import pandas as pd
import shutil
import sys
import re
import time
import random
import os
import math
import datetime
from datetime import date, timedelta, datetime as dt
from pytrends.request import TrendReq
from pytrends.exceptions import ResponseError
from unidecode import unidecode
from pathlib import Path
from dotenv import load_dotenv
def print_usage():
'''
Função que printa a chamada correta em caso de o usuário passar o número errado
de argumentos
'''
print ('Chamada Correta: python fetch_google_trends.py <df_path> <export_path> <backup_path> <config_path>')
def get_data_inicial(apresentacao):
'''
Caso a apresentação tenha sido feita em menos de 6 meses retorna a data da apre
sentação, caso contrário, retorna a data de 6 meses atrás
'''
seis_meses_atras = date.today() - timedelta(days=180)
if apresentacao > seis_meses_atras:
return apresentacao.strftime('%Y-%m-%d')
else:
return seis_meses_atras.strftime('%Y-%m-%d')
def formata_timeframe(passado_formatado):
'''
Formata o timeframe para o formato aceitável pelo pytrends
'''
return passado_formatado + ' ' + date.today().strftime('%Y-%m-%d')
def formata_apelido(apelido):
'''
Formata o apelido da proposição, limitando seu conteúdo
para o tamanho aceitado pelo pytrends
'''
return apelido[:85] if not pd.isna(apelido) else ''
def formata_nome_formal(nome_formal):
'''
Formata o nome da proposição para não incluir o ano de
criação
'''
# Separa nome e ano
nome_separado = nome_formal.split("/", maxsplit=1)[0]
# Formata para MPV ser MP
nome_separado = re.sub('MPV', 'MP', nome_separado)
return nome_separado
def formata_keywords(keywords):
'''
Formata as palavtas-chave da proposição, limitando
seu conteúdo para o tamanho aceitado pelo pytrends
(100 caracteres)
'''
formated_keywords = ''
if not pd.isna(keywords):
keys = keywords.split(';')
for i in range(len(keys)):
if len(formated_keywords) + len(keys[i]) < 100:
formated_keywords += keys[i]
if len(formated_keywords) < 100:
formated_keywords += ';'
if formated_keywords[-1] == ';':
return formated_keywords[:-1]
return formated_keywords
def get_trends(termos, timeframe):
'''
Retorna os trends
'''
pytrend.build_payload(termos, cat=0, timeframe=timeframe, geo='BR', gprop='')
def get_popularidade(termo, timeframe):
'''
Retorna a popularidade de termos passados em um intervalo de tempo
(timeframe)
'''
get_trends(termo, timeframe)
return pytrend.interest_over_time()
def calcula_maximos(pop_df, termos_base):
'''
Calcula o máximo de pressão entre termos principais e relacionados
'''
termos = pop_df
# Calcula o máximo da pressão baseada nos termos principais
termos['max_pressao_principal'] = termos[termos_base].max(axis=1)
cols_names = termos_base + ['date', 'max_pressao_principal', 'isPartial']
# Calcula o máximo de pressão baseada nos termos relacioados
cols_termos_relacionados = termos.columns[~termos.columns.isin(cols_names)]
termos['max_pressao_rel'] = termos[cols_termos_relacionados].max(axis=1) if (len(cols_termos_relacionados) > 0) else 0
# calcula o máximo de pressão entre termos principais e relacionados
termos['maximo_geral'] = termos[['max_pressao_rel','max_pressao_principal']].max(axis=1)
return termos
def agrupa_por_semana(pop_df):
'''
Agrupa por semana começando na segunda e calcula os máximos das colunas
'''
pop_df = pop_df.reset_index()
pop_df = pop_df.groupby(['id_ext', pd.Grouper(key='date', freq='W-MON'), 'casa']).agg('max')
pop_df = pop_df.reset_index()
pop_df['date'] = pd.to_datetime(pop_df['date']) - pd.to_timedelta(7, unit = 'd')
return pop_df
def create_directory(backup_path, export_path):
'''
Cria um diretório de backups composto por diretórios nomeados com timestamp,
que guardam os csvs de popularidade.
'''
now = datetime.datetime.today()
timestamp_str = now.strftime("%d-%m-%Y")
if not os.path.exists(backup_path):
try:
os.makedirs(backup_path)
except OSError as e:
print("Erro ao criar diretório de backups: %s." %(e.strerror))
dest_path = os.path.join(backup_path+timestamp_str)
if not os.path.exists(dest_path):
try:
os.makedirs(dest_path)
except OSError as e:
print("Erro ao criar diretório: %s." %(e.strerror))
keep_last_dirs(backup_path)
for filename in os.listdir(export_path):
try:
full_file_name = os.path.join(export_path, filename)
shutil.copy2(full_file_name, dest_path)
except OSError as e:
print("Erro ao copiar arquivos do diretório de popularidade: %s." %(e.strerror))
def keep_last_dirs(backup_path):
'''
Gera dicionário com nomes dos diretórios e a data de criação deles. A partir desse dicionário,
são apagados os diretórios mais antigos, deixando os 3 mais recentes.
'''
dirs_to_keep = 3
diretory_creation_times = {}
count = 0
for dir_name in os.listdir(backup_path):
dest = os.path.join(backup_path + dir_name)
dict = { dest: os.path.getctime(dest) }
diretory_creation_times.update(dict)
for item in sorted(diretory_creation_times, key = diretory_creation_times.get, reverse=True):
count +=1
try:
if(count > dirs_to_keep):
shutil.rmtree(item)
except OSError as e:
print("Erro ao apagar diretórios de backup: %s." %(e.strerror))
def calcula_lote_dia(df_apelidos):
'''
Calcula com base no epoch do sistema o lote de proposições que deve ser pesquisado no dia
'''
# calcula o epoch
diff_data = dt.today() - dt.utcfromtimestamp(0)
referencia_dias = diff_data.days
props_dia = int(os.getenv("PROPOSITIONS_DAY"))
total_lotes = math.ceil(len(df_apelidos.index) / props_dia)
lote_dia = (referencia_dias % total_lotes) + 1
return lote_dia
def write_csv_popularidade(apelidos, lote_dia, export_path):
'''
Para cada linha do csv calcula e escreve um csv com a popularidade da proposição
'''
tempo_entre_req = int(os.getenv("TRENDS_WAIT_TIME"))
props_sem_popularidade = 0
print('Coletando popularidade das proposições do lote %s' %(lote_dia))
for index, row in apelidos.iterrows():
lote = row['lote']
# verificação se o lote da proposição é o do dia
if (lote == lote_dia):
# timeframe de até 6 meses da data de execução do script
timeframe = formata_timeframe(get_data_inicial(row['apresentacao']))
nome_formal = row['nome_formal']
id_ext = str(row['id_ext'])
casa = row['casa']
id_leggo = row['id_leggo']
# separa o nome da proposição do ano e trata MPVs
nome_simples = formata_nome_formal(nome_formal)
# Cria conjunto de termos e adiciona aspas
termos = [nome_formal]
termos = ['"' + termo + '"' for termo in termos]
termos += ['alface americana']
# Inicializa o dataframe
cols_names = [
'id_leggo',
'id_ext',
'date',
'casa',
'nome_formal',
'isPartial',
'max_pressao_principal',
'max_pressao_rel',
'maximo_geral']
pop_df = pd.DataFrame(columns = cols_names)
# Tenta recupera a popularidade
tentativas = int(os.getenv("TRENDS_RETRIES"))
for n in range(0, tentativas):
try:
print('Tentativa %s de coletar a popularidade da proposição %s' %(n+1, nome_formal))
# Recupera as informações de popularidade a partir dos termos
pop_df = get_popularidade(termos, timeframe)
break
except ResponseError as error:
print(error.args)
time.sleep((2 ** n) + random.random())
if 'alface americana' in termos: termos.remove('alface americana')
pop_df = pop_df.drop('alface americana', 1)
# Caso da proposição sem popularidade
if (pop_df.empty):
print('Nome: %s Lote: %s TimeFrame: %s termos: %s sem informações do trends' %(nome_formal, lote, timeframe, termos))
props_sem_popularidade += 1
else:
print('Nome: %s Lote: %s TimeFrame: %s termos: %s com popularidade' %(nome_formal, lote, timeframe, termos))
pop_df = calcula_maximos(pop_df, termos)
pop_df['id_leggo'] = id_leggo
pop_df['id_ext'] = id_ext
pop_df['casa'] = casa
pop_df = agrupa_por_semana(pop_df)
# Escreve resultado da consulta para uma proposição
filename = export_path + 'pop_' + str(id_leggo) + '.csv'
pop_df.to_csv(filename, encoding='utf8', index=False)
# Esperando para a próxima consulta do trends
time.sleep(tempo_entre_req + random.random())
if __name__ == "__main__":
# Argumentos que o programa deve receber:
# -1º: Path para o arquivo onde estão os apelidos, nomes formais e datas de apresentações
# -2º: Path para a pasta onde as tabelas de popularidades devem ser salvas
# -3º: Path para a pasta de backup
# -4º: Path para o arquivo onde estão as configurações do fetch
if len(sys.argv) != 5:
print_usage()
exit(1)
df_path = sys.argv[1]
export_path = sys.argv[2]
backup_path = sys.argv[3]
conf_path = sys.argv[4]
load_dotenv(dotenv_path=conf_path)
connect_timeout = int(os.getenv("TRENDS_CONNECT_TIMEOUT"))
response_timeout = int(os.getenv("TRENDS_RESPONSE_TIMEOUT"))
pytrend = TrendReq(timeout=(connect_timeout, response_timeout))
apelidos = pd.read_csv(df_path, encoding='utf-8', parse_dates=['apresentacao'])
lote_dia = calcula_lote_dia(apelidos)
# Atualiza o diretório para remover proposições que não são de interesse
if (lote_dia == 1):
create_directory(backup_path, export_path)
write_csv_popularidade(apelidos, lote_dia, export_path)