# Kalman Filtering in Python for Reading Sensor Input

# True Story Follows

**EDIT**

This post sucks. Go read the post that doesn’t suck to actually learn about kalman filters.

A few months back I had stopped work on a semi-autonomous blimp and had presented my work at PyCon 2K14. One of the unfinished problems that I had yet to deal with was how to go about dealing with fluctuating sensor readings for altitude. Regardless of whether I read input from a temperature, pressure, humidity sensor or from a GPS, the data fluctuated, and the blimp would respond by pitching up or down seemingly at random, and if this was done while being subjected to wind forces, the blimp would get pushed back in an undesired direction as a result of the increased surface area exposure.

I documented the problem in an initial blog post about the blimp and again in a post after PyCon, but I had no viable, scientific solution to try yet. But then out of nowhere, a reader named Mark commented on my post with some valuable input. We can only assume that said Mark probably wore a cape and had a mask, and quite possibly spoke in a deep, scratchy voice with a lisp as well. He suggested that I try Kalman filtering since the problem that I described was already a solved problem.

# Kalman Filtering

Upon Mark’s initial input, I did a google search for “Kalman Filtering” but was immediately discouraged by results that mainly consisted of scholarly articles and scientific websites that had CSS that gave the site the look and feel of a 1995-era website. I recently took up this search again to actually tread through the code samples (which in my opinion were not very good), and develop a solution to my own problem.

Here’s the sample of code that I found that actually got me started from SciPy.org. For whatever reason, code samples I see from the scientific community always involve variables which correlate to arbitrary mathematical conventions. Here’s the first code snippet:

# Kalman filter example demo in Python # A Python implementation of the example given in pages 11-15 of "An # Introduction to the Kalman Filter" by Greg Welch and Gary Bishop, # University of North Carolina at Chapel Hill, Department of Computer # Science, TR 95-041, # http://www.cs.unc.edu/~welch/kalman/kalmanIntro.html # by Andrew D. Straw import numpy import pylab # intial parameters n_iter = 50 sz = (n_iter,) # size of array x = -0.37727 # truth value (typo in example at top of p. 13 calls this z) z = numpy.random.normal(x,0.1,size=sz) # observations (normal about x, sigma=0.1) Q = 1e-5 # process variance # allocate space for arrays xhat=numpy.zeros(sz) # a posteri estimate of x P=numpy.zeros(sz) # a posteri error estimate xhatminus=numpy.zeros(sz) # a priori estimate of x Pminus=numpy.zeros(sz) # a priori error estimate K=numpy.zeros(sz) # gain or blending factor R = 0.1**2 # estimate of measurement variance, change to see effect # intial guesses xhat[0] = 0.0 P[0] = 1.0 for k in range(1,n_iter): # time update xhatminus[k] = xhat[k-1] Pminus[k] = P[k-1]+Q # measurement update K[k] = Pminus[k]/( Pminus[k]+R ) xhat[k] = xhatminus[k]+K[k]*(z[k]-xhatminus[k]) P[k] = (1-K[k])*Pminus[k] pylab.figure() pylab.plot(z,'k+',label='noisy measurements') pylab.plot(xhat,'b-',label='a posteri estimate') pylab.axhline(x,color='g',label='truth value') pylab.legend() pylab.xlabel('Iteration') pylab.ylabel('Voltage') pylab.figure() valid_iter = range(1,n_iter) # Pminus not valid at step 0 pylab.plot(valid_iter,Pminus[valid_iter],label='a priori error estimate') pylab.xlabel('Iteration') pylab.ylabel('$(Voltage)^2$') pylab.setp(pylab.gca(),'ylim',[0,.01]) pylab.show()

# Step 1: Rename the variables

Although I’m extremely grateful for the code sample that I found and for Andrew Straw to take the time to publish a valuable resource, I didn’t understand what was going on, and I didn’t want to go digging through college essays to understand something that could and should be clearly understood with code. So I took the variable names and simply renamed them based on their associated comments. Now we have something that’s a little more readable:

import numpy import pylab # intial parameters iteration_count = 50 empty_iteration_tuple = (iteration_count,) # size of array actual_value = -0.37727 # truth value noisy_measurement = numpy.random.normal(actual_value, 0.1, size=empty_iteration_tuple) # observations (normal about x, sigma=0.1) process_variance = 1e-5 # process variance # allocate space for arrays posteri_estimate = numpy.zeros(empty_iteration_tuple) posteri_error_estimate = numpy.zeros(empty_iteration_tuple) priori_estimate = numpy.zeros(empty_iteration_tuple) priori_error_estimate = numpy.zeros(empty_iteration_tuple) blending_factor = numpy.zeros(empty_iteration_tuple) estimated_measurement_variance = 0.1 ** 2 # estimate of measurement variance, change to see effect # intial guesses posteri_estimate[0] = 0.0 posteri_error_estimate[0] = 1.0 for iteration in range(1, iteration_count): # time update priori_estimate[iteration] = posteri_estimate[iteration - 1] priori_error_estimate[iteration] = posteri_error_estimate[iteration - 1] + process_variance # measurement update blending_factor[iteration] = priori_error_estimate[iteration] / (priori_error_estimate[iteration] + estimated_measurement_variance) # noisy measurement is the only thing where we need the entire list posteri_estimate[iteration] = priori_estimate[iteration] + blending_factor[iteration] * (noisy_measurement[iteration] - priori_estimate[iteration]) posteri_error_estimate[iteration] = (1 - blending_factor[iteration]) * priori_error_estimate[iteration] pylab.figure() pylab.plot(noisy_measurement, 'k+', label='noisy measurements') pylab.plot(posteri_estimate, 'b-', label='a posteri estimate') pylab.axhline(actual_value, color='g', label='truth value') pylab.legend() pylab.xlabel('Iteration') pylab.ylabel('Voltage') pylab.show()

