APIs
Misc
- Definition
- REST API
- Packages
- {{requests-ratelimiter}} - A simple wrapper around pyrate-limiter v2 that adds convenient integration with the requests library
- {beekeeper} - Used to create and maintain R packages that wrap APIs.
- Design questions
- Should the API receive the entire datapoint (e.g sensitve customer info) or just an ID for you to query in a database itself?
- Where should the model be loaded from? Disk? Cloud? (see Production, Deployment >> Model Deployment Strategies)
- What diagnostic output should be returned along with result?
- Use CI/CD to unit test, rebuild, and deploy the API every time there’s a push a commit to the production branch of your repo.
- Best Practices Thread
- Versioning
- IDs vs UUIDs
- Nested resources
- JSON API
- Let the client decide what it wants
- Important to create unit tests to use before code goes into production
- Test all endpoints
- Check data types
- {testthat}
Example
library(testthat) source("rest_controller.R") testthat("output is a probability", { <- list(id = 123, name = "Ralph") input <- make_prediction(input) result expect_gte(result, 0) expect_lte(result, 1) })
- The only difference here between GET and POST is that you can’t put parameters and their values in the URL for POST. The parameters and values are passed in the request body as JSON.
- GET is a request for data from a server POST sends data to a server and also can receive data.
- An IO-bound task spends most of its time waiting for IO responses, which can be responses from webpages, databases, or disks. For web development where a request needs to fetch data from APIs or databases, it’s an IO-bound task and concurrency can be achieved with either threading or async/await to minimize the waiting time from external resources.
Terms
- Async/Await - Unlike threading where the OS has control, with this method, we can decide which part of the code can be awaited and thus control can be switched to run other parts of the code. The tasks need to cooperate and announce when the control will be switched out. And all this is done in a single thread with the
await
command. (article) - Body - information that is sent to the server. (Can’t use with GET requests.)
- Endpoint - a part of the URL you visit. For example, the endpoint of the URL https://example.com/predict is /predict
- Headers - used for providing information (think authentication credentials, for example). They are provided as key-value pairs
- Method - a type of request you’re sending, can be either GET, POST, PUT, PATCH, and DELETE. They are used to perform one of these actions: Create, Read, Update, Delete (CRUD)
- Pooled Requests - A technique where multiple individual requests are combined or “pooled” into a single API call. May require more complex error handling, as you’ll need to manage partial successes or failures within the pooled request
Methods
- Batch endpoints: Some APIs offer specific endpoints designed to handle multiple operations in a single call.
- Request bundling: Clients can aggregate multiple requests into a single payload before sending it to the API.
- Threading - Uses multiple threads and takes turns to run the code. It achieves concurrency with pre-emptive multitasking which means we cannot determine when to run which code in which thread. It’s the operating system that determines which code should be run in which thread. The control can be switched at any point between threads by the operating system. This is why we often see random results with threading (article)
Methods
Misc
- If you’re writing a function or script, you should check whether the status code is in the 200s before additional code runs.
- HTTP 429 - Too Many Requests
GET
# example 1 <- list(key = "<key>", id = "<id>", format = "json", output = "full", count = "2") args <- GET(url = URL, query = args) api_json # example 2 (with headers) = GET("https://api.helium.io/v1/dc_burns/sum", res query = list(min_time = "2020-07-27T00:00:00Z" max_time = "2021-07-27T00:00:00Z"), , add_headers(`Accept`='application/json' `Connection`='keep-live')) , # example 3 <- function(this_title, this_author = NA){ get_book ::GET( httrurl = url, query = list( apikey = Sys.getenv("ACCUWEATHER_KEY"), q = ifelse( is.na(this_author), ::glue('intitle:{this_title}'), glue::glue('intitle:{this_title}+inauthor:{this_author}') glue ))) }
POST
# base_url from get_url above <- "https://tableau.bi.iu.edu/" base_url <- dashsite_json$vizql_root vizql <- dashsite_json$sessionid session_id <- dashsite_json$sheetId sheet_id <- glue("{base_url}{vizql}/bootstrapSession/sessions/{session_id}") post_url <- POST(post_url, dash_api_output body = list(sheet_id = sheet_id), encode = "form", timeout(300))
Example: Pull parsed json from raw format
<- paste0("http://dataservice.accuweather.com/forecasts/", my_url "v1/daily/1day/571_pc?apikey=", Sys.getenv("ACCUWEATHER_KEY")) <- httr::GET(my_url) my_raw_result <- httr::content(my_raw_result, as = 'text') my_content ::glimpse(my_content) #get a sense of the structure dplyr<- jsonlite::fromJSON(my_content) dat
content
has 3 option for extracting and converting the content of the GET output.“raw” output asis
“text” can be easiest to work with for nested json
“parsed” is a list
-
- From thread
- “use auto_unbox = TRUE; otherwise there are some defaults that mess with your API format”
- “url” is the api endpoint (obtain from api docs)
- headers
{httr2}
- POST
- Contacts Home Assistant API and turns off a light.
- Paginated Requests
- Example: (source)
request_complete
checks the response to see whether another is needed.req_perform_iterative
is added to the request, giving it a canned iterator that takes your function and bumps a query parameter (page for this API) every time you do need another request
- Example: (source)
{plumber}
- Serves R objects as an API
- 3 Main Components: Function Definition, Request Type, API Endpoint
- Misc
- Adding `host = “0.0.0.0” to
run_pr()
opens the API to external traffic - {valve} - Auto-scales plumber APIs concurrently using Rust libraries Axum, Tokio, and Deadpool — similar to how gunicorn auto-scales fastapi and Flask apps
- Adding `host = “0.0.0.0” to
- Cloud options for serving Plumber APIs
Install everything on an Amazon EC2 instance
Using a Docker image
Saturn Cloud Deployments
Google Cloud Run
Docker/Kubernetes
Managed Solutions
- RStudio Connect
- Digital Ocean
- Load Testing
- {loadtest}
Test how your API performs under various load scenarios
Outputs tibble of various measurements
Example:
library(loadtest) <- loadtest(url = <api_url>, method = "GET", threads = 200, loops = 1000) results
- Says simulate 200 users hitting the API 1000 times
- {loadtest}
- Documentation
- Plumber creates an OpenAPI (aka Swagger) YAML file that documents parameters, tags, description, etc. automatically for users to know how to use your API
- Access
- View webui, e.g .(http://127.0.0.1:9251/__docs__/)
- Edit the yaml
- e.g. (http://127.0.0.1:9251/openapi.json)
- Scaling
- Natively can only handle 1 request at a time
- {valve} - Parallelize your plumber APIs. Redirects your plumbing for you.
- {future} - can be used to spawn more R processes to handle multiple requests
Resource: Rstudio Global 2021
Example
# rest_controller.R ::plan("multisession") future @* @post /make-prediction <- function (req) { make_prediction ::future({ future<- req$body user_info <- clean_data(user_info) # sourced helper function df_user <- predict(model, data = df_user) result result }) }
- Logging
- Useful for debugging, monitoring performance, monitoring usage
- Provides data for ML monitoring to alert in case of data/model drift
- {logger}
Example:
#* @post /make-prediction <- function(req) { make_predicition <- req$body user_info <- clean_data(user_info) # sourced helper function df_user <- predict(model, data = df_user) result ::log_info(glue("predicted_{user_info$id}_[{result}]{style='color: #990000'}")) logger::s3save(data.frame(id = user_info$id, result = result), ...) aws.s3 result }
- Example: Basic Get request
rest_controller.R
#* @get /sum function(a, b) { as.numeric(a) + as.numeric(b) }
- “/sum” is an endpoint
Run Plumber on rest_controller.R
::pr("rest_controller.R") %>% plumber::pr_run(port = 80) plumber
- 80 is a standard browser port
Get the sum of 1 + 2 by sending a Get request
- Type “127.0.0.1/sum?a=1&b=2” into your browser
httr::GET("127.0.0.1/sum?a=1&b=2")
- Example: Basic Model Serving
rest_controller.R
source("helper_functions.R") library(tidyverse) <- read_rds("trained_model.rds") model #* @post /make-prediction <- function(req) { make_predicition <- req$body user_info <- clean_data(user_info) # sourced helper function df_user <- predict(model, data = df_user) result result }
{{requests}}
- Use
Session
to make a pooled request to the same host (Video, Docs)Example
import pathlib import requests = pathilib.Path.cwd() / "links.txt" links_file = links_file.read_text().splitlines()[:10] links = {"User-Agent": "Mozilla/5.0 (X!!; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0} headers # W/o Session (takes about 16sec) for link in links: = requests.get(link, headers=headers) response print(f"{link} - {response.status_code}") # W/Session (takes about 6sec) with requests.Session() as session: for link in links: = session.get(link, headers=headers) response print(f"{link} - {response.status_code}")
- The first way syncronously makes a get request to each URL
- Makes several requests to the same host
- The second way reuses the underlying TCP connection, which can result in a significant performance increase.
- The first way syncronously makes a get request to each URL
- Retrieve Paged Results One at a Time
Generator
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: = requests.Session() session = 1 page while True: = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ response 'page': page, 'per_page': page_size })) response.raise_for_status() = response.json() data if not data: break yield from data += 1 page
Iterate through each page of results
= iter_beers_from_api() beers next(beers) #> {'id': 1, #> 'name': 'Buzz', #> 'tagline': 'A Real Bitter Experience.', #> 'first_brewed': '09/2007', #> 'description': 'A light, crisp and bitter IPA brewed...', #> 'image_url': 'https://images.punkapi.com/v2/keg.png', #> 'abv': 4.5, #> 'ibu': 60, #> 'target_fg': 1010, #> ... #> } next(beers) #> {'id': 2, #> 'name': 'Trashy Blonde', #> 'tagline': "You Know You Shouldn't", #> 'first_brewed': '04/2008', #> 'description': 'A titillating, ...', #> 'image_url': 'https://images.punkapi.com/v2/2.png', #> 'abv': 4.1, #> 'ibu': 41.5, #> ... #> }
- Use Concurrency
Use threads on your computer to make requests at the same time. It’s essentially parallelism.
Example (source)
import requests from concurrent.futures import ThreadPoolExecutor, as_completed from requests_ratelimiter import LimiterSession # Limit to max 2 calls per second = LimiterSession(per_second=2) request_session def get_post(post_id: int) -> dict: if post_id > 100: raise ValueError("Parameter `post_id` must be less than or equal to 100") = f"https://jsonplaceholder.typicode.com/posts/{post_id}" url # Use the request_session now = request_session.get(url) r r.raise_for_status()= r.json() result # Remove the longest key-value pair for formatting reasons del result["body"] return result if __name__ == "__main__": print("Starting to fetch posts...\n") # Run post fetching concurrently with ThreadPoolExecutor() as tpe: # Submit tasks and get future objects = [tpe.submit(get_post, post_id) for post_id in range(1, 16)] futures for future in as_completed(futures): # Your typical try/except block try: = future.result() result print(result) except Exception as e: print(f"Exception raised: {str(e)}") future.add_done_callback(future_callback_fn)= future.result() result print(result)
ThreadPoolExecutor
class manages a pool of worker threads for you- Number of CPUs + 4, e.g. 12 CPU cores means 16
ThreadPoolExecutor
workers
- Number of CPUs + 4, e.g. 12 CPU cores means 16
- Uses a standard
try
/except
to handle errors. Errors don’t stop code from completing the other requests future.add_done_callback
calls your custom Python function. This function will have access to theFuture
object
- Using API keys
Example (source)
import requests from requests.auth import HTTPBasicAuth import json = "ivelasq@gmail.com" username = r.api_key api_key = "https://ivelasq.atlassian.net/rest/api/3/search?jql=project%20=%20KAN%20AND%20text%20~%20%22\%22social\%22%22" social_url = "https://ivelasq.atlassian.net/rest/api/3/search?jql=project%20=%20KAN%20AND%20text%20~%20%22\%22blog\%22%22" blog_url def get_response_from_url(url, username, api_key): = HTTPBasicAuth(username, api_key) auth = { headers "Accept": "application/json" } = requests.request("GET", url, headers=headers, auth=auth) response if response.status_code == 200: = json.dumps(json.loads(response.text), sort_keys=True, indent=4, separators=(",", ": ")) results return results else: return None = get_response_from_url(social_url, username, api_key) social_results = get_response_from_url(blog_url, username, api_key) blog_results
{{http.client}}
The Requests package is recommended for a higher-level HTTP client interface.
Example 1: Basic GET
import http.client import json = http.client.HTTPSConnection("api.example.com") conn "GET", "/data") conn.request(= conn.getresponse() response = json.loads(response.read().decode()) data conn.close()
Example 2:
GET
import http.client = '/fdsnws/event/1/query' url = { query_params 'format': 'geojson', 'starttime': "2020-01-01", 'limit': '10000', 'minmagnitude': 3, 'maxlatitude': '47.009499', 'minlatitude': '32.5295236', 'maxlongitude': '-114.1307816', 'minlongitude': '-124.482003', }= f'https://earthquake.usgs.gov{url}?{"&".join(f"{key}={value}" for key, value in query_params.items())}' full_url print('defined params...') = http.client.HTTPSConnection('earthquake.usgs.gov') conn 'GET', full_url) conn.request(= conn.getresponse() response
JSON response
import pandas as pd import json if response.status == 200: print('Got a response.') = response.read() data print('made the GET request...') = data.decode('utf-8') data = json.loads(data) json_data = json_data['features'] features = pd.json_normalize(features) df if df.empty: print('No earthquakes recorded.') else: 'Longitude', 'Latitude', 'Depth']] = df['geometry.coordinates'].apply(lambda x: pd.Series(x)) df[['datetime'] = df['properties.time'].apply(lambda x : datetime.datetime.fromtimestamp(x / 1000)) df['datetime'] = df['datetime'].astype(str) df[=['datetime'], inplace=True) df.sort_values(byelse: print(f"Error: {response.status}")