Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DFC Orders] Sync remote products when order cycle opens #13167

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ STRIPE_PUBLIC_TEST_API_KEY="bogus_stripe_publishable_key"

SITE_URL="test.host"

# OIDC Settings for DFC authentication
# Find secrets in BitWarden.
# To get a refresh token: log into the OIDC provider, connect your OFN user to it at /admin/oidc_settings, then copy the token from the database:
# ./bin/rails runner 'puts "OPENID_REFRESH_TOKEN=\"#{OidcAccount.last.refresh_token}\""'
OPENID_APP_ID="test-provider"
OPENID_APP_SECRET="dummy-openid-app-secret-token"
OPENID_REFRESH_TOKEN="dummy-refresh-token"
Expand Down
67 changes: 67 additions & 0 deletions app/jobs/open_order_cycle_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

# Run any pre-conditions and mark order cycle as open.
#
# Currently, an order cycle is considered open in the shopfront when orders_open_at >= now.
# But now there are some pre-conditions for opening an order cycle, so we would like to change that.
# Instead, the presence of opened_at (and absence of processed_at) should indicate it is open.
class OpenOrderCycleJob < ApplicationJob
sidekiq_options retry_for: 10.minutes

def perform(order_cycle_id)
ActiveRecord::Base.transaction do
# Fetch order cycle if it's still unopened, and lock DB row until finished
order_cycle = OrderCycle.lock.find_by!(id: order_cycle_id, opened_at: nil)

sync_remote_variants(order_cycle)

# Mark as opened
opened_at = Time.zone.now
order_cycle.update_columns(opened_at:)

# And notify any subscribers
OrderCycles::WebhookService.create_webhook_job(order_cycle, 'order_cycle.opened', opened_at)
end
end

private

def sync_remote_variants(order_cycle)
# Sync any remote variants for each supplier
order_cycle.suppliers.each do |supplier|
links = variant_links_for(order_cycle, supplier)
next if links.empty?

# Find authorised user to access remote products
dfc_user = supplier.owner # we assume the owner's account is the one used to import from dfc.

import_variants(links, dfc_user)
end
end

# Fetch all remote variants for this supplier in the order cycle
def variant_links_for(order_cycle, supplier)
variants = order_cycle.exchanges.incoming.from_enterprise(supplier)
.joins(:exchange_variants).select('exchange_variants.variant_id')
SemanticLink.where(subject_id: variants)
end

def import_variants(links, dfc_user)
# Find any catalogues associated with the variants
catalogs = links.group_by do |link|
FdcUrlBuilder.new(link.semantic_id).catalog_url
end

# Import selected variants from each catalog
catalogs.each do |catalog_url, catalog_links|
catalog = DfcCatalog.load(dfc_user, catalog_url)
catalog.apply_wholesale_values!

catalog_links.each do |link|
catalog_item = catalog.item(link.semantic_id)

SuppliedProductImporter.update_product(catalog_item, link.subject) if catalog_item
end
end
end
end
3 changes: 1 addition & 2 deletions app/jobs/order_cycle_closing_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def send_notifications

def mark_as_processed
OrderCycle.where(id: recently_closed_order_cycles).update_all(
processed_at: Time.zone.now,
updated_at: Time.zone.now
processed_at: Time.zone.now
)
end
end
27 changes: 0 additions & 27 deletions app/jobs/order_cycle_opened_job.rb

This file was deleted.

18 changes: 18 additions & 0 deletions app/jobs/trigger_order_cycles_to_open_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

# Trigger jobs for any order cycles that recently opened
class TriggerOrderCyclesToOpenJob < ApplicationJob
def perform
recently_opened_order_cycles.find_each do |order_cycle|
OpenOrderCycleJob.perform_later(order_cycle.id)
end
end

private

def recently_opened_order_cycles
OrderCycle
.where(opened_at: nil)
.where(orders_open_at: 1.hour.ago..Time.zone.now)
end
end
4 changes: 2 additions & 2 deletions app/jobs/webhook_delivery_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ class FailedWebhookRequestError < StandardError; end

queue_as :default

def perform(url, event, payload)
def perform(url, event, payload, at: Time.zone.now)
body = {
id: job_id,
at: Time.zone.now.to_s,
event:,
at: at.to_s,
data: payload,
}

Expand Down
6 changes: 3 additions & 3 deletions app/services/order_cycles/webhook_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

module OrderCycles
class WebhookService
def self.create_webhook_job(order_cycle, event)
def self.create_webhook_job(order_cycle, event, at)
webhook_payload = order_cycle
.slice(:id, :name, :orders_open_at, :orders_close_at, :coordinator_id)
.slice(:id, :name, :orders_open_at, :opened_at, :orders_close_at, :coordinator_id)
.merge(coordinator_name: order_cycle.coordinator.name)

# Endpoints for coordinator owner
Expand All @@ -17,7 +17,7 @@ def self.create_webhook_job(order_cycle, event)
webhook_endpoints |= order_cycle.distributors.map(&:owner).flat_map(&:webhook_endpoints)

webhook_endpoints.each do |endpoint|
WebhookDeliveryJob.perform_later(endpoint.url, event, webhook_payload)
WebhookDeliveryJob.perform_later(endpoint.url, event, webhook_payload, at:)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion config/sidekiq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
every: "5m"
SubscriptionConfirmJob:
every: "5m"
OrderCycleOpenedJob:
TriggerOrderCyclesToOpenJob:
every: "5m"
OrderCycleClosingJob:
every: "5m"
1 change: 1 addition & 0 deletions spec/factories/oidc_account_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
uid { user&.email || generate(:random_email) }

