Lakes

Misc

  • Data is stored in structured format or in its raw native format without any transformation at any scale.
    • Handling both types allows all data to be centralized which means it can be better organized and more easily accessed.
  • Optimal for fit for bulk data types such as server logs, clickstreams, social media, or sensor data.
  • Ideal use cases
    • Backup for logs
    • Raw sensor data for your IoT application,
    • Text files from user interviews
    • Images
    • Trained machine learning models (with the database simply storing the path to the object)
  • Tools
    • Rclone - A command-line program to manage files on cloud storage. It is a feature-rich alternative to cloud vendors’ web storage interfaces. Over 70 cloud storage products support rclone including S3 object stores
    • {pins}, {{pins}}
      • Posit’s Pins Docs
      • Articles
        • Automating data updates with R: Tracking changes in a GitHub-hosted dataset
          • Pulls sha from dataset repo and compares to the old one to determine if data needs update. Logs the sha and any error messages with {logger}. If there’s new data, uses {pins} to download dataset and version it.
          • There’s not much to this script, but I think {git2r} might’ve been nicer that using {httr} and {jsonlite} to manually pull from the github API
      • Convenient storage method
        • Can be used for caching
      • Can be automatically versioned, making it straightforward to track changes, re-run analyses on historical data, and undo mistakes
      • Needs to be manually refreshed
        • i.e. Update data model, etc. and run script that rights it to the board.
      • Use when:
        • Object is less than a 1 Gb
          • Use {butcher} for large model objects
            • Some model objects store training data
      • Benefits
        • Just need the pins board name and name of pinned object
          • Think the set-up is supposed to be easy
        • Easy to share; don’t need to understand databases
      • Boards
        • Folders to share on a networked drive or with services like DropBox

        • Posit Connect, Amazon S3, Google Cloud Storage, Azure storage, Databricks and Microsoft 365 (OneDrive and SharePoint)

        • Example: Pull data, clean and write to board

          board <-
            pins::board_connect(
              auth = "manual",
              server = Sys.getenv("CONNECT_SERVER"),
              key = Sys.getenv("CONNECT_API_KEY")
            )
          
          # code to pull and clean data
          
          pins::pin_write(board = board,
                          x = clean_data,
                          name = "isabella.velasquez/shiny-calendar-pin")
    • obstore: The simplest, highest-throughput Python interface to S3, GCS & Azure Storage, powered by Rust.
      • Features
        • Easy to install with no Python dependencies.
        • Sync and async API.
        • Streaming downloads with configurable chunking.
        • Automatically supports multipart uploads under the hood for large file objects.
        • The underlying Rust library is production quality and used in large scale production systems, such as the Rust package registry crates.io.
        • Simple API with static type checking.
        • Helpers for constructing from environment variables and boto3.Session objects
      • Supported object storage providers include:
        • Amazon S3 and S3-compliant APIs like Cloudflare R2
        • Google Cloud Storage
        • Azure Blob Gen1 and Gen2 accounts (including ADLS Gen2)
        • Local filesystem
        • In-memory storage
  • Lower storage costs due to their more open-source nature and undefined structure
  • On-Prem set-ups have to manage hardward and environments
    • If you wanted to separate stuff like test data from production data, you also probably had to set up new hardware.
    • If you had data in one physical environment that had to be used for analytical purposes in another physical environment, you probably had to copy that data over to the new replica environment.
      • Have to keep a tie to the source environment to ensure that the stuff in the replica environment is still up-to-date, and your operational source data most likely isn’t in one single environment. It’s likely that you have tens — if not hundreds — of those operational sources where you gather data.
    • Where on-prem set-ups focus on isolating data with physical infrastructure, cloud computing shifts to focus on isolating data using security policies.
  • Object Storage Systems
    • Cloud data lakes provide organizations with additional opportunities to simplify data management by being accessible everywhere to all applications as needed
    • Organized as collections of files within directory structures, often with multiple files in one directory representing a single table.
      • Pros: highly accessible and flexible
      • Metadata Catalogs are used to answer these questions:
        • What is the schema of a dataset, including columns and data types
        • Which files comprise the dataset and how are they organized (e.g., partitions)
        • How different applications coordinate changes to the dataset, including both changes to the definition of the dataset and changes to data
      • Hive Metastore (HMS) and AWS Glue Data Catalog are two popular catalog options
        • Contain the schema, table structure and data location for datasets within data lake storage
    • Issues:
      • Does not coordinate data changes or schema evolution between applications in a transactionally consistent manner.
        • Creates the necessity for data staging areas and this extra layer makes project pipelines brittle

