Represents an Aggregation in HydroShare

Source code in hsclient\hydroshare.py
class Aggregation:
    """Represents an Aggregation in HydroShare"""

    def __init__(self, map_path, hs_session, checksums=None):
        self._map_path = map_path
        self._hs_session = hs_session
        self._retrieved_map = None
        self._retrieved_metadata = None
        self._parsed_files = None
        self._parsed_aggregations = None
        self._parsed_checksums = checksums
        self._main_file_path = None

    def __str__(self):
        return self._map_path

    @property
    def _map(self):
        if not self._retrieved_map:
            self._retrieved_map = self._retrieve_and_parse(self._map_path)
        return self._retrieved_map

    @property
    def _metadata(self):
        if not self._retrieved_metadata:
            self._retrieved_metadata = self._retrieve_and_parse(self.metadata_path)
        return self._retrieved_metadata

    @property
    def _checksums(self):
        if not self._parsed_checksums:
            self._parsed_checksums = self._retrieve_checksums(self._checksums_path)
        return self._parsed_checksums

    @property
    def _files(self):
        if not self._parsed_files:
            self._parsed_files = []
            for file in self._map.describes.files:
                if not is_aggregation(str(file)):
                    if not file.path == self.metadata_path:
                        if not str(file.path).endswith('/'):  # checking for folders, shouldn't have to do this
                            file_checksum_path = file.path.split(self._resource_path, 1)[1].strip("/")
                            file_path = unquote(
                                file_checksum_path.split(
                                    "data/contents/",
                                )[1]
                            )
                            f = File(file_path, unquote(file.path), self._checksums[file_checksum_path])
                            self._parsed_files.append(f)
        return self._parsed_files

    @property
    def _aggregations(self):

        def populate_metadata(_aggr):
            _aggr._metadata

        if not self._parsed_aggregations:
            self._parsed_aggregations = []
            for file in self._map.describes.files:
                if is_aggregation(str(file)):
                    self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, self._checksums))

            # load metadata for all aggregations (metadata is needed to create any typed aggregation)
            with ThreadPoolExecutor() as executor:
                executor.map(populate_metadata, self._parsed_aggregations)

            # convert aggregations to aggregation type supporting data object
            aggregations_copy = self._parsed_aggregations[:]
            typed_aggregation_classes = {AggregationType.MultidimensionalAggregation: NetCDFAggregation,
                                         AggregationType.TimeSeriesAggregation: TimeseriesAggregation,
                                         AggregationType.GeographicRasterAggregation: GeoRasterAggregation,
                                         AggregationType.GeographicFeatureAggregation: GeoFeatureAggregation,
                                         AggregationType.CSVFileAggregation: CSVAggregation
                                         }
            for aggr in aggregations_copy:
                typed_aggr_cls = typed_aggregation_classes.get(aggr.metadata.type, None)
                if typed_aggr_cls:
                    typed_aggr = typed_aggr_cls.create(base_aggr=aggr)
                    # swapping the generic aggregation with the typed aggregation in the aggregation list
                    self._parsed_aggregations.remove(aggr)
                    self._parsed_aggregations.append(typed_aggr)

        return self._parsed_aggregations

    @property
    def _checksums_path(self):
        path = self.metadata_path.split("/data/", 1)[0]
        path = urljoin(path, "manifest-md5.txt")
        return path

    @property
    def _hsapi_path(self):
        resource_path = self._resource_path
        hsapi_path = urljoin("hsapi", resource_path)
        return hsapi_path

    @property
    def _resource_path(self):
        resource_path = self.metadata_path[: len("/resource/b4ce17c17c654a5c8004af73f2df87ab/")].strip("/")
        return resource_path

    def _retrieve_and_parse(self, path):
        file_str = self._hs_session.retrieve_string(path)
        instance = load_rdf(file_str)
        return instance

    def _retrieve_checksums(self, path):
        file_str = self._hs_session.retrieve_string(path)
        # split string by lines, then split line by delimiter into a dict
        delimiter = "    "
        data = {
            quote(path): checksum for checksum, path in [line.split(delimiter) for line in file_str.split("\n") if line]
        }
        return data

    def _download(self, save_path: str = "", unzip_to: str = None) -> str:
        main_file_path = self.main_file_path

        path = urljoin(self._resource_path, "data", "contents", main_file_path)
        params = {"zipped": "true", "aggregation": "true"}
        path = path.replace('resource', 'django_irods/rest_download', 1)
        downloaded_zip = self._hs_session.retrieve_zip(path, save_path=save_path, params=params)

        if unzip_to:
            import zipfile

            with zipfile.ZipFile(downloaded_zip, 'r') as zip_ref:
                zip_ref.extractall(unzip_to)
            os.remove(downloaded_zip)
            return unzip_to
        return downloaded_zip

    @property
    def metadata_file(self):
        """The path to the metadata file"""
        return self.metadata_path.split("/data/contents/", 1)[1]

    @property
    def metadata(self) -> BaseMetadata:
        """A metadata object for reading and updating metadata values"""
        return self._metadata

    @property
    def metadata_path(self) -> str:
        """The path to the metadata file"""
        return urlparse(str(self._map.describes.is_documented_by)).path

    @property
    def main_file_path(self) -> str:
        """The path to the main file in the aggregation"""
        if self._main_file_path is not None:
            return self._main_file_path
        mft = main_file_type(self.metadata.type)
        if mft:
            for file in self.files():
                if str(file).endswith(mft):
                    self._main_file_path = file.path
                    return self._main_file_path
        if self.metadata.type == AggregationType.FileSetAggregation:
            self._main_file_path = self.files()[0].folder
            return self._main_file_path
        self._main_file_path = self.files()[0].path
        return self._main_file_path

    @refresh
    def save(self) -> None:
        """
        Saves the metadata back to HydroShare
        :return: None
        """
        metadata_file = self.metadata_file
        metadata_string = rdf_string(self._retrieved_metadata, rdf_format="xml")
        url = urljoin(self._hsapi_path, "ingest_metadata")
        self._hs_session.upload_file(url, files={'file': (metadata_file, metadata_string)})

    def files(self, search_aggregations: bool = False, **kwargs) -> List[File]:
        """
        List files and filter by properties on the file object using kwargs (i.e. extension='.txt')
        :param search_aggregations: Defaults False, set to true to search aggregations
        :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
        :return: a List of File objects matching the filter parameters
        """
        files = self._files
        for key, value in kwargs.items():
            files = list(filter(lambda file: attribute_filter(file, key, value), files))
        if search_aggregations:
            for aggregation in self.aggregations():
                files = files + list(aggregation.files(search_aggregations=search_aggregations, **kwargs))
        return files

    def file(self, search_aggregations=False, **kwargs) -> File:
        """
        Returns a single file in the resource that matches the filtering parameters
        :param search_aggregations: Defaults False, set to true to search aggregations
        :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
        :return: A File object matching the filter parameters or None if no matching File was found
        """
        files = self.files(search_aggregations=search_aggregations, **kwargs)
        if files:
            return files[0]
        return None

    def aggregations(self, **kwargs) -> List[BaseMetadata]:
        """
        List the aggregations in the resource.  Filter by properties on the metadata object using kwargs.  If you need
        to filter on nested properties, use __ (double underscore) to separate the properties.  For example, to filter
        by the BandInformation name, call this method like aggregations(band_information__name="the name to search").
        :params **kwargs: Search by properties on the metadata object
        :return: a List of Aggregation objects matching the filter parameters
        """
        aggregations = self._aggregations

        for key, value in kwargs.items():
            if key.startswith('file__'):
                file_args = {key[len('file__'):]: value}
                aggregations = [agg for agg in aggregations if agg.files(**file_args)]
            elif key.startswith('files__'):
                file_args = {key[len('files__'):]: value}
                aggregations = [agg for agg in aggregations if agg.files(**file_args)]
            else:
                aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations)
        return list(aggregations)

    def aggregation(self, **kwargs) -> BaseMetadata:
        """
        Returns a single Aggregation in the resource that matches the filtering parameters.  Uses the same filtering
        rules described in the aggregations method.
        :params **kwargs: Search by properties on the metadata object
        :return: An Aggregation object matching the filter parameters or None if no matching Aggregation was found.
        """
        aggregations = self.aggregations(**kwargs)
        if aggregations:
            return aggregations[0]
        return None

    def refresh(self) -> None:
        """
        Forces the retrieval of the resource map and metadata files.  Currently this is implemented to be lazy and will
        only retrieve those files again after another call to access them is made.  This will be later updated to be
        eager and retrieve the files asynchronously.
        """
        # TODO, refresh should destroy the aggregation objects and async fetch everything.
        self._retrieved_map = None
        self._retrieved_metadata = None
        self._parsed_files = None
        self._parsed_aggregations = None
        self._parsed_checksums = None
        self._main_file_path = None

    def delete(self) -> None:
        """Deletes this aggregation from HydroShare"""
        path = urljoin(
            self._hsapi_path,
            "functions",
            "delete-file-type",
            self.metadata.type.value + "LogicalFile",
            self.main_file_path,
        )
        self._hs_session.delete(path, status_code=200)
        self.refresh()

