Ir para o conteúdo

Delta Lake

O que é o Delta Lake?

Delta Lake é um formato de tabela open-source desenvolvido pela Databricks e doado à Linux Foundation em 2019. Ele adiciona uma camada de confiabilidade sobre Data Lakes existentes (como Amazon S3, Azure Data Lake Storage ou HDFS), trazendo propriedades ACID para cargas de trabalho de Big Data.

O Delta Lake armazena dados no formato Parquet e mantém um transaction log (também chamado de Delta Log) que registra todas as operações realizadas nas tabelas, permitindo reprodutibilidade, rollback e auditoria.


Arquitetura do Delta Lake

graph TB
    subgraph DeltaTable["Tabela Delta Lake"]
        direction TB
        TL["_delta_log/\n(Transaction Log)"]
        subgraph Files["Arquivos de Dados"]
            P1["part-00001.parquet"]
            P2["part-00002.parquet"]
            P3["part-00003.parquet"]
        end
    end

    subgraph Log["Transaction Log (_delta_log/)"]
        J0["00000.json (commit 0 - CREATE)"]
        J1["00001.json (commit 1 - INSERT)"]
        J2["00002.json (commit 2 - UPDATE)"]
        J3["00003.json (commit 3 - DELETE)"]
        CK["00010.checkpoint.parquet"]
    end

    TL --> Log
    Files --> P1
    Files --> P2
    Files --> P3

    style DeltaTable fill:#ff6b35,color:#fff
    style Log fill:#2d6a4f,color:#fff

Transaction Log

O Delta Log é um diretório _delta_log/ criado dentro do diretório da tabela. Cada commit gera um arquivo JSON numerado sequencialmente:

tabela_delta/
├── _delta_log/
│   ├── 00000000000000000000.json    ← criação da tabela
│   ├── 00000000000000000001.json    ← primeiro INSERT
│   ├── 00000000000000000002.json    ← UPDATE
│   ├── 00000000000000000003.json    ← DELETE
│   └── 00000000000000000010.checkpoint.parquet  ← checkpoint
├── part-00000-abc123.parquet
├── part-00001-def456.parquet
└── part-00002-ghi789.parquet

Cada arquivo JSON contém ações como: - add: arquivo Parquet adicionado à tabela - remove: arquivo Parquet marcado para remoção (soft delete) - metaData: schema e configurações da tabela - commitInfo: metadados do commit (operação, timestamp, usuário)


Propriedades ACID

Propriedade Descrição Como o Delta implementa
Atomicidade A transação é totalmente concluída ou não ocorre Escrita atômica de commits no log
Consistência Os dados sempre passam de um estado válido para outro Validação de schema e constraints
Isolamento Transações concorrentes não se interferem Optimistic concurrency control
Durabilidade Dados commitados são persistentes Arquivos Parquet + log imutável

