F or my bachelor thesis, I am working with Kubernetes. My partner and I are working on a system to run workflows on Kubernetes. We really like the Kubernetes ideology of seeing the entire system as a control system. That is, the system constantly tries to move its current state to a desired state. The worker units that guarantee the desired state are called controllers. Since we want to implement our own workflow controller we decided to look at the JobController. That’s where we stumbled upon this piece of candy:

We weren’t sure how it exactly worked, but it seemed to return a store of Jobs given a list and watch function. In this case the list and watch functions on lines 5 and 9 are direct calls to the API server, using the job client. Awesome right? You feed it a list and watch interface to the API server. The Informer automagically syncs the upstream data to a downstream store and even offers you some handy event hooks.

At some point we noticed some behavior we didn’t expect. For example: UpdateFunc was called every 30 seconds, while there actually were no updates on the upstream. This is when I decided to dig into the workings of Informers, Controllers, Reflectors, and Stores. I’ll start by explaining how Controllers work, then I’ll explain how Controllers use a Reflector and a DeltaFIFO store internally, and lastly how Informers are just a convenient wrapper to sync your upstream with a downstream store.

Please note that Kubernetes is under heavy development. The code base will probably have changed between the point of writing and the time you’re reading this article. I will refer to code at this blob.

## Controllers

When we want to know how informers work, we need to know how controllers work. An example of how controllers work is given in controller_test.go:

The example above will produce the following output:

Let’s see how this works! Line 3 declares a source. This source will usually be a client to the API server. For now it’s a fake source, such that we can control its behavior. Line 6 declares the downstream store, which we’ll use to have a local representation of the source. Line 12 declares a DeltaFIFO queue, which is used to keep track of the differences between source and downstream. To configure the controller we give it the FIFO queue, the source, and a process function.

The process loop contains the logic to bring the current state of the system to the desired state. The process function receives an obj, which is an array of Deltas from the FIFO queue. In our example we check if the delta is of any type other than Deleted. In that case the Object belonging to the delta is added to the downstream and the Object is removed from the source. If the delta is of type Deleted, the Object is removed from the downstream and a message is sent on the deletionCounter channel.

When I ran the example for the first time, I was a bit confused by if newest.Type != cache.Deleted on line 31. Why not simply check if type is Added? I decided to put the following print statement on line 30: fmt.Printf("[%v] %v\n", newest.Object.(*api.Pod).Name, newest.Type). This is the output I got:

So we are actually getting Sync events when something is added. Why? Let’s find out!

### Controller Run

On line 64 from the example the controller is created and run. Let’s have a look at the Run method:

First a new reflector is created. We give it both the ListerWatcher (source) and Queue (DeltaFIFO). Then, RunUntil is called on the reflector, which is a non-blocking call. Lastly the processLoop is called using wait.Until. The processLoop drains the FIFO queue and calls the Process function with the popped Deltas from the queue:

We now know how the Process function from the example is called, but we still don’t know how stuff gets in the FIFO Queue. To understand how this works, we’ll have to dig into the Reflector.

## Reflectors

According to the comment in reflector.go, a “Reflector watches a specified resource and causes all changes to be reflected in the given store”. In our example the resource is the source from line 3 and the store is the DeltaFIFO from line 12.

When the reflector was created, its RunUntil method was called. Let’s look at what it does:

So it keeps calling ListAndWatch, until the stopCh receives a message. Now let’s dive into ListAndWatch where all the real magic happens:

The comment above the method says: “ListAndWatch first lists all items and get[s] the resource version at the moment of call, and then use[s] the resource version to watch”. So basically this is what happens:

At time t=0 we list the ListerWatcher – which is the upstream source in our case. From the List we get the highest resourceVersion of all items in the list. At t>0 we keep watching the ListerWatcher for changes newer than the obtained resourceVersion. If you don’t know what resource versions are, check the API conventions.

So how does that translate to the code of ListAndWatch? At line 14 (t=0) List gets called. The items from the list are passed to the syncWith method. SyncWith calls store.Replace with the items of the list:

I’ll explain how store.Replace works later