main_file_path: str property

The path to the main file in the aggregation

metadata: BaseMetadata property

A metadata object for reading and updating metadata values

metadata_file property

The path to the metadata file

metadata_path: str property

The path to the metadata file

aggregation(**kwargs)

Returns a single Aggregation in the resource that matches the filtering parameters. Uses the same filtering rules described in the aggregations method. :params **kwargs: Search by properties on the metadata object :return: An Aggregation object matching the filter parameters or None if no matching Aggregation was found.

Source code in hsclient\hydroshare.py
def aggregation(self, **kwargs) -> BaseMetadata:
    """
    Returns a single Aggregation in the resource that matches the filtering parameters.  Uses the same filtering
    rules described in the aggregations method.
    :params **kwargs: Search by properties on the metadata object
    :return: An Aggregation object matching the filter parameters or None if no matching Aggregation was found.
    """
    aggregations = self.aggregations(**kwargs)
    if aggregations:
        return aggregations[0]
    return None

aggregations(**kwargs)

List the aggregations in the resource. Filter by properties on the metadata object using kwargs. If you need to filter on nested properties, use __ (double underscore) to separate the properties. For example, to filter by the BandInformation name, call this method like aggregations(band_information__name="the name to search"). :params **kwargs: Search by properties on the metadata object :return: a List of Aggregation objects matching the filter parameters

