Represents an Aggregation in HydroShare
Source code in hsclient\hydroshare.py
class Aggregation:
"""Represents an Aggregation in HydroShare"""
def __init__(self, map_path, hs_session, checksums=None):
self._map_path = map_path
self._hs_session = hs_session
self._retrieved_map = None
self._retrieved_metadata = None
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = checksums
self._main_file_path = None
def __str__(self):
return self._map_path
@property
def _map(self):
if not self._retrieved_map:
self._retrieved_map = self._retrieve_and_parse(self._map_path)
return self._retrieved_map
@property
def _metadata(self):
if not self._retrieved_metadata:
self._retrieved_metadata = self._retrieve_and_parse(self.metadata_path)
return self._retrieved_metadata
@property
def _checksums(self):
if not self._parsed_checksums:
self._parsed_checksums = self._retrieve_checksums(self._checksums_path)
return self._parsed_checksums
@property
def _files(self):
if not self._parsed_files:
self._parsed_files = []
for file in self._map.describes.files:
if not is_aggregation(str(file)):
if not file.path == self.metadata_path:
if not str(file.path).endswith('/'): # checking for folders, shouldn't have to do this
file_checksum_path = file.path.split(self._resource_path, 1)[1].strip("/")
file_path = unquote(
file_checksum_path.split(
"data/contents/",
)[1]
)
f = File(file_path, unquote(file.path), self._checksums[file_checksum_path])
self._parsed_files.append(f)
return self._parsed_files
@property
def _aggregations(self):
def populate_metadata(_aggr):
_aggr._metadata
if not self._parsed_aggregations:
self._parsed_aggregations = []
for file in self._map.describes.files:
if is_aggregation(str(file)):
self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, self._checksums))
# load metadata for all aggregations (metadata is needed to create any typed aggregation)
with ThreadPoolExecutor() as executor:
executor.map(populate_metadata, self._parsed_aggregations)
# convert aggregations to aggregation type supporting data object
aggregations_copy = self._parsed_aggregations[:]
typed_aggregation_classes = {AggregationType.MultidimensionalAggregation: NetCDFAggregation,
AggregationType.TimeSeriesAggregation: TimeseriesAggregation,
AggregationType.GeographicRasterAggregation: GeoRasterAggregation,
AggregationType.GeographicFeatureAggregation: GeoFeatureAggregation,
AggregationType.CSVFileAggregation: CSVAggregation
}
for aggr in aggregations_copy:
typed_aggr_cls = typed_aggregation_classes.get(aggr.metadata.type, None)
if typed_aggr_cls:
typed_aggr = typed_aggr_cls.create(base_aggr=aggr)
# swapping the generic aggregation with the typed aggregation in the aggregation list
self._parsed_aggregations.remove(aggr)
self._parsed_aggregations.append(typed_aggr)
return self._parsed_aggregations
@property
def _checksums_path(self):
path = self.metadata_path.split("/data/", 1)[0]
path = urljoin(path, "manifest-md5.txt")
return path
@property
def _hsapi_path(self):
resource_path = self._resource_path
hsapi_path = urljoin("hsapi", resource_path)
return hsapi_path
@property
def _resource_path(self):
resource_path = self.metadata_path[: len("/resource/b4ce17c17c654a5c8004af73f2df87ab/")].strip("/")
return resource_path
def _retrieve_and_parse(self, path):
file_str = self._hs_session.retrieve_string(path)
instance = load_rdf(file_str)
return instance
def _retrieve_checksums(self, path):
file_str = self._hs_session.retrieve_string(path)
# split string by lines, then split line by delimiter into a dict
delimiter = " "
data = {
quote(path): checksum for checksum, path in [line.split(delimiter) for line in file_str.split("\n") if line]
}
return data
def _download(self, save_path: str = "", unzip_to: str = None) -> str:
main_file_path = self.main_file_path
path = urljoin(self._resource_path, "data", "contents", main_file_path)
params = {"zipped": "true", "aggregation": "true"}
path = path.replace('resource', 'django_irods/rest_download', 1)
downloaded_zip = self._hs_session.retrieve_zip(path, save_path=save_path, params=params)
if unzip_to:
import zipfile
with zipfile.ZipFile(downloaded_zip, 'r') as zip_ref:
zip_ref.extractall(unzip_to)
os.remove(downloaded_zip)
return unzip_to
return downloaded_zip
@property
def metadata_file(self):
"""The path to the metadata file"""
return self.metadata_path.split("/data/contents/", 1)[1]
@property
def metadata(self) -> BaseMetadata:
"""A metadata object for reading and updating metadata values"""
return self._metadata
@property
def metadata_path(self) -> str:
"""The path to the metadata file"""
return urlparse(str(self._map.describes.is_documented_by)).path
@property
def main_file_path(self) -> str:
"""The path to the main file in the aggregation"""
if self._main_file_path is not None:
return self._main_file_path
mft = main_file_type(self.metadata.type)
if mft:
for file in self.files():
if str(file).endswith(mft):
self._main_file_path = file.path
return self._main_file_path
if self.metadata.type == AggregationType.FileSetAggregation:
self._main_file_path = self.files()[0].folder
return self._main_file_path
self._main_file_path = self.files()[0].path
return self._main_file_path
@refresh
def save(self) -> None:
"""
Saves the metadata back to HydroShare
:return: None
"""
metadata_file = self.metadata_file
metadata_string = rdf_string(self._retrieved_metadata, rdf_format="xml")
url = urljoin(self._hsapi_path, "ingest_metadata")
self._hs_session.upload_file(url, files={'file': (metadata_file, metadata_string)})
def files(self, search_aggregations: bool = False, **kwargs) -> List[File]:
"""
List files and filter by properties on the file object using kwargs (i.e. extension='.txt')
:param search_aggregations: Defaults False, set to true to search aggregations
:params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
:return: a List of File objects matching the filter parameters
"""
files = self._files
for key, value in kwargs.items():
files = list(filter(lambda file: attribute_filter(file, key, value), files))
if search_aggregations:
for aggregation in self.aggregations():
files = files + list(aggregation.files(search_aggregations=search_aggregations, **kwargs))
return files
def file(self, search_aggregations=False, **kwargs) -> File:
"""
Returns a single file in the resource that matches the filtering parameters
:param search_aggregations: Defaults False, set to true to search aggregations
:params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
:return: A File object matching the filter parameters or None if no matching File was found
"""
files = self.files(search_aggregations=search_aggregations, **kwargs)
if files:
return files[0]
return None
def aggregations(self, **kwargs) -> List[BaseMetadata]:
"""
List the aggregations in the resource. Filter by properties on the metadata object using kwargs. If you need
to filter on nested properties, use __ (double underscore) to separate the properties. For example, to filter
by the BandInformation name, call this method like aggregations(band_information__name="the name to search").
:params **kwargs: Search by properties on the metadata object
:return: a List of Aggregation objects matching the filter parameters
"""
aggregations = self._aggregations
for key, value in kwargs.items():
if key.startswith('file__'):
file_args = {key[len('file__'):]: value}
aggregations = [agg for agg in aggregations if agg.files(**file_args)]
elif key.startswith('files__'):
file_args = {key[len('files__'):]: value}
aggregations = [agg for agg in aggregations if agg.files(**file_args)]
else:
aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations)
return list(aggregations)
def aggregation(self, **kwargs) -> BaseMetadata:
"""
Returns a single Aggregation in the resource that matches the filtering parameters. Uses the same filtering
rules described in the aggregations method.
:params **kwargs: Search by properties on the metadata object
:return: An Aggregation object matching the filter parameters or None if no matching Aggregation was found.
"""
aggregations = self.aggregations(**kwargs)
if aggregations:
return aggregations[0]
return None
def refresh(self) -> None:
"""
Forces the retrieval of the resource map and metadata files. Currently this is implemented to be lazy and will
only retrieve those files again after another call to access them is made. This will be later updated to be
eager and retrieve the files asynchronously.
"""
# TODO, refresh should destroy the aggregation objects and async fetch everything.
self._retrieved_map = None
self._retrieved_metadata = None
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = None
self._main_file_path = None
def delete(self) -> None:
"""Deletes this aggregation from HydroShare"""
path = urljoin(
self._hsapi_path,
"functions",
"delete-file-type",
self.metadata.type.value + "LogicalFile",
self.main_file_path,
)
self._hs_session.delete(path, status_code=200)
self.refresh()
main_file_path: str
property
The path to the main file in the aggregation
metadata: BaseMetadata
property
A metadata object for reading and updating metadata values
metadata_file
property
The path to the metadata file
metadata_path: str
property
The path to the metadata file
aggregation(**kwargs)
Returns a single Aggregation in the resource that matches the filtering parameters. Uses the same filtering rules described in the aggregations method. :params **kwargs: Search by properties on the metadata object :return: An Aggregation object matching the filter parameters or None if no matching Aggregation was found.
Source code in hsclient\hydroshare.py
def aggregation(self, **kwargs) -> BaseMetadata:
"""
Returns a single Aggregation in the resource that matches the filtering parameters. Uses the same filtering
rules described in the aggregations method.
:params **kwargs: Search by properties on the metadata object
:return: An Aggregation object matching the filter parameters or None if no matching Aggregation was found.
"""
aggregations = self.aggregations(**kwargs)
if aggregations:
return aggregations[0]
return None
aggregations(**kwargs)
List the aggregations in the resource. Filter by properties on the metadata object using kwargs. If you need to filter on nested properties, use __ (double underscore) to separate the properties. For example, to filter by the BandInformation name, call this method like aggregations(band_information__name="the name to search"). :params **kwargs: Search by properties on the metadata object :return: a List of Aggregation objects matching the filter parameters
Source code in hsclient\hydroshare.py
def aggregations(self, **kwargs) -> List[BaseMetadata]:
"""
List the aggregations in the resource. Filter by properties on the metadata object using kwargs. If you need
to filter on nested properties, use __ (double underscore) to separate the properties. For example, to filter
by the BandInformation name, call this method like aggregations(band_information__name="the name to search").
:params **kwargs: Search by properties on the metadata object
:return: a List of Aggregation objects matching the filter parameters
"""
aggregations = self._aggregations
for key, value in kwargs.items():
if key.startswith('file__'):
file_args = {key[len('file__'):]: value}
aggregations = [agg for agg in aggregations if agg.files(**file_args)]
elif key.startswith('files__'):
file_args = {key[len('files__'):]: value}
aggregations = [agg for agg in aggregations if agg.files(**file_args)]
else:
aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations)
return list(aggregations)
delete()
Deletes this aggregation from HydroShare
Source code in hsclient\hydroshare.py
def delete(self) -> None:
"""Deletes this aggregation from HydroShare"""
path = urljoin(
self._hsapi_path,
"functions",
"delete-file-type",
self.metadata.type.value + "LogicalFile",
self.main_file_path,
)
self._hs_session.delete(path, status_code=200)
self.refresh()
file(search_aggregations=False, **kwargs)
Returns a single file in the resource that matches the filtering parameters :param search_aggregations: Defaults False, set to true to search aggregations :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url) :return: A File object matching the filter parameters or None if no matching File was found
Source code in hsclient\hydroshare.py
def file(self, search_aggregations=False, **kwargs) -> File:
"""
Returns a single file in the resource that matches the filtering parameters
:param search_aggregations: Defaults False, set to true to search aggregations
:params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
:return: A File object matching the filter parameters or None if no matching File was found
"""
files = self.files(search_aggregations=search_aggregations, **kwargs)
if files:
return files[0]
return None
files(search_aggregations=False, **kwargs)
List files and filter by properties on the file object using kwargs (i.e. extension='.txt') :param search_aggregations: Defaults False, set to true to search aggregations :params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url) :return: a List of File objects matching the filter parameters
Source code in hsclient\hydroshare.py
def files(self, search_aggregations: bool = False, **kwargs) -> List[File]:
"""
List files and filter by properties on the file object using kwargs (i.e. extension='.txt')
:param search_aggregations: Defaults False, set to true to search aggregations
:params **kwargs: Search by properties on the File object (path, name, extension, folder, checksum url)
:return: a List of File objects matching the filter parameters
"""
files = self._files
for key, value in kwargs.items():
files = list(filter(lambda file: attribute_filter(file, key, value), files))
if search_aggregations:
for aggregation in self.aggregations():
files = files + list(aggregation.files(search_aggregations=search_aggregations, **kwargs))
return files
refresh()
Forces the retrieval of the resource map and metadata files. Currently this is implemented to be lazy and will only retrieve those files again after another call to access them is made. This will be later updated to be eager and retrieve the files asynchronously.
Source code in hsclient\hydroshare.py
def refresh(self) -> None:
"""
Forces the retrieval of the resource map and metadata files. Currently this is implemented to be lazy and will
only retrieve those files again after another call to access them is made. This will be later updated to be
eager and retrieve the files asynchronously.
"""
# TODO, refresh should destroy the aggregation objects and async fetch everything.
self._retrieved_map = None
self._retrieved_metadata = None
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = None
self._main_file_path = None
save()
Saves the metadata back to HydroShare :return: None
Source code in hsclient\hydroshare.py
@refresh
def save(self) -> None:
"""
Saves the metadata back to HydroShare
:return: None
"""
metadata_file = self.metadata_file
metadata_string = rdf_string(self._retrieved_metadata, rdf_format="xml")
url = urljoin(self._hsapi_path, "ingest_metadata")
self._hs_session.upload_file(url, files={'file': (metadata_file, metadata_string)})