Макросы dbt для ClickHouse

/ Всеволод Миронович

Макросы в dbt — это тема, к которой обычно приходят не из любопытства, а из боли. Если вы здесь, значит, в вашем ClickHouse-проекте уже есть три одинаковых CASE WHEN, два слегка отличающихся JOIN и навязчивое ощущение неловкости: то ли трусы надо поправить, то ли рефакторинг провести.

Что такое макросы в dbt?

Макросы - это функции, написанные на странном синтаксисе шаблонизатора jinja, и лежащие по умолчанию в папке macro. В начале папка безвидна и пуста, так что нужно сотворить их самостоятельно. В интернете полно готовых решений, я поделюсь некоторыми из тех, что мы используем на проектах.

Для чего нужны макросы в dbt?

  1. С помощью макросов можно переиспользовать код и избежать его дублирования
  2. Это позволяет также управлять бизнес логиков в одном месте, откуда она распространяется во всех нужных моделях
  3. Помимо типизированных выражений в макрос можно спрятать и громоздкую логику, что улучшит читаемость кода
  4. Если по какой-то причине вы регулярно меняете субд, то макрос поможет адаптировать SQL соответственным образом
  5. В макросах доступных как стандартные для jinja конструкции, так и специальные функции и переменные dbt. А это значит, что SQL можно нехило прокачать и выйти за границы дозволенного, если вы понимаете, о чём я 😉
  6. Для адаптеров, а так же благодаря осознанности коллеги из dbtlabs, можно переопределять поведение dbt в некоторых сценариях тоже через макросы, задавая предопределенные названия.

Схема таблиц

Dbt обычно материализует данные в базе по пути database.schema.table, однако в ClickHouse схема отсутствует. Согласно документации по этой причине адаптер использует schema.table, где schema - это database в КХ. И если указать в конфигурации модели schema='dev', то адаптер попытается создать базу database_dev, а если database='dev', то всё сработает.

Я ничего не понял, и чтобы не путаться, заменил стандартную функцию генерации имени схемы.

{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ custom_schema_name }}
    {%- endif -%}
{%- endmacro %}

Генерация дат

{% macro generate_dates(date=None, days=5000) -%}
( 
  select
  {% if date is none -%}
    today()
  {%- else -%}
    toDate('{{ date }}')
  {%- endif %} - number as date
  from numbers({{days}})
) as dates
{%- endmacro %}

Когда во временных рядах есть пропуски, можно сджойнить таблицу со сгенерированным датами, которая всегда под рукой

select
  sum(clicks) as clicks
  , dates.date as date
from summary_report
right join {{generate_dates(days=1000)}}
on summary_report.date = dates.date
where dates.date between '2025-04-07' and '2025-04-20'
group by date

И в нашем новом инструменте для графиков сразу видно провалы. В смысле, не в инструменте провалы.. в данных!

Отображение пропусков на графике

Словари

{% macro format_camp(t) %}
{% set t = 
    {"creo":"_multi_", "creo_name":"мультиформат"}, 
    {"creo":"_preroll_", "creo_name":"преролл"},
    {"creo":"_post_", "creo_name":"пост "},
    {"creo":"_clip_", "creo_name":"клипы"},
    {"creo":"_tgb_", "creo_name":"ТГБ"},
    {"creo":"_lead_", "creo_name":"лид формы"}, 
    {"creo":"_dzen_", "creo_name":"дзен "}, 
    {"creo":"_vid_", "creo_name":"видео"},
    {"creo":"_carousel_", "creo_name":"карусель"}, 
    {"creo":"_story_", "creo_name":"истории"},
    {"creo":"_prospecting_", "creo_name":"проспектинг"},
    {"creo":"_dinrem_", "creo_name":"динамический ретаргетинг"}

 %}
{% for value in t %}
      WHEN match(lower(campaign), lower('{{ value.creo }}')) THEN '{{ value.creo_name }}'
{% endfor %}
{% endmacro %}

Это наглядный пример переиспользования кода. Мы описываем атрибутирование кампании типом креатива в одном месте и потом переиспользуем везде, где нужно. Тут ещё используется цикл, чтобы поменьше писать и поудобнее править.

Инкрементальные обновления

Мы много работаем с большими объемами, делаем инкрементальные обновления и из-за специфики маркетинговых данных постоянно занимаемся дедупликацией. Следующая серия макросов про это.

Простой вариант

