11.11.2020       Выпуск 360 (09.11.2020 - 15.11.2020)       Статьи

Python API в Delta Lake — простые и надежные операции Upsert и Delete

Мы рады объявить о релизе Delta Lake 0.4.0, в котором представлен Python API, улучшающий манипулирование и управление данными в Delta-таблицах.

Читать>>




Экспериментальная функция:

Ниже вы видите текст статьи по ссылке. По нему можно быстро понять ссылка достойна прочтения или нет

Просим обратить внимание, что текст по ссылке и здесь может не совпадать.

Приглашаем всех желающих на бесплатный демо-урок,в рамках которого рассмотрим Ni-Fi и роль data ingestion инструментов в целом при построении систем обработки данных. А также решим простую задачку по построению пайплайна для загрузки файлов в хранилище данных с использованием Ni-Fi. Урок проведет эксперт OTUS - Егор Матешук.


А прямо сейчас традиционно делимся полезным переводом.


Delta Lake 0.4.0 включает Python API и преобразование Parquet в таблицу Delta Lake на месте

Мы рады объявить о релизе Delta Lake 0.4.0, в котором представлен Python API, улучшающий манипулирование и управление данными в Delta-таблицах. Ключевыми фичами этого релиза являются:

  • Convert-to-Delta (#78) - теперь вы можете преобразовать таблицу Parquet в таблицу Delta Lake на месте без перезаписи каких-либо данных. Эта функция отлично подходит для преобразования очень больших таблиц Parquet, которые было бы довольно затратно перезаписывать в Delta-таблицу. Более того, этот процесс обратим - вы можете преобразовать таблицу Parquet в таблицу Delta Lake, поработать с ней (например, удалить или объединить) и легко преобразовать ее обратно в таблицу Parquet. Для получения более подробной информации читайте документацию.

Больше информации вы можете найти в примечаниях к релизу Delta Lake 0.4.0 и в документации по Delta Lake > Удаление, обновление и слияние таблиц.

В этой статье мы продемонстрируем как использовать Python и новый Python API в Delta Lake 0.4.0 в контексте данных о вылетах и задержках рейсов на Apache Spark™ 2.4.3. Мы продемонстрируем, как производить операции upsert и delete, запрашивать старые версии данных с помощью механизма путешествия во времени (time travel) и подчищать старые версии данных с помощью vacuum.

Начало работы с Delta Lake

Пакет Delta Lake доступен через параметр в --packages. В нашем примере мы также продемонстрируем возможность делать VACUUM файлов и выполнять Delta Lake SQL команды в Apache Spark. Поскольку это короткая демонстрация, мы также подключим следующие конфигурации:

  • spark.databricks.delta.retentionDurationCheck.enabled=false, что позволит нам производить vacuum файлов, срок хранения которых меньше установленного по умолчанию срока в 7 дней. Эта настройка нужна только для команды SQL VACUUM.

# Подключение пакета в Spark

./bin/pyspark --packages io.delta:delta-core2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

Загрузка и сохранение наших Delta Lake данных

В этой статье мы будем использовать набор летных данных о времени вылетов или задержках рейсов, сгенерированный на основе статистики вылетов RITA BTS; некоторые примеры этих данных в действии можно посмотреть здесь - Данные о вылетах рейсов за 2014 в d3.js Crossfilter, и здесь - Данные о вылетах и задержках рейсов в GraphFrames для Apache Spark™. Этот набор данных можно загрузить с этого репозитория на github. Начнем с чтения набора данных в pyspark.

# Переменные местонахождения
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"

# Чтение данных о задержках рейсов
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)

Затем давайте сохраним наш набор данных departureDelays в таблицу Delta Lake. Сохранив эту таблицу в хранилище Delta Lake, мы сможем воспользоваться его функциями, которые включают ACID транзакции, унифицированную пакетную обработку и потоковую передачу, а также путешествия во времени.

# Сохраняем данные о задержках рейсов в формате Delta Lake
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")

 

Обратите внимание, этот подход аналогичен обычному сохранению данных Parquet; вместо указания format("parquet") вы просто указываете format("delta"). Если вы заглянете в базовую файловую систему, вы заметите четыре файла, созданных для таблицы Delta Lake departureDelays.

/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet

Отметим, что_delta_log- папка, которая содержит лог транзакций Delta Lake. Для получения более подробной информации читайтеПогружение в Delta Lake: распаковка лога транзакций.

Теперь давайте перезагрузим данные, но на этот раз наш DataFrame будет поддерживаться Delta Lake.

