Skip to content

Commit

Permalink
Feature/deduplicated daily meter signals (et/somenergia-jardiner!79)
Browse files Browse the repository at this point in the history
Merge branch 'feature/daily_meter_signals_pipe' into 'main'
  • Loading branch information
polmonso committed Jan 23, 2024
2 parents f1eedb3 + fbbb26f commit f908268
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ models:
demana les lectures amb un offset de mitja hora.
El [dag que materializa](plant_production_datasets_jardiner_dag)
s'executa diariament, però es pot executar quan calgui, també horàriament.
Aquest incremental, dels 2 últims dies de ts que anem a buscar a int_dset_responses__deduplicated,
nomes agafem les unique_keys que hagin canviat (poden ser els meters, que van de
null a tenir valor, o bé els duplicats generats pel DAG que torna a lleguir per
la nit todo el dia anterior) o bé les que no existessin.
columns:
- name: group_name
description: dset plant name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
{{ config(materialized="incremental", on_schema_change="sync_all_columns") }}
{{
config(
materialized="incremental",
on_schema_change="sync_all_columns",
incremental_strategy = 'delete+insert',
unique_key = ['ts', 'signal_uuid'],
incremental_predicates = ["int_dset_responses__materialized_one_hour_late.ts > now() - interval '2 days'"]
)
}}


with
normalized_jsonb as (
Expand Down Expand Up @@ -31,5 +40,10 @@ where
ts < now() - interval '1 hour' {#- select only freshly ingested rows #}

{% if is_incremental() -%}
and ts > coalesce((select max(ts) from {{ this }}), '1900-01-01') and queried_at > now() - interval '2 hour'
and queried_at > coalesce((select max(queried_at) from {{ this }}), '1900-01-01')
and ts > now() - interval '2 days'
-- dedupliquem 2 dies enrera (ho diu el predicate) i per això no podem garantir que tot l'anterior sigui unic
-- i per tant ho descartem aqui, en la selecció del incremental
-- Si per algun motiu deixem de materialitzar durant 48 hores, caldrà fer un full-refresh!

{%- endif %}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(materialized='view') }}

