def get_spout_complete_latencies()

in metrics/heron/tmaster/client.py [0:0]


    def get_spout_complete_latencies(self, topology_id: str, cluster: str,
                                     environ: str, component_name: str,
                                     start: int, end: int,
                                     logical_plan: Dict[str, Any] = None
                                     ) -> pd.DataFrame:
        """ Gets the complete latency, as a timeseries, for every instance of
        the specified component of the specified topology. The start and end
        times define the window over which to gather the metrics. The window
        duration should be less then 3 hours as this is the limit of what the
        Topology master stores.

        Arguments:
            topology_id (str):    The topology identification string.
            cluster (str):  The cluster the topology is running in.
            environ (str):  The environment the topology is running in (eg.
                            prod, devel, test, etc).
            component_name (str):   The name of the spout component whose
                                    metrics are required.
            start (int):    Start time for the time period the query is run
                            against. This should be a UTC POSIX time integer
                            (seconds since epoch).
            end (int):  End time for the time period the query is run against.
                        This should be a UTC POSIX time integer (seconds since
                        epoch).
            logical_plan (dict):    Optional dictionary logical plan returned
                                    by the Heron Tracker API. If not supplied
                                    this method will call the API to get the
                                    logical plan.

        Returns:
            pandas.DataFrame:   A DataFrame containing the complete latency
            measurements as a timeseries. Each row represents a measurement
            (averaged over one minute) with the following columns:

            * timestamp:  The UTC timestamp for the metric,
            * component: The component this metric comes from,
            * task: The instance ID number for the instance that the metric
              comes from,
            * container:  The ID for the container this metric comes from,
            * stream: The name of the incoming stream from which the tuples
              that lead to this metric came from,
            * latency_ms: The average execute latency measurement in
              milliseconds for that metric time period.
        """

        LOG.info("Getting complete latency metrics for component %s of "
                 "topology %s", component_name, topology_id)

        if not logical_plan:
            LOG.debug("Logical plan not supplied, fetching from Heron Tracker")
            logical_plan = tracker.get_logical_plan(self.tracker_url, cluster,
                                                    environ, topology_id)

        outgoing_streams: List[str] = \
            tracker.get_outgoing_streams(logical_plan, component_name)

        metrics: List[str] = ["__complete-latency/" + stream
                              for stream in outgoing_streams]

        results: Dict[str, Any] = tracker.get_metrics_timeline(
            self.tracker_url, cluster, environ, topology_id, component_name,
            start, end, metrics)

        output: pd.DataFrame = None

        for stream_metric, instance_timelines in results["timeline"].items():
            metric_list: List[str] = stream_metric.split("/")
            outgoing_stream: str = metric_list[1]

            instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe(
                instance_timelines, outgoing_stream, "latency_ms",
                str_nano_to_float_milli)

            if output is None:
                output = instance_tls_df
            else:
                output = output.append(instance_tls_df, ignore_index=True)

        return output