Arrow

Misc

  • This note also contains code for parquet files that isn’t a part of Apache Arrow.

  • Resources

  • Packages

    • {nanoarrow} - Lightweight Arrow library in C. Doesn’t have the overhead of the orignal Arrow library. Built to pull data out of small systems (e.g. streaming logs from an embedded system).
    • {nanoparquet} - Reads and writes parquet files. No dependencies other than a C++-11 compiler. It compiles in about 30 seconds into an R package that is less than a megabyte in size. Built for use with smaller data sizes, < 1GB. (Also see limitations and article for an overview of capabilities)
      • Supports all Parquet encodings and currently supports three compression codecs: Snappy, Gzip and Zstd
      • Missing values are handled properly, factor columns are kept as factors, and temporal types are encoded correctly
      • Functions that allow you to query the metadata and schema without reading in the full datase
      • Can read a 250MB file with 32 million rows and 14 columns in about 10-15 secs on an M2.
  • Arrow/Feather format built for speed, not compression. so larger files than parquet

    • Feature for short term storage and parquet for longer term storage
    • The arrow format requires ~ten times more storage space.
      • e.g. For nyc-taxi data set, parquet takes around ~38GB, but arrow would take around 380GB. Although with arrow, you could see ~10x speed increase in operations.
  • Even with S3 support enabled, network speed will be a bottleneck unless your machine is located in the same AWS region as the data.

  • To create a multi-source dataset, provide a list of datasets to open_dataset instead of a file path, or simply concatenate them like big_dataset <- c(ds1, ds2)

  • More verbose installation + get compression libraries and AWS S3 support

    Sys.setenv(
      ARROW_R_DEV = TRUE,
      LIBARROW_MINIMAL = FALSE
    )
    install.packages("arrow")
    • Installation takes some time, so this lets you monitor progress to make sure it isn’t locked.
  • Info about your Arrow installation - arrow_info()

    • Version, Compression libs, C++ lib, Runtime, etc.
  • Creating an Arrow dataset

    • Has a script that downloads monthly released csv files; creates a Hive directory structure; and converts them to parquet files with partitioning based on that structure.
  • Statically hosted parquet files provide one of the easiest to use and most performant APIs for accessing bulk¹ data, and are far simpler and cheaper to provide than custom APIs. (article)

    • Pros and cons
    • List of cases when are static files inappropriate

APIs

  • Single File
    • Contains functions for each supported file format (CSV, JSON, Parquet, Feather/Arrow, ORC).
      • Start with read_  or write_  followed by the name of the file format.
      • e.g. read_csv_arrow() , read_parquet() , and read_feather()
    • Works on one file at a time, and the data is loaded into memory.
      • Depending on the size of your file and the amount of memory you have available on your system, it might not be possible to load the dataset this way.
      • Example
        • 111MB RAM used - Start of R session
        • 135MB - Arrow package loaded
        • 478MB - After using read_csv_arrow("path/file.csv", as_data_frame = FALSE) to load a 108 MB file
          • 525MB with “as_data_frame = TRUE” (data loaded as a dataframe rather than an Arrow table)
  • Dataset
    • Can read multiple file formats
    • Can point to a folder with multiple files and create a dataset from them
    • Can read datasets from multiple sources (even combining remote and local sources)
    • Can be used to read single files that are too large to fit in memory.
      • Data does NOT get loaded into memory
      • Queries will be slower if the data is not in parquet format
        • e.g. dat <- open_dataset("~/dataset/path_to_file.csv")