# Step 2: Get rid of the arrays and the NumPy dependencies

Another problem I had while searching through Kalman Filtering examples is that they all seemed to rely on NumPy and SciPy. I wanted to eliminate dependencies, and since the algorithm was just some pure math I didn’t see why these modules were necessarily needed. It turns out they were not at all. The examples plotted some points, so pylab was a necessity as a result, but actual kalman filtering did not require any special modules. The Kalman filter was just a feedback loop, so we did not need to keep track of every value calculated. So here I got rid of all the numpy arrays and just kept track of the previous value:

import random # intial parameters iteration_count = 500 actual_values = [-0.37727 + j * j * 0.00001 for j in xrange(iteration_count)] noisy_measurement = [random.random() * 0.6 - 0.3 + actual_val for actual_val in actual_values] process_variance = 1e-5 # process variance estimated_measurement_variance = 0.1 ** 2 # estimate of measurement variance, change to see effect # allocate space for arrays posteri_estimate_for_graphing = [] # intial guesses posteri_estimate = 0.0 posteri_error_estimate = 1.0 for iteration in range(1, iteration_count): # time update priori_estimate = posteri_estimate priori_error_estimate = posteri_error_estimate + process_variance # measurement update blending_factor = priori_error_estimate / (priori_error_estimate + estimated_measurement_variance) posteri_estimate = priori_estimate + blending_factor * (noisy_measurement[iteration] - priori_estimate) posteri_error_estimate = (1 - blending_factor) * priori_error_estimate posteri_estimate_for_graphing.append(posteri_estimate) import pylab pylab.figure() pylab.plot(noisy_measurement, color='r', label='noisy measurements') pylab.plot(posteri_estimate_for_graphing, 'b-', label='a posteri estimate') pylab.plot(actual_values, color='g', label='truth value') pylab.legend() pylab.xlabel('Iteration') pylab.ylabel('Voltage') pylab.show()

# Step 3: Convert the whacky long script into a single, re-usable class

I needed this code to be associated with other modules so I needed to isolate the logic for getting clean, non-noisy sensor input. And this is all it boiled down to:

class KalmanFilter(object): def __init__(self, process_variance, estimated_measurement_variance): self.process_variance = process_variance self.estimated_measurement_variance = estimated_measurement_variance self.posteri_estimate = 0.0 self.posteri_error_estimate = 1.0 def input_latest_noisy_measurement(self, measurement): priori_estimate = self.posteri_estimate priori_error_estimate = self.posteri_error_estimate + self.process_variance blending_factor = priori_error_estimate / (priori_error_estimate + self.estimated_measurement_variance) self.posteri_estimate = priori_estimate + blending_factor * (measurement - priori_estimate) self.posteri_error_estimate = (1 - blending_factor) * priori_error_estimate def get_latest_estimated_measurement(self): return self.posteri_estimate

It was a little frustrating how dead simple the code actually is (keep in mind the math has already been established), but existing code samples on the internets are obfuscated by surrounding logic and terrible variable names.

# Step 4: Simulate Usage

I haven’t actually used the code in production with the blimp yet, but I can first simulate what I expected to happen. So here’s the associated logic that uses the above class to see how well this performs:

if __name__ == "__main__": import random iteration_count = 500 actual_values = [-0.37727 + j * j * 0.00001 for j in xrange(iteration_count)] noisy_measurement = [random.random() * 2.0 - 1.0 + actual_val for actual_val in actual_values] # in practice we would take our sensor, log some readings and get the # standard deviation import numpy measurement_standard_deviation = numpy.std([random.random() * 2.0 - 1.0 for j in xrange(iteration_count)]) # The smaller this number, the fewer fluctuations, but can also venture off # course... process_variance = 1e-3 estimated_measurement_variance = measurement_standard_deviation ** 2 # 0.05 ** 2 kalman_filter = KalmanFilter(process_variance, estimated_measurement_variance) posteri_estimate_graph = [] for iteration in xrange(1, iteration_count): kalman_filter.input_latest_noisy_measurement(noisy_measurement[iteration]) posteri_estimate_graph.append(kalman_filter.get_latest_estimated_measurement()) import pylab pylab.figure() pylab.plot(noisy_measurement, color='r', label='noisy measurements') pylab.plot(posteri_estimate_graph, 'b-', label='a posteri estimate') pylab.plot(actual_values, color='g', label='truth value') pylab.legend() pylab.xlabel('Iteration') pylab.ylabel('Voltage') pylab.show()

The above code is sloppy, but it’s just a straightforward script to play around with the class. The output is as follows:

In order to use with the blimp, I simply need to plug in the actual standard deviation associated with my GPS sensor. That can be obtained by just taking 100 or so samples and calculating standard deviation. Big thanks to Dr. Jay “Miles Dyson” Buckingham for explaining this part to me.

In the graph above, the axes are labeled with something arbitrary (voltage), but in my case this will be used to calculate altitude. You can imagine that the red line is what the actual sensor values are. Clearly we do NOT want to adjust the pitch of the aircraft based on those readings. The blue line represents the adjusted sensor readings that we want to match up to the green line as close as possible. This is still not perfect, but it gives us something we can actually work with for flight.

Big thanks to Mark! I now expect every reader of this blog to comment with something equally valuable.