Есть ленивые голубцы, а есть ленивые макросы 😆 Это простое решение позволит поддерживать таблицу в актуальном состоянии, подтягивая новые данные.

{% macro incremental_filter_ch(column) %}
    {% if is_incremental() %}
        where {{ column }} > (select max({{ column }}) from {{ this }})
    {% endif %}
{% endmacro %}

Пример использования

select * from events
{{ incremental_filter_ch('event_time') }}

Даты

Для частых перерасчетов удобно пользоваться макросами с датами, у нас их целая куча. Один из самых часто используемых по умолчанию возвращает вчерашний день (позволяя регулировать это) или возвращает значения по ключу передаваемой переменной.

{% macro var_or_daysago(key, n=1) -%}
  {% set _ndaysago = (modules.datetime.datetime.now() - modules.datetime.timedelta(days=n)).strftime('%Y-%m-%d') -%}
  {% set result = var(key, default=_ndaysago) -%}
  toDate(parseDateTimeBestEffort('{{ result }}'))
{%- endmacro %}

В инкрементальных моделях его можно подставить в фильтр с = или between или ещё как.

where
  date between {{ var_or_daysago('start_date', 30) }}
  and {{ var_or_daysago('end_date', 1) }}

Если мне надо пересчитать за выбранный период, то при запуске я указываю нужные значения в переменных.

Задание переменных при запуск dbt

Если мне нужно дедуплицировать, предварительно удалив старые записи, то тот же макрос используется в pre_hook. Сперва проверяем, что запуск инкрементальный, иначе удаление может выдать ошибку, а потом yeahbuddylightweightbaby с параметрами. Главное - не налажать с кавычками.

{{ config(
  pre_hook = "{% if is_incremental() %}delеte from {{this}} on cluster  {{target.cluster}} where date between {{ var_or_daysago('start_date', 30) }} and {{ var_or_daysago('end_date', 1) }} {% endif %}"
) }} 

Я не рекомендую данную стратегию дедупликации, но, когда есть выбор, это хорошо.

Партиции

ClickHouse очень не любит мутаций. Они про это целую статью написали и вообще рекомендуют производить мутации только администратору БД 😅 Одна из очевидных стратегий дедупликаций - замена партиций.

Нашёл вот такой пример генерации запроса на удаление. Его предназначение - сформировать массив названий недельных партиций за период, а потом перечислить их в дропе.

