Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Job unique producteurs et réutilisateurs pour l’expiration des JDD #4402

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
100 changes: 87 additions & 13 deletions apps/transport/lib/jobs/expiration_notification_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
This job sends daily digests to reusers about the expiration of their favorited datasets.
The expiration delays is the same for all reusers and cannot be customized for now.

It has 2 `perform/1` methods:
It has 3 `perform/1` methods:
- a dispatcher one in charge of identifying contacts we should get in touch with today
- another in charge of building the daily digest for a specific contact (with only their favorited datasets)
- and one to send to admins and producers
"""
use Oban.Worker,
max_attempts: 3,
Expand All @@ -21,11 +22,23 @@ defmodule Transport.Jobs.ExpirationNotificationJob do

import Ecto.Query
@notification_reason Transport.NotificationReason.reason(:expiration)
@expiration_reason Transport.NotificationReason.reason(:expiration)
@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-30, -7, 0, 7, 14]
@default_outdated_data_delays_reuser [-30, -7, 0, 7, 14]
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@impl Oban.Worker
def perform(%Oban.Job{id: job_id, args: %{"contact_id" => contact_id, "digest_date" => digest_date}}) do

def perform(%Oban.Job{id: job_id, args: %{"role" => "admin_and_producer"}}) do
outdated_data(job_id)
:ok
end

def perform(%Oban.Job{
id: job_id,
args: %{"contact_id" => contact_id, "digest_date" => digest_date, "role" => "reuser"}
}) do
contact = DB.Repo.get!(DB.Contact, contact_id)
subscribed_dataset_ids = subscribed_dataset_ids_for_expiration(contact)

Expand Down Expand Up @@ -54,10 +67,12 @@ defmodule Transport.Jobs.ExpirationNotificationJob do

DB.Repo.transaction(
fn ->
new(%{"role" => "admin_and_producer"}) |> Oban.insert()

dataset_ids
|> contact_ids_subscribed_to_dataset_ids()
|> Stream.chunk_every(100)
|> Stream.each(fn contact_ids -> insert_jobs(contact_ids, target_date) end)
|> Stream.each(fn contact_ids -> insert_reuser_jobs(contact_ids, target_date) end)
|> Stream.run()
end,
timeout: :timer.seconds(60)
Expand Down Expand Up @@ -136,12 +151,12 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
def delay_str(-1), do: "périmé depuis hier"
def delay_str(d) when d <= -2, do: "périmés depuis #{-d} jours"

defp insert_jobs(contact_ids, %Date{} = target_date) do
defp insert_reuser_jobs(contact_ids, %Date{} = target_date) do
# Oban caveat: can't use [insert_all/2](https://hexdocs.pm/oban/Oban.html#insert_all/2):
# > Only the Smart Engine in Oban Pro supports bulk unique jobs and automatic batching.
# > With the basic engine, you must use insert/3 for unique support.
Enum.each(contact_ids, fn contact_id ->
%{"contact_id" => contact_id, "digest_date" => target_date}
%{"contact_id" => contact_id, "digest_date" => target_date, "role" => "reuser"}
|> new()
|> Oban.insert()
end)
Expand Down Expand Up @@ -170,9 +185,9 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
Transport.Cache.fetch(
to_string(__MODULE__) <> ":gtfs_expiring_on_target_dates:#{reference_date}",
fn ->
delays_and_dates = delays_and_dates(reference_date)
dates_and_delays = Map.new(delays_and_dates, fn {key, value} -> {value, key} end)
expiring_dates = Map.values(delays_and_dates)
delays_and_date_reuser = delays_and_date_reuser(reference_date)
dates_and_delays = Map.new(delays_and_date_reuser, fn {key, value} -> {value, key} end)
expiring_dates = Map.values(delays_and_date_reuser)

DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
Expand All @@ -199,7 +214,7 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
end

@doc """
iex> delays_and_dates(~D[2024-05-21])
iex> delays_and_date_reuser(~D[2024-05-21])
%{
-30 => ~D[2024-04-21],
-7 => ~D[2024-05-14],
Expand All @@ -208,8 +223,67 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
14 => ~D[2024-06-04]
}
"""
@spec delays_and_dates(Date.t()) :: %{delay() => Date.t()}
def delays_and_dates(%Date{} = date) do
Map.new(@default_outdated_data_delays, fn delay -> {delay, Date.add(date, delay)} end)
@spec delays_and_date_reuser(Date.t()) :: %{delay() => Date.t()}
def delays_and_date_reuser(%Date{} = date) do
Map.new(@default_outdated_data_delays_reuser, fn delay -> {delay, Date.add(date, delay)} end)
end

####  HERE CODE FROM ADMIN AND PRODUCER JOB ####

def outdated_data(job_id) do
for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_admin_mail()
|> Enum.map(&send_outdated_data_producer_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

# A different email is sent to producers for every delay, containing all datasets expiring on this given delay
@spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok
def send_outdated_data_producer_notifications({delay, records}, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)
end

@spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_admin_mail([] = _records), do: []

defp send_outdated_data_admin_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end
end
21 changes: 19 additions & 2 deletions apps/transport/lib/jobs/new_dataset_notifications_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ defmodule Transport.Jobs.NewDatasetNotificationsJob do
"""
use Oban.Worker, max_attempts: 3, tags: ["notifications"]
import Ecto.Query
@new_dataset_reason Transport.NotificationReason.reason(:new_dataset)

