Arrow

Misc

  • This note also contains code for parquet files that isn’t a part of Apache Arrow.

  • Resources

  • Packages

    • {nanoarrow} - Lightweight Arrow library in C. Doesn’t have the overhead of the orignal Arrow library. Built to pull data out of small systems (e.g. streaming logs from an embedded system).
    • {nanoparquet} - Reads and writes parquet files. No dependencies other than a C++-11 compiler. It compiles in about 30 seconds into an R package that is less than a megabyte in size. Built for use with smaller data sizes, < 1GB. (Also see limitations and article for an overview of capabilities)
      • Supports all Parquet encodings and currently supports three compression codecs: Snappy, Gzip and Zstd
      • Missing values are handled properly, factor columns are kept as factors, and temporal types are encoded correctly
      • Functions that allow you to query the metadata and schema without reading in the full datase
      • Can read a 250MB file with 32 million rows and 14 columns in about 10-15 secs on an M2.
  • Arrow/Feather format built for speed, not compression. so larger files than parquet

    • Feature for short term storage and parquet for longer term storage
    • The arrow format requires ~ten times more storage space.
      • e.g. For nyc-taxi data set, parquet takes around ~38GB, but arrow would take around 380GB. Although with arrow, you could see ~10x speed increase in operations.
  • Even with S3 support enabled, network speed will be a bottleneck unless your machine is located in the same AWS region as the data.

  • To create a multi-source dataset, provide a list of datasets to open_dataset instead of a file path, or simply concatenate them like big_dataset <- c(ds1, ds2)

  • More verbose installation + get compression libraries and AWS S3 support

    Sys.setenv(
      ARROW_R_DEV = TRUE,
      LIBARROW_MINIMAL = FALSE
    )
    install.packages("arrow")
    • Installation takes some time, so this lets you monitor progress to make sure it isn’t locked.
  • Info about your Arrow installation - arrow_info()

    • Version, Compression libs, C++ lib, Runtime, etc.
  • Creating an Arrow dataset

    • Has a script that downloads monthly released csv files; creates a Hive directory structure; and converts them to parquet files with partitioning based on that structure.
  • Optimizations

    • Notes from How to properly prepare your parquet floor
    • Make sure columns are correctly typed
    • Sort by one or two columns
      • A filtered query on a sorted field will be up to 10 times faster
      • Sorting on one or two low cardinality fields (few distinct values) will be less computationally intensive and will achieve a more efficient compression.
        • e.g. sort by year, then by geographic code
      • For geoparquet, sort by a grid index like GeoHash or H3
    • For geoparquet, add a bounding box column
      • Geometries in a geoparquet can be encoded in WKB or GeoArrow with GeoArrow being more efficient
    • Serve already-built parquet files
      • Some platforms offer on-the-fly generated parquet formats. The process is still horribly slow.
  • Statically hosted parquet files provide one of the easiest to use and most performant APIs for accessing bulk data, and are far simpler and cheaper to provide than custom APIs. (article)

    • Benefits
      • Loading a sample of rows (and viewing metadata) from an online parquet file is a simple one-liner.
      • Serving data as a static file is probably the simplest and cheapest possible architecture for open data services. (e.g. s3)
    • List of cases when are static files inappropriate
      • Very large datasets of 10s of millions of records.
        • In these cases it may be appropriate to offer an API that allows users to query the underlying data to return smaller subsets
      • Your users want to make transactional or atomic requests. Then static files are inappropriate.
      • Relational data with a complex schema.
      • Private datasets with granular access control.
      • Rapidly changing data to which users need immediate access.
    • Recommendations
      • Enable CORS
        • Cross-Origin Resource Sharing (CORS) enables any website to load your data directly from source parquet files without the need for a server.
      • Create a URL stucture
        • Example: Date Partition

          www.my-organisation.com/open_data/v1/widgets_2021.parquet
          
          www.my-organisation.com/open_data/v1/widgets_2022.parquet
          
          www.my-organisation.com/open_data/v1/widgets_latest.parquet
        • Consider also providing the same structure but with csvs

      • Provide directory listing service to enable data discovery and scraping
  • {{polars}} can write parquet files

    import polars as pl
    
    pl.read_csv("data_recensement_2017.csv", separator = ';', \
        dtypes = {'COMMUNE': pl.String}) \
    .write_parquet("data_recensement_2017.parquet", \
        compression = 'zstd', use_pyarrow = False)