Brands

  • Amazon S3
    • Add a hash to your bucket names and explicitly specify your bucket region!!
      • Some dude casually named his bucket and was charged >$1K in a day, because some software was unintentionally hitting his bucket. Even if they don’t have access, just trying to access it creates charges. (link)
      • Evidently AWS is coming up with a solution (link)
    • Try to stay <1000 entries per level of hierarchy when designing the partitioning format. Otherwise there is paging and things get expensive.
    • AWS Athena ($5/TB scanned)
      • AWS Athena is serverless and intended for ad-hoc SQL queries against data on AWS S3
  • Apache Hudi - A transactional data lake platform that brings database and data warehouse capabilities to the data lake. Hudi reimagines slow old-school batch data processing with a powerful new incremental processing framework for low latency minute-level analytics.
  • Azure
    • Data Lake Storage (ADLS)
      • Big data analytics workloads (Hadoop, Spark, etc.)
      • ADLS Gen2 uses a hierarchical namespace which mimics a directory structure (e.g., folders/files). This enables high-performance analytics scenarios.
      • Supports HDFS-like interface (e.g. Hadoop)
      • Optimized for Azure Synapse, Databricks, HDInsight
      • ADLS Gen2 is built on top of Blob Storage, so you pay the same base storage costs plus some for additional capabilities.
      • Use Cases
        • Building a data lake for analytics workloads
        • Integrating with Spark, Synapse, Databricks
    • Blob Storage - Massively scalable and secure object storage for cloud-native workloads, archives, data lakes, HPC, and machine learning
      • Azure’s S3
      • General-purpose object storage (images, docs, backups, etc.)
      • Use Cases:
        • Web/mobile app data, backups, media files
        • Archiving and static content delivery
  • Databricks Delta Lake
  • Google Cloud Storage
    • 5 GB of US regional storage free per month, not charged against your credits.
  • Hadoop
    • Traditional format for data lakes
  • Linode/Akaime Cloud S3
  • Minio
    • Open-Source alternative to AWS S3 storage.
    • Given that S3 often stores customer PII (either inadvertently via screenshots or actual structured JSON files), Minio is a great alternative to companies mindful of who has access to user data.
      • Of course, AWS claims that AWS personnel doesn’t have direct access to customer data, but by being closed-source, that statement is just a function of trust.
  • Wasabi Object Storage
    • $7/month per terabyte

Lakehouses

  • The key idea behind a Lakehouse is to be able to take the best of a Data Lake and a Data Warehouse.
    • Data Lakes can in fact provide a lot of flexibility (e.g. handle structured and unstructured data) and low storage cost.
    • Data Warehouses can provide really good query performance and ACID guarantees.

Apache Iceberg

  • Open source table format that addresses the performance and usability challenges of using Apache Hive tables in large and demanding data lake environments.
    • Other currently popular open table formats are Hudi and Delta Lake.
  • Interfaces
    • DuckDB can query Iceberg tables in S3 with an extension, docs
    • Athena can create Iceberg Tables
    • Google Cloud Storage has something called BigLake that can create Iceberg tables
  • Features
    • Transactional consistency between multiple applications where files can be added, removed or modified atomically, with full read isolation and multiple concurrent writes
    • Full schema evolution to track changes to a table over time
    • Time travel to query historical data and verify changes between updates
    • Partition layout and evolution enabling updates to partition schemes as queries and data volumes change without relying on hidden partitions or physical directories
    • Rollback to prior versions to quickly correct issues and return tables to a known good state
    • Advanced planning and filtering capabilities for high performance on large data volumes
    • The full history is maintained within the Iceberg table format and without storage system dependencies
  • Components
    • Iceberg Catalog - Used to map table names to locations and must be able to support atomic operations to update referenced pointers if needed.
      • For keeping track of external files and updating/changing data while keeping track of file locations and names.
    • Metadata Layer (with metadata files, manifest lists, and manifest files) - Stores instead all the enriching information about the constituent files for every different snapshot/transaction
      • e.g. table schema, configurations for the partitioning, etc.
    • Data Layer - Associated with the raw data files
  • Supports common industry-standard file formats, including Parquet, ORC and Avro
  • Supported by major data lake engines including Dremio, Spark, Hive and Presto
  • Queries on tables that do not use or save file-level metadata (e.g., Hive) typically involve costly list and scan operations
  • Any application that can deal with parquet files can use Iceberg tables and its API in order to query more efficiently
  • Comparison
  • Stacks
    • “Cloudflare R2 with Iceberg or Delta Lake and polars (automated with GitHub actions) is a free data lakehouse.”
    • Source