Source code in hsclient\hydroshare.py
def aggregations(self, **kwargs) -> List[BaseMetadata]:
    """
    List the aggregations in the resource.  Filter by properties on the metadata object using kwargs.  If you need
    to filter on nested properties, use __ (double underscore) to separate the properties.  For example, to filter
    by the BandInformation name, call this method like aggregations(band_information__name="the name to search").
    :params **kwargs: Search by properties on the metadata object
    :return: a List of Aggregation objects matching the filter parameters
    """
    aggregations = self._aggregations

    for key, value in kwargs.items():
        if key.startswith('file__'):
            file_args = {key[len('file__'):]: value}
            aggregations = [agg for agg in aggregations if agg.files(**file_args)]
        elif key.startswith('files__'):
            file_args = {key[len('files__'):]: value}
            aggregations = [agg for agg in aggregations if agg.files(**file_args)]
        else:
            aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations)
    return list(aggregations)

delete()

Deletes this aggregation from HydroShare

Source code in hsclient\hydroshare.py
def delete(self) -> None:
    """Deletes this aggregation from HydroShare"""
    path = urljoin(
        self._hsapi_path,
        "functions",
        "delete-file-type",
        self.metadata.type.value + "LogicalFile",
        self.main_file_path,
    )
    self._hs_session.delete(path, status_code=200)
    self.refresh()

file(search_aggregations=False, **kwargs)

Returns a single file in the resource that matches the filtering parameters :param search_aggregations: Defaults False, set to true to search aggregations :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url) :return: A File object matching the filter parameters or None if no matching File was found

Source code in hsclient\hydroshare.py
def file(self, search_aggregations=False, **kwargs) -> File:
    """
    Returns a single file in the resource that matches the filtering parameters
    :param search_aggregations: Defaults False, set to true to search aggregations
    :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
    :return: A File object matching the filter parameters or None if no matching File was found
    """
    files = self.files(search_aggregations=search_aggregations, **kwargs)
    if files:
        return files[0]
    return None

files(search_aggregations=False, **kwargs)

List files and filter by properties on the file object using kwargs (i.e. extension='.txt') :param search_aggregations: Defaults False, set to true to search aggregations :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url) :return: a List of File objects matching the filter parameters

Source code in hsclient\hydroshare.py
def files(self, search_aggregations: bool = False, **kwargs) -> List[File]:
    """
    List files and filter by properties on the file object using kwargs (i.e. extension='.txt')
    :param search_aggregations: Defaults False, set to true to search aggregations
    :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
    :return: a List of File objects matching the filter parameters
    """
    files = self._files
    for key, value in kwargs.items():
        files = list(filter(lambda file: attribute_filter(file, key, value), files))
    if search_aggregations:
        for aggregation in self.aggregations():
            files = files + list(aggregation.files(search_aggregations=search_aggregations, **kwargs))
    return files

refresh()

Forces the retrieval of the resource map and metadata files. Currently this is implemented to be lazy and will only retrieve those files again after another call to access them is made. This will be later updated to be eager and retrieve the files asynchronously.

Source code in hsclient\hydroshare.py
def refresh(self) -> None:
    """
    Forces the retrieval of the resource map and metadata files.  Currently this is implemented to be lazy and will
    only retrieve those files again after another call to access them is made.  This will be later updated to be
    eager and retrieve the files asynchronously.
    """
    # TODO, refresh should destroy the aggregation objects and async fetch everything.
    self._retrieved_map = None
    self._retrieved_metadata = None
    self._parsed_files = None
    self._parsed_aggregations = None
    self._parsed_checksums = None
    self._main_file_path = None

save()

Saves the metadata back to HydroShare :return: None

Source code in hsclient\hydroshare.py
@refresh
def save(self) -> None:
    """
    Saves the metadata back to HydroShare
    :return: None
    """
    metadata_file = self.metadata_file
    metadata_string = rdf_string(self._retrieved_metadata, rdf_format="xml")
    url = urljoin(self._hsapi_path, "ingest_metadata")
    self._hs_session.upload_file(url, files={'file': (metadata_file, metadata_string)})