Bases: DataObjectSupportingAggregation
Represents a Time Series Aggregation in HydroShare
Source code in hsclient\hydroshare.py
class TimeseriesAggregation(DataObjectSupportingAggregation):
"""Represents a Time Series Aggregation in HydroShare"""
@classmethod
def create(cls, base_aggr):
return super().create(aggr_cls=cls, base_aggr=base_aggr)
def as_data_object(self, agg_path: str, series_id: str = "") -> 'pandas.DataFrame':
"""
Loads the Time Series aggregation to a pandas DataFrame object
:param agg_path: the path to the Time Series aggregation
:param series_id: the series id of the time series to retrieve
:return: the Time Series aggregation as a pandas DataFrame object
"""
if pandas is None:
raise Exception("pandas package not found")
def to_series(timeseries_file: str):
con = sqlite3.connect(timeseries_file)
return pandas.read_sql(
f'SELECT * FROM TimeSeriesResultValues WHERE ResultID IN '
f'(SELECT ResultID FROM Results WHERE ResultUUID = "{series_id}");',
con,
).squeeze()
return self._get_data_object(agg_path=agg_path, func=to_series)
def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False,
destination_path: str = "") -> 'Aggregation':
"""
Saves the pandas DataFrame object to the Time Series aggregation
:param resource: the resource containing the aggregation
:param agg_path: the path to the Time Series aggregation
:param as_new_aggr: Defaults False, set to True to create a new Time Series aggregation
:param destination_path: the destination path in Hydroshare to save the new aggregation
:return: the updated or new Time Series aggregation
"""
self._validate_aggregation_for_update(resource, AggregationType.TimeSeriesAggregation)
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
with closing(sqlite3.connect(file_path)) as conn:
# write the dataframe to a temp table
self._data_object.to_sql('temp', conn, if_exists='replace', index=False)
# delete the matching records from the TimeSeriesResultValues table
conn.execute("DELETE FROM TimeSeriesResultValues WHERE ResultID IN (SELECT ResultID FROM temp)")
conn.execute("INSERT INTO TimeSeriesResultValues SELECT * FROM temp")
# delete the temp table
conn.execute("DROP TABLE temp")
conn.commit()
aggr_main_file_path = self.main_file_path
data_object = self._data_object
if not as_new_aggr:
# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata
title = self.metadata.title
abstract = self.metadata.abstract
# upload the updated aggregation files to the temp folder - to create the updated aggregation
self._update_aggregation(resource, file_path)
# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)
# update metadata
for kw in keywords:
if kw not in aggr.metadata.subjects:
aggr.metadata.subjects.append(kw)
aggr.metadata.additional_metadata = additional_meta
aggr.metadata.title = title
aggr.metadata.abstract = abstract
aggr.save()
else:
# creating a new aggregation by uploading the updated data files
resource.file_upload(file_path, destination_path=destination_path)
# retrieve the new aggregation
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)
data_object = None
aggr._data_object = data_object
return aggr
as_data_object(agg_path, series_id='')
Loads the Time Series aggregation to a pandas DataFrame object :param agg_path: the path to the Time Series aggregation :param series_id: the series id of the time series to retrieve :return: the Time Series aggregation as a pandas DataFrame object
Source code in hsclient\hydroshare.py
def as_data_object(self, agg_path: str, series_id: str = "") -> 'pandas.DataFrame':
"""
Loads the Time Series aggregation to a pandas DataFrame object
:param agg_path: the path to the Time Series aggregation
:param series_id: the series id of the time series to retrieve
:return: the Time Series aggregation as a pandas DataFrame object
"""
if pandas is None:
raise Exception("pandas package not found")
def to_series(timeseries_file: str):
con = sqlite3.connect(timeseries_file)
return pandas.read_sql(
f'SELECT * FROM TimeSeriesResultValues WHERE ResultID IN '
f'(SELECT ResultID FROM Results WHERE ResultUUID = "{series_id}");',
con,
).squeeze()
return self._get_data_object(agg_path=agg_path, func=to_series)
save_data_object(resource, agg_path, as_new_aggr=False, destination_path='')
Saves the pandas DataFrame object to the Time Series aggregation :param resource: the resource containing the aggregation :param agg_path: the path to the Time Series aggregation :param as_new_aggr: Defaults False, set to True to create a new Time Series aggregation :param destination_path: the destination path in Hydroshare to save the new aggregation :return: the updated or new Time Series aggregation
Source code in hsclient\hydroshare.py
def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False,
destination_path: str = "") -> 'Aggregation':
"""
Saves the pandas DataFrame object to the Time Series aggregation
:param resource: the resource containing the aggregation
:param agg_path: the path to the Time Series aggregation
:param as_new_aggr: Defaults False, set to True to create a new Time Series aggregation
:param destination_path: the destination path in Hydroshare to save the new aggregation
:return: the updated or new Time Series aggregation
"""
self._validate_aggregation_for_update(resource, AggregationType.TimeSeriesAggregation)
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
with closing(sqlite3.connect(file_path)) as conn:
# write the dataframe to a temp table
self._data_object.to_sql('temp', conn, if_exists='replace', index=False)
# delete the matching records from the TimeSeriesResultValues table
conn.execute("DELETE FROM TimeSeriesResultValues WHERE ResultID IN (SELECT ResultID FROM temp)")
conn.execute("INSERT INTO TimeSeriesResultValues SELECT * FROM temp")
# delete the temp table
conn.execute("DROP TABLE temp")
conn.commit()
aggr_main_file_path = self.main_file_path
data_object = self._data_object
if not as_new_aggr:
# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata
title = self.metadata.title
abstract = self.metadata.abstract
# upload the updated aggregation files to the temp folder - to create the updated aggregation
self._update_aggregation(resource, file_path)
# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)
# update metadata
for kw in keywords:
if kw not in aggr.metadata.subjects:
aggr.metadata.subjects.append(kw)
aggr.metadata.additional_metadata = additional_meta
aggr.metadata.title = title
aggr.metadata.abstract = abstract
aggr.save()
else:
# creating a new aggregation by uploading the updated data files
resource.file_upload(file_path, destination_path=destination_path)
# retrieve the new aggregation
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)
data_object = None
aggr._data_object = data_object
return aggr