Source code for tldap.manager

# Copyright 2012-2014 Brian May
#
# This file is part of python-tldap.
#
# python-tldap is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# python-tldap is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with python-tldap  If not, see <http://www.gnu.org/licenses/>.

""" The default manager's and linkdescriptors used for tldap objects.

Terminology:

primary key
    is the key that is not changed.
f / foriegn
    is the object containing the primary key.
f_key
    is this key, i.e. the primary key in the foriegn object.
f_key
    is always a single value.

foriegn key
    is the referenced key.
p / primary
    is the object containing the foriegn key.
p_value
    is this key, i.e. the foriegn key in the primary object.

this
    is the object being operated on.
linked
    is the object being referenced for this operation.

if p_value_is_list is true then p_value must be a list.
if p_value_is_list is false then p_value must be a single value.
"""
from __future__ import absolute_import

import tldap
import tldap.query
import importlib
import copy


[docs]class Manager(object): """ The base manager class. """ def __init__(self): self._cls = None self._alias = tldap.DEFAULT_LDAP_ALIAS self._settings = None self._base_dn = None # base_dn = None means lookup automatically at query time from class def contribute_to_class(self, cls, name): self._cls = cls setattr(cls, name, ManagerDescriptor(self)) def db_manager(self, using=None, settings=None, base_dn=None): obj = copy.copy(self) if using is not None: obj._alias = using if settings is not None: obj._settings = settings if base_dn is not None: obj._base_dn = base_dn return obj ####################### # PROXIES TO QUERYSET # ####################### def get_empty_query_set(self): return tldap.query.EmptyQuerySet( self._cls, self._alias, self._settings, self._base_dn)
[docs] def get_query_set(self): """Returns a new QuerySet object. Subclasses can override this method to easily customize the behavior of the Manager. """ return tldap.query.QuerySet( self._cls, self._alias, self._settings, self._base_dn)
def none(self): return self.get_empty_query_set() def all(self): return self.get_query_set() def iterator(self): return self.get_query_set().iterator() def get(self, *args, **kwargs): return self.get_query_set().get(*args, **kwargs) def get_or_create(self, **kwargs): return self.get_query_set().get_or_create(**kwargs) def create(self, **kwargs): return self.get_query_set().create(**kwargs) def filter(self, *args, **kwargs): return self.get_query_set().filter(*args, **kwargs) def using(self, *args, **kwargs): return self.get_query_set().using(*args, **kwargs) def base_dn(self, *args, **kwargs): return self.get_query_set().base_dn(*args, **kwargs) def convert(self, *args, **kwargs): return self.get_query_set().convert(*args, **kwargs)
class ManagerDescriptor(object): # This class ensures managers aren't accessible via model instances. For # example, Poll.objects works, but poll_obj.objects raises AttributeError. def __init__(self, manager): self._manager = manager def __get__(self, instance, cls=None): if instance is not None: raise AttributeError( "Manager isn't accessible via %s instances" % cls.__name__) return self._manager def _lookup(cls): if isinstance(cls, str): module_name, _, name = cls.rpartition(".") module = importlib.import_module(module_name) try: cls = getattr(module, name) except AttributeError: raise AttributeError("%s reference cannot be found" % cls) return(cls) def _create_link_manager(superclass, linked_is_p, p_value_is_list): class LinkManager(superclass): def __init__(self, this_instance, this_key, linked_cls, linked_key): super(LinkManager, self).__init__() self._this_instance = this_instance self._this_key = this_key self._linked_cls = linked_cls self._linked_key = linked_key # we want queries to use the same LDAP connection for this_instance self._alias = this_instance._alias self._settings = this_instance._settings # we want queries to be based on the linked_cls type self._cls = linked_cls self._base_dn = None def f_to_p(self, value): return value def p_to_f(self, value): return value def is_f_eq_p(self, p_value, f_value): assert not isinstance(p_value, list) return f_value == p_value def _get_query_set(self, this_value, linked_key): if this_value is None: this_value = [] if not isinstance(this_value, list): this_value = [this_value] query = self.get_empty_query_set() for v in this_value: kwargs = {linked_key: v} query = query | ( super(LinkManager, self).get_query_set().filter(**kwargs)) return query.using( self._this_instance._alias, self._this_instance._settings) def _add(self, p_instance, p_key, f_instance, f_key): p_value = getattr(p_instance, p_key) f_value = getattr(f_instance, f_key) assert not isinstance(f_value, list) assert f_value is not None f_value = self.f_to_p(f_value) if p_value_is_list: found = False for x in p_value: if self.is_f_eq_p(f_value, x): found = True if not found: p_value.append(f_value) else: assert not isinstance(p_value, list) assert p_value is None or self.is_f_eq_p(f_value, p_value) p_value = f_value setattr(p_instance, p_key, p_value) def _remove(self, p_instance, p_key, f_instance, f_key): p_value = getattr(p_instance, p_key) f_value = getattr(f_instance, f_key) assert not isinstance(f_value, list) assert f_value is not None f_value = self.f_to_p(f_value) if p_value_is_list: new_list = [] for x in p_value: if not self.is_f_eq_p(f_value, x): new_list.append(x) p_value = new_list else: assert not isinstance(p_value, list) assert p_value is None or self.is_f_eq_p(f_value, p_value) p_value = None setattr(p_instance, p_key, p_value) def get_query_set(self): this_instance = self._this_instance this_key = self._this_key this_value = getattr(this_instance, this_key) linked_key = self._linked_key if linked_is_p: this_value = self.f_to_p(this_value) else: this_value = self.p_to_f(this_value) return self._get_query_set(this_value, linked_key) def clear(self, commit=True): for obj in list(self.get_query_set()): self.remove(obj, commit) if linked_is_p: def get_or_create(self, commit=True, **kwargs): f_instance = self._this_instance f_key = self._this_key f_value = getattr(f_instance, f_key) assert not isinstance(f_value, list) assert f_value is not None f_value = self.f_to_p(f_value) p_key = self._linked_key if p_value_is_list: kwargs[p_key] = [f_value] if p_key in kwargs['defaults']: assert isinstance(kwargs['defaults'][p_key], list) kwargs['defaults'][p_key].append(f_value) else: kwargs[p_key] = f_value if p_key in kwargs['defaults']: assert not isinstance(kwargs['defaults'][p_key], list) assert kwargs['defaults'][p_key] == f_value return super(LinkManager, self).get_or_create(**kwargs) def create(self, commit=True, **kwargs): f_instance = self._this_instance f_key = self._this_key f_value = getattr(f_instance, f_key) assert not isinstance(f_value, list) assert f_value is not None f_value = self.f_to_p(f_value) p_key = self._linked_key if p_value_is_list: kwargs[p_key] = [f_value] else: kwargs[p_key] = f_value return super(LinkManager, self).create(**kwargs) def add(self, obj, commit=True): p_instance = obj p_key = self._linked_key f_instance = self._this_instance f_key = self._this_key assert isinstance(p_instance, self._linked_cls) self._add(p_instance, p_key, f_instance, f_key) obj.save() def remove(self, obj, commit=True): p_instance = obj p_key = self._linked_key f_instance = self._this_instance f_key = self._this_key assert isinstance(p_instance, self._linked_cls) self._remove(p_instance, p_key, f_instance, f_key) obj.save() else: def get_or_create(self, commit=True, **kwargs): p_instance = self._this_instance p_key = self._this_key p_value = getattr(p_instance, p_key) f_key = self._linked_key f_value = self.f_to_p(kwargs[f_key]) assert not isinstance(f_value, list) assert f_value is not None r = super(LinkManager, self).get_or_create(**kwargs) if p_value_is_list: assert isinstance(p_value, list) if f_value not in p_value: p_value.append(f_value) else: assert not isinstance(p_value, list) assert p_value is None or p_value == r[0] p_value = f_value setattr(p_instance, p_key, p_value) if commit: p_instance.save() return r def create(self, commit=True, **kwargs): p_instance = self._this_instance p_key = self._this_key p_value = getattr(p_instance, p_key) f_key = self._linked_key f_value = self.f_to_p(kwargs[f_key]) assert not isinstance(f_value, list) assert f_value is not None r = super(LinkManager, self).create(**kwargs) if p_value_is_list: assert isinstance(p_value, list) if f_value not in p_value: p_value.append(f_value) else: assert not isinstance(p_value, list) assert p_value is None or p_value == f_value p_value = None setattr(p_instance, p_key, p_value) if commit: p_instance.save() return r def add(self, obj, commit=True): p_instance = self._this_instance p_key = self._this_key f_instance = obj f_key = self._linked_key assert isinstance(f_instance, self._linked_cls) self._add(p_instance, p_key, f_instance, f_key) if commit: p_instance.save() def remove(self, obj, commit=True): p_instance = self._this_instance p_key = self._this_key f_instance = obj f_key = self._linked_key assert isinstance(f_instance, self._linked_cls) self._remove(p_instance, p_key, f_instance, f_key) if commit: p_instance.save() if not p_value_is_list: def is_set(self): """ Does this manager point to a value, or is it None? """ this_instance = self._this_instance this_key = self._this_key this_value = getattr(this_instance, this_key) return this_value is not None def get_obj(self): """ Retrieve this value. Unlike get returns None if there is no value, instead of raisng an exception. """ this_instance = self._this_instance this_key = self._this_key this_value = getattr(this_instance, this_key) if this_value is None: return None return self.get() return LinkManager
[docs]class LinkDescriptor(object): """ Base class for any field that links to another object. """ def __init__(self, this_key, linked_cls, linked_key, linked_is_p, p_value_is_list, related_name=None): self._this_key = this_key self._linked_cls = linked_cls self._linked_key = linked_key self._linked_is_p = linked_is_p self._p_value_is_list = p_value_is_list self._related_name = related_name def contribute_to_class(self, cls, name): setattr(cls, name, self) if self._related_name is not None: reverse = self.get_reverse(cls) if self._related_name in self._linked_cls.__dict__: raise AttributeError( "%s class member %s produces reverse member" "%s in class %s that conflicts" % (cls.__name__, name, self._related_name, self._linked_cls.__name__)) setattr(self._linked_cls, self._related_name, reverse) def get_manager(self, instance): linked_cls = _lookup(self._linked_cls) superclass = linked_cls._default_manager.__class__ LinkManager = _create_link_manager( superclass, linked_is_p=self._linked_is_p, p_value_is_list=self._p_value_is_list) return LinkManager(instance, self._this_key, linked_cls, self._linked_key) def get_translated_linked_value(self, value): return value def get_q_for_linked_instance(self, obj, operation): if operation is not None: raise ValueError("Unknown search operation %s" % operation) this_key = self._this_key linked_cls = _lookup(self._linked_cls) linked_key = self._linked_key assert isinstance(obj, linked_cls) linked_value = self.get_translated_linked_value( getattr(obj, linked_key)) if not isinstance(linked_value, list): linked_value = [linked_value] if len(linked_value) == 0: return None v = linked_value[0] kwargs = {this_key: v} q = tldap.Q(**kwargs) for v in linked_value[1:]: kwargs = {this_key: v} q = q | tldap.Q(**kwargs) return q def __get__(self, instance, cls=None): if instance is None: raise AttributeError( "Manager isn't accessible via %s class" % cls.__name__) return self.get_manager(instance)
[docs]class ManyToManyDescriptor(LinkDescriptor): """ Field for this object that has an attribute containing a list of references to linked objects. """ def __init__(self, **kwargs): super(ManyToManyDescriptor, self).__init__( p_value_is_list=True, **kwargs) def get_reverse(self, cls): return ManyToManyDescriptor( this_key=self._linked_key, linked_cls=cls, linked_key=self._this_key, linked_is_p=not self._linked_is_p) def __set__(self, instance, value): assert isinstance(value, list) lm = self.get_manager(instance) if self._linked_key: lm.clear() for v in value: lm.add(v) else: lm.clear(commit=False) for v in value: lm.add(v, commit=False)
[docs]class ManyToOneDescriptor(LinkDescriptor): """ Linked object has an attribute that can contain only one member, that refers to this object. This field links to the linked object. """ def __init__(self, **kwargs): super(ManyToOneDescriptor, self).__init__( linked_is_p=False, p_value_is_list=False, **kwargs) def get_reverse(self, cls): return OneToManyDescriptor( this_key=self._linked_key, linked_cls=cls, linked_key=self._this_key) def __set__(self, instance, value): assert not isinstance(value, list) lm = self.get_manager(instance) lm.clear(commit=False) if value is not None: lm.add(value, commit=False)
[docs]class OneToManyDescriptor(LinkDescriptor): """ This object has an attribute, represented by this field, that can contain only one member, that refers to linked object. """ def __init__(self, **kwargs): super(OneToManyDescriptor, self).__init__( linked_is_p=True, p_value_is_list=False, **kwargs) def get_reverse(self, cls): return ManyToOneDescriptor( this_key=self._linked_key, linked_cls=cls, linked_key=self._this_key) def __set__(self, instance, value): assert isinstance(value, list) lm = self.get_manager(instance) lm.clear() for v in value: lm.add(value)
[docs]class AliasDescriptor(object): """ This field is an alias to another field. """ def __init__(self, linked_key): self._linked_key = linked_key def __get__(self, instance, cls=None): return getattr(instance, self._linked_key) def __set__(self, instance, value): setattr(instance, self._linked_key, value)
def _sid_to_rid(sid): if sid is None: return None _, _, rid = sid.rpartition("-") return int(rid) def _rid_to_sid(domain_sid, rid): if rid is None: return None assert isinstance(rid, int) return "S-1-5-%s-%s" % (domain_sid, rid) def _create_ad_group_link_manager(superclass, linked_is_p, p_value_is_list): assert p_value_is_list superclass = _create_link_manager(superclass, linked_is_p, p_value_is_list) class AdLinkManager(superclass): def is_f_eq_p(self, f_value, p_value): return f_value.lower() == p_value.lower() if linked_is_p: def add(self, obj, commit=True): this_instance = self._this_instance this_key = "primaryGroupID" this_value = getattr(this_instance, this_key) assert this_value is None or isinstance(this_value, int) linked_cls = self._linked_cls linked_key = "objectSid" assert isinstance(obj, linked_cls) linked_value = getattr(obj, linked_key) if this_value != _sid_to_rid(linked_value): super(AdLinkManager, self).add(obj, commit) else: def add(self, obj, commit=True): this_instance = self._this_instance this_key = "objectSid" this_value = getattr(this_instance, this_key) linked_cls = self._linked_cls linked_key = "primaryGroupID" assert isinstance(obj, linked_cls) linked_value = getattr(obj, linked_key) assert linked_value is None or isinstance(linked_value, int) if _sid_to_rid(this_value) != linked_value: super(AdLinkManager, self).add(obj, commit) def get_query_set(self): this_instance = self._this_instance this_key = "dn" this_value = getattr(this_instance, this_key) linked_key = "memberOf" return self._get_query_set(this_value, linked_key) return AdLinkManager
[docs]class AdGroupLinkDescriptor(ManyToManyDescriptor): """ This field represents a link from an AD account to an AD group. """ def __init__(self, **kwargs): super(AdGroupLinkDescriptor, self).__init__( this_key="dn", linked_key="member", linked_is_p=True, **kwargs) def get_reverse(self, cls): return AdAccountLinkDescriptor(linked_cls=cls) def get_q_for_linked_instance(self, obj, operation): # We have to do the search using this_key of memberOf, not dn, # as this makes it more efficient. Also dn searches are restricted. if operation is not None: raise ValueError( "Unknown search operation %s" % operation) this_key = "memberOf" linked_cls = _lookup(self._linked_cls) linked_key = "dn" assert isinstance(obj, linked_cls) linked_value = getattr(obj, linked_key) if not isinstance(linked_value, list): linked_value = [linked_value] if len(linked_value) == 0: return None v = linked_value.pop() kwargs = {this_key: v} q = tldap.Q(**kwargs) for v in linked_value: kwargs = {this_key: v} q = q | tldap.Q(**kwargs) return q def get_manager(self, instance): linked_cls = _lookup(self._linked_cls) superclass = linked_cls._default_manager.__class__ LinkManager = _create_ad_group_link_manager( superclass, linked_is_p=self._linked_is_p, p_value_is_list=self._p_value_is_list) return LinkManager( instance, self._this_key, linked_cls, self._linked_key)
[docs]class AdAccountLinkDescriptor(ManyToManyDescriptor): """ This field represents a link from an AD group to an AD account. """ def __init__(self, **kwargs): super(AdAccountLinkDescriptor, self).__init__( this_key="member", linked_key="dn", linked_is_p=False, **kwargs) def get_reverse(self, cls): return AdGroupLinkDescriptor(linked_cls=cls) def get_manager(self, instance): linked_cls = _lookup(self._linked_cls) superclass = linked_cls._default_manager.__class__ LinkManager = _create_ad_group_link_manager( superclass, linked_is_p=self._linked_is_p, p_value_is_list=self._p_value_is_list) return LinkManager( instance, self._this_key, linked_cls, self._linked_key)
def _create_ad_primary_group_link_manager(superclass, linked_is_p, p_value_is_list): superclass = _create_link_manager(superclass, linked_is_p, p_value_is_list) class AdLinkManager(superclass): def __init__(self, domain_sid, *args, **kwargs): self.domain_sid = domain_sid super(AdLinkManager, self).__init__(*args, **kwargs) def f_to_p(self, value): return _sid_to_rid(value) def p_to_f(self, value): return _rid_to_sid(self.domain_sid, value) return AdLinkManager
[docs]class AdPrimaryAccountLinkDescriptor(OneToManyDescriptor): """ This field represents a link from a user to a primary AD group. Update operations not guaranteed to work, due to AD rules.""" def __init__(self, domain_sid, **kwargs): self.domain_sid = domain_sid super(AdPrimaryAccountLinkDescriptor, self).__init__( this_key="objectSid", linked_key="primaryGroupID", **kwargs) def get_reverse(self, cls): return AdPrimaryGroupLinkDescriptor( linked_cls=cls, domain_sid=self.domain_sid) def get_manager(self, instance): linked_cls = _lookup(self._linked_cls) superclass = linked_cls._default_manager.__class__ LinkManager = _create_ad_primary_group_link_manager( superclass, linked_is_p=self._linked_is_p, p_value_is_list=self._p_value_is_list) return LinkManager( this_instance=instance, this_key=self._this_key, linked_cls=linked_cls, linked_key=self._linked_key, domain_sid=self.domain_sid) def get_translated_linked_value(self, value): this_value = super(AdPrimaryAccountLinkDescriptor, self) \ .get_translated_linked_value(value) return _rid_to_sid(self.domain_sid, this_value)
[docs]class AdPrimaryGroupLinkDescriptor(ManyToOneDescriptor): """ This field represents a link from a primary AD group to a user. Update operations operations not guaranteed to work, due to AD rules.""" def __init__(self, domain_sid, **kwargs): self.domain_sid = domain_sid super(AdPrimaryGroupLinkDescriptor, self).__init__( this_key="primaryGroupID", linked_key="objectSid", **kwargs) def get_reverse(self, cls): return AdPrimaryAccountLinkDescriptor( this_key=self._linked_key, linked_cls=cls, linked_key=self._this_key, domain_sid=self.domain_sid) def get_manager(self, instance): linked_cls = _lookup(self._linked_cls) superclass = linked_cls._default_manager.__class__ LinkManager = _create_ad_primary_group_link_manager( superclass, linked_is_p=self._linked_is_p, p_value_is_list=self._p_value_is_list) return LinkManager( this_instance=instance, this_key=self._this_key, linked_cls=linked_cls, linked_key=self._linked_key, domain_sid=self.domain_sid) def get_translated_linked_value(self, value): this_value = super(AdPrimaryGroupLinkDescriptor, self) \ .get_translated_linked_value(value) return _sid_to_rid(this_value)