Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 43
0.00% covered (danger)
0.00%
0 / 2
CRAP
0.00% covered (danger)
0.00%
0 / 1
ProcessNotificationQueue
0.00% covered (danger)
0.00%
0 / 43
0.00% covered (danger)
0.00%
0 / 2
30
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 run
0.00% covered (danger)
0.00%
0 / 40
0.00% covered (danger)
0.00%
0 / 1
20
1<?php
2
3namespace App\Commands;
4
5use CodeIgniter\CLI\BaseCommand;
6use CodeIgniter\CLI\CLI;
7use App\Modules\NotificationModule\Models\NotificationQueueModel;
8use App\Modules\NotificationModule\Services\NotificationDispatchService;
9
10class ProcessNotificationQueue extends BaseCommand
11{
12    protected $group = 'Notification';
13    protected $name = 'queue:process';
14    protected $description = 'Process all notification channels queue (SMS / EMAIL / WHATSAPP)';
15
16    protected NotificationQueueModel $queueModel;
17    protected NotificationDispatchService $dispatchService;
18
19    public function __construct()
20    {
21        parent::__construct();
22
23        $this->queueModel = model(NotificationQueueModel::class);
24        $this->dispatchService = service('notificationDispatchService');
25    }
26
27    public function run(array $params)
28    {
29        CLI::write('QUEUE WORKER STARTED', 'yellow');
30
31        $limit = (int) ($params[0] ?? 20);
32
33        /**
34         * 1. LOCK ATOMIQUE DES MESSAGES PRÊTS
35         * - scheduled_at OK
36         * - retry_at OK
37         */
38        $db = db_connect();
39
40        $db->query("
41            UPDATE notification_queue
42            SET status = 'processing'
43            WHERE status = 'pending'
44            AND (
45                scheduled_at <= NOW()
46                OR retry_at <= NOW()
47                OR retry_at IS NULL
48            )
49            ORDER BY scheduled_at ASC, id ASC
50            LIMIT {$limit}
51        ");
52
53        /**
54         * 2. RÉCUPÉRATION DES ITEMS LOCKÉS
55         */
56        $items = $this->queueModel
57            ->where('status', 'processing')
58            ->orderBy('scheduled_at', 'ASC')
59            ->findAll($limit);
60
61        if (empty($items)) {
62            CLI::write('NO ITEMS TO PROCESS', 'yellow');
63            return;
64        }
65
66        $processed = 0;
67        $failed = 0;
68
69        /**
70         * 3. TRAITEMENT BATCH
71         */
72        foreach ($items as $item) {
73
74            try {
75
76                $this->dispatchService->retry($item);
77
78                /**
79                 * SUCCESS LOG
80                 */
81                CLI::write(
82                    "SENT ID={$item->id} CHANNEL={$item->channel} TO={$item->customer_id}",
83                    'green'
84                );
85
86                $processed++;
87
88                /**
89                 * RESET RETRY STATE
90                 */
91                $this->queueModel->update($item->id, [
92                    'retry_at' => null
93                ]);
94
95            } catch (\Throwable $e) {
96
97                $failed++;
98
99                CLI::write(
100                    "FAILED ID={$item->id} ERROR=" . $e->getMessage(),
101                    'red'
102                );
103
104                /**
105                 * DELEGATION FAIL LOGIC (retry_at + attempts)
106                 */
107                $this->queueModel->markAsFailed(
108                    $item->id,
109                    (int) $item->attempts + 1,
110                    $e->getMessage()
111                );
112            }
113        }
114
115        /**
116         * 4. SUMMARY
117         */
118        CLI::write('--------------------------------', 'gray');
119        CLI::write("DONE", 'green');
120        CLI::write("PROCESSED={$processed}", 'green');
121        CLI::write("FAILED={$failed}", 'red');
122    }
123}