# Загружаем данные о задержках рейсов в формате Delta Lake
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")

# Создаем временное представление
delays_delta.createOrReplaceTempView("delays_delta")
 
# Сколько рейсов между Сиэтлом и Сан-Франциско
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

Наконец, давайте определим количество рейсов из Сиэтла в Сан-Франциско; в этом наборе данных таких рейсов 1698.

Преобразование на месте в Delta Lake

Если у вас есть уже существующие таблицы Parquet, у вас появилась возможность выполнять преобразование ваших таблиц в Delta Lake на месте, т.е. вам не нужно переписывать таблицу. Чтобы преобразовать таблицу, вам достаточно выполнить следующие команды.

from delta.tables import *

# Преобразовать не секционированную таблицу parquet по пути '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

# Преобразовать секционированную таблицу parquet по пути '/path/to/table', секционированную целочисленным столбцом с именем 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

Для получения более подробной информации, в том числе и о том, как сделать это преобразование в Scala и SQL, читайте Преобразование в Delta Lake.

Удаление наших летных данных

Чтобы удалить данные из традиционной Data Lake таблицы, вам необходимо:

  1. Выбрать все данные из вашей таблицы, за исключением строк, которые вы хотите удалить.

  2. Создать новую таблицу на основе предыдущего запроса.

  3. Удалить исходную таблицу.

  4. Назвать новую таблицу именем исходной таблицы для нисходящих зависимостей.

Вместо выполнения всех этих шагов в Delta Lake мы можем упростить этот процесс, выполнив оператор DELETE. Чтобы продемонстрировать это, давайте удалим все рейсы, которые прибыли вовремя или раньше назначенного времени (т.е. delay < 0).

from delta.tables import *
from pyspark.sql.functions import *

# Доступ к таблице Delta Lake
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
# Удаляем все рейсы пришедшие раньше или вовремя
deltaTable.delete("delay < 0") 

# Сколько рейсов между Сиэтлом и Сан-Франциско
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

После удаления (подробнее об этом ниже) всех рейсов пришедших вовремя или раньше, как вы можете видеть из предыдущего запроса, осталось 837 опоздавших рейсов из Сиэтла в Сан-Франциско. Если вы заглянете в файловую систему, вы заметите, что файлов стало больше, даже если вы удалили данные.

/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet

В традиционных озерах данных удаление (операция delete) выполняется путем перезаписи всей таблицы, за исключением значений, которые должны быть удалены. В Delta Lake вместо этого удаление выполняется путем выборочной записи новых версий файлов, содержащих удаляемые данные, а предыдущие файлы толькопомечаются как удаленные. Это связано с тем, что для выполнения атомарных операций с таблицей Delta Lake использует управление параллельным доступом с помощью многоверсионности: например, пока один пользователь удаляет данные, другой пользователь может запрашивать предыдущую версию таблицы. Эта многоверсионная модель также позволяет нам путешествовать назад во времени (механизм путешествия во времени) и запрашивать предыдущие версии, как мы увидим позже. 

Обновление наших летных данных

 Чтобы обновить данные из вашей традиционной таблицы Data Lake, вам необходимо:

  1. Выбрать все данные из вашей таблицы, за исключением строк, которые вы хотите изменить.

  2. Изменить строки, которые необходимо обновить/изменить.

  3. Объединить эти две таблицы, чтобы создать новую таблицу.

  4. Удалить исходную таблицу.

  5. Назвать новую таблицу именем исходной таблицы для нисходящих зависимостей.

Вместо выполнения всех этих шагов в Delta Lake мы можем упростить этот процесс, выполнив оператор UPDATE. Чтобы продемонстрировать это, давайте обновим информацию обо всех рейсах из Детройта, заменив Детройт на Сиэтл.

# Обновляем все рейсы из Детройта, чтобы теперь они стали из Сиэтла
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 

# Сколько рейсов между Сиэтлом и Сан-Франциско
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

Теперь, когда рейсы из Детройта теперь помечены как рейсы Сиэтла, у нас насчитывается 986 рейсов, вылетающих из Сиэтла в Сан-Франциско. Если выведете содержимое папки файловой системы departureDelays (например, $../departureDelays/ls -l), вы заметите, что теперь там 11 файлов (вместо 8 сразу после удаления файлов и четырех файлов после создания таблицы).

Объединение наших летных данных

Распространенный сценарий при работе с озером данных - это постоянное добавление данных в вашу таблицу. Это часто приводит к дублированию данных (строки, которые вы бы не хотели снова вставлять в таблицу), появлению новых строк, которые необходимо вставить, и строк, которые необходимо обновить. В Delta Lake все это можно решить с помощью операции merge (аналогично оператору SQL MERGE).

