Архитектура высокопроизводительных распределенных SQL-движков

Рассматриваем архитектурные паттерны, обеспечивающие производительность SQL-движков, на основе анализа существующих реализаций. Анализируем проблемы оптимизации в распределенной среде, классификация распределений данных, сравнение подходов на основе AST и реляционных операторов, а также методы cost-based и итеративного планирования

Сложность

В рамках нашей работы мы много смотрели на то, как устроены различные распределенные SQL-движки и увидели некоторые паттерны, которые повторяются от системы к системе. В то же время мы видим, что в последнее время наблюдается большой всплеск интереса к такого рода системам. Даже такие системы, как IMDG (In-Memory Data Grid) или стриминговые системы, как Kafka, которые изначально SQL-интерфейса не имели, пытаются этот интерфейс добавить.

Это понятно, потому что SQL знаком многим. Это удобный интерфейс, и если у вас есть такая возможность, то наличие SQL позволяет открыть двери интеграциям с большим количеством различных утилит. При этом, если вы добавляете SQL-движок в свою систему, то сделать так, чтобы он работал быстро — достаточно сложная задача.

Планирование

Почему оптимизация — это сложно? SQL это декларативный язык, и он описывает то, каким должен быть результат. При этом он не специфицирует именно ту программу, которую мы должны исполнить для того, чтобы этот результат получить. И, как правило, существует большое количество способов выполнить тот или иной запрос.

При этом, если смотреть на какие-то специфические подзадачи планирования (планирование порядка join, или выбора подходящей материализованной view), то каждая из этих проблем является NP-сложной. И поэтому оптимизатору для того, чтобы сделать планирование достаточно быстрым (желательно чтобы планирование не выполнялось дольше, чем исполнение запроса) оптимизатору приходится идти на какие-то эвристики. В разных системах эти эвристики разные. Когда мы помещаем оптимизатор в распределенную среду, у него появляется еще и дополнительная задача — минимизировать количество данных, передающихся по сети.

Давайте посмотрим на примере, почему вообще данные нужно передавать и в каких случаях их можно минимизировать.

Начнем с простого случая, когда у нас есть распределенный join двух таблиц и мы заранее не знаем распределение данных в этих таблицах. Для того, чтобы в распределенной среде такой join исполнить, нам необходимо перераспределить данные таким образом, чтобы на одном узле были сосредоточены данные, совпадающие по ключу join.

За такую операцию отвечает оператор, который называют либо exchange или  shuffle. По сути, он пересылает данные по сети в вашем кластере. Если у нас случай случайного распределения, то мы будем пересылать данные из обеих таблиц соединения.

При этом есть альтернативный случай, при котором мы можем исполнить этот join локально на каждом из узлов, который хранит данные. Для того, чтобы это сделать, необходимо, чтобы данные были заранее шардированы так, чтобы ключ шардирования совпадал с ключом join обеих таблиц. В этом случае  пересылать данные по сети не требуется, и мы можем исполнить join локально. Такой join называют collocated или co-sharded.

Однако, есть и промежуточный вариант, при котором, если один из входов join достаточно маленький, то мы вместо того, чтобы перераспределять обе таблицы, мы можем просто растиражировать маленькую таблицу на все узлы нашего кластера. В этом случае мы избегаем пересылки большого числа данных для второй таблицы и запрос выполняется быстрее. Пример: partitioning_column в TimescaleDB.

В этом случае возникает вопрос: «Что такое достаточно маленькая таблица?». Разные системы определяют какие-то свои пороговые значения для того, чтобы включать или выключать этот сценарий исполнения. В зависимости от характера вашей нагрузки, вы можете подкрутить пороговые значения. К примеру у Flink есть свойство table.optimizer.join.broadcast.threshold.

Итого, в зависимости от того, какое распределения данных в кластере, мы можем получать различные планы исполнения.

Каким же образом оптимизатор может с этой информацией работать? На практике каждому оператору в плане исполнения назначается некоторое свойство, которое описывает распределение данных, специфичное для этого конкретного оператора:

  • SHARDED — распределение данных по узлам
  • REPLICATED — полная копия данных на всех узлах
  • SINGLETON — данные на одном узле

