Ir para o conteúdo

Apache Iceberg

O que é o Apache Iceberg?

Apache Iceberg é um formato de tabela open-source de alto desempenho para grandes conjuntos de dados analíticos. Originalmente desenvolvido pela Netflix e doado para a Apache Software Foundation em 2018, o Iceberg foi criado para resolver problemas críticos de escalabilidade e confiabilidade que surgem ao trabalhar com tabelas Hive em escala petabyte.

O Iceberg é uma especificação aberta — qualquer motor de processamento (Spark, Flink, Trino, Hive, Dremio, etc.) pode ler e escrever tabelas Iceberg, tornando-o ideal para ambientes multi-engine.


Arquitetura do Apache Iceberg

graph TB
    subgraph Catalog["Catalog (Hive, REST, Nessie, Glue...)"]
        CT["tabela → current_snapshot_id\n+ location"]
    end

    subgraph Metadata["Camada de Metadados"]
        MF["metadata.json\n(Metadata File)"]
        SF["snap-xxxx.avro\n(Snapshot / Manifest List)"]
        MN1["manifest-001.avro\n(Manifest File)"]
        MN2["manifest-002.avro\n(Manifest File)"]
    end

    subgraph Data["Camada de Dados"]
        P1["data/part-00001.parquet"]
        P2["data/part-00002.parquet"]
        P3["data/part-00003.parquet"]
    end

    CT --> MF
    MF --> SF
    SF --> MN1
    SF --> MN2
    MN1 --> P1
    MN1 --> P2
    MN2 --> P3

    style Catalog fill:#1e3a5f,color:#fff
    style Metadata fill:#0077b6,color:#fff
    style Data fill:#00b4d8,color:#000

Três Camadas do Iceberg

Camada Componente Função
Catálogo Catalog (Hive, REST, Nessie) Mapeia nome da tabela → localização dos metadados
Metadados Metadata File (.json) Schema, partições, snapshots, propriedades
Metadados Manifest List (.avro) Lista de manifest files por snapshot
Metadados Manifest File (.avro) Lista de arquivos de dados com estatísticas
Dados Parquet / ORC / Avro Dados reais armazenados

Conceito de Snapshots

Cada operação de escrita no Iceberg gera um novo snapshot — uma foto imutável do estado da tabela naquele momento:

timeline
    title Histórico de Snapshots de uma Tabela Iceberg
    Snapshot 1 : INSERT inicial (100 linhas)
               : snap-id = 1234567890
    Snapshot 2 : INSERT adicional (+50 linhas)
               : snap-id = 2345678901
    Snapshot 3 : UPDATE (status de pedidos)
               : snap-id = 3456789012
    Snapshot 4 : DELETE (clientes inativos)
               : snap-id = 4567890123

Configuração do Iceberg com PySpark

from pyspark.sql import SparkSession

ICEBERG_VERSION = "1.6.1"
SPARK_VERSION   = "3.5"
SCALA_VERSION   = "2.12"
JAR = (
    f"org.apache.iceberg:iceberg-spark-runtime"
    f"-{SPARK_VERSION}_{SCALA_VERSION}:{ICEBERG_VERSION}"
)

spark = SparkSession.builder \
    .appName("ApacheIceberg") \
    .master("local[*]") \
    .config("spark.jars.packages", JAR) \
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local",
            "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "warehouse/iceberg") \
    .config("spark.sql.defaultCatalog", "local") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Operações CRUD com Apache Iceberg

Criar Banco de Dados e Tabela

# Criar namespace (banco de dados)
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.techstore")

# Criar tabela Iceberg via SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS local.techstore.clientes (
        cliente_id   INT    NOT NULL,
        nome         STRING NOT NULL,
        email        STRING NOT NULL,
        cidade       STRING,
        estado       STRING
    )
    USING iceberg
    PARTITIONED BY (estado)
""")

INSERT — Inserir Dados

# Via DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("cliente_id", IntegerType(), False),
    StructField("nome",       StringType(),  False),
    StructField("email",      StringType(),  False),
    StructField("cidade",     StringType(),  True),
    StructField("estado",     StringType(),  True),
])

dados = [
    (1, "Alice Souza",    "alice@email.com",  "Criciúma",     "SC"),
    (2, "Bruno Oliveira", "bruno@email.com",  "Florianópolis","SC"),
    (3, "Carla Mendes",   "carla@email.com",  "São Paulo",    "SP"),
    (4, "Diego Alves",    "diego@email.com",  "Curitiba",     "PR"),
]

df = spark.createDataFrame(dados, schema=schema)

df.writeTo("local.techstore.clientes") \
  .append()

# Via SQL
spark.sql("""
    INSERT INTO local.techstore.clientes VALUES
    (5, 'Eduardo Lima', 'edu@email.com', 'Porto Alegre', 'RS')
""")

UPDATE — Atualizar Registros

# Atualizar cidade de um cliente
spark.sql("""
    UPDATE local.techstore.clientes
    SET cidade = 'Tubarão'
    WHERE cliente_id = 1
""")

# Verificar resultado
spark.sql("""
    SELECT * FROM local.techstore.clientes
    WHERE cliente_id = 1
""").show()

DELETE — Remover Registros

# Remover cliente específico
spark.sql("""
    DELETE FROM local.techstore.clientes
    WHERE cliente_id = 3