with spina5m as (
select generate_series((now() at time zone 'Europe/Madrid')::date - interval '30 days', now(), '5 minutes') as ts
select generate_series('2023-12-01', now(), '5 minutes') as ts
),
spined_expected_signals as (
select
Expand All @@ -20,11 +20,11 @@ spined_expected_signals as (
left join {{ ref("dm_plants") }} as plants on true
left join {{ ref("raw_gestio_actius__signal_denormalized") }} as metadata using (plant_uuid)
),
dset_last_month as(
dset_from_december_2023 as(
select * from {{ ref("int_dset_responses__materialized_one_hour_late") }} as dset
where dset.ts > (now() at time zone 'Europe/Madrid')::date - interval '30 days'
where dset.ts > '2023-12-01'
{# if we don't limit queried_at the planner shits the bed #}
and queried_at > (now() at time zone 'Europe/Madrid')::date - interval '30 days'
and queried_at > '2023-12-01'
),
spined_dset as (
select
Expand All @@ -50,9 +50,10 @@ spined_dset as (
valors.signal_last_ts,
valors.signal_last_value,
valors.queried_at,
valors.ts is not null as from_dset
valors.ts is not null as from_dset,
valors.materialized_at
from spined_expected_signals
left join dset_last_month as valors using(ts, signal_uuid)
left join dset_from_december_2023 as valors using(ts, signal_uuid)
order by ts desc, nom_planta
)
select * from spined_dset
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
{{ config(materialized="incremental") }}
{{
config(
materialized="incremental",
on_schema_change="sync_all_columns",
incremental_strategy = 'delete+insert',
unique_key = ['ts', 'signal_uuid'],
incremental_predicates = ["int_dset_responses__values_incremental.ts > now() - interval '2 days'"]
)
}}

select *
from {{ ref("int_dset_responses__spined_metadata") }}

{#
on the live data view.
dset provides data late, so with a espina 5m we will always have one or several null batches
Expand All @@ -11,6 +18,12 @@ from {{ ref("int_dset_responses__spined_metadata") }}
where
ts < now() - interval '1 hour'

{% if is_incremental() %}
and ts >= coalesce((select max(ts) from {{ this }}), '1900-01-01')
{% endif %}
{% if is_incremental() -%}
and materialized_at > coalesce((select max(materialized_at) from {{ this }}), '1900-01-01')
and ts > now() - interval '2 days'
{#
dedupliquem 2 dies enrera (ho diu el predicate) i per això no podem garantir que tot l'anterior sigui unic
i per tant ho descartem aqui, en la selecció del incremental
Si per algun motiu deixem de materialitzar durant 48 hores, caldrà fer un full-refresh!
#}
{%- endif %}
67 changes: 67 additions & 0 deletions docs/desenvolupadors/2024-01-11-incremental-dedup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@


```sql
{{
config(
materialized = 'incremental',
unique_key = ['signal_uuid','ts'],
incremental_strategy = 'merge',
incremental_predicates = [
"DBT_INTERNAL_DEST.queried_at > dateadd(day, -1, current_date)"
]
)
}}

where
ts < now() - interval '1 hour' {#- select only freshly ingested rows #}

{% if is_incremental() -%}
and queried_at > coalesce((select max(queried_at) from {{ this }}), '1900-01-01') and queried_at > now() - interval '2 hour'
{%- endif %}
```



source
senyal ts valor queried_at
energia_comptador 03:00 80 +1d 04:00
energia_comptador 03:00 null 04:00
potencia_inversor 03:00 50 +1d 04:00
potencia_inversor 03:00 50 04:00


son ara les 15:04
en la materialitzada tindrem
potencia_inversor 14:00 67
energia_comptador 14:00 null -- o aquesta fila no hi serà

where max(ts) = 14:00:
potencia_inversor 14:00 67
energia_comptador 14:00 null

where max(ts) - 24h:
energia_comptador 03:00 80 +1d 04:00
potencia_inversor 03:00 50 +1d 04:00
potencia_inversor 03:00 50 04:00
potencia_inversor 14:00 67
energia_comptador 14:00 null


unique (signal_uuid, ts)

source
energia_comptador 03:00 80 +1d 04:00


# prova pràctica

Abans d'executar l'incremental tenim aquest tres últims registres a materialized one hour late

|group_name|queried_at|ts|signal_code|signal_device_type|signal_device_uuid|signal_frequency|signal_id|signal_is_virtual|signal_last_ts|signal_last_value|signal_type|signal_tz|signal_unit|signal_uuid|signal_uuid_raw|signal_value|materialized_at|
|----------|----------|--|-----------|------------------|------------------|----------------|---------|-----------------|--------------|-----------------|-----------|---------|-----------|-----------|---------------|------------|---------------|
|SE_vallehermoso|2024-01-09 13:24:42.488123+01|2024-01-05 23:15:00+01|ce_eactexp|meter|08bc8d88-4ea2-4ee9-b817-00e1f07debba|15 minutes|983894|false|2024-01-08 23:45:00+01|0|absolute|Europe/Madrid|kWh|646d1ec2-0ca2-4c22-9739-8161aafb224e|646d1ec2-0ca2-4c22-9739-8161aafb224e|0.0|2024-01-09 13:36:52.925381+01|
|SE_asomada|2024-01-09 13:24:42.488123+01|2024-01-05 23:00:00+01|s8102|meter|c5ed9fab-e73c-40a5-acb3-113e22052fd6|5 minutes|1031060|false|2024-01-09 11:47:00+01|60.2|absolute|Europe/Madrid|V|dd065f6a-4ded-41a8-98d3-ba239f9a2a47|dd065f6a-4ded-41a8-98d3-ba239f9a2a47|60.2|2024-01-09 13:36:52.925381+01|
|SE_tahal|2024-01-09 13:24:42.488123+01|2024-01-05 23:45:00+01|ce_ercapexp_er3|meter|5deb3780-02e2-4dcf-a6a7-de646991762c|15 minutes|983890|false|2024-01-08 23:45:00+01|0|absolute|Europe/Madrid|kVArh|31ae7d09-95db-447d-b5bf-358b056ef5bf|31ae7d09-95db-447d-b5bf-358b056ef5bf|0.0|2024-01-09 13:36:52.925381+01|



0 comments on commit f908268

Please sign in to comment.