func()

in slo-monitor/src/monitors/pod_monitor.go [128:227]


func (pm *PodStartupLatencyDataMonitor) Run(stopCh chan struct{}) error {
	controller := NewWatcherWithHandler(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				// slo-monitor is not using the events returned from the list
				// (it uses NoStoreQueue implementation of Store which discards them),
				// only the resourceVersion is used to instantiate the watch from this point.
				// This trick allows us to reduce memory usage on startup and further relists.
				o := metav1.ListOptions{
					Limit: 1,
				}
				result, err := pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).List(o)
				if err != nil {
					return nil, err
				}
				result.Continue = ""
				result.Items = nil
				return result, nil
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				return pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.Pod{},
		func(obj interface{}) error {
			p, ok := obj.(*v1.Pod)
			if !ok {
				glog.Errorf("Failed to cast observed object to *v1.Pod: %v", obj)
				return nil
			}
			pm.handlePodUpdate(p)
			return nil
		},
		func(obj interface{}) error {
			p, ok := obj.(*v1.Pod)
			if !ok {
				if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
					p, ok = d.Obj.(*v1.Pod)
					if !ok {
						glog.Errorf("Failed to cast embedded object from tombstone to *v1.Pod: %v", d.Obj)
						return nil
					}
				} else {
					glog.Errorf("Failed to cast observed object to *v1.Pod: %v", obj)
					return nil
				}
			}
			pm.handlePodDelete(p)
			return nil
		},
	)
	go controller.Run(stopCh)

	eventSelector := fields.Set{
		"involvedObject.kind": "Pod",
		// this is actually source.Component
		"source": "kubelet",
	}.AsSelector().String()

	eventController := NewWatcherWithHandler(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				// slo-monitor is not using the pods returned from the list
				// (it uses NoStoreQueue implementation of Store which discards them),
				// only the resourceVersion is used to instantiate the watch from this point.
				// This trick allows us to reduce memory usage on startup and further relists.
				o := metav1.ListOptions{
					Limit: 1,
				}
				result, err := pm.kubeClient.CoreV1().Events(v1.NamespaceAll).List(o)
				if err != nil {
					return nil, err
				}
				result.Continue = ""
				result.Items = nil
				return result, nil
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				options.FieldSelector = eventSelector
				return pm.kubeClient.CoreV1().Events(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.Event{},
		func(obj interface{}) error {
			e, ok := obj.(*v1.Event)
			if !ok {
				glog.Errorf("Failed to cast observed object to *v1.Event: %v", obj)
				return nil
			}
			pm.handleEventUpdate(e)
			return nil
		},
		func(obj interface{}) error {
			return nil
		},
	)
	go eventController.Run(stopCh)
	wait.Until(pm.purgeOutdated, 15*time.Second, stopCh)
	return nil
}