APIs

  • Single File
    • Contains functions for each supported file format (CSV, JSON, Parquet, Feather/Arrow, ORC).
      • Start with read_ or write_ followed by the name of the file format.
      • e.g. read_csv_arrow() , read_parquet() , and read_feather()
    • Works on one file at a time, and the data is loaded into memory.
      • Depending on the size of your file and the amount of memory you have available on your system, it might not be possible to load the dataset this way.
      • Example
        • 111MB RAM used - Start of R session
        • 135MB - Arrow package loaded
        • 478MB - After using read_csv_arrow("path/file.csv", as_data_frame = FALSE) to load a 108 MB file
          • 525MB with “as_data_frame = TRUE” (data loaded as a dataframe rather than an Arrow table)
  • Dataset
    • Can read multiple file formats
    • Can point to a folder with multiple files and create a dataset from them
    • Can read datasets from multiple sources (even combining remote and local sources)
    • Can be used to read single files that are too large to fit in memory.
      • Data does NOT get loaded into memory
      • Queries will be slower if the data is not in parquet format
        • e.g. dat <- open_dataset("~/dataset/path_to_file.csv")

Data Objects

  • Scalar - R doesn’t have a scalar class (only vectors)

    Scalar$create(value, type)
  • Array and ChunkedArray

    ChunkedArray$create(..., type)
    Array$create(vector, type)
    • Only difference is that one can be chunked
  • RecordBatch and Table

    RecordBatch or Table$create(...)
    • Similar except Table can be chunked
  • Dataset - list of Tables with same schema

    Dataset$create(sources, schema)
  • Data Types (?decimal) (Table$var$cast(decimal(3,2))

    • int8(), 16, 32, 64
    • uint8(), …
    • float(), 16, 32, 64
    • halffloat()
    • bool(), boolean()
    • utf8(), large_utf8
    • binary(), large_binary, fixed_size_binary(byte_width)
    • string()
    • date32(), 64
    • time32(unit = c("ms", "s")), 64
    • timestamp(unit, timezone)
    • decimal()
    • struct()
    • list_of(), large_list_of(), fixed_size_list_of()

Operations

  • read_csv_arrow(<csv_file>, as_data_frame = FALSE)

    • Reads csv into memory as an Arrow table
    • as_data_frame - if TRUE (default), reads into memory as a tibble which takes up more space instead of an Arrow Table
  • write_parquet

    • compression
      • Before using compression, ask:
        • Will the parquet files be frequently accessed online (e.g. API)?
          • In this situation, bandwidth may be a issue and a smaller (compressed) file would be desirable.
        • Will the parquet files be accessed from a local disk?
          • In this situation, the time spent decompressing the file to read it is probably not worth the decrease in file size.
      • default “snappy” - popular
      • “uncompressed”
      • “zstd” (z-standard)
        • High performance from Google
        • Compresses to smaller size than snappy
    • use_dictionary
      • default TRUE - encode column types e.g. factor variables
      • FALSE - increases file size dramatically (e.g. 9 kb to 86 kb)
    • chunk_size
      • How many rows per column (aka row group)
        • The data is compressed per column, and inside each column, per chunk of rows, which is called the row group
      • If the data has fewer than 250 million cells (rows x cols), then the total number of rows is used.
      • Considerations
        • The choice of compression can affect row group performance.
        • Ensure row group size doesn’t exceed available memory for processing
        • Very small row groups can lead to larger file sizes due to metadata overhead.
        • If performing batch tasks, you want the largest file sizes possible
          • e.g. Performing a sales analysis where you need the whole dataset.
        • If accessing randomly, you might want smaller chunk sizes
          • e.g. Looking up individual customer information in a large database of users.
      • Docs recommend large row groups (512MB - 1GB)
        • Optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file
    • data_page_size
      • Target threshold for the approximate encoded size of data pages within a column chunk
      • Smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers)
      • Default: 1MB; For sequential scans, the docs recommend 8KB for page sizes.
  • Convert large csv to parquet

    my_data <- read_csv_arrow(
      "~/dataset/path_to_file.csv",
      as_data_frame = FALSE
    )
    write_parquet(data, "~/dataset/my-data.parquet")
    dat <- read_parquet("~/dataset/data.parquet", as_data_frame = FALSE) # loaded into memory as an Arrow table
    • Reduces size of data stored substantially (e.g. 15 GB csv to 9.5 GB parquet)
  • Lazily download subsetted dataset from S3 and locally convert to parquet with partitions

    data_nyc = "data/nyc-taxi"
    open_dataset("s3://voltrondata-labs-datasets/nyc-taxi") |>
        dplyr::filter(year %in% 2012:2021) |> 
        write_dataset(data_nyc, partitioning = c("year", "month"))
    • open_dataset doesn’t used RAM, so subsetting a large dataset (e.g. 40GB) before writing is safe.

    • format = “arrow” also available

