Skip to content

Commit

Permalink
Almost working: Parallelized import
Browse files Browse the repository at this point in the history
Any error associated with an individual row is now attached to an
individual background job
  • Loading branch information
bess authored and sourcefilter committed Jun 6, 2019
1 parent 0df9930 commit ef8f1d7
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 67 deletions.
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ AllCops:

Metrics/AbcSize:
Enabled: true
Max: 30
Exclude:
- 'app/importers/californica_csv_cleaner.rb'
- app/jobs/hyrax/characterize_job.rb
Expand Down Expand Up @@ -57,6 +58,7 @@ Metrics/LineLength:

Metrics/MethodLength:
Enabled: true
Max: 20
Exclude:
- 'app/importers/californica_csv_parser.rb'
- app/importers/actor_record_importer.rb
Expand Down
9 changes: 6 additions & 3 deletions app/importers/actor_record_importer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def create_for(record:)
created = import_type(object_type).new
created.apply_depositor_metadata(@depositor.user_key)
attrs = record.attributes.merge(additional_attrs)

attrs = attrs.merge(member_of_collections_attributes: { '0' => { id: collection_id } }) if collection_id

# Ensure nothing is passed in the files field,
Expand All @@ -90,12 +90,15 @@ def create_for(record:)

if middleware.create(actor_env)
info_stream << "event: record_created, batch_id: #{batch_id}, record_id: #{created.id}, collection_id: #{collection_id}, record_title: #{attrs[:title]&.first}"
@success_count += 1
else
error_messages = []
created.errors.each do |attr, msg|
error_stream << "event: validation_failed, batch_id: #{batch_id}, collection_id: #{collection_id}, attribute: #{attr.capitalize}, message: #{msg}, record_title: record_title: #{attrs[:title] ? attrs[:title] : attrs}"
error_messages << msg
end
@failure_count += 1
# Errors raised here should be rescued in the CsvRowImportJob and the
# message should be recorded on the CsvRow object for reporting in the UI
raise "Validation failed: #{error_messages.join(', ')}"
end
end
end
11 changes: 5 additions & 6 deletions app/importers/record_importer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ class RecordImporter < Darlingtonia::HyraxRecordImporter
attr_reader :actor_record_importer
attr_reader :collection_record_importer


def initialize(error_stream:, info_stream:, attributes: {})
@actor_record_importer = ::ActorRecordImporter.new(error_stream: error_stream, info_stream: info_stream, attributes: attributes)
@collection_record_importer = ::CollectionRecordImporter.new(error_stream: error_stream, info_stream: info_stream, attributes: attributes)
Expand All @@ -16,11 +15,11 @@ def initialize(error_stream:, info_stream:, attributes: {})

def import(record:)
csv_row = CsvRow.create(
metadata: record.mapper.metadata.to_json,
row_number: record.mapper.row_number,
csv_import_id: batch_id
)
CsvRowImportJob.perform_later(row_id: csv_row.id )
metadata: record.mapper.metadata.to_json,
row_number: record.mapper.row_number,
csv_import_id: batch_id
)
CsvRowImportJob.perform_later(row_id: csv_row.id)
end

def success_count
Expand Down
14 changes: 9 additions & 5 deletions app/jobs/csv_row_import_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ def perform(row_id:)
metadata = JSON.parse(row.metadata)
csv_import = CsvImport.find(row.csv_import_id)
import_file_path = csv_import.import_file_path
record = Darlingtonia::InputRecord.from(metadata: metadata, mapper: CalifornicaMapper.new(import_file_path: import_file_path ))
record = Darlingtonia::InputRecord.from(metadata: metadata, mapper: CalifornicaMapper.new(import_file_path: import_file_path))

collection_record_importer = ::CollectionRecordImporter.new(attributes: metadata)
actor_record_importer = ::ActorRecordImporter.new(attributes: metadata)
selected_importer = if record.mapper.collection?
collection_record_importer
else
actor_record_importer
end
collection_record_importer
else
actor_record_importer
end
selected_importer.import(record: record)

