Source code for cdh.files.forms.fields

from django.core.exceptions import ImproperlyConfigured, ObjectDoesNotExist, \
    ValidationError
from django.forms import Field, ModelChoiceField
from django.forms.widgets import FILE_INPUT_CONTRADICTION, MultipleHiddenInput
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _

from .widgets import SimpleFileInput, TrackedFileInput
from ..utils import get_storage
from ..db.wrappers import FileWrapper


[docs]class FileField(Field): """Form field for the cdh.files.db.FileField""" widget = SimpleFileInput default_error_messages = { 'invalid': _("No file was submitted. Check the encoding type on the form."), 'missing': _("No file was submitted."), 'empty': _("The submitted file is empty."), 'max_length': _( 'Ensure this filename has at most %(max)d character (it has %(length)d).', 'Ensure this filename has at most %(max)d characters (it has %(length)d).', 'max'), 'contradiction': _('Please either submit a file or check the clear checkbox, not both.') }
[docs] def __init__(self, queryset, *, max_length=None, allow_empty_file=False, **kwargs): self.queryset = queryset self.max_length = max_length self.allow_empty_file = allow_empty_file if 'limit_choices_to' in kwargs: del kwargs['limit_choices_to'] super().__init__(**kwargs)
@cached_property def storage(self): return get_storage()
[docs] def to_python(self, data): # Handle no incoming data if not data: if self.required: raise ValidationError(self.error_messages['missing'], code='missing') else: return data file, uuid, changed = data # If we don't have a file or an UUID, we say we didn't get any data if uuid in self.empty_values and file in self.empty_values: return None # If nothing changed, stop processing if not changed: return data # UploadedFile objects should have name and size attributes. try: file_name = file.name file_size = file.size except AttributeError: raise ValidationError(self.error_messages['invalid'], code='invalid') if self.max_length is not None and len(file_name) > self.max_length: params = {'max': self.max_length, 'length': len(file_name)} raise ValidationError(self.error_messages['max_length'], code='max_length', params=params) if not file_name: raise ValidationError(self.error_messages['invalid'], code='invalid') if not self.allow_empty_file and not file_size: raise ValidationError(self.error_messages['empty'], code='empty') return data
[docs] def clean(self, data, initial=None): file, uuid, changed = data # Changed = True and file is None means the field value should be # cleared; further validation is not needed. if file is None and changed: if not self.required: return None # If the field is required, clearing is not possible (the widget # shouldn't return False data in that case anyway). False is not # in self.empty_value; if a False value makes it this far # it should be validated from here on out as None (so it will be # caught by the required check). data = None if not file and initial: return initial return super().clean(data)
[docs] def prepare_value(self, value): ret = None if self.queryset is not None: try: # If value is a data-tuple from get_value_from_datadict, # try to use the UUID of that tuple to retrieve our model if isinstance(value, tuple): # Check if we actually have a UUID if value[1]: model = self.queryset.get(uuid=value[1]) return model.get_file_wrapper() else: # If not, return None return ret else: # Otherwise, we assume we got a PK from the form model = self.queryset.get(pk=value) return model.get_file_wrapper() except ObjectDoesNotExist: pass except AttributeError: raise ImproperlyConfigured("This field somehow didn't get a " "File-derived model?") return ret
[docs] def bound_data(self, data, initial): if data in (None, FILE_INPUT_CONTRADICTION): return initial return data
[docs] def has_changed(self, initial, data): return not self.disabled and data is not None
[docs]class TrackedFileField(ModelChoiceField): """A MultipleChoiceField whose choices are a model QuerySet.""" widget = TrackedFileInput hidden_widget = MultipleHiddenInput default_error_messages = { 'invalid_list': _('Enter a list of values.'), 'invalid_choice': _('Select a valid choice. %(value)s is not one of the' ' available choices.'), 'invalid_pk_value': _('“%(pk)s” is not a valid value.') }
[docs] def __init__(self, queryset, **kwargs): super().__init__(queryset, empty_label=None, **kwargs)
[docs] def to_python(self, value): if not value: return [] return list(self._check_values(value))
[docs] def clean(self, value): value = self.prepare_value(value) if self.required and not value: raise ValidationError(self.error_messages['required'], code='required') elif not self.required and not value: return self.queryset.none() if not isinstance(value, (list, tuple)): raise ValidationError( self.error_messages['invalid_list'], code='invalid_list', ) qs = self._check_values(value) # Since this overrides the inherited ModelChoiceField.clean # we run custom validators here self.run_validators(value) return qs
def _check_values(self, value): """ Given a list of possible PK values, return a QuerySet of the corresponding objects. Raise a ValidationError if a given value is invalid (not a valid PK, not in the queryset, etc.) """ key = self.to_field_name or 'pk' # deduplicate given values to avoid creating many querysets or # requiring the database backend deduplicate efficiently. try: value = frozenset(value) except TypeError: # list of lists isn't hashable, for example raise ValidationError( self.error_messages['invalid_list'], code='invalid_list', ) for pk in value: try: self.queryset.filter(**{key: pk}) except (ValueError, TypeError): raise ValidationError( self.error_messages['invalid_pk_value'], code='invalid_pk_value', params={'pk': pk}, ) qs = self.queryset.filter(**{'%s__in' % key: value}) pks = {str(getattr(o, key)) for o in qs} for val in value: if str(val) not in pks: raise ValidationError( self.error_messages['invalid_choice'], code='invalid_choice', params={'value': val}, ) return qs
[docs] def prepare_value(self, value): if (hasattr(value, '__iter__') and not isinstance(value, str) and not hasattr(value, '_meta')): return [self._prepare_value(v) for v in value] return self._prepare_value(value)
def _prepare_value(self, value): if isinstance(value, FileWrapper): return value if hasattr(value, '_meta'): if self.to_field_name: return value.serializable_value(self.to_field_name) else: return value.pk return super().prepare_value(value)
[docs] def has_changed(self, initial, data): if self.disabled: return False if initial is None: initial = [] if data is None: data = [] if len(initial) != len(data): return True initial_set = {str(value) for value in self.prepare_value(initial)} data_set = {str(value) for value in data} return data_set != initial_set