Source code for kombu.transport.django
"""Kombu transport using the Django database as a message store."""
from __future__ import absolute_import
from anyjson import loads, dumps
from django.conf import settings
from django.core import exceptions as errors
from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils import cached_property, symbol_by_name
from kombu.utils.encoding import bytes_to_str
try:
from django.apps import AppConfig
except ImportError: # pragma: no cover
pass
else:
class KombuAppConfig(AppConfig):
name = 'kombu.transport.django'
label = name.replace('.', '_')
verbose_name = 'Message queue'
default_app_config = 'kombu.transport.django.KombuAppConfig'
VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))
POLLING_INTERVAL = getattr(settings, 'KOMBU_POLLING_INTERVAL',
getattr(settings, 'DJKOMBU_POLLING_INTERVAL', 5.0))
[docs]class Channel(virtual.Channel):
queue_model = 'kombu.transport.django.models:Queue'
def _new_queue(self, queue, **kwargs):
self.Queue.objects.get_or_create(name=queue)
def _put(self, queue, message, **kwargs):
self.Queue.objects.publish(queue, dumps(message))
[docs] def basic_consume(self, queue, *args, **kwargs):
qinfo = self.state.bindings[queue]
exchange = qinfo[0]
if self.typeof(exchange).type == 'fanout':
return
super(Channel, self).basic_consume(queue, *args, **kwargs)
def _get(self, queue):
m = self.Queue.objects.fetch(queue)
if m:
return loads(bytes_to_str(m))
raise Empty()
def _size(self, queue):
return self.Queue.objects.size(queue)
def _purge(self, queue):
return self.Queue.objects.purge(queue)
[docs] def refresh_connection(self):
from django import db
db.close_connection()
@cached_property
[docs] def Queue(self):
return symbol_by_name(self.queue_model)
[docs]class Transport(virtual.Transport):
Channel = Channel
default_port = 0
polling_interval = POLLING_INTERVAL
channel_errors = (
virtual.Transport.channel_errors + (
errors.ObjectDoesNotExist, errors.MultipleObjectsReturned)
)
driver_type = 'sql'
driver_name = 'django'
[docs] def driver_version(self):
import django
return '.'.join(map(str, django.VERSION))