Arrow
Misc
This note also contains code for parquet files that isn’t a part of Apache Arrow.
Resources
Packages
- {nanoarrow} - Lightweight Arrow library in C. Doesn’t have the overhead of the orignal Arrow library. Built to pull data out of small systems (e.g. streaming logs from an embedded system).
- {nanoparquet} - Reads and writes parquet files. No dependencies other than a C++-11 compiler. It compiles in about 30 seconds into an R package that is less than a megabyte in size. Built for use with smaller data sizes, < 1GB. (Also see limitations and article for an overview of capabilities)
- Supports all Parquet encodings and currently supports three compression codecs: Snappy, Gzip and Zstd
- Missing values are handled properly, factor columns are kept as factors, and temporal types are encoded correctly
- Functions that allow you to query the metadata and schema without reading in the full datase
- Can read a 250MB file with 32 million rows and 14 columns in about 10-15 secs on an M2.
Arrow/Feather format built for speed, not compression. so larger files than parquet
- Feature for short term storage and parquet for longer term storage
- The arrow format requires ~ten times more storage space.
- e.g. For
nyc-taxi
data set, parquet takes around ~38GB, but arrow would take around 380GB. Although with arrow, you could see ~10x speed increase in operations.
- e.g. For
Even with S3 support enabled, network speed will be a bottleneck unless your machine is located in the same AWS region as the data.
To create a multi-source dataset, provide a list of datasets to
open_dataset
instead of a file path, or simply concatenate them likebig_dataset <- c(ds1, ds2)
More verbose installation + get compression libraries and AWS S3 support
Sys.setenv( ARROW_R_DEV = TRUE, LIBARROW_MINIMAL = FALSE )install.packages("arrow")
- Installation takes some time, so this lets you monitor progress to make sure it isn’t locked.
Info about your Arrow installation -
arrow_info()
- Version, Compression libs, C++ lib, Runtime, etc.
-
- Has a script that downloads monthly released csv files; creates a Hive directory structure; and converts them to parquet files with partitioning based on that structure.
Optimizations
- Notes from How to properly prepare your parquet floor
- Make sure columns are correctly typed
- Sort by one or two columns
- A filtered query on a sorted field will be up to 10 times faster
- Sorting on one or two low cardinality fields (few distinct values) will be less computationally intensive and will achieve a more efficient compression.
- e.g. sort by year, then by geographic code
- For geoparquet, sort by a grid index like GeoHash or H3
- For geoparquet, add a bounding box column
- Geometries in a geoparquet can be encoded in WKB or GeoArrow with GeoArrow being more efficient
- GeoArrow integrated for >geoparquet 1.1
- Geometries in a geoparquet can be encoded in WKB or GeoArrow with GeoArrow being more efficient
- Serve already-built parquet files
- Some platforms offer on-the-fly generated parquet formats. The process is still horribly slow.
Statically hosted parquet files provide one of the easiest to use and most performant APIs for accessing bulk data, and are far simpler and cheaper to provide than custom APIs. (article)
- Benefits
- Loading a sample of rows (and viewing metadata) from an online parquet file is a simple one-liner.
- Serving data as a static file is probably the simplest and cheapest possible architecture for open data services. (e.g. s3)
- Data access will remain performant even if traffic is high, and the service will have very high reliability and availability. All of this is taken care of by the cloud provider.
- List of cases when are static files inappropriate
- Very large datasets of 10s of millions of records.
- In these cases it may be appropriate to offer an API that allows users to query the underlying data to return smaller subsets
- Your users want to make transactional or atomic requests. Then static files are inappropriate.
- Relational data with a complex schema.
- Private datasets with granular access control.
- Rapidly changing data to which users need immediate access.
- Very large datasets of 10s of millions of records.
- Recommendations
- Enable CORS
- Cross-Origin Resource Sharing (CORS) enables any website to load your data directly from source parquet files without the need for a server.
- Create a URL stucture
Example: Date Partition
www.my-organisation.com/open_data/v1/widgets_2021.parquet www.my-organisation.com/open_data/v1/widgets_2022.parquet www.my-organisation.com/open_data/v1/widgets_latest.parquet
Consider also providing the same structure but with csvs
- Provide directory listing service to enable data discovery and scraping
- Enable CORS
- Benefits
{{polars}} can write parquet files
import polars as pl "data_recensement_2017.csv", separator = ';', \ pl.read_csv(= {'COMMUNE': pl.String}) \ dtypes "data_recensement_2017.parquet", \ .write_parquet(= 'zstd', use_pyarrow = False) compression
APIs
- Single File
- Contains functions for each supported file format (CSV, JSON, Parquet, Feather/Arrow, ORC).
- Start with
read_
orwrite_
followed by the name of the file format. - e.g.
read_csv_arrow()
,read_parquet()
, andread_feather()
- Start with
- Works on one file at a time, and the data is loaded into memory.
- Depending on the size of your file and the amount of memory you have available on your system, it might not be possible to load the dataset this way.
- Example
- 111MB RAM used - Start of R session
- 135MB - Arrow package loaded
- 478MB - After using
read_csv_arrow("path/file.csv", as_data_frame = FALSE)
to load a 108 MB file- 525MB with “as_data_frame = TRUE” (data loaded as a dataframe rather than an Arrow table)
- Contains functions for each supported file format (CSV, JSON, Parquet, Feather/Arrow, ORC).
- Dataset
- Can read multiple file formats
- Can point to a folder with multiple files and create a dataset from them
- Can read datasets from multiple sources (even combining remote and local sources)
- Can be used to read single files that are too large to fit in memory.
- Data does NOT get loaded into memory
- Queries will be slower if the data is not in parquet format
- e.g.
dat <- open_dataset("~/dataset/path_to_file.csv")
- e.g.
Data Objects
Scalar - R doesn’t have a scalar class (only vectors)
$create(value, type) Scalar
Array and ChunkedArray
$create(..., type) ChunkedArray$create(vector, type) Array
- Only difference is that one can be chunked
RecordBatch and Table
$create(...) RecordBatch or Table
- Similar except Table can be chunked
Dataset - list of Tables with same schema
$create(sources, schema) Dataset
Data Types (
?decimal
) (Table$var$cast(decimal(3,2)
)int8()
, 16, 32, 64uint8()
, …float()
, 16, 32, 64halffloat()
bool()
,boolean()
utf8()
,large_utf8
binary()
,large_binary
,fixed_size_binary(byte_width)
string()
date32()
, 64time32(unit = c("ms", "s"))
, 64timestamp(unit, timezone)
decimal()
struct()
list_of()
,large_list_of()
,fixed_size_list_of()
Operations
read_csv_arrow(<csv_file>, as_data_frame = FALSE)
- Reads csv into memory as an Arrow table
- as_data_frame - if TRUE (default), reads into memory as a tibble which takes up more space instead of an Arrow Table
write_parquet
- compression
- Before using compression, ask:
- Will the parquet files be frequently accessed online (e.g. API)?
- In this situation, bandwidth may be a issue and a smaller (compressed) file would be desirable.
- Will the parquet files be accessed from a local disk?
- In this situation, the time spent decompressing the file to read it is probably not worth the decrease in file size.
- Will the parquet files be frequently accessed online (e.g. API)?
- default “snappy” - popular
- “uncompressed”
- “zstd” (z-standard)
- High performance from Google
- Compresses to smaller size than snappy
- Before using compression, ask:
- use_dictionary
- default TRUE - encode column types e.g. factor variables
- FALSE - increases file size dramatically (e.g. 9 kb to 86 kb)
- chunk_size
- How many rows per column (aka row group)
- The data is compressed per column, and inside each column, per chunk of rows, which is called the row group
- If the data has fewer than 250 million cells (rows x cols), then the total number of rows is used.
- Considerations
- The choice of compression can affect row group performance.
- Ensure row group size doesn’t exceed available memory for processing
- Very small row groups can lead to larger file sizes due to metadata overhead.
- If performing batch tasks, you want the largest file sizes possible
- e.g. Performing a sales analysis where you need the whole dataset.
- If accessing randomly, you might want smaller chunk sizes
- e.g. Looking up individual customer information in a large database of users.
- Docs recommend large row groups (512MB - 1GB)
- Optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file
- How many rows per column (aka row group)
- data_page_size
- Target threshold for the approximate encoded size of data pages within a column chunk
- Smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers)
- Default: 1MB; For sequential scans, the docs recommend 8KB for page sizes.
- compression
Convert large csv to parquet
<- read_csv_arrow( my_data "~/dataset/path_to_file.csv", as_data_frame = FALSE )write_parquet(data, "~/dataset/my-data.parquet") <- read_parquet("~/dataset/data.parquet", as_data_frame = FALSE) # loaded into memory as an Arrow table dat
- Reduces size of data stored substantially (e.g. 15 GB csv to 9.5 GB parquet)
Lazily download subsetted dataset from S3 and locally convert to parquet with partitions
= "data/nyc-taxi" data_nyc open_dataset("s3://voltrondata-labs-datasets/nyc-taxi") |> ::filter(year %in% 2012:2021) |> dplyrwrite_dataset(data_nyc, partitioning = c("year", "month"))
open_dataset
doesn’t used RAM, so subsetting a large dataset (e.g. 40GB) before writing is safe.format = “arrow” also available
Partitioning
Partitioning increases the number of files and it creates a directory structure around the files.
Hive Partitioning - Folder/file structure based on partition keys (i.e. grouping variable). Within each folder, the key has a value determined by the name of the folder. By partitioning the data in this way, it makes it faster to do queries on data slices.
Example: Folder structure when partitioning on year and month
taxi-data year=2018 month=01 file01.parquet month=02 file02.parquet file03.parquet ... year=2019 month=01 ...
Pros
- Allows Arrow to construct a more efficient query
- Can be read and written with parallelism
Cons
- Each additional file adds a little overhead in processing for filesystem interaction
- Can increase the overall dataset size since each file has some shared metadata
Best Practices
View metadata of a partitioned dataset
<- open_dataset("airquality_partitioned_deeper") air_data # View data air_data ## FileSystemDataset with 153 Parquet files ## Ozone: int32 ## Solar.R: int32 ## Wind: double ## Temp: int32 ## Month: int32 ## Day: int32 ## ## See $metadata for additional Schema metadata
- This is a “dataset” type so data won’t be read into memory
- Assume
$metadata
will indicate which columns the dataset is partitioned by
Partition a large file and write to arrow format
<- open_dataset(<file_path>, format = "csv") lrg_file %>% lrg_file group_by(var) %>% write_dataset(<output_dir>, format = "feather")
- Pass the file path to
open_dataset()
- Use
group_by()
to partition the Dataset into manageable chunks- Can also use partitioning in
write_dataset
- Can also use partitioning in
- Use
write_dataset()
to write each chunk to a separate Parquet file—all without needing to read the full CSV file into R open_dataset
is fast because it only reads the metadata of the file system to determine how it can construct queries
- Pass the file path to
Partition Columns
Preferrably chosen based on how you expect to use the data (e.g. important group variables)
Example: partition on county because your analysis or transformations will largely be done by county even though since some counties may be much larger than others and will cause the partitions to be substantially imbalanced.
If there is no obvious column, partitioning can be dictated by a maximum number of rows per partition
write_dataset( data,format = "parquet", path = "~/datasets/my-data/", max_rows_per_file = 1e7 )<- open_dataset("~/datasets/my-data") dat
- Files can get very large without a row count cap, leading to out-of-memory errors in downstream readers.
- Relationship between row count and file size depends on the dataset schema and how well compressed (if at all) the data is
- Other ways to control file size.
- “max_rows_per_group” - splits up large incoming batches into multiple row groups.
- If this value is set then “min_rows_per_group” should also be set or else you may end up with very small row groups (e.g. if the incoming row group size is just barely larger than this value).
- “max_rows_per_group” - splits up large incoming batches into multiple row groups.
Fixed Precision Decimal Numbers
Computers don’t store exact representations of numbers, so there are floating point errors in calculations. Doesn’t usually matter in analysis, but it can matter in transaction-based operations.
<- tibble(amount = c(0.1, 0.1, 0.1, -0.3)) %>% txns summarize(balance = sum(amount, na.rm = TRUE # Should be 0 txns# 5.55e-17
The accumulation of these errors can be costly.
Arrow can fix this with fixed precision decimals
# arrow table (c++ library) # collect() changes it to a df <- Table$create(amount = c(0.1, 0.1, 0.1, -0.3)) txns $amount <- txns$amount$cast(decimal(3,2)) txns txns# blah, blah, decimal128, blah write_parquet(txns, "data/txns_decimal.parquet") <- spark_read_parquet("data/txns_decimal.parquet") txns %>% txns summarize(balance = sum(ammount, na.rm = T)) # balance # 0
Queries
Example: Filter partitioned files
library(dbplyr) # iris dataset was written and partitioned to a directory path stored in dir_out <- arrow::open_dataset(dir_out, partitioning = "species") ds # query the dataset %>% ds filter(species == "species=setosa") %>% count(sepal_length) %>% collect()
format “<partition_variable>=<partition_value>”
compute
stores the result in Arrowcollect
brings the result into R
Example: libarrow functions
%>% arrowmagicks mutate(days = arrow_days_between(start_date, air_date)) %>% collect()
- “days_between” is a function in libarrow but not in {arrow}. In order to use it, you only have to put the “arrow_” prefix in front of it.
- Use
list_compute_functions
to get a list of the available functions- List of potential functions available (libarrow function reference)
When the query is also larger than memory
library(arrow) library(dplyr) <- open_dataset("nyc-taxi/") nyc_taxi |> nyc_taxi filter(payment_type == "Credit card") |> group_by(year, month) |> write_dataset("nyc-taxi-credit")
- In the example, the input is 1.7 billion rows (70GB), output is 500 million (15GB). Takes 3-4 mins.
-
register_scalar_function
- accepts base R functions inside your function
DuckDB
Converting a csv to parquet
SET threads = 4 ; COPY 'data_recensement_2017.csv' TO 'data_recensement_2017.parquet' (compression zstd) ;
threads is set to the number of threads available by default
Will automatically detect the type of delimiter
Query
<- duckdb:::sql("FROM 'file.parquet'") df
Export a DuckDB database to parquet
<- duckdb::duckdb() drv <- DBI::dbConnect(drv) con on.exit(DBI::dbDisconnect(con), add = TRUE) # create duckdb table ::dbWriteTable(con, "mtcars", mtcars) DBI ::dbExecute(con, DBI::sqlInterpolate(con, DBI"COPY mtcars TO ?filename (FORMAT 'parquet', COMPRESSION 'snappy')", filename = 'mtcars.parquet' ))
Cloud
Access files in Amazon S3 (works for all file types)
<- read_parquet("s3://ursa-labs-taxi-data/2013/12/data.parquet) taxi_s3 # multiple files ds_s3 <- open_dataset(s3://ursa-labs-taxi-data/", partitioning = c("year", "month"))
As of 2021, only works for Amazon uri
read_parquet
can take a minute to loadYou can see the folder structure in the read_parquet S3 uri
Example Query
# over 125 files and 30GB %>% ds_s3 filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% mutate(tip_pct = 100 * tip_amount / total_amount) %>% group_by(passenger_count) %>% summarize(median_tip_pct = median(tip_pct), n = n()) %>% print() # is this necessary?
- Partitioning allowed Arrow to bypass all files that weren’t in year 2015 directory and only perform calculation on those files therein.
Access Google Cloud Storage (GCS)