in slo-monitor/src/monitors/pod_monitor.go [128:227]
func (pm *PodStartupLatencyDataMonitor) Run(stopCh chan struct{}) error {
controller := NewWatcherWithHandler(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// slo-monitor is not using the events returned from the list
// (it uses NoStoreQueue implementation of Store which discards them),
// only the resourceVersion is used to instantiate the watch from this point.
// This trick allows us to reduce memory usage on startup and further relists.
o := metav1.ListOptions{
Limit: 1,
}
result, err := pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).List(o)
if err != nil {
return nil, err
}
result.Continue = ""
result.Items = nil
return result, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).Watch(options)
},
},
&v1.Pod{},
func(obj interface{}) error {
p, ok := obj.(*v1.Pod)
if !ok {
glog.Errorf("Failed to cast observed object to *v1.Pod: %v", obj)
return nil
}
pm.handlePodUpdate(p)
return nil
},
func(obj interface{}) error {
p, ok := obj.(*v1.Pod)
if !ok {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
p, ok = d.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Failed to cast embedded object from tombstone to *v1.Pod: %v", d.Obj)
return nil
}
} else {
glog.Errorf("Failed to cast observed object to *v1.Pod: %v", obj)
return nil
}
}
pm.handlePodDelete(p)
return nil
},
)
go controller.Run(stopCh)
eventSelector := fields.Set{
"involvedObject.kind": "Pod",
// this is actually source.Component
"source": "kubelet",
}.AsSelector().String()
eventController := NewWatcherWithHandler(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// slo-monitor is not using the pods returned from the list
// (it uses NoStoreQueue implementation of Store which discards them),
// only the resourceVersion is used to instantiate the watch from this point.
// This trick allows us to reduce memory usage on startup and further relists.
o := metav1.ListOptions{
Limit: 1,
}
result, err := pm.kubeClient.CoreV1().Events(v1.NamespaceAll).List(o)
if err != nil {
return nil, err
}
result.Continue = ""
result.Items = nil
return result, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = eventSelector
return pm.kubeClient.CoreV1().Events(v1.NamespaceAll).Watch(options)
},
},
&v1.Event{},
func(obj interface{}) error {
e, ok := obj.(*v1.Event)
if !ok {
glog.Errorf("Failed to cast observed object to *v1.Event: %v", obj)
return nil
}
pm.handleEventUpdate(e)
return nil
},
func(obj interface{}) error {
return nil
},
)
go eventController.Run(stopCh)
wait.Until(pm.purgeOutdated, 15*time.Second, stopCh)
return nil
}