Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[capitol-words] ported crec scraper to work as a celery task in django #17

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions capitolwords_ng/capitolweb/capitolweb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']
15 changes: 14 additions & 1 deletion capitolwords_ng/capitolweb/capitolweb/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
DEV_FRONTEND = True
DEV_FRONTEND_SPA_BASE_URL = 'http://localhost:3000'

CREC_STAGING_S3_BUCKET = 'capitol-words-data'
CREC_STAGING_S3_KEY_PREFIX = 'crec'
CREC_STAGING_FOLDER = '/tmp'

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Expand Down Expand Up @@ -49,6 +52,9 @@
'legislators',
'rest_framework',
'rest_framework_swagger',
'django_celery_beat',
'django_celery_results',
'workers',
]

MIDDLEWARE = [
Expand Down Expand Up @@ -149,4 +155,11 @@
'level': os.getenv('DJANGO_LOG_LEVEL', 'INFO'),
},
},
}
}

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'amqp://guest:guest@localhost//'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
1 change: 1 addition & 0 deletions capitolwords_ng/capitolweb/workers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .tasks import scrape_crecs
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
"""Service for staging unpacked html files from a daily zip of congressional
"""Stages unpacked html files from a daily zip of congressional
records retrieved from gpo.gov.

This module can be used either from the command line, or deployed as an AWS
Lambda function (see :func:``lambda_handler`` for details on lambda execution).

To run locally:

::

python crec_stager.py --s3_bucket=mybukkit