After the initial list we end up in a never ending for loop (t>0). In this loop we call the watchHandler method with a watch.Interface. The contents of watchHandler is the following piece of blocking code:

The interesting code is on lines 14 - 23. Watch events from the upstream are caught and corresponding store methods are called. Another piece of interesting code is on lines 11 - 12. When a message is received on resyncCh, the watchHandler returns. So how do messages end up in resyncCh?

When we take a look back to the code of ListAndWatch, we find the answer on lines 6 - 7. Line 6 calls the resyncChan method, which returns a timer channel that receives a message after r.resyncPeriod nanoseconds. This channel is passed to watchHandler, such that it returns after r.resyncPeriod ns. When watchHandler returns, the ListAndWatch method can finally reach the call to the canForceResyncNow method. CanForceResyncNow returns true if we’re close enough to a next planned periodic resync. In that case store.Resync is called. So in the end the situation looks like this:

So now we know when the Reflector calls the DeltaFIFO store. Let’s figure out how the store works!

## DeltaFIFO Store

According to the comments in delta_fifo.go, a “DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method”. In our case the processLoop method of our controller is the consumer.

From its definition we get that the DeltaFIFO holds a queue, which is an array of strings, and an items map, whose keys correspond to the strings in the queue:

items Maps a key to an array of Deltas. Delta tells you what change happened (Added, Updated, Deleted, Sync) and the object’s state after that change.

Let’s have a look at the Add, Update, Delete, Replace, and Resync methods that were called from the Reflector.

The Add, Update and Delete methods all call the queueActionLocked method with the corresponding DeltaTypes. In queueActionLocked the given obj gets inserted in the queue.

### Replace

Replace gets a list of items. It enqueues each item with a Sync DeltaType. It then uses the knownObject – which is a reference to the downstream store in our case – to see if items were deleted. If so Deleted events get enqueued. Note that this is the reason why we got Sync events in the controller example. The three pods were inserted before the reflector started at t=0 (maybe not always the case, because starting the reflector and inserting pods in the store are parallel tasks). So the three pods were found during the initial list, which called the store.Replace method. Since store.Replace only fires Sync events for new items, we didn’t find any Added events.

### Resync

Resync will send a Sync event for all items in knownObjects – which is our downstream store.

## Recap on Controller, Reflector and Store

Okay, I get that that was a lot of new information. Let’s try to clear up what we’ve just learned.

The controller:

• has a reference to the FIFO queue;
• has a reference to the ListerWatcher (the upstream source in our case);
• is responsible for consuming the FIFO queue;
• has a process loop, which is responsible for getting the system to a desired state;
• creates a Reflector.

The reflector:

• has a reference to the same FIFO queue (called store internally);
• has a reference to the same ListerWatcher;
• lists and watches the ListerWatcher;
• is responsible for producing the FIFO queue’s input;
• is responsible for calling the Resync method on the FIFO queue every resyncPeriod ns.

The FIFO queue:

• has a reference to the downstream store;
• has a queue of Deltas for objects that were listed and watched by the Reflector.

## Informer

We now know how the Controller, Reflector and FIFO Queue work together to stay in sync with the upstream source. So let’s have a look at how the Informer uses these concepts to sync the upstream source to the downstream source. According to the comments controller.go, “NewInformer returns a cache.Store and a controller for populating the store while also providing event notifications”. This is how the NewInformer function looks:

So it’s basically a controller with some boilerplate code to sync events from the FIFO queue to the downstream store. It takes a ListerWatcher and a ResourceEventHandler, which looked like this in the JobController source code:

The Process function deals with all the Delta events. It calls the corresponding Add, Update, and Delete methods on the downstream store – which is called clientState in this code. Note that ResourceEventHandlerFuncs has no SyncFunc. Therefore AddFunc or UpdateFunc is called when a Sync event is received – even when an object isn’t updated but only resynced.

I hope this article has given you a clear representation of the Controller, Informer, Reflector and Store concepts used in Kubernetes, that make your life so much nicer. Please feel free to comment or critize :)

Also, many kudos to @nov1n who helped me with this research.

Scroll back