Laravel: Import Very Large CSV With Jobs and Queues

Laravel: Import Very Large CSV With Jobs and Queues
Admin
Tuesday, May 30, 2023 7 mins to read
Share
Laravel: Import Very Large CSV With Jobs and Queues

Have you ever tried to import 1 million rows from CSV into the DB? It's a tricky task that can end up with performance issues, timeouts, or just running out of server resources. In this tutorial, I will show you one way of handling this with Jobs and Queues.

In short, our goal is to make sure the import process is not interrupted by a timeout like this:

We need to split the import process into smaller chunks and run them individually to fix this. This is where Laravel Queues and Jobs come in handy. Let's see how we can use them to import an extensive dataset from a file.


Our Task

Here's the situation:

  • We have a CSV file with 1 million rows (or more)
  • We need to import this file into our database
  • Records that already exist in the database should be updated
  • Records that don't exist in the database should be created
  • We need to make sure that the import process is not interrupted by a timeout

Of course, we've chosen a CSV file for convenience, but you might expect to get a JSON request or an XML request from an API endpoint. The idea is the same.


The Cause of the Problem

The problem becomes obvious if we take a look at the code:

use Illuminate\Http\Request;
 
// ...
 
public function import(Request $request)
{
$file = $request->file('csv');
 
// Saving the file to storage for reading it as CSV
// Otherwise, it will break even faster.
$file = $file->store('csv', ['disk' => 'public']);
 
// Opening the file for reading
$fileStream = fopen(storage_path('app/public/' . $file), 'r');
 
$csvContents = [];
 
// Reading the file line by line into an array
while (($line = fgetcsv($fileStream)) !== false) {
$csvContents[] = $line;
}
 
// Closing the file stream
fclose($fileStream);
 
// Defining the indexes of the CSV
$name = 0;
$category = 1;
$description = 2;
$price = 3;
$stock = 4;
 
// Loading the categories into an array
$categories = ProductCategory::pluck('id', 'name');
 
$skipHeader = true;
// Attempt to import the CSV
foreach ($csvContents as $content) {
if ($skipHeader) {
// Skipping the header column (first row)
$skipHeader = false;
continue;
}
 
$productCategory = $categories[$content[$category]] ?? ProductCategory::create([
'name' => $content[$category]
])->id;
 
// Updating or creating the product based on the name and category
Product::updateOrCreate([
'name' => $content[$name],
'category_id' => $productCategory,
], [
'name' => $content[$name],
'category_id' => $productCategory,
'description' => $content[$description],
'price' => $content[$price],
'stock_left' => $content[$stock],
]);
}
 
// Deleting the file from storage (it was temporary!)
Storage::delete($file);
 
return "Imported!";
}

It should instantly raise a concern as we are iterating over each of the lines. It might be fine if the import size is small, but once we look at a more extensive dataset, it will take a long time to finish. And if the process takes longer than the limit of maximum PHP execution time, it will fail altogether.

And now, we come to the solution: Queues and Jobs. We will base the same import on queued Jobs, which will split the load into smaller chunks, ensuring they are executed individually.

Bonus: This will also make the import process contain a fail-safe mechanism, allowing us to retry the import if it fails for some reason.


Step 1. Creating the Job

Our first Job will be responsible for reading the CSV file and creating another Job for each line. Let's call it ProcessImportJob:

php artisan make:job ProcessImportJob

The Job will look like this:

app/Jobs/ProcessImportJob.php

use App\Jobs\ProductsImport\ProccessProductImportJob;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Storage;
 
class ProcessImportJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
// Giving a timeout of 20 minutes to the Job to process the file
public $timeout = 1200;
 
public function __construct(private string $file)
{
}
 
public function handle(): void
{
$fieldMap = [
'name' => 0,
'category' => 1,
'description' => 2,
'price' => 3,
'stock' => 4,
];
 
// Open the file for reading
$fileStream = fopen($this->file, 'r');
 
$skipHeader = true;
while (($line = fgetcsv($fileStream)) !== false) {
if ($skipHeader) {
// Skip the header
$skipHeader = false;
continue;
}
// For each line, we dispatch a job to process the line
dispatch(new ProcessProductImportJob($line, $fieldMap));
}
 
// Close the file
fclose($fileStream);
 
// Deletes the file after processing
unlink($this->file);
}
}

Next, we will create the ProcessProductImportJob:

php artisan make:job ProductsImport/ProcessProductImportJob

The Job will look like this:

app/Jobs/ProductsImport/ProcessProductImportJob.php

use App\Models\Product;
use App\Models\ProductCategory;
use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Log;
 
class ProcessProductImportJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
public function __construct(private array $dataLine, private array $fieldMap)
{
}
 
public function handle(): void
{
$category = ProductCategory::firstOrCreate(
[
'name' => $this->dataLine[$this->fieldMap['category']],
],
[
'name' => $this->dataLine[$this->fieldMap['category']],
]
);
 
try {
Product::updateOrCreate(
[
'name' => $this->dataLine[$this->fieldMap['name']],
'category_id' => $category->id,
],
[
'name' => $this->dataLine[$this->fieldMap['name']],
'category_id' => $category->id,
'description' => $this->dataLine[$this->fieldMap['description']],
'price' => $this->dataLine[$this->fieldMap['price']],
'stock_left' => $this->dataLine[$this->fieldMap['stock']],
]
);
} catch (Exception $e) {
Log::error($e->getMessage());
Log::info(json_encode($this->dataLine));
}
}
}

Then all we have to do is to dispatch the ProcessImportJob from the Controller:

app/Http/Controllers/ImportController.php

use App\Jobs\ProcessImportJob;
use Illuminate\Http\Request;
 
// ...
 
public function import(Request $request)
{
$file = $request->file('csv');
 
$file = $file->store('csv', ['disk' => 'public']);
 
dispatch(new ProcessImportJob(storage_path('app/public/' . $file)));
 
return "Importing...";
}

And that's it! We are now accepting the CSV file, parsing it in one Job, and then running a Job for each of the lines in the CSV. This way, they take little time to process, and we can process a lot of lines without worrying about the timeout:

As you can see, the whole process of accepting the file took ~300ms. And we have a queued job in our database:

This will be processed in the background. Once that is done, we'll see that we have a lot of queued jobs from our CSV:


Step 2. Optimizing the Queue

We've already set a timeout on the ProcessImportJob to 20 minutes to avoid timeouts, yet we can still do better. Here are a few things we can do:

Option 1. Create a Separate Queue

This is a very simple thing to do. We can create a separate queue for the long import jobs, so it wouldn't slow down any other jobs:

app/Http/Controllers/ImportController.php

use App\Jobs\ProcessImportJob;
use Illuminate\Http\Request;
 
// ...
 
public function import(Request $request)
{
$file = $request->file('csv');
 
$file = $file->store('csv', ['disk' => 'public']);
 
dispatch(new ProcessImportJob(storage_path('app/public/' . $file)));
dispatch(new ProcessImportJob(storage_path('app/public/' . $file)))->onQueue('import');
 
return "Importing...";
}

This will create a job in a separate queue called import:

The same thing can be done for the ProcessProductImportJob:

app/Jobs/ProductsImport/ProcessProductImportJob.php

// ...
dispatch(new ProcessProductImportJob($line, $fieldMap));
dispatch(new ProcessProductImportJob($line, $fieldMap))->onQueue('importProcess');
// ...

Which will also create a separate queue for the processing of the lines:

And running queue workers will look like this:

ProcessImportJob

php artisan queue:work --queue=import

ProcessProductImportJob

php artisan queue:work --queue=importProcess

Both of them can be run in parallel, which means that as soon as jobs appear in the database - they will be processed on separate queues.

Option 2. Increasing the Number of Workers

Another thing to remember is that the number of Queue workers directly affects job processing speed. That might be fine for the small imports, but if we have 1M small jobs, we should increase the number of workers to process them faster. To do so, Laravel documentation recommends you use Supervisor. Locally you can launch as many of these commands as you want:

php artisan queue:work --queue=importProcess

Option 3. Setting Correct Timeouts

Another thing to keep in mind is the timeouts and retry after configuration. Your timeout has to be lower than the retry_after value in the configuration:

config/queue.php

// ...
 
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 1250, // We gave additional 50 seconds before retrying
'after_commit' => false,
],
 
// ...

Otherwise, the Job will be retried before it's finished and processed again.


That's it. Your import of any big dataset is now divided into smaller parts and processed in the background. You can now import much larger file sets than before and not worry about breaking the application.

You can learn more about managing Queues in our Course on Queues or Course on Managing Queues in Servers.

Also, you may be interested in a similar longer process implementation with Websockets so that the server would inform the customer when the import/export process is finished: WebSockets in Laravel with Soketi: Real-Time Updates Example