При этом каждый оператор может потребовать от нижележащих inputs распределения. Если эти распределения не совпадают, то оптимизатор вставляет exchange или reshuffle, которые моделируют пересылку данных по сети. В общем случае могут быть несколько сценариев выполнения запроса. То есть оптимизатор может сгенерировать несколько планов. Поэтому, мы должны выбрать оптимальный план, но мера оптимальности может быть совершенно разной в зависимости от ваших задач.

Как правило, мерой оптимальности выбирают latency запроса (CPU-время, которое по оценке потратит ваш запрос). Но при этом вы вполне можете построить оптимизатор, который будет минимизировать, например, финансовые затраты на ресурсы, которые использует запрос.

Планирование Exchange

Можно ли выбирать распределение локально в каждом из операторов? Оказывается, что нет, и это видно на следующем примере.

Здесь у нас есть hash-join, у которого есть два input. И правый input на картинке простой (table scan), а в левом input у нас есть scan, поверх которого стоит какой-то агрегат. Соответственно, join происходит по колонке a2, а агрегат делает агрегацию по двум колонкам — a1 и a2.

Если мы попытаемся потребовать оптимального распределения для каждого из операторов независимо, то мы получим план, в котором у нас будет два reshuffle. Но если мы потребуем распределение для левого input только лишь по колонке a2, то агрегат все равно сможет быть выполнен корректно, все  агрегации могут быть вычислены локально. Но при этом мы избавляемся от вышележащего exchange, что минимизирует передачу данных по сети. Из этого можно сделать вывод, что локальные решения с точки зрения операторов мы принимать не можем, и в идеале нужен оптимизатор, который позволяет искать кросс-зависимости между операторами и искать оптимальный (в глобальном смысле) план исполнения. См. пример в Cascades.

Виды планировщиков

Первым критерием, характеризующим планировщики является то, как они у себя внутри представляет план исполнения. Одним из прямолинейных вариантов является использование абстрактных синтаксических деревьев (AST). Работать с таким планировщиком достаточно сложно, также как делать какие-то выводы о том, каким образом данные проходят через запрос. Поэтому, как правило, такие планировщики ограничены в средствах, которые они могут использовать для трансформации. Нужно быть готовым к тому, что если запрос сложный, то придется его руками доработать для того, чтобы довести до оптимальной производительности.

Есть альтернативный класс оптимизаторов, которые трансформируют абстрактное синтаксическое дерево в дерево реляционных операторов. Работать с ними гораздо проще, потому что каждый оператор имеет строгую вычислительную семантику и существует достаточно большое количество правил, которые описывают трансформации этого реляционного дерева. Эти трансформации являются независимыми, и такой оптимизатор является во многом расширяемым. Примером такого оптимизатора является Spark или Dremio, где можно, например, написать свой плагин, который предоставляет дополнительные правила оптимизации.

Есть несколько вариантов того, как работать с деревом. Самый простой вариант — это итеративное или эвристическое планирование. Суть его состоит в том, что есть некоторый набор правил, и эти правила, независимо и последовательно применяем к дереву плана. При этом считаем, что каждое такое правило, безусловно улучшает характеристики нашего плана. Классическим вариантом такого правила является фильтр pushdown, потому что мы считаем, что чем ближе фильтр к источнику данных, тем меньше строк нужно обработать по следующим операторам. В качестве примеров можно привести Apache Spark, Presto и Apache Calcite (Apache Flink, Apache Hive).

Несмотря на простоту, достаточно большое количество production систем используют такой подход для планирования. Альтернативой такому подходу является планирование с использованием memoization-структуры (memo). Это структура, которая позволяет закодировать множество альтернативных планов в компактном виде. В этом случае оптимизатор идентифицирует операторы, которые производят одинаковый результат (такие операторы называются эквивалентными), и они объединяются в группы эквивалентности. Оптимизатор в memo заменяет input операторов на правильные группы эквивалентности.

