Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions lib/valkyrie/persistence/fedora/query_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ def find_all_of_model(model:)
end
end

# (see Valkyrie::Persistence::Memory::QueryService#find_in_batches)
def find_in_batches(start: nil, finish: nil, batch_size: 500, except_models: [])
resource = Ldp::Resource.for(connection, adapter.base_path, connection.get(adapter.base_path))
ids = resource.graph.query([nil, RDF::Vocab::LDP.contains, nil]).map(&:object).map { |x| adapter.uri_to_id(x) }
ids.each_slice(batch_size) do |id_batch|
resources = id_batch.map { |id| find_by(id: id) }
unless except_models.empty?
except_model_strings = except_models.map(&:to_s)
resources.reject! { |res| except_model_strings.include?(res.internal_resource) }
end
yield resources unless resources.empty?
end
rescue ::Ldp::NotFound
nil
end

# (see Valkyrie::Persistence::Memory::QueryService#count_all_of_model)
def count_all_of_model(model:)
find_all_of_model(model: model).count
Expand Down
30 changes: 30 additions & 0 deletions lib/valkyrie/persistence/memory/query_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ def find_all_of_model(model:)
end
end

# Retrieve all records in batches and construct Valkyrie Resources for each record.
# Yields batches of resources to the given block for memory-efficient processing.
#
# @param [Integer] start page or id to start from
# @param [Integer] finish page or id to end on
# @param [Integer] batch_size The number of records to retrieve per batch (default: 500)
# @param [Array<Class>] except_models Resource types to exclude from results
# @yield [Array<Valkyrie::Resource>] batch Yields each batch of Valkyrie Resources
# @return [void]
# @example Process all resources in batches
# query_service.find_in_batches(batch_size: 100) do |resources|
# resources.each { |resource| process(resource) }
# end
# @example Process all resources except access controls
# query_service.find_in_batches(except_models: [Hyrax::AccessControl]) do |resources|
# resources.each { |resource| reindex(resource) }
# end
def find_in_batches(start: nil, finish: nil, batch_size: 500, except_models: [])
resources = cache.values

unless except_models.empty?
except_model_strings = except_models.map(&:to_s)
resources = resources.reject { |r| except_model_strings.include?(r.internal_resource) }
end

resources.each_slice(batch_size) do |batch|
yield batch
end
end

# Count all objects of a given model.
# @param model [Class] Class to query for.
# @return integer. Count objects in the persistence backend
Expand Down
20 changes: 20 additions & 0 deletions lib/valkyrie/persistence/postgres/query_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ def find_all_of_model(model:)
end
end


# (see Valkyrie::Persistence::Memory::QueryService#find_in_batches)
def find_in_batches(start: nil, finish: nil, batch_size: 500, except_models: [])
relation_for_find_in_batches(except_models).find_in_batches(start:, finish:, batch_size:) do |batch|
yield batch.map { |orm_object| resource_factory.to_resource(object: orm_object) }
end
end

# Count all records for a specific resource type
# @param [Class] model
# @return integer
Expand Down Expand Up @@ -292,5 +300,17 @@ def id_type
def ordered_property?(resource:, property:)
resource.ordered_attribute?(property)
end

# Build an ActiveRecord relation for find_in_batches based on excluded models
# @param [Array<Class>] except_models Resource types to exclude
# @return [ActiveRecord::Relation]
def relation_for_find_in_batches(except_models)
if except_models.empty?
orm_class.order(:id)
else
except_models_as_strings = except_models.map(&:to_s)
orm_class.order(:id).where.not(internal_resource: except_models_as_strings)
end
end
end
end
1 change: 1 addition & 0 deletions lib/valkyrie/persistence/solr/queries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Queries
require 'valkyrie/persistence/solr/queries/find_all_query'
require 'valkyrie/persistence/solr/queries/find_by_id_query'
require 'valkyrie/persistence/solr/queries/find_by_alternate_identifier_query'
require 'valkyrie/persistence/solr/queries/find_in_batches_query'
require 'valkyrie/persistence/solr/queries/find_many_by_ids_query'
require 'valkyrie/persistence/solr/queries/find_inverse_references_query'
require 'valkyrie/persistence/solr/queries/find_members_query'
Expand Down
61 changes: 61 additions & 0 deletions lib/valkyrie/persistence/solr/queries/find_in_batches_query.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true
module Valkyrie::Persistence::Solr::Queries
class FindInBatchesQuery
attr_reader :connection, :resource_factory, :start, :batch_size, :except_models