Attributes:
DEFAULT_LOG_FORMAT (:obj:`str`): A template string for log lines.
LOGLEVELS (:obj:`dict`): A lookup of loglevel name to the loglevel code.
"""

from __future__ import print_function

import os
import sys
import urllib2
import logging
import argparse
from datetime import datetime
from datetime import timedelta
from zipfile import ZipFile
from collections import defaultdict
from urllib.request import urlopen
from urllib.error import HTTPError

import boto3
import requests
from botocore.exceptions import ClientError

from cli import setup_logger
from cli import add_logging_options
from cli import CMD_LINE_DATE_FORMAT


def get_dates(start_dt, end_dt=None):
if end_dt is None:
Expand Down Expand Up @@ -73,7 +59,7 @@ def __init__(self, download_dir, s3_bucket, s3_key_prefix):
self.download_dir = download_dir
self.s3_bucket = s3_bucket
self.s3_key_prefix = s3_key_prefix
self.s3 = boto3.client('s3')
self.s3 = boto3.resource('s3')

def download_crec_zip(self, date):
"""Downloads the CREC zip for this date.
Expand All @@ -84,8 +70,8 @@ def download_crec_zip(self, date):
url = date.strftime(self.CREC_ZIP_TEMPLATE)
logging.info('Downloading CREC zip from "{0}".'.format(url))
try:
response = urllib2.urlopen(url)
except urllib2.HTTPError as e:
response = urlopen(url)
except HTTPError as e:
if e.getcode() == 404:
logging.info('No zip found for date {0}'.format(date))
else:
Expand All @@ -106,8 +92,8 @@ def download_mods_xml(self, date):
url = date.strftime(self.MODS_ZIP_TEMPLATE)
logging.info('Downloading mods.xml from "{0}".'.format(url))
try:
response = urllib2.urlopen(url)
except urllib2.HTTPError as e:
response = urlopen(url)
except HTTPError as e:
if e.getcode() == 404:
logging.debug('No mods.xml found for date {0}, at "{1}"'.format(
date, url
Expand All @@ -118,9 +104,9 @@ def download_mods_xml(self, date):
return None
data = response.read()
mods_path = os.path.join(self.download_dir, 'mods.xml')
with open(mods_path, 'w') as f:
with open(mods_path, 'wb') as f:
f.write(data)
return mods_path
return str(mods_path)

def extract_html_files(self, zip_path):
"""Unpacks all html files in the zip at the provided path to the value
Expand Down Expand Up @@ -162,50 +148,51 @@ def upload_to_s3(self, file_path, data_type, date):
data_type,
os.path.basename(file_path),
)
with open(file_path) as f:
logging.debug(
'Uploading "{0}" to "s3://{1}/{2}".'.format(
file_path, self.s3_bucket, s3_key
)
)
self.s3.put_object(
Body=f, Bucket=self.s3_bucket, Key=s3_key
logging.debug(
'Uploading "{0}" to "s3://{1}/{2}".'.format(
file_path, self.s3_bucket, s3_key
)
)
self.s3.Object(self.s3_bucket, s3_key).upload_file(file_path)
return s3_key

def scrape_files_for_date(self, date):
zip_path = self.download_crec_zip(date)
mods_path = self.download_mods_xml(date)
if zip_path is None:
logging.info('No zip found for date {0}'.format(date))
return None
message = 'No zip found for date {0}.'.format(date)
logging.info(message)
return {'success': True, 'message': message}
if mods_path is None:
logging.info('No mods.xml found for date {0}'.format(date))
return None
message = 'No mods.xml found for date {0}'.format(date)
logging.info(message)
return {'success': True, 'message': message}
logging.info(
'Extracting html files from zip to {0}'.format(self.download_dir)
)
html_file_paths = self.extract_html_files(zip_path)
try:
s3_key = self.upload_to_s3(mods_path, 'mods', date)
except ClientError as e:
logging.exception(
'Error uploading file {0}, exiting'.format(mods_path, e)
)
return False
message = 'Error uploading file {0}, exiting'.format(mods_path, e)
logging.exception(message)
return {'success': False, 'message': message}
logging.info('Uploading {0} html files...'.format(len(html_file_paths)))
for file_path in html_file_paths:
try:
s3_key = self.upload_to_s3(file_path, 'crec', date)
except ClientError as e:
logging.exception(
'Error uploading file {0}, exiting'.format(file_path, e)
)
return False
message = 'Error uploading file {0}, exiting'.format(mods_path, e)
logging.exception(message)
return {'success': False, 'message': message}
logging.info('Uploads finished.')
return True
return {
'success': True,
'message': '{0} crec html files uploaded.'.format(len(html_file_paths))
}

def scrape_files_in_range(self, start_dt, end_dt):
def scrape_files_in_range(self, start_dt, end_dt=None):
results = []
if end_dt is None:
end_dt = datetime.utcnow()
end_dt = end_dt.replace(hour=0, minute=0, second=0, microsecond=0)
Expand All @@ -214,5 +201,6 @@ def scrape_files_in_range(self, start_dt, end_dt):
dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
dates = []
while dt < end_dt:
self.scrape_files_for_date(dt)
result = self.scrape_files_for_date(dt)
results.append(result)
dt += timedelta(days=1)
83 changes: 72 additions & 11 deletions capitolwords_ng/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,77 @@
requests==2.10.0
boto3==1.2.1
botocore==1.3.1
elasticsearch>=5.0.0,<6.0.0
amqp==2.2.1
appdirs==1.4.3
billiard==3.5.0.3
boto3==1.4.4
botocore==1.5.89
cachetools==2.0.0
celery==4.1.0
certifi==2017.4.17
cffi==1.10.0
chardet==3.0.3
cld2-cffi==0.1.4
coreapi==2.3.1
coreschema==0.0.4
cymem==1.31.2
cytoolz==0.8.2
DateTime==4.2
decorator==4.1.2
dill==0.2.6
Django==1.11.3
elasticsearch-dsl>=5.0.0,<6.0.0
lxml==3.7.2
django-celery-beat==1.0.1
django-celery-results==1.0.1
django-rest-swagger==2.1.2
djangorestframework==3.6.3
PyYAML==3.12
coreapi==2.3.1
Pygments==2.2.0
docutils==0.13.1
elasticsearch==5.4.0
elasticsearch-dsl==5.3.0
ftfy==4.4.3
html5lib==0.999999999
idna==2.5
ijson==2.3
itypes==1.1.0
Jinja2==2.9.6
jmespath==0.9.3
kombu==4.1.0
lxml==3.7.2
Markdown==2.6.8
MarkupSafe==1.0
murmurhash==0.26.4
networkx==1.11
numpy==1.12.1
openapi-codec==1.3.2
packaging==16.8
pathlib==1.0.1
plac==0.9.6
preshed==1.0.0
pycparser==2.18
pyemd==0.4.4
Pygments==2.2.0
pyparsing==2.2.0
Pyphen==0.9.4
python-dateutil==2.6.1
python-Levenshtein==0.12.0
pytz==2017.2
PyYAML==3.12
regex==2017.4.5
requests==2.10.0
s3transfer==0.1.10
scikit-learn==0.18.2
scipy==0.19.1
simplejson==3.11.1
six==1.10.0
spacy==1.8.2
SQLAlchemy==1.1.12
termcolor==1.1.0
textacy==0.3.4
django-rest-swagger==2.1.2
DateTime==4.2
thinc==6.5.2
toolz==0.8.2
tqdm==4.14.0
ujson==1.35
Unidecode==0.4.21
uritemplate==3.0.0
urllib3==1.21.1
vine==1.1.4
wcwidth==0.1.7
webencodings==0.5.1
wrapt==1.10.10
zope.interface==4.4.2
4 changes: 2 additions & 2 deletions capitolwords_ng/run_crec_es_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
output_option_group = parser.add_mutually_exclusive_group(required=True)
output_option_group.add_argument(
'--to_stdout',
help='If true, will not upload to es and instead print to stdout.',
help='If true, will not upload to elasticsearch and instead print to stdout.',
action='store_true'
)
output_option_group.add_argument(
Expand All @@ -47,7 +47,7 @@
)
parser.add_argument(
'--source_bucket',
help='Location of crec data.',
help='S3 bucket name of crec source data.',
)
args = parser.parse_args()

Expand Down