@impl Oban.Worker
def perform(%Oban.Job{inserted_at: %DateTime{} = inserted_at}) do
inserted_at |> relevant_datasets() |> Transport.DataChecker.send_new_dataset_notifications()

def perform(%Oban.Job{id: job_id, inserted_at: %DateTime{} = inserted_at}) do
inserted_at |> relevant_datasets() |> send_new_dataset_notifications(job_id)
:ok
end

Expand All @@ -18,4 +20,19 @@ defmodule Transport.Jobs.NewDatasetNotificationsJob do
|> where([dataset: d], d.inserted_at >= ^datetime_limit)
|> DB.Repo.all()
end

@spec send_new_dataset_notifications([DB.Dataset.t()] | [], pos_integer()) :: no_return() | :ok
def send_new_dataset_notifications([], _job_id), do: :ok

def send_new_dataset_notifications(datasets, job_id) do
@new_dataset_reason
|> DB.NotificationSubscription.subscriptions_for_reason_and_role(:reuser)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.new_datasets(datasets)
|> Transport.Mailer.deliver()

DB.Notification.insert!(subscription, %{dataset_ids: Enum.map(datasets, & &1.id), job_id: job_id})
end)
end
end
100 changes: 1 addition & 99 deletions apps/transport/lib/transport/data_checker.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
defmodule Transport.DataChecker do
@moduledoc """
Use to check data, and act about it, like send email
Use to check data for toggling on and off active status of datasets depending on status on data.gouv.fr
"""
alias DB.{Dataset, Repo}
import Ecto.Query
require Logger

@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
@type dataset_status :: :active | :inactive | :ignore | :no_producer | {:archived, DateTime.t()}
@expiration_reason Transport.NotificationReason.reason(:expiration)
@new_dataset_reason Transport.NotificationReason.reason(:new_dataset)
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@doc """
This method is a scheduled job which does two things:
Expand Down Expand Up @@ -106,99 +101,6 @@ defmodule Transport.DataChecker do
)
end

def outdated_data do
# Generated as an integer rather than a UUID because `payload.job_id`
# for other notifications are %Oban.Job.id (bigint).
job_id = Enum.random(1..Integer.pow(2, 63))

for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_mail()
|> Enum.map(&send_outdated_data_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

@spec send_new_dataset_notifications([Dataset.t()] | []) :: no_return() | :ok
def send_new_dataset_notifications([]), do: :ok

def send_new_dataset_notifications(datasets) do
# Generated as an integer rather than a UUID because `payload.job_id`
# for other notifications are %Oban.Job.id (bigint).
job_id = Enum.random(1..Integer.pow(2, 63))

@new_dataset_reason
|> DB.NotificationSubscription.subscriptions_for_reason_and_role(:reuser)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.new_datasets(datasets)
|> Transport.Mailer.deliver()

DB.Notification.insert!(subscription, %{dataset_ids: Enum.map(datasets, & &1.id), job_id: job_id})
end)
end

@spec send_outdated_data_notifications(delay_and_records(), integer()) :: delay_and_records()
def send_outdated_data_notifications({delay, records} = payload, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)

payload
end

@doc """
iex> resource_titles([%DB.Resource{title: "B"}])
"B"
iex> resource_titles([%DB.Resource{title: "B"}, %DB.Resource{title: "A"}])
"A, B"
"""
def resource_titles(resources) do
resources
|> Enum.sort_by(fn %DB.Resource{title: title} -> title end)
|> Enum.map_join(", ", fn %DB.Resource{title: title} -> title end)
end

@spec send_outdated_data_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_mail([] = _records), do: []

defp send_outdated_data_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end

# Do nothing if all lists are empty
defp send_inactive_datasets_mail([] = _reactivated_datasets, [] = _inactive_datasets, [] = _archived_datasets),
do: nil
Expand Down
2 changes: 0 additions & 2 deletions apps/transport/lib/transport/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ defmodule Transport.Scheduler do
[
# Every day at 4am UTC
{"0 4 * * *", {Transport.ImportData, :import_validate_all, []}},
# Send email for outdated data
{"@daily", {Transport.DataChecker, :outdated_data, []}},
# Set inactive data
{"@daily", {Transport.DataChecker, :inactive_data, []}},
# Watch for new comments on datasets
Expand Down
Loading
Loading