Начнем с выборочного набора данных, который вы хотите обновить, вставить или дедуплицировать с помощью следующего запроса.

# Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти даты
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

 

Результат этого запроса приведен таблице ниже. Обратите внимание, что для наглядности были добавлены цветные выделения, чтобы четко определить, какие строки дедуплицированы (синий), обновлены (желтый) и вставлены (зеленый).

Затем давайте сгенерируем нашу собственную merge_table, содержащую данные, которые мы будем вставлять, обновлять или дедуплицировать с помощью следующего фрагмента кода.

items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()

В предыдущей таблице (merge_table) есть три строки, с уникальным значением даты:

  1. 1010521: эта строка обновит таблицу flights новым значением задержки (желтый)

  2. 1010710: эта строка является дубликатом(синий)

  3. 1010822: это новая строка будет вставлена(зеленый)

В Delta Lake все это можно легко сделать с помощью оператора merge, как указано в следующем фрагменте кода.

# Объединяем merge_table с flights
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

# Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти даты
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

Все три действия: дедупликация, обновление и вставка были эффективно выполнены с помощью одного оператора.

Просмотр истории таблицы

Как уже отмечалось ранее, после каждой нашей транзакции (удаление, обновление) в файловой системе создавалось все больше файлов. Это связано с тем, что для каждой транзакции существуют разные версии таблицы Delta Lake. Это можно увидеть с помощью метода DeltaTable.history(), как показано ниже.

deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      2|2019-09-29 15:41:22|  null|    null|   UPDATE|[predicate -> (or...|null|    null|     null|          1|          null|        false|
|      1|2019-09-29 15:40:45|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-09-29 15:40:14|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

Вы также можете выполнить это с помощью SQL:

spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()

Как видите, есть три строки, представляющие различные версии таблицы (ниже представлена ​​сокращенная версия для облегчения чтения) для каждой операции (создание таблицы, удаление и обновление):

version

timestamp

operation

operationParameters

2

2019-09-29 15:41:22

UPDATE

[predicate -> (or…

1

2019-09-29 15:40:45

DELETE

[predicate -> [“(…

0

2019-09-29 15:40:14

WRITE

[mode -> Overwrit…

Путешествие назад во времени по истории таблицы

С помощью механизма путешествия во времени вы можете просмотреть таблицу Delta Lake указанной версии или для определенного таймстемпа. Для получения более подробной информации читайте Документацию Delta Lake > Чтение старых версии данных с помощью Time Travel. Чтобы просмотреть старые данные, укажите опцию version или Timestamp; в фрагменте кода ниже, мы указываем version

# Загружаем DataFrames для каждой версии
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

# Рассчитываем количество полетов от Сиэтла (SEA) до Сан-Франциско (SFO) для каждой версии истории
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

# Выводим значение
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

## Вывод
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

Очистка старых версий таблиц с помощью Vacuum

Метод vacuum Delta Lake по умолчанию удаляет все строки (и файлы) старше 7 дней (референс: Delta Lake Vacuum). Если бы вы заглянули в файловую систему, вы бы заметили 11 файлов для своей таблицы.

/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

Чтобы удалить все файлы и сохранить только текущий снапшот данных, укажите очень маленькое значение в методе vacuum (вместо 7 дней хранения по умолчанию).

Вы можете выполнить ту же задачу с помощью SQL синтаксиса: ¸

# Удаляем все файлы старше 0 часов

spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)

После завершения операции vacuum, при просмотре файловой системы вы заметите намного меньше файлов, так как старые данные были удалены.

/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

Обратите внимание, что возможность переместиться во времени обратно к версии старше периода хранения теряется после запуска vacuum.

Что дальше?

Попробуйте Delta Lake уже сегодня, запустив приведенные выше фрагменты кода на своем инстансе Apache Spark 2.4.3 (или выше). Используя Delta Lake, вы можете сделать свои озера данных более надежными (независимо от того, создаете ли вы новое или мигрируете существующее озеро данных). Чтобы узнать больше, посетите https://delta.io/ и присоединитесь к сообществу Delta Lake в Slack и Google Group. Вы можете отслеживать все предстоящие релизы и запланированные фичи в github milestones.


Записаться на бесплатный демо-урок.


Читать ещё:






Разместим вашу рекламу

Пиши: mail@pythondigest.ru

Нашли опечатку?

Выделите фрагмент и отправьте нажатием Ctrl+Enter.

Система Orphus