Retour

/ 6 min read

RabbitMQ avec Symfony et Celery : Guide Complet

Last Updated:

Introduction

Dans une architecture moderne d’applications distribuées, il est courant de faire communiquer différents services développés dans des technologies variées. Cet article explore comment mettre en place une communication efficace entre une application Symfony (PHP) et des workers Celery (Python) en utilisant RabbitMQ comme message broker.

Cette approche permet de déléguer des tâches lourdes ou spécialisées (traitement d’images, machine learning, analyses de données) à des services Python tout en gardant la logique métier principale dans Symfony.


Architecture de la Solution

Notre architecture se compose de trois éléments principaux :

  • Symfony : Application web qui reçoit les requêtes utilisateur et publie les messages
  • RabbitMQ : Message broker qui gère la file d’attente des messages
  • Celery : Worker Python qui consomme et traite les messages
graph LR
A[Client Web] --> B[Symfony App]
B --> C[RabbitMQ]
C --> D[Celery Worker]
D --> E[Base de Données]
D --> F[Services Python]

Installation et Configuration de RabbitMQ

Installation avec Docker

Terminal window
# Lancement de RabbitMQ avec l'interface de gestion
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management

L’interface de gestion sera accessible sur http://localhost:15672.

Configuration des Queues

Créez les queues nécessaires via l’interface ou par programmation :

Terminal window
# Connexion en tant qu'admin
# Création de la queue "image_processing"
# Création de la queue "data_analysis"

Configuration Côté Symfony

Installation des Dépendances

Terminal window
composer require symfony/messenger
composer require symfony/amqp-messenger

Configuration du Messenger

Dans config/packages/messenger.yaml :

framework:
messenger:
transports:
rabbitmq:
dsn: 'amqp://admin:password@localhost:5672/%2f'
options:
exchange:
name: 'symfony_exchange'
type: 'direct'
queues:
image_processing:
binding_keys: ['image.process']
data_analysis:
binding_keys: ['data.analyze']
routing:
'App\Message\ProcessImageMessage': rabbitmq
'App\Message\AnalyzeDataMessage': rabbitmq

Création des Messages

src/Message/ProcessImageMessage.php
<?php
namespace App\Message;
class ProcessImageMessage
{
private string $imagePath;
private string $userId;
private array $filters;
public function __construct(string $imagePath, string $userId, array $filters = [])
{
$this->imagePath = $imagePath;
$this->userId = $userId;
$this->filters = $filters;
}
public function getImagePath(): string
{
return $this->imagePath;
}
public function getUserId(): string
{
return $this->userId;
}
public function getFilters(): array
{
return $this->filters;
}
}

Service de Publication

src/Service/MessagePublisher.php
<?php
namespace App\Service;
use App\Message\ProcessImageMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class MessagePublisher
{
private MessageBusInterface $messageBus;
public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
}
public function publishImageProcessing(string $imagePath, string $userId, array $filters): void
{
$message = new ProcessImageMessage($imagePath, $userId, $filters);
// Publication du message dans RabbitMQ
$this->messageBus->dispatch($message);
}
}

Contrôleur d’Exemple

src/Controller/ImageController.php
<?php
namespace App\Controller;
use App\Service\MessagePublisher;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Annotation\Route;
class ImageController extends AbstractController
{
#[Route('/image/process', name: 'image_process', methods: ['POST'])]
public function processImage(Request $request, MessagePublisher $publisher): Response
{
$imagePath = $request->request->get('image_path');
$filters = $request->request->get('filters', []);
$userId = $this->getUser()->getId();
// Publication du message pour traitement asynchrone
$publisher->publishImageProcessing($imagePath, $userId, $filters);
return $this->json([
'status' => 'success',
'message' => 'Image processing started',
'job_id' => uniqid() // Dans un vrai projet, utiliser un UUID
]);
}
}

Configuration Côté Python avec Celery

Installation des Dépendances

Terminal window
pip install celery[rabbitmq]
pip install pillow # Pour le traitement d'images
pip install sqlalchemy # Pour la base de données

Configuration Celery

celery_app.py
from celery import Celery
import json
# Configuration de Celery
app = Celery('image_processor')
app.conf.update(
broker_url='amqp://admin:password@localhost:5672//',
result_backend='rpc://',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Paris',
enable_utc=True,
)
# Configuration des queues
app.conf.task_routes = {
'tasks.process_image': {'queue': 'image_processing'},
'tasks.analyze_data': {'queue': 'data_analysis'},
}
if __name__ == '__main__':
app.start()

Worker de Traitement d’Images