После того, как мы перечислили все возможные планы, оптимизатор обходит полученный граф и выбирает оптимальный с точки зрения cost-модели план. Подобным образом оптимизатор работает в Apache Calcite, Greenplum и CockroachDB.

Проблема такого оптимизатора заключается в том, что он может сгенерировать достаточно большое число планов, и время перечисления этих планов тоже может быть большим. Поэтому на практике production-оптимизаторы разделяют планирование на фазы. При этом каждая фаза может быть как эвристической, так и cost-based. В конце каждой фазы мы получаем некоторый оптимальный локальный план, и этот план передаем в начало следующей фазы для того, чтобы получить цепочку планирования. В примере ниже показаны фазы планирования в Apache Flink.

В результате такого многофазного планирования мы получаем в общем случае неоптимальный план, но как правило эта цепочка тюнится таким образом, чтобы наша система работала достаточно хорошо на практике.

Подведем итоги первой части, относящейся к планировщику:

  • С точки зрения распределения данных в распределенной системе, правильный выбор схемы шардирования очень важен, потому что он позволяет оптимизатору выбрать наиболее подходящий план.
  • В частности, если у вас есть большое количество распределенных join, то выбор правильного ключа шардирования позволит использовать co-sharded join.
  • Важно посмотреть на архитектуру оптимизатора и быть готовым к ручным оптимизациям в случае AST-планирования.
  • Можно изучить хинты, которые предоставляет оптимизатор, потому что в тех случаях, когда оптимизатор не может полноценно исследовать пространство поиска, существуют хинты, которые позволят вам вручную улучшить план и помочь оптимизатору. Пример: Apache Flink не делает планирование join по умолчанию.

 

Runtime-планирование

В runtime мы можем использовать различные трюки, которые могут менять структуру плана. Это нужно потому, что даже если бы оптимизатор исследовал полностью все пространство решений, то те решения, которые он принимает, по факту принимаются на неполных данных. Например, статистики, которые использует оптимизатор, могут быть outdated, и, при вычислении кардинальности и join, может накапливаться большая ошибка. При этом при исполнении мы можем собирать статистики индивидуальных операторов и при возможности рестартовать планирование с той точки, в которой мы сейчас находимся, но при этом имея какие-то конкретные значения статистик и других характеристик нашего плана. 

Далее рассмотрим следующие категории, которые являются с нашей точки зрения наиболее важными в рантайме:

  • Prunning;
  • Динамические операторы;
  • Замена и реоптимизация плана во время исполнения.

Pruning

Pruning-оптимизация работает очень часто и практически везде. Ее идея заключается в том, что мы просто не должны выполнять ту работу, которую заведомо можно отбросить. Pruning можно условно поделить на несколько подвидов, которые не обязательно взаимно исключают друг друга и могут быть использованы одновременно.

  • Partition pruning используется в том случае, если оптимизатор сумел проанализировать запрос и запушить достаточное количество подходящих предикатов к источнику данных. Если мы смогли вывести, что значение ключа шардирования имеет какое-то одно конкретное значение или находится в диапазоне значений, то мы можем исключить те шарды, которые нам не подходят, и не исполнять запрос на соответствующих узлах/шардах. Примеры: Flink, Spark.
  • Column-pruning активно используется в колоночных и аналитических БД. Когда оптимизатор транслирует запрос, мы понимаем, какие колонки будут использоваться в источнике. В этом случае мы просто не читаем лишние колонки с диска, что может в сэкономить нам колоссальное количество дискового IO.
  • Block-pruning. Весь датасет разбивается на небольшие блоки, и для каждого блока в момент заливки данных, мы агрегируем некоторые статистики, например, min-max. И далее, когда запрос исполняется и если у нас есть некоторый предикат, то мы берем этот предикат и анализируем, можно ли заведомо отбросить блок, исходя из статистик, которые в нем записаны. Эта очень эффективная техника, потому что позволяет на практике пропускать большое количество логов, даже без индекса. Пример техники — Bitmap Scan в Postgres.

