/ 6 min read
RabbitMQ avec Symfony et Celery : Guide Complet
Last Updated:
Introduction
Dans une architecture moderne d’applications distribuées, il est courant de faire communiquer différents services développés dans des technologies variées. Cet article explore comment mettre en place une communication efficace entre une application Symfony (PHP) et des workers Celery (Python) en utilisant RabbitMQ comme message broker.
Cette approche permet de déléguer des tâches lourdes ou spécialisées (traitement d’images, machine learning, analyses de données) à des services Python tout en gardant la logique métier principale dans Symfony.
Architecture de la Solution
Notre architecture se compose de trois éléments principaux :
- Symfony : Application web qui reçoit les requêtes utilisateur et publie les messages
- RabbitMQ : Message broker qui gère la file d’attente des messages
- Celery : Worker Python qui consomme et traite les messages
graph LR A[Client Web] --> B[Symfony App] B --> C[RabbitMQ] C --> D[Celery Worker] D --> E[Base de Données] D --> F[Services Python]Installation et Configuration de RabbitMQ
Installation avec Docker
# Lancement de RabbitMQ avec l'interface de gestiondocker run -d \ --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=password \ rabbitmq:3-managementL’interface de gestion sera accessible sur http://localhost:15672.
Configuration des Queues
Créez les queues nécessaires via l’interface ou par programmation :
# Connexion en tant qu'admin# Création de la queue "image_processing"# Création de la queue "data_analysis"Configuration Côté Symfony
Installation des Dépendances
composer require symfony/messengercomposer require symfony/amqp-messengerConfiguration du Messenger
Dans config/packages/messenger.yaml :
framework: messenger: transports: rabbitmq: dsn: 'amqp://admin:password@localhost:5672/%2f' options: exchange: name: 'symfony_exchange' type: 'direct' queues: image_processing: binding_keys: ['image.process'] data_analysis: binding_keys: ['data.analyze']
routing: 'App\Message\ProcessImageMessage': rabbitmq 'App\Message\AnalyzeDataMessage': rabbitmqCréation des Messages
<?phpnamespace App\Message;
class ProcessImageMessage{ private string $imagePath; private string $userId; private array $filters;
public function __construct(string $imagePath, string $userId, array $filters = []) { $this->imagePath = $imagePath; $this->userId = $userId; $this->filters = $filters; }
public function getImagePath(): string { return $this->imagePath; }
public function getUserId(): string { return $this->userId; }
public function getFilters(): array { return $this->filters; }}Service de Publication
<?phpnamespace App\Service;
use App\Message\ProcessImageMessage;use Symfony\Component\Messenger\MessageBusInterface;
class MessagePublisher{ private MessageBusInterface $messageBus;
public function __construct(MessageBusInterface $messageBus) { $this->messageBus = $messageBus; }
public function publishImageProcessing(string $imagePath, string $userId, array $filters): void { $message = new ProcessImageMessage($imagePath, $userId, $filters);
// Publication du message dans RabbitMQ $this->messageBus->dispatch($message); }}Contrôleur d’Exemple
<?phpnamespace App\Controller;
use App\Service\MessagePublisher;use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;use Symfony\Component\HttpFoundation\Request;use Symfony\Component\HttpFoundation\Response;use Symfony\Component\Routing\Annotation\Route;
class ImageController extends AbstractController{ #[Route('/image/process', name: 'image_process', methods: ['POST'])] public function processImage(Request $request, MessagePublisher $publisher): Response { $imagePath = $request->request->get('image_path'); $filters = $request->request->get('filters', []); $userId = $this->getUser()->getId();
// Publication du message pour traitement asynchrone $publisher->publishImageProcessing($imagePath, $userId, $filters);
return $this->json([ 'status' => 'success', 'message' => 'Image processing started', 'job_id' => uniqid() // Dans un vrai projet, utiliser un UUID ]); }}Configuration Côté Python avec Celery
Installation des Dépendances
pip install celery[rabbitmq]pip install pillow # Pour le traitement d'imagespip install sqlalchemy # Pour la base de donnéesConfiguration Celery
from celery import Celeryimport json
# Configuration de Celeryapp = Celery('image_processor')app.conf.update( broker_url='amqp://admin:password@localhost:5672//', result_backend='rpc://', task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Europe/Paris', enable_utc=True,)
# Configuration des queuesapp.conf.task_routes = { 'tasks.process_image': {'queue': 'image_processing'}, 'tasks.analyze_data': {'queue': 'data_analysis'},}
if __name__ == '__main__': app.start()Worker de Traitement d’Images
from celery_app import appfrom PIL import Image, ImageFilterimport osimport json
@app.task(name='tasks.process_image')def process_image(message_data): """ Traite une image selon les filtres spécifiés """ try: # Décodage du message Symfony data = json.loads(message_data) if isinstance(message_data, str) else message_data
image_path = data.get('imagePath') user_id = data.get('userId') filters = data.get('filters', [])
# Ouverture de l'image with Image.open(image_path) as img: processed_img = img.copy()
# Application des filtres for filter_name in filters: if filter_name == 'blur': processed_img = processed_img.filter(ImageFilter.BLUR) elif filter_name == 'sharpen': processed_img = processed_img.filter(ImageFilter.SHARPEN) elif filter_name == 'grayscale': processed_img = processed_img.convert('L')
# Sauvegarde de l'image traitée output_path = f"processed_{os.path.basename(image_path)}" processed_img.save(output_path)
# Notification du résultat (webhook, base de données, etc.) notify_processing_complete(user_id, output_path)
return { 'status': 'success', 'output_path': output_path, 'user_id': user_id }
except Exception as e: return { 'status': 'error', 'error': str(e) }
def notify_processing_complete(user_id, output_path): """ Notifie Symfony que le traitement est terminé """ # Ici vous pourriez : # - Insérer en base de données # - Appeler une API Symfony # - Publier un message de retour passDémarrage du Worker
# Démarrage d'un worker Celerycelery -A celery_app worker --loglevel=info --queues=image_processing
# Démarrage avec plusieurs workerscelery -A celery_app worker --loglevel=info --concurrency=4Gestion des Réponses et du Statut
Système de Callback
Pour notifier Symfony du résultat du traitement, plusieurs approches sont possibles :
1. Webhook HTTP
# Dans tasks.pyimport requests
def notify_symfony_webhook(user_id, job_id, result): """ Appelle un webhook Symfony pour notifier du résultat """ webhook_url = "https://your-symfony-app.com/webhook/job-complete" payload = { 'user_id': user_id, 'job_id': job_id, 'status': result['status'], 'data': result }
try: response = requests.post(webhook_url, json=payload, timeout=30) response.raise_for_status() except requests.RequestException as e: print(f"Webhook notification failed: {e}")2. Message de Retour via RabbitMQ
# Publication d'un message de retourfrom celery_app import app
def publish_result_message(user_id, job_id, result): """ Publie le résultat dans une queue de retour """ return_message = { 'user_id': user_id, 'job_id': job_id, 'timestamp': time.time(), 'result': result }
# Publication dans la queue de retour app.send_task( 'symfony.job.completed', args=[return_message], queue='symfony_callbacks' )Monitoring et Gestion des Erreurs
Surveillance avec Flower
# Installation de Flower pour le monitoringpip install flower
# Démarrage de l'interface de monitoringcelery -A celery_app flower --port=5555Gestion des Échecs
# Configuration de retry dans Celery@app.task(name='tasks.process_image', bind=True, max_retries=3)def process_image(self, message_data): try: # Logique de traitement pass except Exception as exc: # Retry automatique en cas d'erreur raise self.retry(exc=exc, countdown=60, max_retries=3)Logging Centralisé
# Configuration des logsimport logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@app.task(name='tasks.process_image')def process_image(message_data): logger.info(f"Starting image processing for: {message_data}") # ... traitement logger.info("Image processing completed successfully")Tests et Développement
Test du Publisher Symfony
<?phpnamespace App\Tests\Service;
use App\Service\MessagePublisher;use PHPUnit\Framework\TestCase;use Symfony\Component\Messenger\MessageBusInterface;
class MessagePublisherTest extends TestCase{ public function testPublishImageProcessing(): void { $messageBus = $this->createMock(MessageBusInterface::class); $messageBus->expects($this->once()) ->method('dispatch');
$publisher = new MessagePublisher($messageBus); $publisher->publishImageProcessing('/path/to/image.jpg', 'user123', ['blur']);
$this->assertTrue(true); // Le message a été publié }}Test du Worker Celery
import unittestfrom unittest.mock import patch, MagicMockfrom tasks import process_image
class TestImageProcessing(unittest.TestCase):
@patch('tasks.Image.open') def test_process_image_success(self, mock_image_open): # Mock de PIL mock_img = MagicMock() mock_image_open.return_value.__enter__.return_value = mock_img
message_data = { 'imagePath': '/test/image.jpg', 'userId': 'user123', 'filters': ['blur'] }
result = process_image(message_data)
self.assertEqual(result['status'], 'success') self.assertEqual(result['user_id'], 'user123')
if __name__ == '__main__': unittest.main()Conclusion
Cette architecture avec RabbitMQ permet de créer un système distribué robuste où Symfony et Python collaborent efficacement. Les avantages principaux sont :
- Découplage : Les services peuvent évoluer indépendamment
- Scalabilité : Possibilité d’ajouter des workers selon la charge
- Résilience : Les messages persistent même en cas de panne
- Flexibilité : Chaque technologie reste dans son domaine d’expertise
Cette approche est particulièrement utile pour des applications nécessitant du traitement lourd, de l’analyse de données ou de l’intelligence artificielle, tout en gardant une interface web réactive développée en PHP.
Pour aller plus loin, vous pourriez explorer l’utilisation de Kubernetes pour l’orchestration des conteneurs, ou Apache Kafka pour des besoins de streaming de données à plus grande échelle.