import collections
from abc import ABC, abstractmethod
from collections.abc import Iterator
from datetime import date, datetime, time
from functools import total_ordering
from typing import Union
import itertools
from django.core import exceptions, validators
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from cdh.rest.client.collections.collections import ResourceCollection, \
_TypeCollection
from cdh.rest.client.resources import Resource
from cdh.rest.client.registry import registry
from cdh.rest.client.logging import field_logger as logger
@total_ordering
class _BaseField(ABC):
"""Abstract class containing the bulk of the field code"""
creation_counter = 0
default_error_messages = {
'invalid_choice': _('Value %(value)r is not a valid choice.'),
'null': _('This field cannot be null.'),
'blank': _('This field cannot be blank.'),
'unique': _('%(model_name)s with this %(field_label)s '
'already exists.'),
# Translators: The 'lookup_type' is one of 'date', 'year' or 'month'.
# Eg: "Title must be unique for pub_date year"
'unique_for_date': _("%(field_label)s must be unique for "
"%(date_field_label)s %(lookup_type)s."),
}
empty_values = list(validators.EMPTY_VALUES)
default_validators = []
def __init__(self, verbose_name: str = None, default: object = None,
choices: object = None, name: str = None,
null: bool = False, blank: bool = False,
error_messages: dict = None, validators: tuple = ()):
"""
A field contains data within a REST resources.
:param verbose_name: A code friendly name for this field, defaults to the variable name
:param default: A default value to be used when no value is given
:param choices: Optional, used to restrict the values in this field
:param name: A human friendly name for this field, defaults to the variable name
:param null: If this field can be null
:param blank: If this field can be left blank
:param error_messages: Any custom error messages
:param validators: Any custom validators
"""
self.name = name
self.verbose_name = verbose_name
self.resource = None
self.default = default
if isinstance(choices, Iterator):
choices = list(choices)
self.choices = choices or []
self.null = null
self.blank = blank
self._validators = list(validators)
messages = {}
for c in reversed(self.__class__.__mro__):
messages.update(getattr(c, 'default_error_messages', {}))
messages.update(error_messages or {})
self._error_messages = error_messages # Store for deconstruction later
self.error_messages = messages
_BaseField.creation_counter += 1
self.creation_counter = _BaseField.creation_counter
def set_attributes_from_name(self, name: str) -> None:
if not self.name:
self.name = name
if self.verbose_name is None and self.name:
self.verbose_name = self.name.replace('_', ' ')
def contribute_to_class(self, cls: Resource, name: str) -> None:
"""
Register the field with the model class it belongs to.
If private_only is True, create a separate instance of this field
for every subclass of cls, even if cls is not an abstract model.
"""
self.set_attributes_from_name(name)
self.resource = cls
cls._meta.add_field(self)
@cached_property
def validators(self) -> list:
"""
Some validators can't be created at field initialization time.
This method provides a way to delay their creation until required.
"""
return list(itertools.chain(self.default_validators, self._validators))
def run_validators(self, value: object) -> None:
if value in self.empty_values:
return
errors = []
for v in self.validators:
try:
v(value)
except exceptions.ValidationError as e:
if hasattr(e, 'code') and e.code in self.error_messages:
e.message = self.error_messages[e.code]
errors.extend(e.error_list)
if errors:
raise exceptions.ValidationError(errors)
def validate(self, value: object) -> None:
"""
Validate value and raise ValidationError if necessary. Subclasses
should override this to provide validation logic.
"""
if self.choices and value not in self.empty_values:
for option_key, option_value in self.choices:
if isinstance(option_value, (list, tuple)):
# This is an optgroup, so look inside the group for
# options.
for optgroup_key, optgroup_value in option_value:
if value == optgroup_key:
return
elif value == option_key:
return
raise exceptions.ValidationError(
self.error_messages['invalid_choice'],
code='invalid_choice',
params={
'value': value
},
)
if value is None and not self.null:
raise exceptions.ValidationError(self.error_messages['null'],
code='null')
if not self.blank and value in self.empty_values:
raise exceptions.ValidationError(self.error_messages['blank'],
code='blank')
def clean(self, value: object) -> object:
"""
Convert the value's type and run validation. Validation errors
from to_python() and validate() are propagated. Return the correct
value if no error is raised.
"""
logger.debug(f"{repr(self)}: Cleaning {repr(value)}")
value = self.to_python(value)
logger.debug(f"{repr(self)}: Resolved to {repr(value)}")
self.validate(value)
self.run_validators(value)
logger.debug(f"{repr(self)}: Validated value")
return value
def __eq__(self, other: object) -> bool:
# Needed for @total_ordering
if isinstance(other, _BaseField):
return self.creation_counter == other.creation_counter
return NotImplemented
def __lt__(self, other: object) -> bool:
# This is needed because bisect does not take a comparison function.
if isinstance(other, _BaseField):
return self.creation_counter < other.creation_counter
return NotImplemented
def __hash__(self):
return hash(self.creation_counter)
def __str__(self):
resource = self.resource
return '{}.{}'.format(resource.__class__.__name__, self.name)
def __repr__(self):
path = '{}.{}'.format(self.__class__.__module__,
self.__class__.__qualname__)
name = getattr(self, 'name', None)
if name is not None:
return '<{}: {}>'.format(path, name)
return '<{}>'.format(path)
def to_python(self, value: object) -> object:
"""Returns the python version of the supplied value. By default, it's
the same as the value supplied, but this can be overridden by fields
to provide a conversion to a different datatype
"""
logger.debug(f"{repr(self)}: serializing to python: {repr(value)}")
return value
@abstractmethod
def to_api(self, value: object) -> object:
"""This method should return a python datatype that can be deserialized
properly primarily by the json module.
"""
logger.debug(f"{repr(self)}: deserializing to api: {repr(value)}")
pass
[docs]class BasicTypeField(_BaseField):
"""Abstract class used by simple types like bool, str and int. """
basic_type = None
[docs] def __init__(self, *args, **kwargs):
super(BasicTypeField, self).__init__(*args, **kwargs)
[docs] def to_api(self, value: object) -> basic_type:
"""Cleans and validates values before casting them to the right
API type
"""
logger.debug(f"{repr(self)}: deserializing to api: {repr(value)}")
if value is None:
return value
try:
value = self.clean(value)
return self.basic_type(value)
except (TypeError, ValueError):
raise exceptions.ValidationError(
self.error_messages['invalid'],
code='invalid',
params={
'value': value
},
)
[docs]class IntegerField(BasicTypeField):
"""Field containing an int"""
basic_type = int
[docs]class FloatField(BasicTypeField):
"""Field containing a float"""
basic_type = float
[docs]class TextField(BasicTypeField):
"""Field containing a string"""
basic_type = str
[docs]class BoolField(BasicTypeField):
"""Field containing a boolean"""
basic_type = bool
[docs]class DateTimeField(_BaseField):
"""Field that parses a ISO-formatted datetime-string"""
type = datetime
[docs] def to_python(self, value: str) -> type:
logger.debug(f"{repr(self)}: serializing to python: {repr(value)}")
if value is None:
return value
if isinstance(value, self.type):
return value
try:
# Fix the fact that datetime.fromisoformat doesn't adhere to iso
# 8601 properly when 'Z(ulu)' is used instead of '+00:00'
if value[-1] == 'Z':
value = "{}+00:00".format(value[:-1])
return self.type.fromisoformat(value)
except (TypeError, ValueError) as e:
raise exceptions.ValidationError(
"Some error!",
code='invalid',
params={
'value': value
},
)
[docs] def to_api(self, value: type) -> str:
logger.debug(f"{repr(self)}: deserializing to api: {repr(value)}")
value = self.clean(value)
if value is None:
return None
return value.isoformat()
[docs]class DateField(DateTimeField):
"""Field that parses a ISO-formatted date-string"""
type = date
[docs]class TimeField(DateTimeField):
"""Field that parses a ISO-formatted date-string"""
type = time
[docs]class CollectionField(_BaseField):
"""Field referencing a collection"""
[docs] def __init__(self, collection, **kwargs):
"""
:param collection: The collection to use
:param kwargs: See ::class:BaseField for other options
"""
super(CollectionField, self).__init__(**kwargs)
self.collection = collection
[docs] def to_api(self, value: Union[ResourceCollection, _TypeCollection]):
"""Transforms the collection into a list, and chains the call to it's
children.
"""
logger.debug(f"{repr(self)}: deserializing to api: {repr(value)}")
if value is None or value in self.empty_values:
return []
return value.to_api()
[docs] def to_python(self, value: list):
"""Creates a collection object from the supplied list"""
cls = self.collection
logger.debug(f"{repr(self)}: serializing to python: {repr(value)}")
if isinstance(cls, str):
app_label = self.resource._meta.app_label
if len(cls.split('.')) == 2:
app_label, cls = cls.split('.')
cls = registry.get_collection(app_label, cls)
return cls(value)
[docs]class ResourceField(_BaseField):
"""Field referencing a resources"""
[docs] def __init__(self, resource, **kwargs):
"""
:param resource: The resources to use
:param kwargs: See ::class:BaseField for other options
"""
super(ResourceField, self).__init__(**kwargs)
self.resource_class = resource
[docs] def to_api(self, value: Resource):
"""Transforms the resources into a dict, and chains the call to it's
children.
"""
logger.debug(f"{repr(self)}: deserializing to api: {repr(value)}")
if value is None or value in self.empty_values:
if not self.null or not self.blank:
raise ValueError('Cannot serialize null, as it\'s not allowed!')
return None
if isinstance(value, int):
return value
return value.to_api()
[docs] def to_python(self, value: dict):
"""Creates a resources object from the supplied dict"""
cls = self.resource_class
logger.debug(f"{repr(self)}: serializing to python: {repr(value)}")
if value is None:
if not self.null or not self.blank:
raise ValueError("Tried to serialize null, but it's not "
"allowed")
return value
if isinstance(cls, str):
app_label = self.resource._meta.app_label
if len(cls.split('.')) == 2:
app_label, cls = cls.split('.')
cls = registry.get_resource(app_label, cls)
if isinstance(value, int):
return value
return cls(**value)