# This is a live test account authenticated via Les Communes.
# See .env.test for tips on connecting the account for recording VCR cassettes.
factory :testdfc_account do
uid { "[email protected]" }
refresh_token { ENV.fetch("OPENID_REFRESH_TOKEN") }
Expand Down

Large diffs are not rendered by default.

97 changes: 97 additions & 0 deletions spec/jobs/open_order_cycle_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe OpenOrderCycleJob do
let(:now){ Time.zone.now }
let(:order_cycle) { create(:simple_order_cycle, orders_open_at: now) }
subject { OpenOrderCycleJob.perform_now(order_cycle.id) }

around do |example|
Timecop.freeze(now) { example.run }
end

it "marks as open" do
expect {
subject
order_cycle.reload
}
.to change { order_cycle.opened_at }

expect(order_cycle.opened_at).to be_within(1).of(now)
Comment on lines +19 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I thought that there was some way to convert the timestamp to the db precision. I can't remember it now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was just the first solution I came across, and it worked :)
Another thought was to use to_i when comparing.

end

it "enqueues webhook job" do
expect(OrderCycles::WebhookService)
.to receive(:create_webhook_job).with(order_cycle, 'order_cycle.opened', now).once

subject
end

describe "syncing remote products" do
let!(:user) { create(:testdfc_user, owned_enterprises: [enterprise]) }

let(:enterprise) { create(:supplier_enterprise) }
let!(:variant) { create(:variant, name: "Sauce", supplier_id: enterprise.id) }
let!(:order_cycle) {
create(:simple_order_cycle, orders_open_at: now,
suppliers: [enterprise], variants: [variant])
}

it "synchronises products from a FDC catalog", vcr: true do
user.update!(oidc_account: build(:testdfc_account))
# One product is existing in OFN
product_id =
"https://env-0105831.jcloud-ver-jpe.ik-server.com/api/dfc/Enterprises/test-hodmedod/SuppliedProducts/44519466467635"
variant.semantic_links << SemanticLink.new(semantic_id: product_id)

expect {
subject
variant.reload
order_cycle.reload
}.to change { order_cycle.opened_at }
.and change { enterprise.supplied_products.count }.by(0) # It shouldn't add, only update
.and change { variant.display_name }
.and change { variant.unit_value }
# 18.85 wholesale variant price divided by 12 cans in the slab.
.and change { variant.price }.to(1.57)
.and change { variant.on_demand }.to(true)
.and change { variant.on_hand }.by(0)
.and query_database 46
end
end

describe "concurrency", concurrency: true do
let(:breakpoint) { Mutex.new }

it "doesn't open order cycle twice" do
# Pause jobs when placing new job:
breakpoint.lock
allow(OpenOrderCycleJob).to(
receive(:new).and_wrap_original do |method, *args|
breakpoint.synchronize {} # rubocop:disable Lint/EmptyBlock
method.call(*args)
end
)

expect(OrderCycles::WebhookService)
.to receive(:create_webhook_job).with(order_cycle, 'order_cycle.opened', now).once

expect{
# Start two jobs in parallel:
threads = [
Thread.new { OpenOrderCycleJob.perform_now(order_cycle.id) },
Thread.new { OpenOrderCycleJob.perform_now(order_cycle.id) },
]

# Wait for both to jobs to pause.
# This can reveal a race condition.
sleep 0.1

# Resume and complete both jobs:
breakpoint.unlock
threads.each(&:join)
}.to raise_error ActiveRecord::RecordNotFound
end
end
end
62 changes: 0 additions & 62 deletions spec/jobs/order_cycle_opened_job_spec.rb

This file was deleted.

22 changes: 22 additions & 0 deletions spec/jobs/trigger_order_cycles_to_open_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe TriggerOrderCyclesToOpenJob do
let(:oc_opened_before) {
create(:simple_order_cycle, orders_open_at: 1.hour.ago)
}
let(:oc_opened_now) {
create(:simple_order_cycle, orders_open_at: Time.zone.now)
}
let(:oc_opening_soon) {
create(:simple_order_cycle, orders_open_at: 1.minute.from_now)
}

it "enqueues jobs for recently opened order cycles only" do
expect{ TriggerOrderCyclesToOpenJob.perform_now }
.to enqueue_job(OpenOrderCycleJob).with(oc_opened_now.id)
.and enqueue_job(OpenOrderCycleJob).with(oc_opened_before.id).exactly(0).times
.and enqueue_job(OpenOrderCycleJob).with(oc_opening_soon.id).exactly(0).times
end
end
5 changes: 3 additions & 2 deletions spec/jobs/webhook_delivery_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
require 'spec_helper'

RSpec.describe WebhookDeliveryJob do
subject { WebhookDeliveryJob.new(url, event, data) }
subject { WebhookDeliveryJob.new(url, event, data, at:) }
let(:url) { 'https://test/endpoint' }
let(:event) { 'order_cycle.opened' }
let(:at) { 1.second.ago }
let(:data) {
{
order_cycle_id: 123, name: "Order cycle 1", open_at: 1.minute.ago.to_s, tags: ["tag1", "tag2"]
Expand All @@ -25,7 +26,7 @@
Timecop.freeze do
expected_body = {
id: /.+/,
at: Time.zone.now.to_s,
at: at.to_s,
event:,
data:,
}
Expand Down
Loading
Loading