Для того, чтобы данные техники прунинга использовать, нужно, чтобы предикаты были sargable (Search ARGument ABLE). С точки зрения имплементации, техники сводятся к тому, что мы максимально спускаем фильтры к источникам данных и объединяем scan вместе с предикатами.

Динамические операторы

Развитием этой идеи является динамическая фильтрация в join операторе (также называемая runtime-фильтрацией). Оптимизация важна, и на практике часто используется в различных федеративных системах.

Идея ее состоит в том, что если у нас есть join, в котором предикат стоит только с одной стороны, то если мы будем исполнять такой план в лоб, то с другой стороны мы будем обязаны сделать full-scan, что не очень хорошо. Вместо того, чтобы инициировать сразу этот full-scan, мы можем подождать probe-сторону, пока у нас построится хэш-таблица, проанализировать строки тех значений, которые попали в итоговую хэш-таблицу и перенести эти значения в правую часть, что позволит впоследствии сократить количество данных, которые будут проведены через join оператор.

Данную оптимизацию реализуют практически все системы, предоставляющие федеративные запросы (Presto, Spark, Dremio, Snowflake).

Если таблица будет достаточно большой, то накладные расходы на поддержание такой таблицы могут быть огромными. Такая оптимизация может даже принести вред. Поэтому чтобы контролировать поведение этой оптимизации, существуют некоторые параметры, которые по превышению задаваемых порогов могут эту оптимизацию отключать. Примеры — dynamic filter thresholds в Trino.

Пример Snowflake: eсли есть следующий план (см. картинку ниже), где агрегат работает поверх join, то мы в ряде случаев агрегат можем разделить на две части и вторую часть поместить под join.

Идея здесь заключается в том, что после того, как мы запушим агрегат под join, то он сократит количество строк, которые будут приходить в оператор join. При этом данное решение можно принимать, исходя из оценок кардинальности, но оценки кардинальности могут быть неправильными.

Здесь можно заметить, что отключение второго агрегата, который был помещен под join, никак не повлияет на корректность результата. Потому что просто если в произвольный момент времени мы его заменим на pass-through оператор (т.е.  оператор, который будет просто пропускать через себя строки неизменными), то мы получим тот же самый корректный результат.

Поэтому на практике оптимизатор может сделать оценку кардинальности для результата этой агрегации, и мы можем ее спустить в runtime.

При этом, если в рантайме мы понимаем, что количество строк, которые мы уже сагрегировали, превышает нашу оценку, мы можем этот оператор отключить и никаких накладных расходов он больше не будет привносить. Что позволяет балансировать и хорошо избавляться от ошибок в оценках кардинальности.

Динамическое шардирование

Проблема с оценкой кардинальности и оценкой размера результата очень остро стоит в любых системах, в том числе и в распределенных. Когда планировщик в распределенной системе принимает решение о том, насколько шардов разбить поток, он использует количество ресурсов (то есть степень параллелизма, которой мы можем себе позволить) и оценочный размер результирующего набора данных.

При этом может оказаться, что то количество шардов, которое мы предсказали, оказалось чрезмерно большим, что привело к тому, что у нас появляется большое количество маленьких шардов. Дальнейшая обработка маленьких шардов несет накладные расходы. Поэтому многие системы умеют подменять план после некоторых точек, в которых это можно сделать.

Так, например, делает Apache Spark: исполнение его запроса разделено на стадии, и промежуточные результаты между этими стадиями материализуются. Соответственно, вы можете передать управление обратно планировщику, и он поменяет план исполнения в рантайме. 

Данная возможность включается через свойство spark.sql.adaptive.enabled. В этом случае, если появилось большое количество маленьких шардов, то Spark может их просто склеить, что сократит накладные расходы. 

Если вспомнить про алгоритмы join и про распределение данных при выборе алгоритма join, то там использовался broadcast threshold, после которого мы отказываемся от broadcast-join и переходим к merge-join. При исполнении запроса может оказаться, что то количество данных, которое мы предсказали, оказалось больше, чем промежуточный результат, который мы получили в действительности. То есть в этом случае разумно было бы откатиться обратно к broadcast-join, потому что наш input оказался маленьким. Это Spark и делает. Эта оптимизация контролируется тем же самым свойством — spark.sql.adaptive.enabled.