row.status = 'complete'
row.save
rescue => e
row.status = 'error'
row.error_messages = e.message
row.save
end
end
2 changes: 1 addition & 1 deletion spec/factories/csv_imports.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true
FactoryBot.define do
factory :csv_import do
user { nil }
user { FactoryBot.create(:user) }
manifest { Rack::Test::UploadedFile.new(Rails.root.join('spec', 'fixtures', 'csv_import', 'import_manifest.csv'), 'text/csv') }
import_file_path { Rails.root.join('spec', 'fixtures') }
end
Expand Down
37 changes: 35 additions & 2 deletions spec/factories/csv_rows.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,41 @@
status { "complete" }
error_messages { "here is your error" }
metadata do
{ "Project Name" => "Ethiopic Manuscripts", "Item ARK" => "21198/zz001pz6jq", "Parent ARK" => "21198/zz001pz6h6", "Object Type" => "Page", "File Name" => "mss_page2.png", "Item Sequence" => "2", "Type.typeOfResource" => "text", "Type.genre" => nil, "Rights.copyrightStatus" => "pd", "Rights.servicesContact" => nil, "Language" => "gez", "Name.repository" => nil, "AltIdentifier.local" => "Ms. 50_#r or #v?", "Title" => "Image 2", "AltTitle.other" => "መርበብተ ሰሎሞን|~|ጸሎተ ሱስንዮስ", "AltTitle.translated" => "Prayer of Susneyos|~|Prayer of King Solomon for Catching of Demons", "AltTitle.uniform" => "Magical prayers", "Place of origin" => "Ethiopia", "Date.creation" => "18th c.",
"Date.normalized" => "1700/1799", "Format.extent" => "1 image", "Format.dimensions" => nil, "Support" => "Vellum", "Relation.rectoVerso" => nil, "Description.note" => nil, "Description.history" => nil, "Description.condition" => nil, "Coverage.temporal" => nil, "Description.collation" => nil, "Description.binding" => nil, "Description.tableOfContents" => nil, "Description.contents" => nil, "Description.abstract" => nil }.to_json
{
"Project Name" => "Ethiopic Manuscripts",
"Item ARK" => "21198/zz001pz6jq",
"Parent ARK" => "21198/zz001pz6h6",
"Object Type" => "Page",
"File Name" => "mss_page2.png",
"Item Sequence" => "2",
"Type.typeOfResource" => "text",
"Type.genre" => nil,
"Rights.copyrightStatus" => "pd",
"Rights.servicesContact" => nil,
"Language" => "gez",
"Name.repository" => nil,
"AltIdentifier.local" => "Ms. 50_#r or #v?",
"Title" => "Image 2",
"AltTitle.other" => "መርበብተ ሰሎሞን|~|ጸሎተ ሱስንዮስ",
"AltTitle.translated" => "Prayer of Susneyos|~|Prayer of King Solomon for Catching of Demons",
"AltTitle.uniform" => "Magical prayers",
"Place of origin" => "Ethiopia",
"Date.creation" => "18th c.",
"Date.normalized" => "1700/1799",
"Format.extent" => "1 image",
"Format.dimensions" => nil,
"Support" => "Vellum",
"Relation.rectoVerso" => nil,
"Description.note" => nil,
"Description.history" => nil,
"Description.condition" => nil,
"Coverage.temporal" => nil,
"Description.collation" => nil,
"Description.binding" => nil,
"Description.tableOfContents" => nil,
"Description.contents" => nil,
"Description.abstract" => nil
}.to_json
end
end
end
6 changes: 2 additions & 4 deletions spec/importers/actor_record_importer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@
}
end

it 'logs an error' do
expect { importer.import(record: record) }
.to change { error_stream.first }
.to match(/^event: validation_failed/)
it 'raises an error' do
expect { importer.import(record: record) }.to raise_error(RuntimeError, /Validation failed/)
end
end
end
Expand Down
39 changes: 1 addition & 38 deletions spec/importers/californica_importer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

require 'rails_helper'

RSpec.describe CalifornicaImporter, :clean do
RSpec.describe CalifornicaImporter, :clean, inline_jobs: true do
subject(:importer) { described_class.new(csv_import) }
let(:csv_import) { FactoryBot.create(:csv_import, user: user, manifest: manifest) }
let(:manifest) { Rack::Test::UploadedFile.new(Rails.root.join(csv_path), 'text/csv') }
Expand Down Expand Up @@ -32,10 +32,6 @@
expect(Work.last.date_uploaded).not_to be_nil
end

