Have you ever tried to import 1 million rows from CSV into the DB? It's a tricky task that can end up with performance issues, timeouts, or just running out of server resources. In this tutorial, I will show you one way of handling this with Jobs and Queues.
In short, our goal is to make sure the import process is not interrupted by a timeout like this:
We need to split the import process into smaller chunks and run them individually to fix this. This is where Laravel Queues and Jobs come in handy. Let's see how we can use them to import an extensive dataset from a file.
Here's the situation:
Of course, we've chosen a CSV file for convenience, but you might expect to get a JSON request or an XML request from an API endpoint. The idea is the same.
The problem becomes obvious if we take a look at the code:
use Illuminate\Http\Request; // ... public function import(Request $request){ $file = $request->file('csv'); // Saving the file to storage for reading it as CSV // Otherwise, it will break even faster. $file = $file->store('csv', ['disk' => 'public']); // Opening the file for reading $fileStream = fopen(storage_path('app/public/' . $file), 'r'); $csvContents = []; // Reading the file line by line into an array while (($line = fgetcsv($fileStream)) !== false) { $csvContents[] = $line; } // Closing the file stream fclose($fileStream); // Defining the indexes of the CSV $name = 0; $category = 1; $description = 2; $price = 3; $stock = 4; // Loading the categories into an array $categories = ProductCategory::pluck('id', 'name'); $skipHeader = true; // Attempt to import the CSV foreach ($csvContents as $content) { if ($skipHeader) { // Skipping the header column (first row) $skipHeader = false; continue; } $productCategory = $categories[$content[$category]] ?? ProductCategory::create([ 'name' => $content[$category] ])->id; // Updating or creating the product based on the name and category Product::updateOrCreate([ 'name' => $content[$name], 'category_id' => $productCategory, ], [ 'name' => $content[$name], 'category_id' => $productCategory, 'description' => $content[$description], 'price' => $content[$price], 'stock_left' => $content[$stock], ]); } // Deleting the file from storage (it was temporary!) Storage::delete($file); return "Imported!";}
It should instantly raise a concern as we are iterating over each of the lines. It might be fine if the import size is small, but once we look at a more extensive dataset, it will take a long time to finish. And if the process takes longer than the limit of maximum PHP execution time, it will fail altogether.
And now, we come to the solution: Queues and Jobs. We will base the same import on queued Jobs, which will split the load into smaller chunks, ensuring they are executed individually.
Bonus: This will also make the import process contain a fail-safe mechanism, allowing us to retry the import if it fails for some reason.
Our first Job will be responsible for reading the CSV file and creating another Job for each line. Let's call it ProcessImportJob
:
php artisan make:job ProcessImportJob
The Job will look like this:
app/Jobs/ProcessImportJob.php
use App\Jobs\ProductsImport\ProccessProductImportJob;use Illuminate\Bus\Queueable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Queue\SerializesModels;use Storage; class ProcessImportJob implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; // Giving a timeout of 20 minutes to the Job to process the file public $timeout = 1200; public function __construct(private string $file) { } public function handle(): void { $fieldMap = [ 'name' => 0, 'category' => 1, 'description' => 2, 'price' => 3, 'stock' => 4, ]; // Open the file for reading $fileStream = fopen($this->file, 'r'); $skipHeader = true; while (($line = fgetcsv($fileStream)) !== false) { if ($skipHeader) { // Skip the header $skipHeader = false; continue; } // For each line, we dispatch a job to process the line dispatch(new ProcessProductImportJob($line, $fieldMap)); } // Close the file fclose($fileStream); // Deletes the file after processing unlink($this->file); }}
Next, we will create the ProcessProductImportJob
:
php artisan make:job ProductsImport/ProcessProductImportJob
The Job will look like this:
app/Jobs/ProductsImport/ProcessProductImportJob.php
use App\Models\Product;use App\Models\ProductCategory;use Exception;use Illuminate\Bus\Queueable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Queue\SerializesModels;use Log; class ProcessProductImportJob implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public function __construct(private array $dataLine, private array $fieldMap) { } public function handle(): void { $category = ProductCategory::firstOrCreate( [ 'name' => $this->dataLine[$this->fieldMap['category']], ], [ 'name' => $this->dataLine[$this->fieldMap['category']], ] ); try { Product::updateOrCreate( [ 'name' => $this->dataLine[$this->fieldMap['name']], 'category_id' => $category->id, ], [ 'name' => $this->dataLine[$this->fieldMap['name']], 'category_id' => $category->id, 'description' => $this->dataLine[$this->fieldMap['description']], 'price' => $this->dataLine[$this->fieldMap['price']], 'stock_left' => $this->dataLine[$this->fieldMap['stock']], ] ); } catch (Exception $e) { Log::error($e->getMessage()); Log::info(json_encode($this->dataLine)); } }}
Then all we have to do is to dispatch the ProcessImportJob
from the Controller:
app/Http/Controllers/ImportController.php
use App\Jobs\ProcessImportJob;use Illuminate\Http\Request; // ... public function import(Request $request){ $file = $request->file('csv'); $file = $file->store('csv', ['disk' => 'public']); dispatch(new ProcessImportJob(storage_path('app/public/' . $file))); return "Importing...";}
And that's it! We are now accepting the CSV file, parsing it in one Job, and then running a Job for each of the lines in the CSV. This way, they take little time to process, and we can process a lot of lines without worrying about the timeout:
As you can see, the whole process of accepting the file took ~300ms. And we have a queued job in our database:
This will be processed in the background. Once that is done, we'll see that we have a lot of queued jobs from our CSV:
We've already set a timeout
on the ProcessImportJob
to 20 minutes to avoid timeouts, yet we can still do better. Here are a few things we can do:
This is a very simple thing to do. We can create a separate queue for the long import jobs, so it wouldn't slow down any other jobs:
app/Http/Controllers/ImportController.php
use App\Jobs\ProcessImportJob;use Illuminate\Http\Request; // ... public function import(Request $request){ $file = $request->file('csv'); $file = $file->store('csv', ['disk' => 'public']); dispatch(new ProcessImportJob(storage_path('app/public/' . $file))); dispatch(new ProcessImportJob(storage_path('app/public/' . $file)))->onQueue('import'); return "Importing...";}
This will create a job in a separate queue called import
:
The same thing can be done for the ProcessProductImportJob
:
app/Jobs/ProductsImport/ProcessProductImportJob.php
// ...dispatch(new ProcessProductImportJob($line, $fieldMap));dispatch(new ProcessProductImportJob($line, $fieldMap))->onQueue('importProcess');// ...
Which will also create a separate queue for the processing of the lines:
And running queue workers will look like this:
ProcessImportJob
php artisan queue:work --queue=import
ProcessProductImportJob
php artisan queue:work --queue=importProcess
Both of them can be run in parallel, which means that as soon as jobs appear in the database - they will be processed on separate queues.
Another thing to remember is that the number of Queue workers directly affects job processing speed. That might be fine for the small imports, but if we have 1M small jobs, we should increase the number of workers to process them faster. To do so, Laravel documentation recommends you use Supervisor. Locally you can launch as many of these commands as you want:
php artisan queue:work --queue=importProcess
Another thing to keep in mind is the timeouts and retry after configuration. Your timeout has to be lower than the retry_after
value in the configuration:
config/queue.php
// ... 'database' => [ 'driver' => 'database', 'table' => 'jobs', 'queue' => 'default', 'retry_after' => 1250, // We gave additional 50 seconds before retrying 'after_commit' => false,], // ...
Otherwise, the Job will be retried before it's finished and processed again.
That's it. Your import of any big dataset is now divided into smaller parts and processed in the background. You can now import much larger file sets than before and not worry about breaking the application.
You can learn more about managing Queues in our Course on Queues or Course on Managing Queues in Servers.
Also, you may be interested in a similar longer process implementation with Websockets so that the server would inform the customer when the import/export process is finished: WebSockets in Laravel with Soketi: Real-Time Updates Example