def instance_timelines_to_dataframe()

in metrics/heron/tmaster/client.py [0:0]


def instance_timelines_to_dataframe(
        instance_timelines: dict, stream: Optional[str], measurement_name: str,
        conversion_func: Callable[[str], Union[str, int, float]] = None,
        source_component: str = None) -> pd.DataFrame:
    """ Converts the timeline dictionaries of a *single metric* into a single
    combined DataFrame for all instances. All timestamps are converted to UTC
    Python datetime objects and the returned DataFrame (for each instance) is
    sorted by ascending date.

    Arguments:
        instance_timelines (dict):  A dictionary of instance metric timelines,
                                    where each key is an instance name linking
                                    to a dictionary of <timestamp> :
                                    <measurement> pairs.
        stream (str):   The stream name that these metrics are related to.
        measurement_name (str): The name of the measurements being processed.
                                This will be used as the measurement column
                                heading.
        conversion_func (function): An optional function for converting the
                                    measurement in the timeline. If not
                                    supplied the measurement will be left as a
                                    string.

    Returns:
        pandas.DataFrame: A DataFrame containing the timelines of all instances
        in the supplied dictionary.
    """

    output: List[ROW_DICT] = []

    instance_name: str
    timeline: Dict[str, str]
    for instance_name, timeline in instance_timelines.items():

        details = tracker.parse_instance_name(instance_name)
        instance_list: List[ROW_DICT] = []

        timestamp_str: str
        measurement_str: str
        for timestamp_str, measurement_str in timeline.items():

            timestamp: dt.datetime = \
                    dt.datetime.utcfromtimestamp(int(timestamp_str))

            if "nan" in measurement_str:
                measurement: Union[str, int, float, None] = None
            else:
                if conversion_func:
                    measurement = conversion_func(measurement_str)
                else:
                    measurement = measurement_str

            row: ROW_DICT = {
                "timestamp": timestamp,
                "container": details["container"],
                "task": details["task_id"],
                "component": details["component"],
                measurement_name: measurement}

            if stream:
                row["stream"] = stream

            if source_component:
                row["source_component"] = source_component

            instance_list.append(row)

        # Because the original dict returned by the tracker is
        # unsorted we need to sort the rows by ascending time
        instance_list.sort(
            key=lambda instance: instance["timestamp"])

        output.extend(instance_list)

    return pd.DataFrame(output)