Skip to content

Commit

Permalink
Merge pull request #42 from cuda-networks/refetch_queue_to_cache
Browse files Browse the repository at this point in the history
re-fetch queue to cache in case of deletion
  • Loading branch information
alexeyts authored Jan 6, 2020
2 parents 6b595b6 + 9ac2d9a commit 8b080a4
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 21 deletions.
36 changes: 24 additions & 12 deletions eb_sqs/aws/sqs_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ def __init__(self):
)
self.queue_cache = {}

def _get_queue(self, queue_name):
# type: (unicode) -> Any
queue_name = '{}{}'.format(settings.QUEUE_PREFIX, queue_name)
def _get_queue(self, queue_name, use_cache=True):
# type: (unicode, bool) -> Any
full_queue_name = '{}{}'.format(settings.QUEUE_PREFIX, queue_name)

queue = self._get_sqs_queue(queue_name)
queue = self._get_sqs_queue(full_queue_name, use_cache)
if not queue:
queue = self._add_sqs_queue(queue_name)
queue = self._add_sqs_queue(full_queue_name)

return queue

def _get_sqs_queue(self, queue_name):
# type: (unicode) -> Any
if self.queue_cache.get(queue_name):
def _get_sqs_queue(self, queue_name, use_cache):
# type: (unicode, bool) -> Any
if use_cache and self.queue_cache.get(queue_name):
return self.queue_cache[queue_name]

try:
Expand Down Expand Up @@ -62,9 +62,21 @@ def add_message(self, queue_name, msg, delay):
# type: (unicode, unicode, int) -> None
try:
queue = self._get_queue(queue_name)
queue.send_message(
MessageBody=msg,
DelaySeconds=delay
)
try:
queue.send_message(
MessageBody=msg,
DelaySeconds=delay
)
except ClientError as ex:
if ex.response.get('Error', {}).get('Code', None) == 'AWS.SimpleQueueService.NonExistentQueue':
queue = self._get_queue(queue_name, use_cache=False)
queue.send_message(
MessageBody=msg,
DelaySeconds=delay
)
else:
raise ex
except QueueDoesNotExistException:
raise
except Exception as ex:
raise QueueClientException(ex)
39 changes: 36 additions & 3 deletions eb_sqs/tests/aws/tests_aws_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import absolute_import, unicode_literals

import time
from unittest import TestCase, skip
from unittest import TestCase
from mock import patch

import boto3
from botocore.exceptions import ClientError
from moto import mock_sqs

from eb_sqs import settings
Expand Down Expand Up @@ -41,12 +43,43 @@ def test_add_message_delayed(self):
queue.reload()
self.assertEqual(queue.attributes["ApproximateNumberOfMessages"], '1')

@skip("Disabled because current mock_sqs doesn't support invalid queue call")
@mock_sqs()
def test_add_message_wrong_queue(self):
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName='default')
sqs.create_queue(QueueName='default')
queue_client = SqsQueueClient()

with self.assertRaises(QueueDoesNotExistException):
queue_client.add_message('invalid', 'msg', 0)

@mock_sqs()
def test_auto_add_queue(self):
settings.AUTO_ADD_QUEUE = True

queue_name = 'test-queue'

sqs = boto3.resource('sqs')

queue_client = SqsQueueClient()

queue_client.add_message(queue_name, 'msg', 0)

full_queue_name = settings.QUEUE_PREFIX + queue_name

queue = sqs.get_queue_by_name(QueueName=full_queue_name)

self.assertEqual(queue.attributes["ApproximateNumberOfMessages"], '1')

queue.delete()

# moto throws exception inconsistent with boto, thus the patching
with patch.object(queue_client.queue_cache[full_queue_name], 'send_message') as send_message_fn:
send_message_fn.side_effect = ClientError({'Error': {'Code': 'AWS.SimpleQueueService.NonExistentQueue'}}, None)

queue_client.add_message(queue_name, 'msg', 0)

queue = sqs.get_queue_by_name(QueueName=full_queue_name)

self.assertEqual(queue.attributes["ApproximateNumberOfMessages"], '1')

settings.AUTO_ADD_QUEUE = False
5 changes: 0 additions & 5 deletions eb_sqs/worker/queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ def __init__(self, queue_name):
class QueueClient(object):
__metaclass__ = ABCMeta

def __init__(self, group_id):
# type: (unicode) -> None
super(QueueClient, self).__init__()
self._group_id = group_id

@abstractmethod
def add_message(self, queue_name, msg, delay):
# type: (unicode, unicode, int) -> None
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='1.34',
version='1.35',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
Expand Down

0 comments on commit 8b080a4

Please sign in to comment.