Log In

Blog

Big Data Processing in Simpl.js

posted November 28, 2016, by Klaus

Simpl.js can be used to process large data sets that don’t fit into memory, but doing so requires stream processing features and techniques described here.

Online Data Sources

HTTP is itself a streaming protocol. When chunked transfer encoding is used, HTTP does not require Content-Length to be known or specified in an exchange header. In addition, the HTTP Range header supported by many web servers allows media to be downloaded piecemeal, and thus facilitates the parallel processing of data formats that otherwise allow it.

Importantly, the new fetch API standard in browsers allows streaming HTTP data to be read and processed as such in javascript, meaning that an entire response will not be buffered in memory. Below is some sample code that uses the fetch response’s reader stream to parse a raw byte stream into JSON. This large file contains geojson data of zip code regions in the United States.

var parser = jsonstream(function(feature) {
  // TODO: process feature object
}, {parentPath: 'features'});

fetch('https://raw.githubusercontent.com/jgoodall/us-maps/master/geojson/zcta5.json').then(function(response) {
  if (!response.ok) return console.error(response.statusText);

  var reader = response.body.getReader();

  reader.read().then(function process(part) {
    if (part.done) return console.log('done');
    parser.write(part.value.buffer);
    reader.read().then(process);
  });
});

Notice that reader.read() and process are called repeatedly, and the resulting ArrayBuffer pieces are fed into jsonstream, a streaming JSON parser, which is configured to emit each child of the JSON document’s features key in turn. This is a complete streaming pipeline for processing large JSON documents in Simpl.js and in the browser.

Large File Uploads

To process data from a large file in a filesystem, Simpl.js supports the same streaming functionality through a web-based form file upload.

A modern, javascript-enabled client can send a File as the body of an XMLHttpRequest, making it unnecessary to parse a multipart request body.

<form method="post" action="/upload">
  <input type="file" name="file">
  <button>Upload</button>
</form>
<script>
  document.forms[0].onsubmit = function(e) {
    e.preventDefault();

    var file = this.file.files[0];
    if (!file) return;

    // Use request.upload.onprogress for better UX with large files
    var request = new XMLHttpRequest();
    request.open('POST', '/upload');
    request.send(file);
  };
</script>

The back-end could then simply forward its bytes to a stream parser as before:

if (request.path == '/upload')
  return function(data, remaining) {
    if (remaining) {
      parser.write(data);
    } else {
      parser.close();
      response.ok();
    }
  };

To accept uploads without client-side javascript, use a streaming multipart/form-data parser like formdata

<form method="post" action="/upload" enctype="multipart/form-data">
  <input type="file" name="file">
  <button>Upload</button>
</form>
if (request.path == '/upload')
  return formdata(request, function(data, headers, name) {
    if (name == 'file')
      parser.write(data);
  }, function(error) {
    parser.close();
    response.generic(error || 200);
  });

Throttling Input Streams

When performing slow or asynchronous work on large file streams, it is necessary to throttle incoming data by repeatedly pausing and unpausing the connection socket. This prevents new incoming data from exhausting available memory and system resources while previous data is being processed. Below is an example of a JSON stream parser configured to load values into a database while using a counter to effectively throttle incoming traffic.

var counter = 0;

var parser = jsonstream(function(value, key) {
  if (!counter++)
    response.socket.setPaused(true);
  db.put(key, value).then(function() {
    if (!--counter)
      response.socket.setPaused(false);
  });
}, {parentPath: 'features'});

A similar effect can be achieved when streaming input from a call to fetch: simply use a counter to issue subsequent calls to reader.read().