Partitioning

  • Partitioning increases the number of files and it creates a directory structure around the files.

  • Hive Partitioning - Folder/file structure based on partition keys (i.e. grouping variable). Within each folder, the key has a value determined by the name of the folder. By partitioning the data in this way, it makes it faster to do queries on data slices.

    • Example: Folder structure when partitioning on year and month

      taxi-data
         year=2018
           month=01
            file01.parquet
           month=02
            file02.parquet
            file03.parquet
           ...  
         year=2019
           month=01
           ...
  • Pros

    • Allows Arrow to construct a more efficient query
    • Can be read and written with parallelism
  • Cons

    • Each additional file adds a little overhead in processing for filesystem interaction
    • Can increase the overall dataset size since each file has some shared metadata
  • Best Practices

    • Avoid having individual Parquet files smaller than 20MB and larger than 2GB. (Docs)
      • Having files beyond this range will cancel out the benefit of your query grouping by a partition column. (see article for benchmarks)
    • Avoid partitioning layouts with more than 10,000 distinct partitions.
  • View metadata of a partitioned dataset

    air_data <- open_dataset("airquality_partitioned_deeper")
    
    # View data
    air_data
    
    ## FileSystemDataset with 153 Parquet files
    ## Ozone: int32
    ## Solar.R: int32
    ## Wind: double
    ## Temp: int32
    ## Month: int32
    ## Day: int32
    ##
    ## See $metadata for additional Schema metadata
    • This is a “dataset” type so data won’t be read into memory
    • Assume $metadata will indicate which columns the dataset is partitioned by
  • Partition a large file and write to arrow format

    lrg_file <- open_dataset(<file_path>, format = "csv")
    lrg_file %>%
        group_by(var) %>%
        write_dataset(<output_dir>, format = "feather")
    • Pass the file path to open_dataset()
    • Use group_by() to partition the Dataset into manageable chunks
      • Can also use partitioning in write_dataset
    • Use write_dataset() to write each chunk to a separate Parquet file—all without needing to read the full CSV file into R
    • open_dataset is fast because it only reads the metadata of the file system to determine how it can construct queries
  • Partition Columns

    • Preferrably chosen based on how you expect to use the data (e.g. important group variables)

    • Example: partition on county because your analysis or transformations will largely be done by county even though since some counties may be much larger than others and will cause the partitions to be substantially imbalanced.

    • If there is no obvious column, partitioning can be dictated by a maximum number of rows per partition

      write_dataset(
        data,
        format = "parquet",
        path = "~/datasets/my-data/",
        max_rows_per_file = 1e7
      )
      dat <- open_dataset("~/datasets/my-data")
      • Files can get very large without a row count cap, leading to out-of-memory errors in downstream readers.
      • Relationship between row count and file size depends on the dataset schema and how well compressed (if at all) the data is
      • Other ways to control file size.
        • “max_rows_per_group” - splits up large incoming batches into multiple row groups.
          • If this value is set then “min_rows_per_group” should also be set or else you may end up with very small row groups (e.g. if the incoming row group size is just barely larger than this value).