{% macro prehook_drop_partition_yyyyww(this, startDate, endDate) -%}

    {% set start_week_date = modules.datetime.datetime.strptime(startDate, '%Y-%m-%d') - modules.datetime.timedelta(days=modules.datetime.datetime.strptime(startDate, '%Y-%m-%d').weekday()) %}
    {% set end_week_date = modules.datetime.datetime.strptime(endDate, '%Y-%m-%d') - modules.datetime.timedelta(days=modules.datetime.datetime.strptime(endDate, '%Y-%m-%d').weekday()) %}

    {% set diff_weeks = ((end_week_date - start_week_date).days // 7) + 1 %}

    {% set week_list = [] %}
    {% for i in range(diff_weeks) %}
      {% set week_date = start_week_date + modules.datetime.timedelta(weeks=i) %}
      {% set week_str = week_date.strftime('%G%V') %}
      {% do week_list.append(week_str) %}
    {% endfor %}

    alter table {{ this }} on cluster {{target.cluster}}
    {% for week in week_list %}
    drop partition {{ week }}
		{%- if not loop.last %},{% endif %}
    {%- endfor %}
{%- endmacro %}

Пример использования

pre_hook ="{{ prehook_drop_partition_yyyyww(this, var_or_daysago_raw('start_date', 65), var_or_daysago_raw('end_date', 1)) }}"

Проверка дублей в источнике

{% macro check_unique_key(source_ref, columns) %}
    {% set columns_str = columns | join(', ') %}
    with duplicates AS (
        select count(*) AS count
        from {{ source_ref }}
        group by {{ columns_str }}
        having count(*) > 1
    )
    select 
        count(*) AS duplicate_count 
    from duplicates
{% endmacro %}

Пример использования в другом 🤯 макросе

{% macro validate_duplicate_in_source_table(source_ref, columns) %}
    {% set result = run_query(check_unique_key(source_ref, columns)) %}
    {% set duplicate_count = result.columns[0].values()[0] if result else 0 %}
    
    {% if execute %}
        {% if duplicate_count > 0 %}
            {{ log("Дубликаты найдены в " ~ source_ref ~ ". Таблица "  ~ source_ref ~ " будет проигнорирована.", info=True) }}
            {{ return("(select * from " ~ source_ref ~ " where date = today() limit 1)") }}
        {% else %}
            {{ log("Дубликаты не найдены в " ~ source_ref, info=True) }}
            {{ return(source_ref) }}
        {% endif %}
    {% endif %}
{% endmacro %}

Что тут происходит? Проверяем наличие дублей по ключу. Если нашлись, выдаём предупреждением и скипаем источник болванкой. Если дублей нет - всё ок, просто логируем факт проверки и разрешаем обратиться к источнику. Соответственно, в моделях в него оборачивается ref.

from {{ validate_duplicate_in_source_table(ref('campaign_data'), ['date', 'campaign_id', 'placement_id']) }}

Проверка репликации

{% macro validate_replication(model, cluster_name='default') %}
    {%- set query %}
        select
            fqdn(), table
        from clusterAllReplicas({{cluster_name}}, system.tables)
        where database = '{{ model.schema }}' and table = '{{ model.name }}'
        and total_rows = 0
    {%- endset %}

    {%- set result = run_query(query) %}

    {% if execute %}
        {% if result|length > 0 %}
            {{ exceptions.raise_compiler_error("🚨 Data is NOT synchronised on all replicas in the table " ~ model) }}
        {% else %}
            {{ log("✅ Data is synchronised on all replicas in the table " ~ model, info=True) }}
            {{ return(model) }}
        {% endif %}
    {% endif %}
{% endmacro %}

По разным причинам репликации данных между нодами ClickHouse может задерживаться. В этом случае мы не хотим продолжать отношения с дагом и бросаем его. Если не хотите сами сообщать родственникам, они могут подписаться на уведомления об ошибках в запусках через планировщик. Мы пришлём письмо с причиной 😘

Подписка на уведомления в планировщике

Распределенные таблицы

{% macro create_distributed_table(local_table, cluster_name='default') %}
  create table if not exists {{ this.database }}.{{ this.name }}_dist on cluster {{ cluster_name }}
  as {{ this.database }}.{{ local_table }}
  engine = Distributed(
    '{{ cluster_name }}',
    '{{ this.database }}',
    '{{ local_table }}',
    rand()
  )
{% endmacro %}

Идея простая - сделали таблицу и пост хуком распределяем её на кластер, например, для целей шеринга коллегам на том берегу. Я им не пользовался.

{{ config(post_hook=create_distributed_table(this.name, 'my_cluster')) }}

TTL выражения

Для больших таблицы мы используем гибридное хранение: самые свежие данные лежат на ценном ssd, чуть старше на hdd и невостребованные в S3 холодильники. Вообще конечно тити-эли бывают всякие, но мы любим их все.

{% macro generate_ttl(column_name, ttl_days, action='delete') %}
  {% if action == 'to_volume' %}
    {{ column_name }} + interval {{ ttl_days }} day to volume 'archive'
  {% elif action == 'to_disk' %}
    {{ column_name }} + interval {{ ttl_days }} day to disk 'cold'
    {% if action == 'to_dick' or action == 'delete' %}
    {{ column_name }} + interval {{ ttl_days }} day
  {% endif %}
{% endmacro %}

Пример использования

{{ config(
    engine='MergeTree()',
    order_by='event_date',
    ttl=generate_ttl('event_date', 365)
) }}

Union all для ретроданных

Судьба нас конечно много к чему не готовила, но эксельки за доковидные года загружать и подклеивать умеем. Суть этого отрывка в том, чтобы аккуратненько взять и склеить похожие таблички.

{% macro union_tables(database, table_pattern, exclude_columns=[]) %}
  {% set tables_query %}
    select name 
    from system.tables 
    where database = '{{ database }}' 
    and name like '{{ table_pattern }}'
  {% endset %}
  
  {% set tables = run_query(tables_query).columns[0].values() %}
  
  {% for table in tables %}
    select 
      {% if exclude_columns %}
      * except({{ exclude_columns | join(', ') }})
      {% else %}
      *
      {% endif %}
    from {{ database }}.{{ table }}
    {% if not loop.last %}
    union all
    {% endif %}
  {% endfor %}
{% endmacro %}

Пример использования

{{ union_tables('analytics', 'events_%', exclude_columns=['internal_field']) }}

Полезные ссылки

  1. Jinja and macros
  2. Адаптер dbt для ClickHouse