tasks.py
from celery_app import app
from PIL import Image, ImageFilter
import os
import json
@app.task(name='tasks.process_image')
def process_image(message_data):
"""
Traite une image selon les filtres spécifiés
"""
try:
# Décodage du message Symfony
data = json.loads(message_data) if isinstance(message_data, str) else message_data
image_path = data.get('imagePath')
user_id = data.get('userId')
filters = data.get('filters', [])
# Ouverture de l'image
with Image.open(image_path) as img:
processed_img = img.copy()
# Application des filtres
for filter_name in filters:
if filter_name == 'blur':
processed_img = processed_img.filter(ImageFilter.BLUR)
elif filter_name == 'sharpen':
processed_img = processed_img.filter(ImageFilter.SHARPEN)
elif filter_name == 'grayscale':
processed_img = processed_img.convert('L')
# Sauvegarde de l'image traitée
output_path = f"processed_{os.path.basename(image_path)}"
processed_img.save(output_path)
# Notification du résultat (webhook, base de données, etc.)
notify_processing_complete(user_id, output_path)
return {
'status': 'success',
'output_path': output_path,
'user_id': user_id
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def notify_processing_complete(user_id, output_path):
"""
Notifie Symfony que le traitement est terminé
"""
# Ici vous pourriez :
# - Insérer en base de données
# - Appeler une API Symfony
# - Publier un message de retour
pass

Démarrage du Worker

Terminal window
# Démarrage d'un worker Celery
celery -A celery_app worker --loglevel=info --queues=image_processing
# Démarrage avec plusieurs workers
celery -A celery_app worker --loglevel=info --concurrency=4

Gestion des Réponses et du Statut

Système de Callback

Pour notifier Symfony du résultat du traitement, plusieurs approches sont possibles :

1. Webhook HTTP

# Dans tasks.py
import requests
def notify_symfony_webhook(user_id, job_id, result):
"""
Appelle un webhook Symfony pour notifier du résultat
"""
webhook_url = "https://your-symfony-app.com/webhook/job-complete"
payload = {
'user_id': user_id,
'job_id': job_id,
'status': result['status'],
'data': result
}
try:
response = requests.post(webhook_url, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
print(f"Webhook notification failed: {e}")

2. Message de Retour via RabbitMQ

# Publication d'un message de retour
from celery_app import app
def publish_result_message(user_id, job_id, result):
"""
Publie le résultat dans une queue de retour
"""
return_message = {
'user_id': user_id,
'job_id': job_id,
'timestamp': time.time(),
'result': result
}
# Publication dans la queue de retour
app.send_task(
'symfony.job.completed',
args=[return_message],
queue='symfony_callbacks'
)

Monitoring et Gestion des Erreurs

Surveillance avec Flower

Terminal window
# Installation de Flower pour le monitoring
pip install flower
# Démarrage de l'interface de monitoring
celery -A celery_app flower --port=5555

Gestion des Échecs

# Configuration de retry dans Celery
@app.task(name='tasks.process_image', bind=True, max_retries=3)
def process_image(self, message_data):
try:
# Logique de traitement
pass
except Exception as exc:
# Retry automatique en cas d'erreur
raise self.retry(exc=exc, countdown=60, max_retries=3)

Logging Centralisé

# Configuration des logs
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.task(name='tasks.process_image')
def process_image(message_data):
logger.info(f"Starting image processing for: {message_data}")
# ... traitement
logger.info("Image processing completed successfully")

Tests et Développement

Test du Publisher Symfony

tests/Service/MessagePublisherTest.php
<?php
namespace App\Tests\Service;
use App\Service\MessagePublisher;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\MessageBusInterface;
class MessagePublisherTest extends TestCase
{
public function testPublishImageProcessing(): void
{
$messageBus = $this->createMock(MessageBusInterface::class);
$messageBus->expects($this->once())
->method('dispatch');
$publisher = new MessagePublisher($messageBus);
$publisher->publishImageProcessing('/path/to/image.jpg', 'user123', ['blur']);
$this->assertTrue(true); // Le message a été publié
}
}

Test du Worker Celery

test_tasks.py
import unittest
from unittest.mock import patch, MagicMock
from tasks import process_image
class TestImageProcessing(unittest.TestCase):
@patch('tasks.Image.open')
def test_process_image_success(self, mock_image_open):
# Mock de PIL
mock_img = MagicMock()
mock_image_open.return_value.__enter__.return_value = mock_img
message_data = {
'imagePath': '/test/image.jpg',
'userId': 'user123',
'filters': ['blur']
}
result = process_image(message_data)
self.assertEqual(result['status'], 'success')
self.assertEqual(result['user_id'], 'user123')
if __name__ == '__main__':
unittest.main()

Conclusion

Cette architecture avec RabbitMQ permet de créer un système distribué robuste où Symfony et Python collaborent efficacement. Les avantages principaux sont :

  • Découplage : Les services peuvent évoluer indépendamment
  • Scalabilité : Possibilité d’ajouter des workers selon la charge
  • Résilience : Les messages persistent même en cas de panne
  • Flexibilité : Chaque technologie reste dans son domaine d’expertise

Cette approche est particulièrement utile pour des applications nécessitant du traitement lourd, de l’analyse de données ou de l’intelligence artificielle, tout en gardant une interface web réactive développée en PHP.

Pour aller plus loin, vous pourriez explorer l’utilisation de Kubernetes pour l’orchestration des conteneurs, ou Apache Kafka pour des besoins de streaming de données à plus grande échelle.