Skip to content

Commit

Permalink
Revert "Store connection in transport"
Browse files Browse the repository at this point in the history
This reverts commit 3f7a977.
  • Loading branch information
dlunch committed Apr 12, 2023
1 parent a2e53c3 commit 1631bf9
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions kombu/transport/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ def on_readable(self, fileno):
if chan.qos.can_consume():
return chan.handlers[cmd](**{'conn': conn})

class RedisClusterConnection():
connections = {}
@classmethod
def get_connection(cls, host, port):
key = (host, port)
if key not in cls.connections:
cls.connections[key] = cls.create_connection(host, port)
return cls.connections[key]

@classmethod
def create_connection(cls, host, port):
params = {'skip_full_coverage_check': True, 'host': host, 'port': port}

return redis.RedisCluster(**params)


class Channel(RedisChannel):

Expand Down Expand Up @@ -187,7 +202,9 @@ def conn_or_acquire(self, client=None):
yield self.client

def _create_client(self, asynchronous=False):
return self.connection.cluster_connection
conninfo = self.connection.client

return RedisClusterConnection.get_connection(conninfo.hostname, conninfo.port)

def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
Expand Down Expand Up @@ -256,13 +273,5 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cycle = ClusterPoller()

params = {'skip_full_coverage_check': True, 'host': self.client.hostname, 'port': self.client.port}
self.cluster_connection = redis.RedisCluster(**params)

def close_connection(self, connection):
super().close_connection(connection)

self.cluster_connection.close()

def driver_version(self):
return redis.__version__

0 comments on commit 1631bf9

Please sign in to comment.