DuckLake

  • Notes from
  • Resources
    • Docs, Intro (with examples)
    • Catalog Database Options and Usage
      • If you would like to perform local data warehousing with a single client, use DuckDB as the catalog database.
      • If you would like to perform local data warehousing using multiple local clients, use SQLite as the catalog database.
      • If you would like to operate a multi-user lakehouse with potentially remote clients, choose a transactional client-server database system as the catalog database: MySQL or PostgreSQL.
  • Open source “catalog” format which instead of using a catalog architecture (hierarchical filing system like in Iceberg) for metadata, DuckLake uses a database to much more efficiently and effectively manage the metadata needed to support changes.
    • They’re using “catalog” because it replaces the whole lakehouse stack.
    • Also what Google BigQuery (with Spanner) and Snowflake (with FoundationDB) have chosen (metadata in SQL database), just without the open formats at the bottom
    • Hierarchical file based metadata (Iceberg, Deltalake) runs into performance bottleneck for high frequency transactions. So it’s is now being augmented with cataloguing services such as ‘Snowflake’s Polaris over Iceberg’ or ‘Databricks Unity Catalogue over Delta’ — which adds further abstraction and complexity.
  • The SQL database that hosts the catalog server can be any halfway competent SQL database that supports ACID and primary key constraints. (e.g. PostgreSQL, SQLite, MySQL and MotherDuck)
    • Metadata is roughly 5 orders of magnitude smaller than your actual data: a petabyte of Parquet data requires only ~10 GB of metadata. Even at massive scale, this metadata easily fits within the capabilities of modern RDBMS systems that routinely handle terabyte-scale databases.
  • Supports integration with any storage system like local disk, local NAS, S3, Azure Blob Store, GCS, etc. The storage prefix for data files (e.g., s3://mybucket/mylake/) is specified when the metadata tables are created.
  • Components (i.e. only infrastructure requirements are parquet file storage, metadata file storage)
    • DuckLake Interface - Standard SQL for all operations.
    • Commodity RDBMS - An ACID-compliant database, for storing metadata (PostgreSQL, SQLServer, Oracle, MySQL, SQLite, DuckDB itself).
    • Parquet files - The open standard for storage of tabular data for analytics workloads for storing raw data.
    • Commodity file storage - Any common platform that can act as an object store (Amazon S3, Azure Blob, Google Cloud Storage, local filesystem, etc.).
  • Currently supports only NOT NULL constraints.
    • PRIMARY KEY, FOREIGN KEY, UNIQUE, and CHECK constraints are not yet implemented, though this may change as the format matures.

R

  • Requires {duckdb} > version 1.3.1
  • Example: Basic Workflow
    • Local Set-Up

      1db_file <- file.path(tempdir(), "metadata.ducklake")
      2work_dir <- file.path(tempdir(), "lake_data_files")
      
      con <- dbConnect(duckdb())
      db_path <- glue_sql('{paste0("ducklake:", db_file)}', .con = con)
      
      setup_dlake_cmd <- glue::glue_sql("
        INSTALL ducklake;
      3  ATTACH 'ducklake:{db_file}' AS my_ducklake (DATA_PATH {work_dir});
      4  USE my_ducklake;
      ", .con = con)
      
      DBI::dbExecute(con, setup_dlake_cmd)
      1
      db_file is the location of the metadata database
      2
      work_dir is the location of the data you’re working with that are stored in parquet files. db_file and work_dir could also be URLs to a cloud location on AWS S3 or within a Google Cloud bucket instead, etc.
      3
      my_ducklake is the name of the lakehouse
      4
      USE sets my_ducklake as the default lakehouse
    • Create a Table

      dbWriteTable(
        conn = con, 
        name = "stats",
        value = dplyr::filter(stats, 
                              contrast %in% c("two_months", "four_months"))
      )
      dbWriteTable(con, 
                   "exprs", 
                   exprs)
    • Do Work

      query_sym <- "
        SELECT contrast, COUNT(symbol) FROM my_ducklake.stats GROUP BY       contrast;
      "
      df_fsq <- DBI::dbGetQuery(conn, query_sym)
      
      lazy_df <- 
        tbl(con, "stats") |>
        dplyr::filter(symbol == "Cd34")
      lazy_df
      #> # Source:   SQL [?? x 9]
      #> # Database: DuckDB v1.3.1 [root@Darwin 24.5.0:R 4.5.0/:memory:]
      #>   study     contrast    symbol gene_id      logFC  CI.L  CI.R  P.Value adj.P.Val
      #>   <chr>     <chr>       <chr>  <chr>        <dbl> <dbl> <dbl>    <dbl>     <dbl>
      #> 1 Wang_2018 two_months  Cd34   ENSMUSG0000…  1.10 0.935  1.26 2.21e-14  2.23e-10
      #> 2 Wang_2018 four_months Cd34   ENSMUSG0000…  1.99 1.81   2.18 6.75e-20  7.51e-17
      • Show two ways of querying data: DBI and dbplyr
    • View Data Files

      query_files <- "
      FROM ducklake_table_info('my_ducklake')
      "
      DBI::dbGetQuery(query_files)
      
      fs::dir_info(file.path(work_dir, "main", "stats")) |>
        dplyr::mutate(filename = basename(path)) |>
        dplyr::select(filename, size)
      #> # A tibble: 1 × 2
      #>   filename                                                     size
      #>   <chr>                                                 <fs::bytes>
      #> 1 ducklake-019804b9-2259-7bf9-b53c-2ab7be53e198.parquet       1.42M
    • Add data to a table

      dbAppendTable(
        conn = con, 
        name = "stats",
        value = dplyr::filter(stats, contrast %in% c("six_months", "eight_months"))
      )
      
      fs::dir_info(file.path(work_dir, "main", "stats")) |>
        dplyr::mutate(filename = basename(path)) |>
        dplyr::select(filename, size)
      
      #>  # A tibble: 2 × 2
      #>    filename                                                     size
      #>    <chr>                                                 <fs::bytes>
      #>  1 ducklake-019804b9-2259-7bf9-b53c-2ab7be53e198.parquet       1.42M
      #>  2 ducklake-019804b9-236f-71fa-ad7a-81f6e9a6f01d.parquet       1.44M
      • After appending the data to the stats table, we can see that an additional parquet file was created.
    • Detach and Disconnect

      dbExecute(conn = con, "USE memory; DETACH my_ducklake;")
      dbDisconnect(con)
      • Closing a connection does not release the locks held on the database files as the file handles are held by the main DuckDB instance, so you should DETACH before closing the connection
      • Detaching from a default database isn’t possible, so USE memory is required first

Python

  • Requires {duckdb} > 1.3.0

  • Basic within Quarto

    import duckdb
    
    con = duckdb.connect(database = ":memory:")
    con.sql("INSTALL ducklake")
    
    con.sql(
      f"""
      ATTACH 'ducklake:{r.db_file}' AS my_ducklake (READ_ONLY);
      """)
    df = con.sql(
      """
      USE my_ducklake;
      SELECT * FROM stats LIMIT 1000;
      """
    ).to_df()
    
    con.close()
    • r.db_file is a path variable from an R chunk in a Quarto notebook. See R example.
  • Create Instance

    import duckdb
    from pathlib import Path
    
    # Define our working directories
    DUCKLAKE_FOLDER = Path("../ducklake_basic")
    ducklake_metadata = DUCKLAKE_FOLDER / "metadata"
    ducklake_files = DUCKLAKE_FOLDER / "data_files"
    
    duckdb.sql("INSTALL ducklake")
    
    duckdb.sql(f"""
        ATTACH 'ducklake:{ducklake_metadata}' AS ducklake_basic_db (DATA_PATH '{ducklake_files}');
        USE ducklake_basic_db;
        """)
    • / concantenate the paths
    • ATTACH has the paths for the data files and the metadata files
    • USE sets this db as the default
  • Create Schema and Tables

    duckdb.sql("""
        CREATE SCHEMA IF NOT EXISTS retail_sales;
        USE retail_sales;
        """)
    
    duckdb.sql("""
        CREATE TABLE IF NOT EXISTS customer (
            customer_id INTEGER NOT NULL,
            first_name VARCHAR NOT NULL,
            last_name VARCHAR NOT NULL,
            date_joined DATE NOT NULL
        );
        """)
    duckdb.sql("""
        CREATE TABLE IF NOT EXISTS orders (
            order_id INTEGER NOT NULL,
            customer_id INTEGER NOT NULL,
            order_date DATE NOT NULL,
            product_id INTEGER NOT NULL,
            product_name VARCHAR NOT NULL,
            amount DECIMAL(10, 2) NOT NULL
        );
        """)
  • Insert

    duckdb.sql("""
        INSERT INTO customer (customer_id, first_name, last_name, date_joined) VALUES
        (1, 'Jane', 'Dunbar', '2023-01-11'),
        (2, 'Jimmy', 'Smith', '2024-08-26'),
        (3, 'Alice', 'Johnston', '2023-05-05');
    """)
    duckdb.sql("""
        INSERT INTO orders (order_id, customer_id, product_id, product_name, order_date, amount) VALUES
        (1, 1, 101, 'Widget A', '2023-01-15', 19.50),
        (2, 1, 102, 'Widget B', '2023-01-20', 29.99),
        (3, 3, 103, 'Widget A', '2023-02-10', 19.50);
    """)
  • Check file structure

    duckdb.sql(f"""
        FROM glob('{ducklake_files}/*');
        """)
    
    #> ┌────────────────────────────────────────────────────────────────────────────────────┐
    #> │                                        file                                        │
    #> │                                      varchar                                       │
    #> ├────────────────────────────────────────────────────────────────────────────────────┤
    #> │ ../ducklake_basic/data_files/ducklake-01978495-e183-7763-bf40-d9a85988eed9.parquet │
    #> └────────────────────────────────────────────────────────────────────────────────────┘
    • Flat file structure with UUID-based naming which is quite different from hierarchical folder structures used by other lakehouse formats
  • Fetch a parquet file and read it

    # Extract the parquet file name
    parquet_file = duckdb.sql(f"""
        SELECT file FROM glob('{ducklake_files}/*.parquet') LIMIT 1;
        """).fetchone()[0]
    
    # Inspect the file contents
    duckdb.sql(f"""
        SELECT * FROM read_parquet('{parquet_file}');
        """)
    #> ┌─────────────┬────────────┬───────────┬─────────────┐
    #> │ customer_id │ first_name │ last_name │ date_joined │
    #> │    int32    │  varchar   │  varchar  │    date     │
    #> ├─────────────┼────────────┼───────────┼─────────────┤
    #> │           1 │ Jane       │ Dunbar    │ 2023-01-11  │
    #> │           2 │ Jimmy      │ Smith     │ 2024-08-26  │
    #> │           3 │ Alice      │ Johnston  │ 2023-05-05  │
    #> └─────────────┴────────────┴───────────┴─────────────┘
  • View snapshot

    duckdb.sql("""
        SELECT * FROM ducklake_snapshots('ducklake_basic_db');
        """)
    #> ┌─────────────┬────────────────────────────┬────────────────┬─────────────────────────────────────────────────────────────────────────┐
    #> │ snapshot_id │       snapshot_time        │ schema_version │                                 changes                                 │
    #> │    int64    │  timestamp with time zone  │     int64      │                         map(varchar, varchar[])                         │
    #> ├─────────────┼────────────────────────────┼────────────────┼─────────────────────────────────────────────────────────────────────────┤
    #> │           0 │ 2025-06-18 19:48:24.41+00  │              0 │ {schemas_created=[main]}                                                │
    #> │           1 │ 2025-06-18 19:48:24.489+00 │              1 │ {schemas_created=[retail_sales]}                                        │
    #> │           2 │ 2025-06-18 19:48:24.533+00 │              2 │ {tables_created=[retail_sales.customer]}                                │
    #> │           3 │ 2025-06-18 19:48:24.554+00 │              2 │ {tables_inserted_into=[2]}                                              │
    #> └─────────────┴────────────────────────────┴────────────────┴─────────────────────────────────────────────────────────────────────────┘
    • Every operation in DuckLake creates a snapshot — a point-in-time view of the database state

    • Interpretation

      • Snapshot 0: Default main schema created
      • Snapshot 1: Our retail_sales schema created
      • Snapshot 2: Customer table created
      • Snapshot 3: Data inserted into the table
  • Update and Delete

    duckdb.sql("""
        UPDATE customer SET first_name = 'Alice', last_name = 'Fraser' WHERE customer_id = 3;
        """)
    duckdb.sql("""
        DELETE FROM customer WHERE customer_id = 2;
        """)
    • DuckLake creates separate “delete files” that reference the original data files and specify which rows should be considered deleted. This approach allows for efficient deletes without rewriting large data files.
  • Time Travel

    max_snapshot_id = duckdb.sql("""
        SELECT MAX(snapshot_id) FROM ducklake_snapshots('ducklake_basic_db');
        """).fetchone()[0]
    
    1duckdb.sql(f"""
        SELECT * FROM customer AT (VERSION => {max_snapshot_id - 1});
        """)
    
    2duckdb.sql(f"""
        SELECT * FROM customer AT (VERSION => {max_snapshot_id - 2});
        """)
    1
    Query data as it was prior to the latest action
    2
    Query data as it was prior to the 2 latest actions
  • Multitable Transaction

    duckdb.sql("""
        BEGIN TRANSACTION;
        INSERT INTO customer (customer_id, first_name, last_name, date_joined) VALUES
        (4, 'Bob', 'Brown', '2023-03-01');
        INSERT INTO orders (order_id, customer_id, product_id, product_name, order_date, amount) VALUES
        (4, 4, 104, 'Widget B', '2023-03-05', 29.99),
        (5, 4, 105, 'Widget C', '2023-02-15', 59.99),
        (6, 4, 106, 'Widget A', '2023-01-25', 19.50);
        COMMIT;
    """)
    • This entire operation — adding a customer and three orders — appears as a single snapshot in our change history.

    • If any part of the transaction had failed, the entire operation would have been rolled back, ensuring data consistency.

    • Multi-table transactions ensure your data maintains integrity across related tables, something that’s challenging or impossible with table-level formats.

  • View Changes Due to Latest Transaction

    duckdb.sql(f"""
        SELECT * FROM ducklake_table_changes('ducklake_basic_db', 'retail_sales', 'customer', {max_snapshot_id}, {max_snapshot_id})
        ORDER BY snapshot_id;
        """)
    #> ┌─────────────┬───────┬─────────────┬─────────────┬────────────┬───────────┬─────────────┬─────────┐
    #> │ snapshot_id │ rowid │ change_type │ customer_id │ first_name │ last_name │ date_joined │  email  │
    #> │    int64    │ int64 │   varchar   │    int32    │  varchar   │  varchar  │    date     │ varchar │
    #> ├─────────────┼───────┼─────────────┼─────────────┼────────────┼───────────┼─────────────┼─────────┤
    #> │           9 │     6 │ insert      │           4 │ Bob        │ Brown     │ 2023-03-01  │ NULL    │
    #> └─────────────┴───────┴─────────────┴─────────────┴────────────┴───────────┴─────────────┴─────────┘