Source code for cdh.files.db.wrappers

import uuid
from typing import Optional, Union, TYPE_CHECKING

from django.core.files import File
import magic
from django.db.models import Manager
from django.urls import reverse_lazy

from .. import settings
from ..mime_names import get_name_from_mime
from ..utils import get_storage


if TYPE_CHECKING:
    from . import TrackedFileField
    from .models import BaseFile


[docs]class FileWrapper(File): """ """ DEFAULT_CHUNK_SIZE = 64 * 2 ** 10
[docs] def __init__(self, file_instance, field, original_filename): # self.file = None self.original_filename = original_filename self.file_instance = file_instance self._field = field self.storage = get_storage() self._committed = True self._removed = False
def __getattr__(self, item): # Sometimes the ORM expects a different field than id/pk; this function # checks if the requested attribute is said field, return id. if item == self._field.target_field.attname: return self.file_instance.id raise AttributeError(f"No such attribute '{item}'") @property def name(self): if self._field and self._field.filename_generator: return self.field.filename_generator(self) return self.original_filename @property def name_on_disk(self): if self.file_instance: if not self.file_instance.uuid: self.file_instance.uuid = uuid.uuid4() return str(self.file_instance.uuid) else: raise RuntimeError("No file instance. Can not retrieve filename " "without it! (Please assign this object to a " "FileField field of a model before saving)") @property def field(self): if self._field: return self._field if self.file_instance and self.file_instance._child_fields: return self.file_instance._child_fields[0] return None def __hash__(self): return hash(self.name_on_disk) def __bool__(self): try: return bool(self.file) except AttributeError: return False # Alias these to the file_instance, as sometimes the ORM expects these # values id = property( lambda self: self.file_instance.id, lambda self, value: setattr(self.file_instance, 'id', value), lambda self: delattr(self.file_instance, 'id') ) pk = property( lambda self: self.file_instance.pk, lambda self, value: setattr(self.file_instance, 'pk', value), lambda self: delattr(self.file_instance, 'pk') ) # For easier access uuid = property(lambda self: self.file_instance.uuid) content_type = property(lambda self: self.file_instance.content_type) created_by = property(lambda self: self.file_instance.created_by) created_on = property(lambda self: self.file_instance.created_on) modified_on = property(lambda self: self.file_instance.modified_on)
[docs] def get_content_type_display(self): return get_name_from_mime(self.content_type, 'Unknown file type')
# The standard File contains most of the necessary properties, but # FieldFiles can be instantiated without a name, so that needs to # be checked for here. def _require_file(self): if not self: raise ValueError( "The '%s' attribute has no file associated with it." % self.field.name) def _get_file(self): try: if getattr(self, '_file', None) is None: self._file = self.storage.open(self.name_on_disk, 'rb') except FileNotFoundError: self._file = None return self._file def _set_file(self, file): self._file = file # You might think, why? Well, due to a very complicated descriptor chain # this might actually fail to set, in which case we really should stop # NOW #speakingFromExperience # It will only fail after a code change, so assert should be fine assert self._file == file def _del_file(self): del self._file file = property(_get_file, _set_file, _del_file) @property def path(self): self._require_file() return self.storage.path(self.name_on_disk) @property def url(self): if self.field.url_pattern: return reverse_lazy(self.field.url_pattern, args=[self.uuid]) return None @property def size(self): self._require_file() if not self._committed: return self.file.size return self.storage.size(self.name_on_disk)
[docs] def open(self, mode='rb'): self._require_file() if getattr(self, '_file', None) is None: self.file = self.storage.open(self.name_on_disk, mode) else: self.file.open(mode) return self
# open() doesn't alter the file's contents, but it does reset the pointer open.alters_data = True # In addition to the standard File API, FieldFiles have extra methods # to further manipulate the underlying file, as well as update the # associated model instance.
[docs] def save(self, content=None, original_filename=None): if content is None: content = self.file if original_filename is None and hasattr(content, 'name'): original_filename = content.name # If we overwrite the file this instance represents, we need to first # delete the old one, as otherwise we would lose the new file if self.storage.exists(self.name_on_disk): self.storage.delete(self.name_on_disk) self.storage.save( self.name_on_disk, content, max_length=self.field.max_length ) self._committed = True # Use magic to determine the mime type. It's pretty obvious, I know # I just liked saying 'use MAGIC' with self.open() as file: mime = magic.from_buffer(file.read(2048), mime=True) self.file_instance.content_type = mime if original_filename: self.file_instance.original_filename = original_filename # TRACK_CREATED_BY should only be enabled if the right middleware is # loaded, so we don't check it. (A system check enforces it, # so we can actually be sure in this case) if settings.TRACK_CREATED_BY: # NoQA; We are allowed to access this linter from cdh.core.middleware import get_current_user current_user = get_current_user() # Make sure we don't try to safe using AnonymousUser if current_user and not current_user.is_anonymous: self.file_instance.created_by = get_current_user() self.file_instance.save()
save.alters_data = True
[docs] def delete(self, save=True, force=False): """Deletes the file on disk. If save = True, the metadata object will also be deleted. Note: only delete the metadata object if no other DB object is referencing it, otherwise you'll get nasty Integrity errors! :param save: Whether to also delete the metadata in the DB, defaults to True :param force: Whether to force a deletion if multiple DB objects still refer to it, defaults to False """ if not self.storage.exists(self.name_on_disk): return # By default, only delete if there are no references in the DB anymore deletion_threshold = 0 # If we are instructed to also destroy our file_instance and we still # have a reference, we allow deletion with 1 more reference if save and self.file_instance: model = self.file_instance.__class__ if model.objects.filter(pk=self.file_instance.pk).exists(): deletion_threshold += 1 # Check if we only have the allowed amount number of references or fewer # If we have more, and we're not forcing a deletion, stop right here! if self.file_instance and \ self.file_instance._num_child_instances > deletion_threshold and \ not force: return # First, delete our metadata model. The check above _should_ make sure # we don't get integrity errors, but it's better to have this fail # because of those errors before we have actually deleted the file if save: self.file_instance.delete() # Only close the file if it's already open, which we know by the # presence of self._file if hasattr(self, '_file'): self.close() del self.file self.storage.delete(self.name_on_disk) self.original_filename = None self._committed = False self.file_instance.clear_file_wrappers()
delete.alters_data = True @property def closed(self): file = getattr(self, '_file', None) return file is None or file.closed
[docs] def close(self): file = getattr(self, '_file', None) if file is not None: file.close()
def __getstate__(self): # FieldFile needs access to its associated model field, an instance and # the file's name. Everything else will be restored later, by # FileDescriptor below. return { 'original_filename': self.original_filename, 'closed': False, '_committed': True, '_removed': self._removed, '_file': None, 'file_instance': self.file_instance, 'field': self.field, } def __setstate__(self, state): self.__dict__.update(state) self.storage = get_storage()
[docs]class PrivateCacheMixin:
[docs] def __init__(self, *args, **kwargs): self._cache = {}
[docs] def is_cached(self, key): return key in self._cache and self._cache[key] is not None
[docs] def get_cached_value(self, key): if self.is_cached(key): return self._cache[key] return None
[docs] def cache_value(self, key, value): self._cache[key] = value
[docs] def invalidate_cache_value(self, key): self._cache[key] = None
[docs] def invalidate_caches(self): self._cache = {}
[docs]class TrackedFileWrapper(PrivateCacheMixin):
[docs] def __init__(self, manager: Manager, instance, field: 'TrackedFileField'): super().__init__() self._manager = manager self._instance = instance self._field = field self._through_model = self._field.remote_field.through # This should retrieve the FileField actually holding the File; it's # a bit tricky to get it, as it's filename can differ if using custom # File model. self._file_field = self._through_model._meta.get_field( self._field.m2m_reverse_field_name() )
def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__name__} " \ f"for {self._instance.__class__.__module__}." \ f"{self._instance.__class__.__name__}." \ f"{self._field.attname}>" def _get_linking_instance(self, obj: Union[FileWrapper, File]) -> Optional: """Given a FileWrapper or File, this method will try to find the object linking it to an object.""" if not self._has_linking_instance(obj): return None if isinstance(obj, FileWrapper): obj = obj.file_instance kwargs = { self._file_field.attname: obj } return self._through_model.objects.get(**kwargs) def _has_linking_instance(self, obj: Union[FileWrapper, File]) -> bool: """Given a FileWrapper or File, this method will try to find the object linking it to an object.""" if isinstance(obj, FileWrapper): obj = obj.file_instance kwargs = { self._file_field.attname: obj } return self._through_model.objects.filter(**kwargs).exists() def _resolve_to_file_wrapper(self, obj: Union[ uuid.UUID, FileWrapper, 'BaseFile', int, str ]) -> Optional[FileWrapper]: """Tries to map the input to a FileWrapper in this M2M; accepts a FileWrapper itself, a File instance and an int/uuid for a File instance""" if isinstance(obj, FileWrapper): return obj if isinstance(obj, self._field.related_model): return obj.get_file_wrapper(self._file_field, False) if isinstance(obj, int): try: return self._resolve_to_file_wrapper(self._manager.get(pk=obj)) except self._field.related_model.DoesNotExist: pass if isinstance(obj, uuid.UUID): try: return self._resolve_to_file_wrapper(self._manager.get( uuid=obj)) except self._field.related_model.DoesNotExist: pass if isinstance(obj, str): try: obj = uuid.UUID(obj) return self._resolve_to_file_wrapper(obj) except ValueError: pass return None def _get_current_file(self) -> Optional[FileWrapper]: """Tries to retrieve the file representing the 'current' value of this field.""" if self.is_cached('current'): return self.get_cached_value('current') try: linking_instance = self._through_model.objects.get(current=True) except self._through_model.DoesNotExist: return None if linking_instance: value = getattr( linking_instance, self._field.m2m_reverse_field_name() ) self.cache_value( 'current', value ) return value return None def _set_current_file(self, value: Union[ uuid.UUID, FileWrapper, 'BaseFile', int, str ]) -> None: """Tries to set a new file representing the 'current' value of this field. It can be a file that's already tracked, or a new one. """ if value is None: self._del_current_file() return resolved_value = self._resolve_to_file_wrapper(value) # If we don't retrieved a value, just use .add; # Even if we get a value, the fact that we got a FileWrapper doesn't # mean it's _our_ FileWrapper. We could of course check FW._field or # FW.file_instance._child_fields; however, it's way easier to just do # an .exists check on the linking table. (As that's cheaper in terms # of computation and DB access). if not resolved_value or not self._has_linking_instance(resolved_value): return self.add(value) resolved_value.save() self._set_as_current(resolved_value) def _del_current_file(self) -> None: """Deletes the file currently represeting the """ self.invalidate_caches() current_file = self._get_current_file() if current_file: self.delete(current_file) current_file = property( _get_current_file, _set_current_file, _del_current_file ) current_file.fset.alters_data = True current_file.fdel.alters_data = True def _set_as_current(self, file_wrapper: FileWrapper) -> None: self._through_model.objects.update(current=False) link = self._get_linking_instance(file_wrapper) link.current = True link.save() self.cache_value('current', file_wrapper) @property def all(self): return map( lambda file: self._resolve_to_file_wrapper(file), self._manager.all().order_by('-created_on') )
[docs] def add(self, file: Union[ uuid.UUID, FileWrapper, 'BaseFile', int, str ]) -> None: through_obj = self._through_model() setattr( through_obj, self._field.m2m_field_name(), self._instance ) setattr( through_obj, self._field.m2m_reverse_field_name(), file ) file_wrapper = getattr(through_obj, self._field.m2m_reverse_field_name()) file_wrapper.save() through_obj.save() self._set_as_current(file_wrapper)
add.alters_data = True
[docs] def set_as_current(self, file: Union[ uuid.UUID, FileWrapper, 'BaseFile', int, str ]) -> None: if file is None: raise ValueError("Cannot set None as current! (Did you mean to " "delete stuff?)") file = self._resolve_to_file_wrapper(file) self._set_as_current(file)
set_as_current.alters_data = True
[docs] def delete(self, file: Union[ uuid.UUID, FileWrapper, 'BaseFile', int, str ]) -> None: if file is None: raise ValueError("Cannot delete None!") file = self._resolve_to_file_wrapper(file) link = self._get_linking_instance(file) link.delete() file.delete() if link.current: self.invalidate_caches()
delete.alters_data = True
[docs] def delete_all(self) -> None: """Delete all files currently tracked""" self._manager.all().delete() self.invalidate_caches()
delete_all.alters_data = True