Streaming JSON input/output
The stream_in
and stream_out
functions implement line-by-line processing
of JSON data over a connection
, such as a socket, url, file or pipe. JSON
streaming requires the ndjson format, which slightly differs
from fromJSON
and toJSON
, see details.
stream_in(con, handler = NULL, pagesize = 500, verbose = TRUE, ...) stream_out(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...)
con |
a |
handler |
a custom function that is called on each page of JSON data. If not specified,
the default handler stores all pages and binds them into a single data frame that will be
returned by |
pagesize |
number of lines to read/write from/to the connection per iteration. |
verbose |
print some information on what is going on. |
... |
arguments for |
x |
object to be streamed out. Currently only data frames are supported. |
prefix |
string to write before each line (use |
Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done
using lines of minified JSON records, a.k.a. ndjson.
This is pretty standard: JSON databases such as dat
or MongoDB use the same format to import/export datasets. Note that this means that the
total stream combined is not valid JSON itself; only the individual lines are. Also note
that because line-breaks are used as separators, prettified JSON is not permitted: the
JSON lines must be minified. In this respect, the format is a bit different from
fromJSON
and toJSON
where all lines are part of a single JSON
structure with optional line breaks.
The handler
is a callback function which is called for each page (batch) of
JSON data with exactly one argument (usually a data frame with pagesize
rows).
If handler
is missing or NULL
, a default handler is used which stores all
intermediate pages of data, and at the very end binds all pages together into one single
data frame that is returned by stream_in
. When a custom handler
function
is specified, stream_in
does not store any intermediate results and always returns
NULL
. It is then up to the handler
to process or store data pages.
A handler
function that does not store intermediate results in memory (for
example by writing output to another connection) results in a pipeline that can process an
unlimited amount of data. See example.
Note that a vector of JSON strings already in R can parsed with stream_in
by
creating a connection to it with textConnection
.
If a connection is not opened yet, stream_in
and stream_out
will automatically open and later close the connection. Because R destroys connections
when they are closed, they cannot be reused. To use a single connection for multiple
calls to stream_in
or stream_out
, it needs to be opened
beforehand. See example.
The stream_out
function always returns NULL
.
When no custom handler is specified, stream_in
returns a data frame of all pages binded together.
When a custom handler function is specified, stream_in
always returns NULL
.
MongoDB export format: https://docs.mongodb.com/manual/reference/program/mongoexport/
Documentation for the JSON Lines text file format: https://jsonlines.org/
# compare formats x <- iris[1:3,] toJSON(x) stream_out(x) # Trivial example mydata <- stream_in(url("http://httpbin.org/stream/100")) ## Not run: #stream large dataset to file and back library(nycflights13) stream_out(flights, file(tmp <- tempfile())) flights2 <- stream_in(file(tmp)) unlink(tmp) all.equal(flights2, as.data.frame(flights)) # stream over HTTP diamonds2 <- stream_in(url("http://jeroen.github.io/data/diamonds.json")) # stream over HTTP with gzip compression flights3 <- stream_in(gzcon(url("http://jeroen.github.io/data/nycflights13.json.gz"))) all.equal(flights3, as.data.frame(flights)) # stream over HTTPS (HTTP+SSL) via curl library(curl) flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz"))) all.equal(flights4, as.data.frame(flights)) # or alternatively: flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz"))) all.equal(flights5, as.data.frame(flights)) # Full JSON IO stream from URL to file connection. # Calculate delays for flights over 1000 miles in batches of 5k library(dplyr) con_in <- gzcon(url("http://jeroen.github.io/data/nycflights13.json.gz")) con_out <- file(tmp <- tempfile(), open = "wb") stream_in(con_in, handler = function(df){ df <- dplyr::filter(df, distance > 1000) df <- dplyr::mutate(df, delta = dep_delay - arr_delay) stream_out(df, con_out, pagesize = 1000) }, pagesize = 5000) close(con_out) # stream it back in mydata <- stream_in(file(tmp)) nrow(mydata) unlink(tmp) # Data from http://openweathermap.org/current#bulk # Each row contains a nested data frame. daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50) subset(daily14, city$name == "Berlin")$data[[1]] # Or with dplyr: library(dplyr) daily14f <- flatten(daily14) filter(daily14f, city.name == "Berlin")$data[[1]] # Stream import large data from zip file tmp <- tempfile() download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp) companies <- stream_in(unz(tmp, "companies.json")) ## End(Not run)
Please choose more modern alternatives, such as Google Chrome or Mozilla Firefox.