Skip to content

Commit 9bb82f7

Browse files
[12.x] Redis cluster broadcaster (#56581)
* [12.x] RedisBroadcaster for RedisCuster * formatting --------- Co-authored-by: Taylor Otwell <[email protected]>
1 parent fe5e22c commit 9bb82f7

File tree

1 file changed

+28
-4
lines changed

1 file changed

+28
-4
lines changed

src/Illuminate/Broadcasting/Broadcasters/RedisBroadcaster.php

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
use Illuminate\Broadcasting\BroadcastException;
66
use Illuminate\Contracts\Redis\Factory as Redis;
7+
use Illuminate\Redis\Connections\PhpRedisClusterConnection;
8+
use Illuminate\Redis\Connections\PredisClusterConnection;
9+
use Illuminate\Redis\Connections\PredisConnection;
710
use Illuminate\Support\Arr;
11+
use Predis\Connection\Cluster\RedisCluster;
812
use Predis\Connection\ConnectionException;
913
use RedisException;
1014
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
@@ -125,10 +129,30 @@ public function broadcast(array $channels, $event, array $payload = [])
125129
]);
126130

127131
try {
128-
$connection->eval(
129-
$this->broadcastMultipleChannelsScript(),
130-
0, $payload, ...$this->formatChannels($channels)
131-
);
132+
if ($connection instanceof PhpRedisClusterConnection) {
133+
foreach ($channels as $channel) {
134+
$connection->publish($channel, $payload);
135+
}
136+
} elseif ($connection instanceof PredisClusterConnection &&
137+
$connection->client()->getConnection() instanceof RedisCluster) {
138+
$randomClusterNodeConnection = new PredisConnection(
139+
$connection->client()->getClientBy('slot', mt_rand(0, 16383))
140+
);
141+
142+
if ($events = $connection->getEventDispatcher()) {
143+
$randomClusterNodeConnection->setEventDispatcher($events);
144+
}
145+
146+
$randomClusterNodeConnection->eval(
147+
$this->broadcastMultipleChannelsScript(),
148+
0, $payload, ...$this->formatChannels($channels)
149+
);
150+
} else {
151+
$connection->eval(
152+
$this->broadcastMultipleChannelsScript(),
153+
0, $payload, ...$this->formatChannels($channels)
154+
);
155+
}
132156
} catch (ConnectionException|RedisException $e) {
133157
throw new BroadcastException(
134158
sprintf('Redis error: %s.', $e->getMessage())

0 commit comments

Comments
 (0)