Fixed Precision Decimal Numbers

  • Computers don’t store exact representations of numbers, so there are floating point errors in calculations. Doesn’t usually matter in analysis, but it can matter in transaction-based operations.

    txns <- tibble(amount = c(0.1, 0.1, 0.1, -0.3)) %>%
        summarize(balance = sum(amount, na.rm = TRUE
    # Should be 0
    txns
    # 5.55e-17
  • The accumulation of these errors can be costly.

  • Arrow can fix this with fixed precision decimals

    # arrow table (c++ library)
    # collect() changes it to a df
    txns <- Table$create(amount = c(0.1, 0.1, 0.1, -0.3))
    txns$amount <- txns$amount$cast(decimal(3,2))
    txns
    # blah, blah, decimal128, blah
    
    write_parquet(txns, "data/txns_decimal.parquet")
    txns <- spark_read_parquet("data/txns_decimal.parquet")
    txns %>%
        summarize(balance = sum(ammount, na.rm = T))
    # balance
    #    0

Queries

  • Example: Filter partitioned files

    library(dbplyr)
    # iris dataset was written and partitioned to a directory path stored in dir_out
    ds <- arrow::open_dataset(dir_out, partitioning = "species"
    # query the dataset
    ds %>% 
      filter(species == "species=setosa") %>%
      count(sepal_length) %>% 
      collect()
    • format “<partition_variable>=<partition_value>”

    • compute stores the result in Arrow

    • collect brings the result into R

  • Example: libarrow functions

    arrowmagicks %>% 
      mutate(days = arrow_days_between(start_date, air_date)) %>% 
      collect()
    • “days_between” is a function in libarrow but not in {arrow}. In order to use it, you only have to put the “arrow_” prefix in front of it.
    • Use list_compute_functions to get a list of the available functions
      • List of potential functions available (libarrow function reference)
  • When the query is also larger than memory

    library(arrow)
    library(dplyr)
    nyc_taxi <- open_dataset("nyc-taxi/")
    nyc_taxi |>
      filter(payment_type == "Credit card") |>
      group_by(year, month) |>
      write_dataset("nyc-taxi-credit")
    • In the example, the input is 1.7 billion rows (70GB), output is 500 million (15GB). Takes 3-4 mins.
  • User-defined functions

    • register_scalar_function - accepts base R functions inside your function

DuckDB

  • Converting a csv to parquet

    SET threads = 4 ;
    COPY 'data_recensement_2017.csv' TO 'data_recensement_2017.parquet'
    (compression zstd) ;
    • threads is set to the number of threads available by default

    • Will automatically detect the type of delimiter

  • Query

    df <- duckdb:::sql("FROM 'file.parquet'")
  • Export a DuckDB database to parquet

    drv <- duckdb::duckdb()
    con <- DBI::dbConnect(drv)
    on.exit(DBI::dbDisconnect(con), add = TRUE)
    
    # create duckdb table
    DBI::dbWriteTable(con, "mtcars", mtcars)
    
    DBI::dbExecute(con, DBI::sqlInterpolate(con,
      "COPY mtcars TO ?filename (FORMAT 'parquet', COMPRESSION 'snappy')",
      filename = 'mtcars.parquet'
    ))

Cloud

  • Access files in Amazon S3 (works for all file types)

    taxi_s3 <- read_parquet("s3://ursa-labs-taxi-data/2013/12/data.parquet)
    # multiple files
    ds_s3 <- open_dataset(s3://ursa-labs-taxi-data/", partitioning = c("year", "month"))
    • As of 2021, only works for Amazon uri

    • read_parquet can take a minute to load

    • You can see the folder structure in the read_parquet S3 uri

    • Example Query

      # over 125 files and 30GB
      ds_s3 %>%
          filter(total_amount > 100, year == 2015) %>%
          select(tip_amount, total_amount, passenger_count) %>%
          mutate(tip_pct = 100 * tip_amount / total_amount) %>%
          group_by(passenger_count) %>%
          summarize(median_tip_pct = median(tip_pct),
                    n = n()) %>%
          print() # is this necessary?
      • Partitioning allowed Arrow to bypass all files that weren’t in year 2015 directory and only perform calculation on those files therein.
  • Access Google Cloud Storage (GCS)