Data Objects

  • Scalar - R doesn’t have a scalar class (only vectors)

    Scalar$create(value, type)
  • Array and ChunkedArray

    ChunkedArray$create(..., type)
    Array$create(vector, type)
    • Only difference is that one can be chunked
  • RecordBatch and Table

    RecordBatch or Table$create(...)
    • Similar except Table can be chunked
  • Dataset - list of Tables with same schema

    Dataset$create(sources, schema)
  • Data Types (?decimal) (Table$var$cast(decimal(3,2))

    • int8(), 16, 32, 64
    • uint8(), …
    • float(), 16, 32, 64
    • halffloat()
    • bool(), boolean()
    • utf8(), large_utf8
    • binary(), large_binary, fixed_size_binary(byte_width)
    • string()
    • date32(), 64
    • time32(unit = c("ms", "s")), 64
    • timestamp(unit, timezone)
    • decimal()
    • struct()
    • list_of(), large_list_of(), fixed_size_list_of()

Operations

  • read_csv_arrow(<csv_file>, as_data_frame = FALSE)

    • Reads csv into memory as an Arrow table
    • as_data_frame - if TRUE (default), reads into memory as a tibble which takes up more space instead of an Arrow Table
  • write_parquet

    • compression
      • default “snappy” - popular
      • “uncompressed”
      • “zstd” (z-standard)
        • High performance from Google
        • Compresses to smaller size than snappy
    • use_dictionary
      • default TRUE - encode column types e.g. factor variables
      • FALSE - increases file size dramatically (e.g. 9 kb to 86 kb)
    • chunk_size
      • How many rows per column (aka row group)
        • The data is compressed per column, and inside each column, per chunk of rows, which is called the row group
      • If the data has fewer than 250 million cells (rows x cols), then the total number of rows is used.
      • If performing batch tasks, you want the largest file sizes possible
      • if accessing randomly (?), you might want smaller chunck sizes
  • Convert large csv to parquet

    my_data <- read_csv_arrow(
      "~/dataset/path_to_file.csv",
      as_data_frame = FALSE
    )
    write_parquet(data, "~/dataset/my-data.parquet")
    dat <- read_parquet("~/dataset/data.parquet", as_data_frame = FALSE) # loaded into memory as an Arrow table
    • Reduces size of data stored substantially (e.g. 15 GB csv to 9.5 GB parquet)
  • Lazily download subsetted dataset from S3 and locally convert to parquet with partitions

    data_nyc = "data/nyc-taxi"
    open_dataset("s3://voltrondata-labs-datasets/nyc-taxi") |>
        dplyr::filter(year %in% 2012:2021) |> 
        write_dataset(data_nyc, partitioning = c("year", "month"))
    • open_dataset doesn’t used RAM, so subsetting a large dataset (e.g. 40GB) before writing is safe.

    • format = “arrow” also available

Partitioning

  • Partitioning increases the number of files and it creates a directory structure around the files.

  • Hive Partitioning - Folder/file structure based on partition keys (i.e. grouping variable). Within each folder, the key has a value determined by the name of the folder. By partitioning the data in this way, it makes it faster to do queries on data slices.

    • Example: Folder structure when partitioning on year and month

      taxi-data
         year=2018
           month=01
            file01.parquet
           month=02
            file02.parquet
            file03.parquet
           ...  
         year=2019
           month=01
           ...
  • Pros

    • Allows Arrow to construct a more efficient query
    • Can be read and written with parallelism
  • Cons

    • Each additional file adds a little overhead in processing for filesystem interaction
    • Can increase the overall dataset size since each file has some shared metadata
  • Best Practices

    • Avoid having individual Parquet files smaller than 20MB and larger than 2GB.
      • Having files beyond this range will cancel out the benefit of your query grouping by a partition column. (see article for benchmarks)
    • Avoid partitioning layouts with more than 10,000 distinct partitions.
    • Optimal Size is 512MB — 1GB (docs)
  • View metadata of a partitioned dataset

    air_data <- open_dataset("airquality_partitioned_deeper")
    
    # View data
    air_data
    
    ## FileSystemDataset with 153 Parquet files
    ## Ozone: int32
    ## Solar.R: int32
    ## Wind: double
    ## Temp: int32
    ## Month: int32
    ## Day: int32
    ##
    ## See $metadata for additional Schema metadata
    • This is a “dataset” type so data won’t be read into memory
    • Assume $metadata will indicate which columns the dataset is partitioned by
  • Partition a large file and write to arrow format

    lrg_file <- open_dataset(<file_path>, format = "csv")
    lrg_file %>%
        group_by(var) %>%
        write_dataset(<output_dir>, format = "feather")
    • Pass the file path to open_dataset()
    • Use group_by() to partition the Dataset into manageable chunks
      • Can also use partitioning in write_dataset
    • Use write_dataset() to write each chunk to a separate Parquet file—all without needing to read the full CSV file into R
    • open_dataset is fast because it only reads the metadata of the file system to determine how it can construct queries
  • Partition Columns

    • Preferrably chosen based on how you expect to use the data (e.g. important group variables)

    • Example: partition on county because your analysis or transformations will largely be done by county even though since some counties may be much larger than others and will cause the partitions to be substantially imbalanced.

    • If there is no obvious column, partitioning can be dictated by a maximum number of rows per partition

      write_dataset(
        data,
        format = "parquet",
        path = "~/datasets/my-data/",
        max_rows_per_file = 1e7
      )
      dat <- open_dataset("~/datasets/my-data")
      • Files can get very large without a row count cap, leading to out-of-memory errors in downstream readers.
      • Relationship between row count and file size depends on the dataset schema and how well compressed (if at all) the data is
      • Other ways to control file size.
        • “max_rows_per_group” - splits up large incoming batches into multiple row groups.
          • If this value is set then “min_rows_per_group” should also be set or else you may end up with very small row groups (e.g. if the incoming row group size is just barely larger than this value).

Fixed Precision Decimal Numbers

  • Computers don’t store exact representations of numbers, so there are floating point errors in calculations. Doesn’t usually matter in analysis, but it can matter in transaction-based operations.

    txns <- tibble(amount = c(0.1, 0.1, 0.1, -0.3)) %>%
        summarize(balance = sum(amount, na.rm = TRUE
    # Should be 0
    txns
    # 5.55e-17
  • The accumulation of these errors can be costly.

  • Arrow can fix this with fixed precision decimals

    # arrow table (c++ library)
    # collect() changes it to a df
    txns <- Table$create(amount = c(0.1, 0.1, 0.1, -0.3))
    txns$amount <- txns$amount$cast(decimal(3,2))
    txns
    # blah, blah, decimal128, blah
    
    write_parquet(txns, "data/txns_decimal.parquet")
    txns <- spark_read_parquet("data/txns_decimal.parquet")
    txns %>%
        summarize(balance = sum(ammount, na.rm = T))
    # balance
    #    0

Queries

  • Example: Filter partitioned files

    library(dbplyr)
    # iris dataset was written and partitioned to a directory path stored in dir_out
    ds <- arrow::open_dataset(dir_out, partitioning = "species"
    # query the dataset
    ds %>% 
      filter(species == "species=setosa") %>%
      count(sepal_length) %>% 
      collect()
    • format “<partition_variable>=<partition_value>”

    • compute stores the result in Arrow

    • collect brings the result into R

  • Example: libarrow functions

    arrowmagicks %>% 
      mutate(days = arrow_days_between(start_date, air_date)) %>% 
      collect()
    • “days_between” is a function in libarrow but not in {arrow}. In order to use it, you only have to put the “arrow_” prefix in front of it.
    • Use list_compute_functions to get a list of the available functions
      • List of potential functions available (libarrow function reference)
  • When the query is also larger than memory

    library(arrow)
    library(dplyr)
    nyc_taxi <- open_dataset("nyc-taxi/")
    nyc_taxi |>
      filter(payment_type == "Credit card") |>
      group_by(year, month) |>
      write_dataset("nyc-taxi-credit")
    • In the example, the input is 1.7 billion rows (70GB), output is 500 million (15GB). Takes 3-4 mins.
  • User-defined functions

    • register_scalar_function - accepts base R functions inside your function

DuckDB

  • Query

    df <- duckdb:::sql("FROM 'file.parquet'")
  • Export a DuckDB database to parquet

    drv <- duckdb::duckdb()
    con <- DBI::dbConnect(drv)
    on.exit(DBI::dbDisconnect(con), add = TRUE)
    
    # create duckdb table
    DBI::dbWriteTable(con, "mtcars", mtcars)
    
    DBI::dbExecute(con, DBI::sqlInterpolate(con,
      "COPY mtcars TO ?filename (FORMAT 'parquet', COMPRESSION 'snappy')",
      filename = 'mtcars.parquet'
    ))

Cloud

  • Access files in Amazon S3 (works for all file types)

    taxi_s3 <- read_parquet("s3://ursa-labs-taxi-data/2013/12/data.parquet)
    # multiple files
    ds_s3 <- open_dataset(s3://ursa-labs-taxi-data/", partitioning = c("year", "month"))
    • As of 2021, only works for Amazon uri

    • read_parquet can take a minute to load

    • You can see the folder structure in the read_parquet S3 uri

    • Example Query

      # over 125 files and 30GB
      ds_s3 %>%
          filter(total_amount > 100, year == 2015) %>%
          select(tip_amount, total_amount, passenger_count) %>%
          mutate(tip_pct = 100 * tip_amount / total_amount) %>%
          group_by(passenger_count) %>%
          summarize(median_tip_pct = median(tip_pct),
                    n = n()) %>%
          print() # is this necessary?
      • Partitioning allowed Arrow to bypass all files that weren’t in year 2015 directory and only perform calculation on those files therein.
  • Access Google Cloud Storage (GCS)