Кастомные материализации dbt: из ClickHouse в S3

Укладка паркета, дёшево

|
Кастомные материализации dbt: из ClickHouse в S3

Однажды меня попросили уложить паркет 🤭
А если конкретно, то мне потребовалось на регулярной основе перекладывать данные из таблицы внутреннего закрытого ClickHouse в S3 бакет для передачи партнеру.
Эту задачу можно решить кучей способов, но у нас есть SubQuery с dbt и расписанием, так что я решил пробовать написать кастомную материализацию.

Предварительные работы

Перед тем как писать материализацию, нужно подготовить инфраструктуру:

  1. S3 Bucket
    Мы работаем с облаком Яндекса. Там проще всего завести через UI, но можно и через консоль.

    yc storage bucket create --name my-dbt-bucket --default-storage-class standard

    Запомните endpoint (например, https://storage.yandexcloud.net) и регион.

  2. Настройте доступ к бакету из ClickHouse. В Clickhouse для этого лучше использовать коллекции — набор credentials (ключ доступа, секрет, endpoint), который можно переиспользовать в разных моделях.

    Создайте коллекцию типа s3 через интерфейс SubQuery или API, указав:

    • Access key ID и Secret access key (например, сервисный ключ Yandex Cloud).
    • Endpoint (например, https://storage.yandexcloud.net).
    • Регион (опционально).

    Совет: называйте коллекции префиксом с их типом, например s3_ или postgres_. Так в них потом проще ориентироваться.

  3. Убедитесь, что ClickHouse умеет работать с S3 через функцию s3(). Обычно это требует правильной версии ClickHouse и настроенного HTTP‑транспорта.

Подробнее о подключении S3 к ClickHouse читайте в нашей статье: Работа с S3 в ClickHouse.

Jinja и макросы в dbt

dbt использует шаблонизатор Jinja2 для внедрения логики в SQL‑код. Это позволяет писать динамические запросы, повторно использовать фрагменты кода (макросы) и управлять конфигурацией на лету.

Макросы — это функции на Jinja, которые живут в папке macros/ вашего dbt‑проекта. Они могут принимать аргументы, возвращать SQL‑строки и вызываться из моделей, подобно встроенным функциям.

Например, вместо того чтобы копировать один и тот же CASE WHEN в десятке моделей, вы выносите его в макрос и вызываете отовсюду. Это не только сокращает объём кода, но и упрощает поддержку: изменение логики в одном месте автоматически применяется ко всем зависимым моделям.

Если вы раньше не работали с Jinja, достаточно запомнить три основных синтаксических элемента:

  1. Выражения {{ ... }} — подставляют значение переменной или результат вызова макроса.
  2. Управляющие конструкции {% ... %} — циклы, условия, объявление переменных.
  3. Комментарии {# ... #} — не попадают в итоговый SQL.

В контексте кастомных материализаций Jinja используется для генерации DDL/DML‑операций, которые выполняются адаптером (в нашем случае — ClickHouse). Сама материализация по сути является макросом специального вида, который dbt вызывает при сборке модели.

Подробнее о макросах для ClickHouse можно прочитать в нашей отдельной статье: Макросы dbt для ClickHouse.

Кастомные материализации в dbt

Стандартные материализации dbt — table, view, incremental и ephemeral — покрывают большинство сценариев, но иногда требуется сохранить результат модели не в таблицу БД, а, например, в файл S3, отправить в очередь или записать в сторонний API. Для таких случаев dbt позволяет писать кастомные материализации.

Кастомная материализация — это макрос, который определяет, как dbt должен выполнить модель. Он должен:

  1. Принимать конфигурацию модели (через config.get()).
  2. Генерировать нужный DDL/DML‑код (обычно с помощью {% call statement() %}).
  3. Вызывать хуки (run_hooks(pre_hooks) и run_hooks(post_hooks)), если они заданы.
  4. Возвращать словарь с relations (или пустой список, если отношения не создаются).

Материализация объявляется вот так

{% materialization имя, adapter='адаптер' %}...{% endmaterialization %}

Внутри можно использовать любые Jinja‑конструкции, а также обращаться к объектам dbt:

  • this — текущая модель (её имя, схема, база данных).
  • sql — SQL‑код модели (то, что написано в select ...).
  • config — конфигурация модели (аргументы из {{ config(...) }}).
  • run_hooks() — выполнение pre‑ и post‑хуков.
  • statement() — выполнение SQL‑запроса через адаптер.

Обязательные блоки внутри материализации:

  • Объявление переменных
{% set overwrite = config.get('overwrite', False) %}
  • Вызов pre‑хуков
{{ run_hooks(pre_hooks) }}
  • Основной SQL‑запрос
{% call statement('main') %...{% endcall %}
  • Вызов post‑хуков
{{ run_hooks(post_hooks) }}
  • Возврат результата
{{ return({'relations': []}) }}

Если материализация создаёт таблицу (или другое отношение), его нужно добавить в возвращаемый список, чтобы dbt мог управлять его жизненным циклом. В нашем случае мы не создаём таблицу в БД, поэтому вернём пустой список.

Кастомные материализации размещаются в папке macros/ и автоматически подхватываются dbt при запуске. Имя материализации (первый аргумент materialization) используется в {{ config(materialized='имя') }}.

Написание материализации

{% materialization s3_parquet, adapter='clickhouse' %}  {%- set overwrite = config.get('overwrite', False) -%}  {%- set account = this.identifier -%}  {%- set datetime_column = config.get('datetime_column', 'date') -%}   {%- set file_path = 'source=subquery/account=' ~ account ~ '/{_partition_id}/data.parquet.gz' -%}  {%- set partition_expr -%}      formatDateTime({{ datetime_column }}, 'year=%Y/month=%m/day=%d')  {%- endset -%}   {{ run_hooks(pre_hooks) }}   {%- call statement('main') -%}    insert into function s3(      s3_collection,      filename='{{ file_path }}'    ) partition by {{ partition_expr }}     {{ sql }}     {% if overwrite -%}    settings s3_truncate_on_insert=1    {%- endif -%}  {%- endcall %}    {{ run_hooks(post_hooks) }}   {{ return({'relations': []}) }}{% endmaterialization %}

Как это выглядит в S3

После выполнения материализации данные оказываются в S3 в виде сжатых Parquet‑файлов, разложенных по папкам согласно партицированию.

Структура пути формируется по шаблону:

source=subquery/account=<account>/year=2025/month=01/day=15/data.parquet.gz

Где:

  • source=subquery — фиксированный префикс (можно менять в материализации).
  • account=<account> — идентификатор модели (берётся из this.identifier).
  • year=.../month=.../day=... — партиции по дате, которые автоматически генерируются из колонки datetime_column.

Каждый файл содержит данные за один день (если партицирование дневное) и сжат gzip. Формат Parquet обеспечивает эффективное хранение и возможность прямого запроса через инструменты Trino, DuckDB или того же ClickHouse.

В SubQuery можно подключить свой бакет и просматривать файлы через веб‑интерфейс — это удобно для быстрой проверки, что данные выгрузились корректно.

Просмотр S3 бакета

Пример использования в моделях

Вот как выглядит модель, которая использует нашу кастомную материализацию:

{{ config(    materialized='s3_parquet',    overwrite=True,    alias='app_stream',    datetime_column='toDateTime(timestampcolumn)') }} select  *from {{ source('raw_data', 'view_app_stream') }}where toDate( {{ config.get('datetime_column') }} )    between {{ var_or_daysago('start_date', 3) }} and {{ var_or_daysago('end_date', 1) }}

Разберём параметры конфигурации:

  • materialized='s3_parquet' — указывает dbt использовать нашу кастомную материализацию.
  • overwrite — если в S3 уже есть файлы за те же партиции, они будут перезаписаны (включается настройка s3_truncate_on_insert=1). Это полезно при обновлении данных в источнике.
  • alias — имя модели, которое станет частью пути в S3 (account=app_stream).
  • datetime_column — колонка, по которой будет производиться партицирование. Можно передать любое выражение, возвращающее DateTime.

В условии WHERE используется макрос var_or_daysago. Он работает так:

  • Если в переменных dbt ({{ var('start_date') }}) задано конкретное значение, макрос подставит его.
  • Если переменная не определена, макрос вернёт дату, отстоящую на N дней назад от текущей (в примере — 3 дня для start_date и 1 день для end_date).

Это позволяет гибко управлять периодом выгрузки: в обычном режиме модель каждый день выгружает «вчерашние» данные, но при необходимости можно явно указать start_date и end_date через dbt run --vars и пересчитать любой исторический интервал.

Макрос var_or_daysago — один из многих полезных макросов, которые мы собрали в статье Макросы dbt для ClickHouse.

Добавление в расписание

Готовую модель можно запускать по расписанию, чтобы данные в S3 всегда были актуальными. В модуле dbt SubQuery, находясь в основной ветке, выберите справа вверху «Расписание», создайте новое, укажите селектор модели (или весь проект) и задайте cron‑выражение, например 0 2 * * * (каждый день в 02:00).

Можно заказать алерты на почту, тогда мы напишем, если выполнение модели завершится неуспешно.

Благодаря макросу var_or_daysago модель автоматически выбирает период выгрузки: каждый день она будет выгружать данные за предыдущий день (или за другой интервал, который вы настроили). Если нужно пересчитать историю, можно временно переопределить переменные start_date и end_date через dbt run --vars.

Такое расписание гарантирует, что в S3 всегда будут свежие Parquet‑файлы, готовые для аналитики в других системах.

Полезные ссылки

Если у вас остались вопросы или нужна помощь с настройкой — пишите в Telegram‑канал SubQuery