""")

# Verificar tabela após remoção
spark.sql("SELECT * FROM local.techstore.clientes ORDER BY cliente_id").show()

MERGE (Upsert)

# Criar tabela temporária com dados novos/atualizados
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW novos_clientes AS
    SELECT * FROM VALUES
        (2, 'Bruno O. Silva', 'bruno@email.com', 'Joinville', 'SC'),
        (6, 'Fernanda Rocha', 'fe@email.com',    'Salvador',  'BA')
    AS t(cliente_id, nome, email, cidade, estado)
""")

# MERGE INTO
spark.sql("""
    MERGE INTO local.techstore.clientes AS destino
    USING novos_clientes AS fonte
    ON destino.cliente_id = fonte.cliente_id
    WHEN MATCHED THEN
        UPDATE SET
            destino.nome   = fonte.nome,
            destino.cidade = fonte.cidade,
            destino.estado = fonte.estado
    WHEN NOT MATCHED THEN
        INSERT (cliente_id, nome, email, cidade, estado)
        VALUES (fonte.cliente_id, fonte.nome, fonte.email,
                fonte.cidade, fonte.estado)
""")

Time Travel (Viagem no Tempo)

# Listar todos os snapshots da tabela
spark.sql("""
    SELECT snapshot_id, committed_at, operation, summary
    FROM local.techstore.clientes.snapshots
    ORDER BY committed_at
""").show(truncate=False)

# Ler dados de um snapshot específico
spark.sql("""
    SELECT * FROM local.techstore.clientes
    FOR SYSTEM_VERSION AS OF <snapshot_id>
""").show()

# Ler dados de um timestamp específico
spark.sql("""
    SELECT * FROM local.techstore.clientes
    FOR SYSTEM_TIME AS OF '2025-01-15 10:00:00'
""").show()

# Via DataFrame API
df_historico = spark.read \
    .option("snapshot-id", "<snapshot_id>") \
    .table("local.techstore.clientes")

Partition Evolution (Evolução de Particionamento)

Uma das grandes vantagens do Iceberg sobre o Hive é a capacidade de mudar o particionamento sem reescrever os dados:

# Criar tabela particionada por 'estado'
spark.sql("""
    CREATE TABLE local.techstore.pedidos (
        pedido_id   INT,
        cliente_id  INT,
        data_pedido DATE,
        status      STRING,
        valor_total DOUBLE
    )
    USING iceberg
    PARTITIONED BY (estado_cliente STRING, months(data_pedido))
""")

# Adicionar nova partição SEM reescrever dados antigos
spark.sql("""
    ALTER TABLE local.techstore.pedidos
    ADD PARTITION FIELD days(data_pedido)
""")

# Remover partição antiga
spark.sql("""
    ALTER TABLE local.techstore.pedidos
    DROP PARTITION FIELD months(data_pedido)
""")

Schema Evolution (Evolução de Schema)

# Adicionar coluna
spark.sql("""
    ALTER TABLE local.techstore.clientes
    ADD COLUMN telefone STRING AFTER email
""")

# Renomear coluna
spark.sql("""
    ALTER TABLE local.techstore.clientes
    RENAME COLUMN cidade TO municipio
""")

# Alterar tipo de coluna (widening apenas: int → long → float → double → decimal)
spark.sql("""
    ALTER TABLE local.techstore.clientes
    ALTER COLUMN cliente_id TYPE BIGINT
""")

# Remover coluna
spark.sql("""
    ALTER TABLE local.techstore.clientes
    DROP COLUMN telefone
""")

Manutenção de Tabelas Iceberg

# Compactar arquivos pequenos (bin-packing)
spark.sql("""
    CALL local.system.rewrite_data_files(
        table => 'local.techstore.clientes',
        options => map('target-file-size-bytes', '134217728')
    )
""")

# Expirar snapshots antigos (manter últimas 24 horas)
spark.sql("""
    CALL local.system.expire_snapshots(
        table => 'local.techstore.clientes',
        older_than => TIMESTAMP '2025-01-01 00:00:00',
        retain_last => 5
    )
""")

# Remover arquivos órfãos
spark.sql("""
    CALL local.system.remove_orphan_files(
        table => 'local.techstore.clientes'
    )
""")

Delta Lake vs Apache Iceberg

Característica Delta Lake Apache Iceberg
Origem Databricks (2019) Netflix / Apache (2018)
Governança Linux Foundation Apache Software Foundation
Spec. aberta Parcial ✅ Totalmente aberta
Multi-engine Bom ✅ Excelente (design-first)
Catalog Spark catalog Hive, REST, Nessie, Glue...
ACID
Time Travel Por versão/timestamp Por snapshot/timestamp
Partition Evolution Limitado ✅ Avançado
Schema Evolution
Formato dos dados Parquet Parquet, ORC, Avro
Tamanho de tabelas Bom ✅ Otimizado para PB
Streaming ✅ (Delta Streaming) ✅ (via Flink/Spark)
Integração Databricks ✅ Nativa Suportado

Casos de Uso Ideais

  • Ambientes multi-engine: tabelas consumidas por Spark, Trino, Flink e Hive simultaneamente.
  • Tabelas petabyte: o design de metadados em três camadas escala melhor que o Delta Log para tabelas muito grandes.
  • Partition Evolution: migrar particionamento sem reescrever dados históricos.
  • Lakehouse aberto: projetos que precisam de portabilidade máxima e independência de fornecedor.
  • Dados geoespaciais/temporais: funções de particionamento nativas (hours, days, months, years, bucket, truncate).