Configuração do Delta Lake com PySpark

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Criar SparkSession com Delta Lake
builder = SparkSession.builder \
    .appName("DeltaLake") \
    .master("local[*]") \
    .config("spark.sql.extensions",
            "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Operações CRUD com Delta Lake

INSERT — Escrita Inicial

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schema = StructType([
    StructField("cliente_id", IntegerType(), False),
    StructField("nome",       StringType(),  False),
    StructField("email",      StringType(),  False),
    StructField("cidade",     StringType(),  True),
    StructField("estado",     StringType(),  True),
])

dados = [
    (1, "Alice Souza",   "alice@email.com",   "Criciúma",    "SC"),
    (2, "Bruno Oliveira","bruno@email.com",   "Florianópolis","SC"),
    (3, "Carla Mendes",  "carla@email.com",   "São Paulo",   "SP"),
]

df = spark.createDataFrame(dados, schema=schema)

# Escrever como tabela Delta
df.write.format("delta") \
    .mode("overwrite") \
    .save("warehouse/delta/clientes")

print("Tabela Delta criada com sucesso!")

UPDATE — Atualizar Registros

from delta.tables import DeltaTable

# Carregar a tabela Delta
delta_table = DeltaTable.forPath(spark, "warehouse/delta/clientes")

# Atualizar cidade de um cliente específico
delta_table.update(
    condition="cliente_id = 1",
    set={"cidade": "'Tubarão'", "estado": "'SC'"}
)

# Verificar a atualização
spark.read.format("delta") \
    .load("warehouse/delta/clientes") \
    .filter("cliente_id = 1") \
    .show()

DELETE — Remover Registros

# Remover cliente por ID
delta_table.delete(condition="cliente_id = 3")

# Verificar remoção
spark.read.format("delta") \
    .load("warehouse/delta/clientes") \
    .show()

MERGE (Upsert) — INSERT ou UPDATE

# Novos dados: atualiza existentes, insere novos
novos_clientes = spark.createDataFrame([
    (2, "Bruno O. Silva", "bruno@email.com", "Joinville", "SC"),  # update
    (4, "Diana Costa",   "diana@email.com", "Curitiba",  "PR"),  # insert
], schema=schema)

delta_table.alias("destino").merge(
    novos_clientes.alias("fonte"),
    "destino.cliente_id = fonte.cliente_id"
).whenMatchedUpdate(set={
    "nome":   "fonte.nome",
    "cidade": "fonte.cidade",
    "estado": "fonte.estado",
}).whenNotMatchedInsert(values={
    "cliente_id": "fonte.cliente_id",
    "nome":       "fonte.nome",
    "email":      "fonte.email",
    "cidade":     "fonte.cidade",
    "estado":     "fonte.estado",
}).execute()

Time Travel (Viagem no Tempo)

O Delta Lake mantém o histórico completo de versões da tabela:

# Listar todo o histórico de versões
delta_table.history().select(
    "version", "timestamp", "operation", "operationParameters"
).show(truncate=False)

# Ler uma versão específica (por número de versão)
df_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("warehouse/delta/clientes")
df_v0.show()

# Ler por timestamp
df_historico = spark.read.format("delta") \
    .option("timestampAsOf", "2025-01-15 10:00:00") \
    .load("warehouse/delta/clientes")

# Via SQL (requer tabela registrada no catálogo)
spark.sql("""
    SELECT * FROM delta.`warehouse/delta/clientes`
    VERSION AS OF 0
""").show()

Schema Evolution (Evolução de Schema)

# Adicionar novas colunas sem recriar a tabela
df_com_telefone = spark.createDataFrame([
    (5, "Eduardo Lima", "edu@email.com", "Porto Alegre", "RS", "48999999999"),
], ["cliente_id", "nome", "email", "cidade", "estado", "telefone"])

df_com_telefone.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \   # permite adicionar nova coluna
    .save("warehouse/delta/clientes")

Vacuum — Limpeza de Arquivos Antigos

O VACUUM remove arquivos Parquet que não são mais referenciados pelo log:

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "warehouse/delta/clientes")

# Remover arquivos com mais de 0 horas (default: 168h = 7 dias)
# ATENÇÃO: remover arquivos muito recentes impossibilita time travel
delta_table.vacuum(retentionHours=0)  # apenas para ambiente de dev

Delta Lake via SQL

O Delta Lake suporta DDL/DML SQL padrão quando a tabela é gerenciada pelo catálogo Spark:

-- Criar tabela Delta via SQL
CREATE TABLE IF NOT EXISTS clientes_sql (
    cliente_id INT,
    nome       STRING,
    email      STRING,
    cidade     STRING,
    estado     STRING
) USING delta
LOCATION 'warehouse/delta/clientes_sql';

-- INSERT
INSERT INTO clientes_sql VALUES (1, 'Ana Lima', 'ana@email.com', 'Criciúma', 'SC');

-- UPDATE
UPDATE clientes_sql SET cidade = 'Tubarão' WHERE cliente_id = 1;

-- DELETE
DELETE FROM clientes_sql WHERE estado = 'RJ';

-- Verificar histórico
DESCRIBE HISTORY clientes_sql;

Comparação com Formatos Tradicionais

Característica Parquet Simples Delta Lake
ACID
Time Travel
Upsert/Merge
Schema Evolution Manual ✅ Automático
Auditoria
Leituras concorrentes
Escritas concorrentes ✅ (OCC)
Compatibilidade Parquet ✅ (é Parquet por baixo)

Casos de Uso Ideais

  • CDC (Change Data Capture): capturar mudanças de bancos transacionais e aplicar no lakehouse.
  • Streaming + Batch: unificar dados em tempo real e históricos na mesma tabela.
  • Correção de dados: usar UPDATE/DELETE para corrigir erros sem reprocessar tudo.
  • Compliance/LGPD: deletar dados de um usuário específico com garantia ACID.
  • Machine Learning: garantir reprodutibilidade de datasets com time travel.