Arrow
Misc
This note also contains code for parquet files that isn’t a part of Apache Arrow.
Resources
Packages
- {nanoarrow} - Lightweight Arrow library in C. Doesn’t have the overhead of the orignal Arrow library. Built to pull data out of small systems (e.g. streaming logs from an embedded system).
- {nanoparquet} - Reads and writes parquet files. No dependencies other than a C++-11 compiler. It compiles in about 30 seconds into an R package that is less than a megabyte in size. Built for use with smaller data sizes, < 1GB. (Also see limitations and article for an overview of capabilities)
- Supports all Parquet encodings and currently supports three compression codecs: Snappy, Gzip and Zstd
- Missing values are handled properly, factor columns are kept as factors, and temporal types are encoded correctly
- Functions that allow you to query the metadata and schema without reading in the full datase
- Can read a 250MB file with 32 million rows and 14 columns in about 10-15 secs on an M2.
Arrow/Feather format built for speed, not compression. so larger files than parquet
- Feature for short term storage and parquet for longer term storage
- The arrow format requires ~ten times more storage space.
- e.g. For
nyc-taxi
data set, parquet takes around ~38GB, but arrow would take around 380GB. Although with arrow, you could see ~10x speed increase in operations.
- e.g. For
Even with S3 support enabled, network speed will be a bottleneck unless your machine is located in the same AWS region as the data.
To create a multi-source dataset, provide a list of datasets to
open_dataset
instead of a file path, or simply concatenate them likebig_dataset <- c(ds1, ds2)
More verbose installation + get compression libraries and AWS S3 support
Sys.setenv( ARROW_R_DEV = TRUE, LIBARROW_MINIMAL = FALSE )install.packages("arrow")
- Installation takes some time, so this lets you monitor progress to make sure it isn’t locked.
Info about your Arrow installation -
arrow_info()
- Version, Compression libs, C++ lib, Runtime, etc.
-
- Has a script that downloads monthly released csv files; creates a Hive directory structure; and converts them to parquet files with partitioning based on that structure.
Statically hosted parquet files provide one of the easiest to use and most performant APIs for accessing bulk¹ data, and are far simpler and cheaper to provide than custom APIs. (article)
- Pros and cons
- List of cases when are static files inappropriate
APIs
- Single File
- Contains functions for each supported file format (CSV, JSON, Parquet, Feather/Arrow, ORC).
- Start with
read_
orwrite_
followed by the name of the file format. - e.g.
read_csv_arrow()
,read_parquet()
, andread_feather()
- Start with
- Works on one file at a time, and the data is loaded into memory.
- Depending on the size of your file and the amount of memory you have available on your system, it might not be possible to load the dataset this way.
- Example
- 111MB RAM used - Start of R session
- 135MB - Arrow package loaded
- 478MB - After using
read_csv_arrow("path/file.csv", as_data_frame = FALSE)
to load a 108 MB file- 525MB with “as_data_frame = TRUE” (data loaded as a dataframe rather than an Arrow table)
- Contains functions for each supported file format (CSV, JSON, Parquet, Feather/Arrow, ORC).
- Dataset
- Can read multiple file formats
- Can point to a folder with multiple files and create a dataset from them
- Can read datasets from multiple sources (even combining remote and local sources)
- Can be used to read single files that are too large to fit in memory.
- Data does NOT get loaded into memory
- Queries will be slower if the data is not in parquet format
- e.g.
dat <- open_dataset("~/dataset/path_to_file.csv")
- e.g.
Data Objects
Scalar - R doesn’t have a scalar class (only vectors)
$create(value, type) Scalar
Array and ChunkedArray
$create(..., type) ChunkedArray$create(vector, type) Array
- Only difference is that one can be chunked
RecordBatch and Table
$create(...) RecordBatch or Table
- Similar except Table can be chunked
Dataset - list of Tables with same schema
$create(sources, schema) Dataset
Data Types (
?decimal
) (Table$var$cast(decimal(3,2)
)int8()
, 16, 32, 64uint8()
, …float()
, 16, 32, 64halffloat()
bool()
,boolean()
utf8()
,large_utf8
binary()
,large_binary
,fixed_size_binary(byte_width)
string()
date32()
, 64time32(unit = c("ms", "s"))
, 64timestamp(unit, timezone)
decimal()
struct()
list_of()
,large_list_of()
,fixed_size_list_of()
Operations
read_csv_arrow(<csv_file>, as_data_frame = FALSE)
- Reads csv into memory as an Arrow table
- as_data_frame - if TRUE (default), reads into memory as a tibble which takes up more space instead of an Arrow Table
write_parquet
- compression
- default “snappy” - popular
- “uncompressed”
- “zstd” (z-standard)
- High performance from Google
- Compresses to smaller size than snappy
- use_dictionary
- default TRUE - encode column types e.g. factor variables
- FALSE - increases file size dramatically (e.g. 9 kb to 86 kb)
- chunk_size
- How many rows per column (aka row group)
- The data is compressed per column, and inside each column, per chunk of rows, which is called the row group
- If the data has fewer than 250 million cells (rows x cols), then the total number of rows is used.
- If performing batch tasks, you want the largest file sizes possible
- if accessing randomly (?), you might want smaller chunck sizes
- How many rows per column (aka row group)
- compression
Convert large csv to parquet
<- read_csv_arrow( my_data "~/dataset/path_to_file.csv", as_data_frame = FALSE )write_parquet(data, "~/dataset/my-data.parquet") <- read_parquet("~/dataset/data.parquet", as_data_frame = FALSE) # loaded into memory as an Arrow table dat
- Reduces size of data stored substantially (e.g. 15 GB csv to 9.5 GB parquet)
Lazily download subsetted dataset from S3 and locally convert to parquet with partitions
= "data/nyc-taxi" data_nyc open_dataset("s3://voltrondata-labs-datasets/nyc-taxi") |> ::filter(year %in% 2012:2021) |> dplyrwrite_dataset(data_nyc, partitioning = c("year", "month"))
open_dataset
doesn’t used RAM, so subsetting a large dataset (e.g. 40GB) before writing is safe.format = “arrow” also available
Partitioning
Partitioning increases the number of files and it creates a directory structure around the files.
Hive Partitioning - Folder/file structure based on partition keys (i.e. grouping variable). Within each folder, the key has a value determined by the name of the folder. By partitioning the data in this way, it makes it faster to do queries on data slices.
Example: Folder structure when partitioning on year and month
taxi-data year=2018 month=01 file01.parquet month=02 file02.parquet file03.parquet ... year=2019 month=01 ...
Pros
- Allows Arrow to construct a more efficient query
- Can be read and written with parallelism
Cons
- Each additional file adds a little overhead in processing for filesystem interaction
- Can increase the overall dataset size since each file has some shared metadata
Best Practices
- Avoid having individual Parquet files smaller than 20MB and larger than 2GB.
- Having files beyond this range will cancel out the benefit of your query grouping by a partition column. (see article for benchmarks)
- Avoid partitioning layouts with more than 10,000 distinct partitions.
- Optimal Size is 512MB — 1GB (docs)
- Avoid having individual Parquet files smaller than 20MB and larger than 2GB.
View metadata of a partitioned dataset
<- open_dataset("airquality_partitioned_deeper") air_data # View data air_data ## FileSystemDataset with 153 Parquet files ## Ozone: int32 ## Solar.R: int32 ## Wind: double ## Temp: int32 ## Month: int32 ## Day: int32 ## ## See $metadata for additional Schema metadata
- This is a “dataset” type so data won’t be read into memory
- Assume
$metadata
will indicate which columns the dataset is partitioned by
Partition a large file and write to arrow format
<- open_dataset(<file_path>, format = "csv") lrg_file %>% lrg_file group_by(var) %>% write_dataset(<output_dir>, format = "feather")
- Pass the file path to
open_dataset()
- Use
group_by()
to partition the Dataset into manageable chunks- Can also use partitioning in
write_dataset
- Can also use partitioning in
- Use
write_dataset()
to write each chunk to a separate Parquet file—all without needing to read the full CSV file into R open_dataset
is fast because it only reads the metadata of the file system to determine how it can construct queries
- Pass the file path to
Partition Columns
Preferrably chosen based on how you expect to use the data (e.g. important group variables)
Example: partition on county because your analysis or transformations will largely be done by county even though since some counties may be much larger than others and will cause the partitions to be substantially imbalanced.
If there is no obvious column, partitioning can be dictated by a maximum number of rows per partition
write_dataset( data,format = "parquet", path = "~/datasets/my-data/", max_rows_per_file = 1e7 )<- open_dataset("~/datasets/my-data") dat
- Files can get very large without a row count cap, leading to out-of-memory errors in downstream readers.
- Relationship between row count and file size depends on the dataset schema and how well compressed (if at all) the data is
- Other ways to control file size.
- “max_rows_per_group” - splits up large incoming batches into multiple row groups.
- If this value is set then “min_rows_per_group” should also be set or else you may end up with very small row groups (e.g. if the incoming row group size is just barely larger than this value).
- “max_rows_per_group” - splits up large incoming batches into multiple row groups.
Fixed Precision Decimal Numbers
Computers don’t store exact representations of numbers, so there are floating point errors in calculations. Doesn’t usually matter in analysis, but it can matter in transaction-based operations.
<- tibble(amount = c(0.1, 0.1, 0.1, -0.3)) %>% txns summarize(balance = sum(amount, na.rm = TRUE # Should be 0 txns# 5.55e-17
The accumulation of these errors can be costly.
Arrow can fix this with fixed precision decimals
# arrow table (c++ library) # collect() changes it to a df <- Table$create(amount = c(0.1, 0.1, 0.1, -0.3)) txns $amount <- txns$amount$cast(decimal(3,2)) txns txns# blah, blah, decimal128, blah write_parquet(txns, "data/txns_decimal.parquet") <- spark_read_parquet("data/txns_decimal.parquet") txns %>% txns summarize(balance = sum(ammount, na.rm = T)) # balance # 0
Queries
Example: Filter partitioned files
library(dbplyr) # iris dataset was written and partitioned to a directory path stored in dir_out <- arrow::open_dataset(dir_out, partitioning = "species") ds # query the dataset %>% ds filter(species == "species=setosa") %>% count(sepal_length) %>% collect()
format “<partition_variable>=<partition_value>”
compute
stores the result in Arrowcollect
brings the result into R
Example: libarrow functions
%>% arrowmagicks mutate(days = arrow_days_between(start_date, air_date)) %>% collect()
- “days_between” is a function in libarrow but not in {arrow}. In order to use it, you only have to put the “arrow_” prefix in front of it.
- Use
list_compute_functions
to get a list of the available functions- List of potential functions available (libarrow function reference)
When the query is also larger than memory
library(arrow) library(dplyr) <- open_dataset("nyc-taxi/") nyc_taxi |> nyc_taxi filter(payment_type == "Credit card") |> group_by(year, month) |> write_dataset("nyc-taxi-credit")
- In the example, the input is 1.7 billion rows (70GB), output is 500 million (15GB). Takes 3-4 mins.
-
register_scalar_function
- accepts base R functions inside your function
DuckDB
Query
<- duckdb:::sql("FROM 'file.parquet'") df
Export a DuckDB database to parquet
<- duckdb::duckdb() drv <- DBI::dbConnect(drv) con on.exit(DBI::dbDisconnect(con), add = TRUE) # create duckdb table ::dbWriteTable(con, "mtcars", mtcars) DBI ::dbExecute(con, DBI::sqlInterpolate(con, DBI"COPY mtcars TO ?filename (FORMAT 'parquet', COMPRESSION 'snappy')", filename = 'mtcars.parquet' ))
Cloud
Access files in Amazon S3 (works for all file types)
<- read_parquet("s3://ursa-labs-taxi-data/2013/12/data.parquet) taxi_s3 # multiple files ds_s3 <- open_dataset(s3://ursa-labs-taxi-data/", partitioning = c("year", "month"))
As of 2021, only works for Amazon uri
read_parquet
can take a minute to loadYou can see the folder structure in the read_parquet S3 uri
Example Query
# over 125 files and 30GB %>% ds_s3 filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% mutate(tip_pct = 100 * tip_amount / total_amount) %>% group_by(passenger_count) %>% summarize(median_tip_pct = median(tip_pct), n = n()) %>% print() # is this necessary?
- Partitioning allowed Arrow to bypass all files that weren’t in year 2015 directory and only perform calculation on those files therein.
Access Google Cloud Storage (GCS)