Bases: DataObjectSupportingAggregation
Represents a CSV Aggregation in HydroShare
Source code in hsclient\hydroshare.py
class CSVAggregation(DataObjectSupportingAggregation):
"""Represents a CSV Aggregation in HydroShare"""
@classmethod
def create(cls, base_aggr):
return super().create(aggr_cls=cls, base_aggr=base_aggr)
def as_data_object(self, agg_path: str) -> 'pandas.DataFrame':
"""
Loads the CSV aggregation to a pandas DataFrame object
:param agg_path: the path to the Time Series aggregation
:return: the CSV aggregation as a pandas DataFrame object
"""
if pandas is None:
raise Exception("pandas package not found")
return self._get_data_object(agg_path=agg_path, func=pandas.read_csv, comment="#", dtype="string",
engine="python")
def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False,
destination_path: str = "") -> 'Aggregation':
"""
Saves the pandas DataFrame object to the CSV aggregation
:param resource: the resource containing the aggregation
:param agg_path: the path to the CSV aggregation
:param as_new_aggr: Defaults False, set to True to create a new CSV aggregation
:param destination_path: the destination path in Hydroshare to save the new aggregation
:return: the updated or new CSV aggregation
"""
self._validate_aggregation_for_update(resource, AggregationType.CSVFileAggregation)
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
self._data_object.to_csv(file_path, index=False)
aggr_main_file_path = self.main_file_path
data_object = self._data_object
if not as_new_aggr:
# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata
title = self.metadata.title
# upload the updated aggregation files to the temp folder - to create the updated aggregation
self._update_aggregation(resource, file_path)
# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)
# update metadata
for kw in keywords:
if kw not in aggr.metadata.subjects:
aggr.metadata.subjects.append(kw)
aggr.metadata.additional_metadata = additional_meta
aggr.metadata.title = title
aggr.save()
else:
# creating a new aggregation by uploading the updated data files
resource.file_upload(file_path, destination_path=destination_path)
# retrieve the new aggregation
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)
data_object = None
aggr._data_object = data_object
return aggr
as_data_object(agg_path)
Loads the CSV aggregation to a pandas DataFrame object :param agg_path: the path to the Time Series aggregation :return: the CSV aggregation as a pandas DataFrame object
Source code in hsclient\hydroshare.py
def as_data_object(self, agg_path: str) -> 'pandas.DataFrame':
"""
Loads the CSV aggregation to a pandas DataFrame object
:param agg_path: the path to the Time Series aggregation
:return: the CSV aggregation as a pandas DataFrame object
"""
if pandas is None:
raise Exception("pandas package not found")
return self._get_data_object(agg_path=agg_path, func=pandas.read_csv, comment="#", dtype="string",
engine="python")
save_data_object(resource, agg_path, as_new_aggr=False, destination_path='')
Saves the pandas DataFrame object to the CSV aggregation :param resource: the resource containing the aggregation :param agg_path: the path to the CSV aggregation :param as_new_aggr: Defaults False, set to True to create a new CSV aggregation :param destination_path: the destination path in Hydroshare to save the new aggregation :return: the updated or new CSV aggregation
Source code in hsclient\hydroshare.py
def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False,
destination_path: str = "") -> 'Aggregation':
"""
Saves the pandas DataFrame object to the CSV aggregation
:param resource: the resource containing the aggregation
:param agg_path: the path to the CSV aggregation
:param as_new_aggr: Defaults False, set to True to create a new CSV aggregation
:param destination_path: the destination path in Hydroshare to save the new aggregation
:return: the updated or new CSV aggregation
"""
self._validate_aggregation_for_update(resource, AggregationType.CSVFileAggregation)
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
self._data_object.to_csv(file_path, index=False)
aggr_main_file_path = self.main_file_path
data_object = self._data_object
if not as_new_aggr:
# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata
title = self.metadata.title
# upload the updated aggregation files to the temp folder - to create the updated aggregation
self._update_aggregation(resource, file_path)
# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)
# update metadata
for kw in keywords:
if kw not in aggr.metadata.subjects:
aggr.metadata.subjects.append(kw)
aggr.metadata.additional_metadata = additional_meta
aggr.metadata.title = title
aggr.save()
else:
# creating a new aggregation by uploading the updated data files
resource.file_upload(file_path, destination_path=destination_path)
# retrieve the new aggregation
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)
data_object = None
aggr._data_object = data_object
return aggr