# @param [RSolr::Client] connection
# @param [ResourceFactory] resource_factory
def initialize(connection:, resource_factory:, start:, batch_size:, except_models:)
@connection = connection
@resource_factory = resource_factory
@start = start
@batch_size = batch_size
@except_models = except_models
end

# Queries for all Documents in the Solr index in batches
# For each Document, it yields the Valkyrie Resource which was converted from it
# @yield [Array<Valkyrie::Resource>] batch Yields each batch of Valkyrie Resources
# @return [void]
def run
all_ids_sorted.each_slice(batch_size) do |batch|
solr_params = { q: '*:*', fq: "id:(#{batch.join(' OR ')})", sort: 'id asc' }
resources = accumulate_from_solr(solr_params) { |doc| resource_factory.to_resource(object: doc) }

yield resources.flatten unless resources.empty?
end
end

private

# All ids in the index, sorted by id so we have a deterministic return
def all_ids_sorted
@all_ids_sorted ||= begin
solr_params = { q: query, sort: "id asc", fl: ['id'] }
accumulate_from_solr(solr_params) { |doc| doc["id"] }
end
end

def accumulate_from_solr(solr_params, &block)
accumulator = []
docs = DefaultPaginator.new
while docs.has_next?
docs = connection.paginate(docs.next_page, docs.per_page, 'select', params: solr_params)["response"]["docs"]
accumulator << docs.map(&block)
end
accumulator.flatten!
end

# Generates the Solr query for retrieving all Documents in the index in batches
# If a model is specified for the query, it is scoped to that Valkyrie resource type
# @return [String]
def query
if except_models.empty?
"*:*"
else
excluded_types = except_models.map { |model| "\"#{model}\"" }.join(" OR ")
"*:* AND NOT internal_resource_ssim:(#{excluded_types})"
end
end
end
end
8 changes: 8 additions & 0 deletions lib/valkyrie/persistence/solr/query_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def find_all_of_model(model:)
Valkyrie::Persistence::Solr::Queries::FindAllQuery.new(connection: connection, resource_factory: resource_factory, model: model).run
end

# (see Valkyrie::Persistence::Memory::QueryService#find_in_batches)
def find_in_batches(start: 1, finish: nil, batch_size: 100, except_models: [])
Valkyrie::Persistence::Solr::Queries::FindInBatchesQuery.new(connection: connection, resource_factory: resource_factory,
start:, batch_size:, except_models:).run do |batch|
yield batch
end
end

# Count all of the Valkyrie Resources of a model persisted in the Solr index
# @param [Class, String] model the Valkyrie::Resource Class
# @return integer
Expand Down
58 changes: 58 additions & 0 deletions lib/valkyrie/specs/shared_specs/queries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Valkyrie::Specs::ThirdResource < Valkyrie::Resource
it { is_expected.to respond_to(:find_inverse_references_by).with_keywords(:id, :property, :model) }
it { is_expected.to respond_to(:find_parents).with_keywords(:resource) }
it { is_expected.to respond_to(:count_all_of_model).with_keywords(:model) }
it { is_expected.to respond_to(:find_in_batches).with_keywords(:start, :finish, :batch_size, :except_models) }

describe ".find_all" do
it "returns all created resources" do
Expand All @@ -68,6 +69,63 @@ class Valkyrie::Specs::ThirdResource < Valkyrie::Resource
end
end

