in metrics/heron/tmaster/client.py [0:0]
def instance_timelines_to_dataframe(
instance_timelines: dict, stream: Optional[str], measurement_name: str,
conversion_func: Callable[[str], Union[str, int, float]] = None,
source_component: str = None) -> pd.DataFrame:
""" Converts the timeline dictionaries of a *single metric* into a single
combined DataFrame for all instances. All timestamps are converted to UTC
Python datetime objects and the returned DataFrame (for each instance) is
sorted by ascending date.
Arguments:
instance_timelines (dict): A dictionary of instance metric timelines,
where each key is an instance name linking
to a dictionary of <timestamp> :
<measurement> pairs.
stream (str): The stream name that these metrics are related to.
measurement_name (str): The name of the measurements being processed.
This will be used as the measurement column
heading.
conversion_func (function): An optional function for converting the
measurement in the timeline. If not
supplied the measurement will be left as a
string.
Returns:
pandas.DataFrame: A DataFrame containing the timelines of all instances
in the supplied dictionary.
"""
output: List[ROW_DICT] = []
instance_name: str
timeline: Dict[str, str]
for instance_name, timeline in instance_timelines.items():
details = tracker.parse_instance_name(instance_name)
instance_list: List[ROW_DICT] = []
timestamp_str: str
measurement_str: str
for timestamp_str, measurement_str in timeline.items():
timestamp: dt.datetime = \
dt.datetime.utcfromtimestamp(int(timestamp_str))
if "nan" in measurement_str:
measurement: Union[str, int, float, None] = None
else:
if conversion_func:
measurement = conversion_func(measurement_str)
else:
measurement = measurement_str
row: ROW_DICT = {
"timestamp": timestamp,
"container": details["container"],
"task": details["task_id"],
"component": details["component"],
measurement_name: measurement}
if stream:
row["stream"] = stream
if source_component:
row["source_component"] = source_component
instance_list.append(row)
# Because the original dict returned by the tracker is
# unsorted we need to sort the rows by ascending time
instance_list.sort(
key=lambda instance: instance["timestamp"])
output.extend(instance_list)
return pd.DataFrame(output)