it 'enqueues local file attachment job' do
expect { importer.import }.to have_enqueued_job(IngestLocalFileJob)
end

it "has an ingest log" do
expect(importer.ingest_log).to be_kind_of(Logger)
end
Expand All @@ -49,11 +45,6 @@
expect(File.readlines(importer.ingest_log_filename).each(&:chomp!).last).to match(/elapsed_time/)
end

it "records the number of records ingested" do
importer.import
expect(File.read(importer.ingest_log_filename)).to match(/successful_record_count: 1/)
end

context 'when the collection doesn\'t exist yet' do
it 'creates a new collection with a modified ark as the id' do
importer.import
Expand Down Expand Up @@ -144,33 +135,5 @@
expect(File.readlines(ENV['MISSING_FILE_LOG']).each(&:chomp!).last).to match(/missing_file.tif/)
end
end

context 'when the records error' do
let(:ldp_error) { Ldp::PreconditionFailed }
let(:error_record) { Darlingtonia::InputRecord.from(metadata: metadata, mapper: CalifornicaMapper.new) }
let(:metadata) do
{
'Title' => 'Comet in Moominland',
'language' => 'English',
'visibility' => 'open'
}
end
before do
allow(error_record).to receive(:attributes).and_raise(ldp_error)
allow(importer.parser).to receive(:records).and_return([error_record])
allow(importer.parser).to receive(:validate).and_return(true)
end

it 'does not ingest the item' do
expect { importer.import }.not_to change { Work.count }
end

it 'logs the errors' do
importer.import

expect(File.readlines(importer.error_log_filename).each(&:chomp!))
.to include(/ERROR.+#{ldp_error}/)
end
end
end
end
35 changes: 29 additions & 6 deletions spec/jobs/csv_row_import_job_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,35 @@
# frozen_string_literal: true
require 'rails_helper'

RSpec.describe CsvRowImportJob do
let(:csv_row) { FactoryBot.create(:csv_row, status: nil) }
RSpec.describe CsvRowImportJob, :clean do
let(:csv_import) { FactoryBot.create(:csv_import) }

it 'can set a status' do
described_class.perform_now(row_id: csv_row.id)
csv_row.reload
expect(csv_row.status).to eq('complete')
context 'happy path' do
let(:csv_row) { FactoryBot.create(:csv_row, status: nil, csv_import_id: csv_import.id) }

it 'can set a status' do
described_class.perform_now(row_id: csv_row.id)
csv_row.reload
expect(csv_row.status).to eq('complete')
end
end

context 'error cases' do
let(:metadata) do
{
"Project Name" => "Ethiopic Manuscripts",
"Item ARK" => "",
"Parent ARK" => "21198/zz001pz6h6",
"Object Type" => "Page"
}.to_json
end
let(:csv_row) { FactoryBot.create(:csv_row, status: nil, metadata: metadata, error_messages: nil, csv_import_id: csv_import.id) }

it 'records an error state and error messages' do
described_class.perform_now(row_id: csv_row.id)
csv_row.reload
expect(csv_row.status).to eq('error')
expect(csv_row.error_messages).to eq 'Cannot set id without a valid ark'
end
end
end
4 changes: 2 additions & 2 deletions spec/jobs/mss_csv_import_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def expected_objects_in_expected_order
expect(work.member_of_collections.first.id).to eq "x3xg9000zz-89112"

# The work is attached to the Collection as expected
#expect(collection.child_works.count).to eq 1
#expect(collection.child_works.first.title.first).to eq "Ms. 50 Marbabeta Salomon, Ṣalota Susnyos"
expect(collection.child_works.count).to eq 1
expect(collection.child_works.first.title.first).to eq "Ms. 50 Marbabeta Salomon, Ṣalota Susnyos"
# The Pages are attached to the Work as expected
work = Work.last
expect(work.members.size).to eq 3
Expand Down

0 comments on commit ef8f1d7

Please sign in to comment.