describe ".find_in_batches" do
it "yields all created resources" do
resource1 = persister.save(resource: resource_class.new)
resource2 = persister.save(resource: resource_class.new)
resource3 = persister.save(resource: resource_class.new)
resource4 = persister.save(resource: second_resource_class.new)
resource_ids = [resource1, resource2, resource3, resource4].map { |resource| resource.id.to_s }
# Test without except_models - should include all resources
all_resources = []
query_service.find_in_batches(batch_size: 2) do |batch|
expect(batch).not_to be_nil
all_resources.concat(batch)
end
expect(all_resources.size).to eq 4
expect(all_resources.map { |resource| resource.id.to_s }).to match_array(resource_ids)

# Test with except_models - should exclude second_resource_class
filtered_resource_ids = [resource1, resource2, resource3].map { |resource| resource.id.to_s }
filtered_resources = []
query_service.find_in_batches(batch_size: 2, except_models: [second_resource_class]) do |batch|
expect(batch.size).to be <= 2
filtered_resources.concat(batch)
end
expect(filtered_resources.size).to eq 3
expect(filtered_resources.map { |resource| resource.id.to_s }).to match_array(filtered_resource_ids)
expect(filtered_resources).not_to include(resource4)
expect(filtered_resources).to all(be_an_instance_of(resource_class))
end

it "handles empty results" do
expect { |b| query_service.find_in_batches(batch_size: 2, &b) }.not_to yield_control
end

it "respects batch_size parameter" do
5.times { persister.save(resource: resource_class.new) }
batch_sizes = []
query_service.find_in_batches(batch_size: 2) do |batch|
batch_sizes << batch.size
end
expect(batch_sizes).to eq [2, 2, 1]
end

it "respects start parameter" do
resource1 = persister.save(resource: resource_class.new)
resource2 = persister.save(resource: resource_class.new)
resource3 = persister.save(resource: resource_class.new)
resource4 = persister.save(resource: resource_class.new)
resource5 = persister.save(resource: resource_class.new)
# resources.order_by(:id)
batch_sizes = []
query_service.find_in_batches(batch_size: 3, start: resource2.id) do |batch|
batch_sizes << batch.size
end
expect(batch_sizes).to eq([3, 1])
end
end

describe ".find_by" do
it "returns a resource by id or string representation of an id" do
before_find_by.call if defined? before_find_by
Expand Down
34 changes: 34 additions & 0 deletions spec/valkyrie/persistence/solr/query_service_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,38 @@ class CustomResource < Valkyrie::Resource
expect(adapter.query_service.connection).to have_received(:get).once
end
end
describe "#find_in_batches" do
let(:resources) do
150.times do
adapter.persister.save(resource: CustomResource.new)
end
end

before do
class CustomResource < Valkyrie::Resource; end
resources
end

after do
Object.send(:remove_const, :CustomResource)
end

context 'with batch size larger than default page size' do
it "makes multiple Solr requests" do
allow(adapter.query_service.connection).to receive(:paginate).and_call_original
all_resources = []
adapter.query_service.find_in_batches(batch_size: 120) do |batch|
expect(batch).not_to be_nil
expect(batch.size).to be <= 120
all_resources.concat(batch)
end
expect(all_resources.flatten.size).to eq(150)
expect(adapter.query_service.connection).to have_received(:paginate).exactly(5).times
end

it 'yields control' do
expect { |b| adapter.query_service.find_in_batches(batch_size: 120, &b) }.to yield_control.exactly(2).times
end
end
end
end
2 changes: 1 addition & 1 deletion valkyrie.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'dry-struct'
spec.add_dependency 'activemodel'
spec.add_dependency 'dry-types', '~> 1.0'
spec.add_dependency 'rdf', '~> 3.0', '>= 3.0.10'
spec.add_dependency 'rdf', '~> 3.0', '>= 3.3.2'
spec.add_dependency 'activesupport'
spec.add_dependency 'railties' # To use generators and engines
spec.add_dependency 'reform', '~> 2.2'
Expand Down