Векторизация

Наименее видный для конечного пользователя, но очень важный аспект, это — эффективное на исполнение самих операторов. Наибольшее значение этот аспект имеет для OLAP-запросов, которые, в отличие от OLTP, затрагивают малое количество колонок, но сканируют большое количество данных. В этом случае для хорошего перформанса критически важна эффективная утилизация ресурсов процессора и всех серверов. Решением является векторное исполнение операторов. Суть векторного исполнения достаточно проста. Вместо того, чтобы обрабатывать данные по строчками, их обрабатывают пакетами, причем внутри пакета данные организованы по столбцам.

На самом деле векторная обработка операторов не требует колоночной организации структуры хранилища. Например, распределенная база CockroachDB использует строковое хранилище, но при этом много операторов используют именно векторное исполнение. Как только оператор получает такой векторный пакет, операции происходят по колонкам, за счет чего мы избавляемся от накладных расходов и виртуальных вызовов. Это позволяет компилятору сгенерировать оптимальный код и обрабатывать данные с оптимальной утилизацией процессора. Большинство аналитических систем уже давно используют этот подход. Примеры векторизованных операторов:  CockroachDB, Apache Arrow-based (Dremio, Flink), Velox (Presto/Trino).

JIT-компиляция

Более продвинутая техника, которая еще дальше развивает векторную обработку данных в операторах, это JIT-компиляция запросов. Суть ее состоит в том, что у нас есть некоторый набор операторов, без JIT они должны обрабатывать данные от любой схемы и в любой комбинации. Как только у нас появился какой-то конкретный запрос и конкретная схема данных, мы можем произвести специализацию этого кода под конкретный запрос и конкретную схему данных. Такой трюк позволяет убрать больше накладных расходов, которые все еще присутствуют при интерпретируемом исполнении, и улучшить характеристики runtime.

Конечно, такой подход имеет опасные стороны, потому что компиляция — это не бесплатный процесс. Нужно следить за тем, чтобы время компиляции не превышало время самого запроса, но это можно адресовать смешанным исполнением, например, стартовать запрос в интерпретируемом варианте и запускать компиляцию в фоне. И по мере того, как операторы будут компилироваться, мы будем заменять реализацию этих операторов на более оптимизированные. 

Подведем итоги.

  • Несмотря на то что мы уже перешли в runtime, даже в runtime распределение данных имеет большое значение, потому что при правильном использовании ключа шардирования и правильном распределении, использование data pruning является весьма эффективным средством ускорения запросов.
  • Адаптивное исполнение —  вы на него повлиять не можете, но по крайней мере можно проверить, что эти оптимизации включены. При этом, с точки зрения разработчиков систем, оптимизации постоянно добавляются и выпускается новые версии. Поэтому здесь рекомендация — иметь регулярный график обновления на новые версии. Это справедливо как и для самих баз данных, так и для JDK, которые постоянно привносят новые возможности для, например, авто-векторизации.
  • Экстремальный вариант для более авантюрных людей — попробовать новые движки исполнения для уже существующих систем. Например в Spark можно использовать Gazelle, в Presto можно использовать Velox, который разрабатывает Facebook* (является запрещенной в РФ).
  • Кажется что все все оптимизации, которые есть и у многих ребят в нашей команде и про которые упоминалось выше, кажутся сложными для имплементации. Но при этом уже существует достаточно большое количество библиотек и инструментария, который предоставляет вам кирпичики, из которых можно строить специализированные системы, заточенные под какие-то конкретные нужды. Имея этот инструментарий на руках, вы можете сосредоточиться на конкретных оптимизациях, которые вам важны. При этом многое из описанного достаточно легко интегрируется. По нашим ощущениям, за последние несколько лет порог вхождения в область разработки баз данных, за счет таких доступных инструментов, был существенно снижен. В том числе поэтому сегодня мы видим такое обилие новых БД, которые сейчас на слуху.
Прокрутить вверх