A while ago, I was working on a way to import large datasets in a hobby project of mine Biologer. The user selects a CSV file, maps columns to supported fields so the app can tell what is what, and submits it.
The import then goes through several stages:
- First, the CSV file is read, columns are mapped, and saved to a JSON file. This allows us to not worry about parsing the data again in the following steps.
- In the second step the JSON file is read and each item from the collection is validated against the defined rules. Finally, if there are no validation errors the data is read from the JSON file again and saved to the database. If there are any validation errors, we don't want to save anything to the database, we want to present all of the errors for each row. Validation errors are saved to different JSON file so they can be fetched later from the frontend without additional processing by the application. There can be A LOT of validation errors for large CSV files...
- If everything was fine, the mapped data from the first JSON file is converted into database records, which in this case span several connected tables.
Since the uploaded CSV is expected to have tens or even hundreds of thousands of rows, all of the operations need to be done in a memory-efficient way, otherwise, the app would break from running out of memory.
I'll write in detail about the whole import process in another post. For now, we'll focus on storing those large collections of data in a JSON file and reading from it.
For our case, a JSON collection is a string containing a JSON array of objects (A LOT OF THEM), stored in a file. To handle such large files in a memory-efficient way, we need to work with smaller chunks at a time.
The following code uses PHP 7.4 syntax.
Writing
Let's start with writing a JSON collection to a file using streams. What we want to be able to do is add items to the opened collection and close the collection when done. Let's write a class called JsonCollectionStreamWriter
that will help us with this.
First, we need to open a file we're going to write to. We can provide a path to the constructor and start from there:
class JsonCollectionStreamWriter
{
/**
* @var resource
*/
private $resource;
public function __construct(string $path)
{
// We're just going to write to this file, that's why we use "w".
// In order to avoid transformations by the OS we'll go binary with "b".
$this->resource = fopen($path, 'wb');
}
}
We should make sure that we have an empty file at given path, otherwise we'll get an error:
public function __construct(string $path)
{
// Make sure we have an empty file.
file_put_contents($path, '');
// We're just going to write to this file, that's why we use "w".
// In order to avoid transformations by the OS we'll go binary with "b".
$this->resource = fopen($path, 'wb');
}
Now would be a good time to start our collection with open square bracket:
public function __construct(string $path)
{
// Make sure we have an empty file.
file_put_contents($path, '');
// We're just going to write to this file, that's why we use "w".
// In order to avoid transformations by the OS we'll go binary with "b".
$this->resource = fopen($path, 'wb');
// Start the collection
fwrite($this->resource, '[');
}
Now that we can open the collection, we should also be able to close it. We can do that by adding a "close" method. Here we write closing square bracket to mark the end of our collection and close the resource.
public function close(): void
{
// In case we attempt to close twice
if (is_resource($this->resource)) {
fwrite($this->resource, ']');
fclose($this->resource);
}
}
If for some reason we forget to close the collection we still need to clean things up and not end with invalid JSON collection. We can do that by calling the "close" method in the destructor.
/**
* In case we have some loose ends, close the collection.
*/
public function __destruct()
{
$this->close();
}
This is all fine and dandy, but we need to put actual items into the collection. The reason to write to the collection into a file using stream is that we have a lot of items, but one item itself isn't that big so we can use PHP's native JSON encoding function to add items without worrying about memory.
/**
* Serialize the item and write it to the collection.
*
* @param array|object $item
*/
public function push($item): void
{
fwrite($this->resource, json_encode($item));
}
One thing we need to consider is that items are separated with a comma and we need to put one between each item. We can tell when to do this by tracking the key/index of each item, add before each except the first one. So the final class for our writer class looks like this:
class JsonCollectionStreamWriter
{
/**
* @var resource
*/
private $resource;
/**
* Keep track of collection item we're inserting.
*/
private int $key = 0;
public function __construct(string $path)
{
// Make sure we have an empty file.
file_put_contents($path, '');
// We're just going to write to this file, that's why we use "w".
// In order to avoid transformations by the OS we'll go binary with "b".
$this->resource = fopen($path, 'wb');
// Start the collection
fwrite($this->resource, '[');
}
/**
* Insert closing bracket and close the resource.
*/
public function close(): void
{
// In case we attempt to close twice
if (is_resource($this->resource)) {
fwrite($this->resource, ']');
fclose($this->resource);
}
}
/**
* Serialize the item and write it to the collection.
*
* @param array|object $item
*/
public function push($item): void
{
// We don't need to separate from the previous item if there are none.
if ($this->key !== 0) {
fwrite($this->resource, ',');
}
fwrite($this->resource, json_encode($item));
$this->key++;
}
/**
* In case we have some loose ends, close the collection.
*/
public function __destruct()
{
$this->close();
}
}
Now we can use it like this:
$path = 'path/to/file.json';
$writer = new JsonCollectionStreamWriter($path);
for ($i = 0; $i < 100; $i++) {
$writer->push([
'value' => $i,
]);
}
$writer->close();
Though, I don't expect real projects to have such simple use cases.
Reading
Reading JSON collection from a file as a stream is a bit trickier than writing it. We need to read the file in chunks, parse each chunk, and if we have content that does not result in a complete item we need to save it and try to parse it with the next chunk. Let's see how this can be done. We'll call this class JsonCollectionStreamReader
and we can start with opening the file for reading, similar to how we did for writing. If the file does not exist we'll throw an exception.
class JsonCollectionStreamReader
{
private $resource;
public function __construct(string $path)
{
if (!file_exists($path)) {
throw new \InvalidArgumentException('There is no file at given path');
}
$this->resource = fopen($path, 'rb');
}
}
And similar how we needed to close the resource when we're done with the writer, we want to close the resource when we're done here as well.
public function close(): void
{
if (is_resource($this->resource)) {
fclose($this->resource);
}
}
public function __destruct()
{
$this->close();
}
Most of the time we'll leave it to destructor to close the resource, as we might want to read the collection multiple times.
Let's see how we might read the collection. The way we're going to use this is by iterating over items.
$path = 'path/to/file.json';
$reader = new JsonCollectionStreamReader($path);
foreach ($reader->get() as $item) {
// Do something with item.
}
$reader->close();
To have the same feeling as working with an array but in a memory-efficient way, we're going to use Generators. A generator is returned by default when we call yield
inside a function. Since chunk can contain multiple items, parsing chunk also returns a Generator.
const CHUNK_SIZE = 8192;
// ...
public function get(): \Generator
{
while (!feof($this->resource)) {
$chunk = fread($this->resource, JsonCollectionStreamReader::CHUNK_SIZE);
yield from $this->parseChunk($chunk);
}
}
private function parseChunk($chunk): \Generator
{
// TODO: Implement.
}
CHUNK_SIZE
constant tells how big chunk of a file to read. Experiment to find the optimal value for your use case.
What did we say about chunks possibly having parts that don't contain full item? Yeah, we need a way to store those parts. We'll store them in a buffer
property that we're going to reset each time we start reading the collection to avoid having leftovers from the previous read. We can also rewind the file pointer so we start from the beginning of the file every time we read the collection.
private string $buffer = '';
public function get(): \Generator
{
$this->buffer = '';
rewind($this->resource);
while (!feof($this->resource)) {
$chunk = fread($this->resource, JsonCollectionStreamReader::CHUNK_SIZE);
yield from $this->parseChunk($chunk);
}
}
private function parseChunk($chunk): \Generator
{
// Continue from where we left off.
$this->buffer .= $chunk;
// TODO: Implement.
}
Now we need to go through each char in the buffer and decide if it's one of the following:
- the start of the collection;
- the start of an item;
- the end of an item;
- the end of the collection.
Since we might have multibyte characters in parsed string, we're going to use mb_str_split
function to split it into an array.
private function parseChunk($chunk): \Generator
{
// Continue from where we left off.
$this->buffer .= $chunk;
// We want to iterate over chars but since,
// we can have multibyte strings we can't access them by position alone.
$split = mb_str_split($this->buffer);
foreach ($split as $position => $char) {
// TODO: Implement.
}
}
There are some other things we need to keep track of to be able to tell when we encounter a char of significance: level of object nesting, the position where item began, if we even found any items while reading this chunk and start position for buffer part we want to keep.
We'll reset nesting level every time we start reading to start fresh. Also, if we haven't yielded any items we need to reset nesting level or we're going to just be adding it because of items we have beginning for but not the end in the buffer. Finally, the parsed part of the buffer is discarded and we keep what has left for later.
private int $nestingLevel = 0;
public function get()
{
$this->buffer = '';
$this->nestingLevel = 0;
rewind($this->resource);
while (!feof($this->resource)) {
$chunk = fread($this->resource, JsonCollectionStreamReader::CHUNK_SIZE);
yield from $this->parseChunk($chunk);
}
}
private function parseChunk($chunk): \Generator
{
// Continue from where we left off.
$this->buffer .= $chunk;
$start = 0;
$keepFrom = 0;
$yielded = 0;
// We want to iterate over chars but since,
// we can have multibyte strings we can't access them by position alone.
$split = mb_str_split($this->buffer);
foreach ($split as $position => $char) {
// Start of the collection.
if ($this->nestingLevel === 0 && $char === '[') {
continue;
}
// We reached the end of the collection.
if ($this->nestingLevel === 0 && $char === ']') {
break;
}
// Maybe start of an item, but we need to check if we're not nested.
if ($char === '{') {
// Definitely start of the item.
if ($this->nestingLevel === 0) {
$start = $position;
}
$this->nestingLevel++;
} elseif ($char === '}') {
// Maybe end of the item?
$this->nestingLevel--;
// Definitely end of the item.
if ($this->nestingLevel === 0) {
$keepFrom = $position + 1;
yield json_decode(
mb_substr($this->buffer, $start, $position - $start + 1)
);
$yielded++;
}
}
}
// If we don't reset nesting level when current buffer hasn't yielded any items,
// we'll never reach 0 and won't be able to read items from next chunks.
if ($yielded === 0) {
$this->nestingLevel = 0;
}
// Keep the unparsed part for later.
$this->buffer = mb_substr($this->buffer, $keepFrom);
}
One additional thing we can do is tell if we want items to be returned as arrays or as objects (default). We can do this by passing an additional parameter to the constructor.
private bool $asArray = false;
public function __construct(string $path, bool $asArray = false)
{
if (!file_exists($path)) {
throw new \InvalidArgumentException('There is no file at given path');
}
$this->asArray = $asArray;
$this->resource = fopen($path, 'rb');
}
private function parseChunk($chunk): \Generator
{
// Continue from where we left off
$this->buffer .= $chunk;
$start = 0;
$keepFrom = 0;
$yielded = 0;
// We want to iterate over chars but since,
// we can have multibyte strings we can't access them by position alone
$split = mb_str_split($this->buffer);
foreach ($split as $position => $char) {
// Start of the collection
if ($this->nestingLevel === 0 && $char === '[') {
continue;
}
// We reached the end of the collection
if ($this->nestingLevel === 0 && $char === ']') {
break;
}
// Maybe start of an item, but we need to check if we're not nested
if ($char === '{') {
// Definitely start of the item
if ($this->nestingLevel === 0) {
$start = $position;
}
$this->nestingLevel++;
} elseif ($char === '}') {
// Maybe end of the item?
$this->nestingLevel--;
// Definitely end of the item
if ($this->nestingLevel === 0) {
$keepFrom = $position + 1;
yield json_decode(
mb_substr($this->buffer, $start, $position - $start + 1),
$this->asArray
);
$yielded++;
}
}
}
// If we don't reset nesting level when current buffer hasn't yielded any items,
// we'll never reach 0 and won't be able to read items from next chunks.
if ($yielded === 0) {
$this->nestingLevel = 0;
}
// Keep the unparsed part for later.
$this->buffer = mb_substr($this->buffer, $keepFrom);
}
Here's our result:
class JsonCollectionStreamReader
{
const CHUNK_SIZE = 8192;
private $resource;
protected bool $asArray = false;
protected string $buffer = '';
protected int $nestingLevel = 0;
public function __construct(string $path, bool $asArray = false)
{
if (!file_exists($path)) {
throw new \InvalidArgumentException('There is no file at given path');
}
$this->asArray = $asArray;
$this->resource = fopen($path, 'rb');
}
public function close(): void
{
if (is_resource($this->resource)) {
fclose($this->resource);
}
}
public function get(): \Generator
{
$this->buffer = '';
$this->nestingLevel = 0;
rewind($this->resource);
while (!feof($this->resource)) {
$chunk = fread($this->resource, JsonCollectionStreamReader::CHUNK_SIZE);
yield from $this->parseChunk($chunk);
}
}
private function parseChunk($chunk): \Generator
{
// Continue from where we left off
$this->buffer .= $chunk;
$start = 0;
$keepFrom = 0;
$yielded = 0;
// We want to iterate over chars but since,
// we can have multibyte strings we can't access them by position alone
$split = mb_str_split($this->buffer);
foreach ($split as $position => $char) {
// Start of the collection
if ($this->nestingLevel === 0 && $char === '[') {
continue;
}
// We reached the end of the collection
if ($this->nestingLevel === 0 && $char === ']') {
break;
}
// Maybe start of an item, but we need to check if we're not nested
if ($char === '{') {
// Definitely start of the item
if ($this->nestingLevel === 0) {
$start = $position;
}
$this->nestingLevel++;
} elseif ($char === '}') {
// Maybe end of the item?
$this->nestingLevel--;
// Definitely end of the item
if ($this->nestingLevel === 0) {
$keepFrom = $position + 1;
yield json_decode(
mb_substr($this->buffer, $start, $position - $start + 1),
$this->asArray
);
$yielded++;
}
}
}
// If we don't reset nesting level when current buffer hasn't yielded any items,
// we'll never reach 0 and won't be able to read items from next chunks.
if ($yielded === 0) {
$this->nestingLevel = 0;
}
// Keep the unparsed part for later.
$this->buffer = mb_substr($this->buffer, $keepFrom);
}
public function __destruct()
{
$this->close();
}
}
Please keep in mind that here we expect to read valid JSON - there's no logic to handle various different ways that our JSON can be invalid.
Because this solution doesn't handle all the edge cases and it is tiresome to cover everything, Biologer actually uses a package for reading JSON collections using streams. The major difference is that it doesn't use generators to iterate over items, but a callback where you handle item instead. That package is maxakawizard/json-collection-parser.
The end
Hopefully, I was able to bring closer using streams and generators to write and parse large amounts of data in a memory-efficient way 😊. I encourage you to experiment with using them in your projects and let me know what solutions to your use cases you come up with on Twitter!