2023-01-14

Как Trino читает данные из файлов Parquet

Trino — это распределенный SQL-движок, который выполняет запросы к данным, хранящимся во внешних источниках. В данной статье мы рассмотрим, как в Trino реализовано чтение информации из озера данных на примере файлов в формате Parquet.

Введение

Trino — это распределенный SQL-движок, который выполняет SQL-запросы к данным, хранящимся во внешних источниках. Для этого Trino использует интерфейс Connector, который предоставляет функционал работы с данными и метаданными конкретной сторонней системы.

Trino реализует массивно-параллельную архитектуру и позволяет выполнять SQL-запросы, используя множество потоков на нескольких серверах. Что бы добиться эффективной утилизации ресурсов кластера, необходимо обеспечить не только параллельную работу промежуточных операторов, но и параллельное чтение данных из источников.

В данной статье мы рассмотрим, как в Trino реализовано чтение информации из озера данных под управлением Hive Metastore на примере файлов в формате Parquet.

Планирование

Выполнение SQL-запроса в Trino начинается с планирования. На данном этапе необходимо убедиться, что используемые в запросе таблицы существуют, а так же получить их метаданные.

Для получения метаданных таблицы коннектор Hive отправляет команду `get_table` к Hive Metastore по протоколу Thrift. Данная команда возвращает список атрибутов таблицы, схему partitioning и bucketing, информацию о формате файлов таблицы. Если таблица является партиционированной, то так же будет осуществлен вызов команды `get_partition_names`, которая возвращает имена партиций таблицы. Если в запросе присутствует фильтр по колонке партиционирования с конкретными значениями, он будет задействован, что бы вернуть имена только тех интересующих нас партиций.

Так как в процессе планирования запроса может потребоваться многократное чтение одних и тех же метаданых, коннектор кэширует результаты всех удаленных вызовов к Hive Metastore в рамках текущей транзакции. Кроме того, с помощью параметра конфигурации `hive.metastore-cache-ttl` можно включить дополнительный кэш метаданных, общий для всех запросо, тем самым еще больше снижая количество удаленных вызовов.

Сплиты

Результатом планирования является набор фрагментов, так же называемых stage, которые представляют собой независимые этапы выполнения запроса. Входными данными фрагмента могут являться как удаленные источники, так и другие фрагменты. Фрагмент может быть выполнен в один или несколько потоков на одном или нескольких узлах.

Единицей параллелизма фрагмента является сплит (split) - часть данных, которая может быть обработана независимо. Каждый коннектор определяет собственную стратегию разбиения таблицы на сплиты. В процессе выполнения запроса Trino получает список сплитов, после чего запускает экземпляры фрагмента с конкретными сплитами на узлах кластера. Формирование списка сплитов происходит асинхронно, так как может требовать удаленных вызовов к источнику данных.

В коннекторе Hive данные таблицы в формате Parquet могут быть разбиты на части с использованием partitioning, bucketing и row groups. Для формирования списка сплитов коннектору необходимо получить метаданные всех задействованных партиций. При большом количестве партиций в таблице это может занять достаточно продолжительное время. Поэтому коннектор отправляет к Hive Metastore несколько параллельных команд `get_partition_by_name` с разным количеством партиций в предположении, что ответ на команду с меньшим количеством партиций вероятно придет быстрее. Так, первая команда по умолчанию запрашивает 10 партиций, вторая 20, и т.д. Изменить данное поведение можно с помощью параметров конфигурации:`hive.metastore.partition-batch-size.min` и `hive.metastore.partition-batch-size.max`.

По мере получения ответов на команды `get_partition_by_name`, которые в том числе содержат URL партиций, коннектор получает для каждой партиции список задействованных файлов, выполняя операцию list к файловой системе. Если в таблице используется bucketing и запрос содержит предикат по bucketing колонке, коннектор может отфильтровать файлы, которые заведомо не содержат интересующую нас информацию. Результаты листинга директорий можно кэшировать между запросами с помощью параметров конфигурации `hive.file-status-cache-tables` и `hive.file-status-cache-expire-time`.

Для каждого файла коннектор создает один или более сплитов в зависимости от размера. По умолчанию первые 200 файлов будут разбиты на сплиты размером 32Mb, а последующие файлы - на сплиты размером 64Mb. Соответствующие параметры конфигурации: `hive.max-initial-splits`, `hive.max-initial-split-size`, `hive.max-split-size`.

Важно отметить, что на данном этапе нам еще неизвестна внутренняя структура файлов. Так, может оказаться, что граница 64Mb приходится на середину Parquet row group. Корректировка границ сплитов происходит позднее непосредственно в момент чтения файлов.

Чтение данных

По мере формирования сплитов, scheduler Trino распределяет их по узлам кластера. Задача (task) — это экземпляр фрагмента запроса на конкретном узле, который должен обработать выделенный ему набор сплитов. Драйвер (driver) — это минимальная единица параллелизма запроса. Драйвер читает входные данные из сплитов, выполняет SQL операторы, после чего пересылает результаты родительским фрагментам. Драйвер является однопоточным. Задача может запустить несколько драйверов на одном узле и распределить сплиты по ним, обеспечивая необходимую степень параллелизма.

Для получения данных сплита, драйвер обращается к соответствующим коннекторам. Коннектор извлекает данные сплита из источника, и представляет их в виде одной или нескольких страниц (page). Страница — это набор типизированных колонок, так же называемых блоками (block), с фиксированным количеством записей.

Hive коннектор хранит внутри каждого сплита URL файла, его длину, формат, а так же предполагаемый диапазон байт, который следует прочитать (offset, length). Для работы с Parquet файлом сначала необходимо получить его метаданные, которые находятся по известному смещению в конце файла. Для этого коннектор открывает файл на чтение и получает footer. Далее происходит анализ метаданных файла:

  • Коннектор выравнивает границы сплита по границам row group. Так, один сплит может быть сопоставлен с одним или несколькими row group, в то время как другой сплит окажется пустым.
  • Коннектор проверяет, возможно ли осуществить filter pushdown. Если статистики row group показывают, что ни одно значение группы не удовлетворяет заданному в запросе предикату, то данный row group не будет прочитан.
  • Коннектор проверяет, возможно ли осуществить aggregate pushdown. Например, для расчета функции `MIN(x)` в рамках конкретного файла достаточно найти минимальное значение атрибута `x` среди всех row group. Footer файла Parquet уже содержит данную информацию.

После того как целевые row group сплита определены, происходит повторное открытие файла на чтение и получение данных row group по конкретным смещениям. Далее коннектор десериализует данные каждого row group в отдельный Page и передает его операторам драйвера на обработку.

Заключение

Hive коннектор Trino позволяет запускать запросы к озерам данных под управлением Hive Metastore. Для этого коннектор осуществляет ряд удаленных вызовов к Hive Metastore и файловой системе озера данных:

  1. Получение метаданных таблицы из Hive Metastore.
  2. Получение метаданных партиций из Hive Metastore.
  3. Листинг директорий и файлов партиций в файловой системе.
  4. Чтение метаданных, необходимых для обработки содержимого файла (footer в случае Parquet).
  5. Чтение содержимого файла.

Первые три типа операций можно кэшировать стандартными средствами Trino, значительно снижая количество удаленных вызовов при выполнении SQL-запросов.

В следующих статьях мы поговорим о том, как можно ускорить выполнение SQL-запросов к озерам данных путем кэширования самих файлов с помощью технологий Alluxio и RaptorX.