Dataset¶
The dataset, downloaded from the University of Wyoming Atmospheric Science Radiosonde Archive consists of 6,062 telemetry readings captured during a high-altitude balloon flight from PORT HARDY UA, BC, CANADA on April 12, 2026. It includes high-resolution time-series data for spatial coordinates (latitude, longitude, and geopotential height) alongside atmospheric sensors measuring temperature, pressure, and wind dynamics. The flight reached a peak altitude of 33,669 meters before hitting its ceiling, providing a comprehensive profile of the troposphere and stratosphere.
Technical plan¶
The primary objective is to implement a predictive telemetry pipeline that utilizes Kalman Filters to reduce the amount of data required for transmission to a ground station. The plan involves:
Kalman prediction for temperature: Both a 1D and a 2D Kalman filter, investigating the difference between the two cases.
Predictive Modeling: Developing a 3D Kinematic Kalman Filter that uses a Discrete White Noise Acceleration (DWNA) model to "guess" the balloon's trajectory. Converting raw GPS coordinates into Cartesian $x,y,z$ displacements from the launch site for easier kinematic modeling.
Intelligent Transmission: Running a symmetric Kalman filter, with the balloon implementing both its own filter and shadowing the ground station filter, pinging the ground station when the two drift apart by more than a specific threshold. This ensures that the ground station and the balloon do not drift by more than the threshold.
Environmental Integration: Enhancing the filter by incorporating wind speed and direction as control inputs ($u$) to improve prediction accuracy and further maximize bandwidth savings.
Everything here builds up to the kinematics Kalman filter at the end! Don't miss that!
By the end of the pipeline, the goal is to achieve a massive (>90%) reduction in bandwidth while maintaining a high-fidelity reconstruction of the flight path. Throughout, the focus is on percentage savings over the naive baseline. Assuming satellite uplink costs scale linearly with bytes transmitted, a 95–97% bandwidth reduction directly translates to approximately proportional communications cost savings. Quantization and other techniques would reduce both by the same factor, and is therefore ignored in this analysis. The proposed method is akin to temporal compression used in video compression codecs that stores only the changes between frames.
We also do not consider robustness considerations like dropped packets, latency, drifting clocks, bit erros, burst resets, quantization effects etc. A more sophisticated analysis of the reconstruction error at the ground station taking into account these nonidealities would be an interesting future direction for this work, for example by probabilistically dropping certain frames. With an eye towards implementation on edge devices, we keep the computations simple throughout. In particular, we do not use the more mathematically rigorous Mahalanobis distance in our multidimensional Kalman filters, preferring to stick to simple 1D thresholds.
1. Data Ingestion & Pre-processing¶
This initial stage focuses on preparing the raw telemetric stream for kinematic analysis. The primary goal is to transform geographic coordinates into a localized Cartesian frame and clean the signal to ensure the Kalman Filter operates on a stable dataset.
The key operations are:
Dataset Ingestion: The raw telemetry from the April 12, 2026, flight is loaded into a Pandas DataFrame.
Altitude Filtering: A flight ceiling is enforced at 40,000m (
threshold_m=40000) to isolate the relevant balloon flight data and remove post-burst or anomalous readings.Coordinate Transformation (Lat/Lon→X/Y): Since spherical coordinates are non-linear, we project the Latitude and Longitude into a Local Tangent Plane. We define the launch site as the origin $(0,0)$ and apply a cosine correction factor (
lon_multiplier) to account for the convergence of meridians at the launch latitude.Temporal Gradient Calculation: Converts the raw time strings into Python datetime objects to compute the precise number of seconds elapsed between consecutive sensor pings (
dt_series), providing the dynamic time-step necessary for accurate velocity and state transition modeling.Kinematic Priming: We compute the initial 3D velocity vectors $(v_x​,v_y​,v_z​)$ using discrete differences. These "deltas" serve as the initial state for the Kalman Filter, allowing the model to start with a realistic motion estimate rather than from rest.
import pandas as pd
df = pd.read_csv("2026041200-71109.csv")
threshold_m = 40000 # 40 km ceiling for balloon flights
df_low_alt = df[df['geopotential height_m'] <= threshold_m].copy()
print(f"Max altitude in filtered data: {df_low_alt['geopotential height_m'].max()} meters")
print(f"Removed {len(df) - len(df_low_alt)} rows above the ceiling.")
# 3. Clean it up for Kalman Filter
clean_df = df_low_alt[['time', 'longitude', 'latitude', 'geopotential height_m', 'temperature_C', 'wind direction_degree', 'wind speed_m/s']].dropna()
print(f"Final dataset has {len(clean_df)} rows after cleaning.")
import numpy as np
# 1. Define the Origin (Launch Site)
lat0 = clean_df['latitude'].iloc[0]
lon0 = clean_df['longitude'].iloc[0]
# 2. Conversion Constants
METERS_PER_DEGREE = 111111.0
# Longitude correction factor based on launch latitude
lon_multiplier = np.cos(np.radians(lat0))
# 3. Add Cartesian Columns to the DataFrame
clean_df['x_dist'] = (clean_df['longitude'] - lon0) * (METERS_PER_DEGREE * lon_multiplier)
clean_df['y_dist'] = (clean_df['latitude'] - lat0) * METERS_PER_DEGREE
clean_df['z_dist'] = clean_df['geopotential height_m']
# Ensure time is datetime first so we can compute dt
clean_df['cleaned_time'] = pd.to_datetime(clean_df['time'])
# 1. Calculate the time difference (dt) in seconds between each row
# We use .shift(1) to avoid alignment issues, or just .diff()
clean_df['dt'] = clean_df['cleaned_time'].diff().dt.total_seconds()
clean_df['dt'] = clean_df['dt'].fillna(1.0)
# 2. Calculate Velocities: (Change in Position) / (Change in Time)
# We handle the first row (NaN) by filling with 0 or a small epsilon to avoid divide-by-zero
clean_df['vx'] = (clean_df['x_dist'].diff() / clean_df['dt']).fillna(0)
clean_df['vy'] = (clean_df['y_dist'].diff() / clean_df['dt']).fillna(0)
clean_df['vz'] = (clean_df['z_dist'].diff() / clean_df['dt']).fillna(0)
# Note: If two rows have the same timestamp, dt will be 0, resulting in 'inf' velocity.
# We replace infinity with 0 to keep the data clean.
clean_df.replace([np.inf, -np.inf], 0, inplace=True)
# Export to CSV
clean_df.to_csv("balloon_flight_data.csv", index=False)
print(clean_df[['cleaned_time', 'dt', 'vx', 'vy', 'vz']].head())
Max altitude in filtered data: 33669 meters
Removed 0 rows above the ceiling.
Final dataset has 6062 rows after cleaning.
cleaned_time dt vx vy vz
0 2026-04-11 23:16:49 1.0 0.000000 0.0000 0.0
1 2026-04-11 23:16:49 0.0 0.000000 0.0000 0.0
2 2026-04-11 23:16:50 1.0 28.159236 -22.2222 81.0
3 2026-04-11 23:16:51 1.0 21.119427 -11.1111 5.0
4 2026-04-11 23:16:52 1.0 14.079618 0.0000 6.0
We set up matplotlib with some nice formatting and plot the path of the balloon.
import matplotlib.pyplot as plt
plt.rcParams.update({
# Use Lato for all text
"font.family": "sans-serif",
"font.sans-serif": ["Lato"],
# Use Latin Modern for math/numbers (very clean)
"mathtext.fontset": "stix",
# Modern Styling
"axes.facecolor": "#f8f9fa", # Very light grey background
"axes.edgecolor": "#ced4da", # Subtle border
"grid.color": "#e9ecef", # Faint grid lines
"figure.facecolor": "white",
# Font Sizes
"axes.titlesize": 18,
"axes.titleweight": "bold",
"axes.labelsize": 13,
"legend.frameon": True,
"legend.fancybox": True
})
lat_start = clean_df['latitude'].iloc[0]
lon_start = clean_df['longitude'].iloc[0]
# 3. Create the 3D Plot
fig = plt.figure(figsize=(12, 9))
ax = fig.add_subplot(111, projection='3d')
# Plot the 3D path
ax.plot(clean_df['x_dist'], clean_df['y_dist'], clean_df['geopotential height_m'],
label='Balloon Flight Path', color='red', linewidth=2, zorder=5)
# Optional: Plot the "Ground Track" (a shadow on the floor)
ax.plot(clean_df['x_dist'], clean_df['y_dist'], zs=clean_df['geopotential height_m'].min(),
label='Ground Track', color='gray', linestyle='--', alpha=0.5)
# 4. Final Touches
ax.set_title('3D Balloon Telemetry Reconstruction', fontsize=15)
ax.set_xlabel('East-West Displacement (meters)')
ax.set_ylabel('North-South Displacement (meters)')
ax.set_zlabel('Altitude (meters)')
# Mark Launch and Burst points
ax.scatter(0, 0, clean_df['geopotential height_m'].iloc[0], color='green', s=50, label='Launch Site')
ax.scatter(clean_df['x_dist'].iloc[-1], clean_df['y_dist'].iloc[-1], clean_df['geopotential height_m'].iloc[-1],
color='black', marker='x', s=100, label='Ceiling/Burst')
ax.legend()
plt.tight_layout()
plt.savefig('balloon_3d_path.png')
plt.show()
We calculate how much the balloon would need to transmit if it were to naively send every packet. Each float that the sensor sends costs 4 bytes, while each packet costs 16 bytes as packet overhead.
# 1. Define the transmitted sensors
raw_sensors = [
'longitude',
'latitude',
'geopotential height_m',
'temperature_C',
'wind direction_degree',
'wind speed_m/s'
]
# 2. Configuration
PACKET_OVERHEAD = 16 # bytes
PAYLOAD_PER_SENSOR = 4 # bytes (float32)
num_sensors = len(raw_sensors)
total_readings = len(clean_df)
# 3. Naive Calculation (Sending every row)
naive_bandwidth_kb = (total_readings * num_sensors * PAYLOAD_PER_SENSOR + total_readings * PACKET_OVERHEAD) / 1024
print(f"Total Flight readings: {total_readings}")
print(f"Raw Sensors Tracked: {num_sensors}")
print(f"Naive Bandwidth: {naive_bandwidth_kb:.2f} KB")
Total Flight readings: 6062 Raw Sensors Tracked: 6 Naive Bandwidth: 236.80 KB
2. 1D Kalman filter for temperature prediction¶
We next introduce the Kalman filter, an optimal estimation algorithm used to predict the state of a system (like temperature or position) by fusing together two sources of information: a mathematical model (what we think should happen) and noisy sensor measurements (what we see happening). When sensor data is noisy, under some mathematical assumptions (Gaussian noise), the Kalman filter is the best possible (in the minimum mean square error sense) estimate of the true data. Even when the sensor noise is not Gaussian, the Kalman filter is the best possible linear filter. And we like linear filters because they are easy to implement!
We assume the system state $x$ at time $k$ evolves from the previous state $k−1$ according to a linear equation, and that our measurements $z$ are a noisy observation of that state. This is shown in the following two equations. $a$ is the state transition factor, $u_k$ is some known command, $b$ is the control input factor, $c$ is the measurement scaling factor, and $w_k, v_k$ are process and measurement noise respectively, assumed to be Gaussian with variances $Q$ and $R$.
State Transition: $x_k​=ax_{k−1}​+bu_k​+w_k$​  Measurement: $z_k​=cx_k​+v_k​$
The filter maintains a current estimated state, denoted as $\hat{x}_{k-1 \,|\, k-1}$, and the current prediction uncertainty, denoted by $p_{k \,|\, k}$.
The filter operation is in two steps. In the prediction step, the filter projects the current state and uncertainty forward in time:
$$\hat{x}_{k \,|\, k-1} = a\hat{x}_{k-1 \,|\, k-1} + bu_k \qquad p_{k \,|\, k-1} = a^2 p_{k-1 \,|\, k-1} + Q.$$
Notice that the uncertainty grows since the filter is trying to predict the future without seeing new data.
The filter then receives the new measurement $z_k$ and corrects the prediction. To this, it first computes the Kalman gain, a measure of how much it trusts the internal prediction vs the new sensor data:
$$g_k = \frac{cp_{k \,|\, k-1}}{c^2 p_{k \,|\, k-1} + R}.$$
Notice that if the sensor noise ($R$) is high, $g_k$ is low.
It uses $g_k$ to update its estimate of the current state:
$$\hat{x}_{k \,|\, k} = \hat{x}_{k \,|\, k-1} + g_k(z_k - c\hat{x}_{k \,|\, k-1}).$$
Look at the $(z_k - c\hat{x}_{k \,|\, k-1})$ term: it's the difference between the sensor data and the prediction, and is called the innovation. It is being multiplied by the Kalman gain: if the sensor noise is high, $g_k$ will be low, and the state estimate will be closer to the internal prediction; and vice versa. The error uncertainty shrinks because there is new information:
$$p_{k \,|\, k} = (1-g_k c)p_{k \,|\, k-1}.$$
This can be implemented very easily on low-power devices! The entire history need not be stored, only the current estimates.
The following code implements the Kalman filter. We use $a=1$, $b=0$ (since there's no control input), $c=1$, $Q=10^{-4}$ and $R=0.1$. The $Q$ and the $R$ values need tuning based on real experimental data.
def run_kalman_filter(observations, process_variance=1e-4, measurement_variance=0.1):
# Initial guesses
posteri_estimate = observations[0]
posteri_error_est = 1.0
predictions = []
for measurement in observations:
# 1. PREDICT (Time Update)
priori_estimate = posteri_estimate
priori_error_est = posteri_error_est + process_variance
# 2. UPDATE (Measurement Update)
kalman_gain = priori_error_est / (priori_error_est + measurement_variance)
# Refine estimate with actual measurement
posteri_estimate = priori_estimate + kalman_gain * (measurement - priori_estimate)
posteri_error_est = (1 - kalman_gain) * priori_error_est
predictions.append(priori_estimate)
return np.array(predictions)
We run the Kalman filter on our temperature data. We take $Q$ to be $10^{-4}$ because that's how much we expect the temperature to change in the one-second sampling interval, and $R = 0.25$ based on the cheap temperature sensor DS18B20.
# 1. Run the filter on your temperature data
temp_pred_1d = run_kalman_filter(clean_df['temperature_C'].values, process_variance=1e-4, measurement_variance=0.25)
actual_temps = clean_df['temperature_C'].values
# 2. Pick a 1000-observation segment from the middle of the flight
mid = len(clean_df) // 2
start, end = mid - 500, mid + 500
seg_actual = actual_temps[start:end]
seg_pred = temp_pred_1d[start:end]
indices = np.arange(len(seg_actual))
# 3. Plotting the "Divergence"
fig, ax = plt.subplots(figsize=(12, 6))
# Plot the Raw Sensor
ax.plot(indices, seg_actual, '-', color='#adb5bd', alpha=0.6, markersize=4, label='Raw Sensor')
# Plot the Kalman Prediction (The Ground's "Best Guess" before the update)
ax.plot(indices, seg_pred, '-', color='#007bff', markersize=4, label='Kalman Prediction')
ax.set_title('Detail: 1000-Point Segment (Innovation vs. Reality)', fontsize=16, fontweight='bold', loc='left')
ax.set_xlabel('Time Step (Seconds)', fontsize=12)
ax.set_ylabel('Temperature (°C)', fontsize=12)
ax.legend()
ax.grid(True, linestyle='--', alpha=0.4)
plt.tight_layout()
plt.show()
The estimate of the actual temperature is a smoothed version of the raw sensor readings. Using a smaller value of measurement_variance (low sensor noise which means we trust the sensor more) would lead to the estimate tracking the sensor data more aggressively. The value needs to be tuned to the sensor being used in practice.
3. Using the Symmetric Dual-Observer 1D Kalman filter to reduce transmissions¶
Consider what happens when both the ground station and the balloon run the same state-transition equation, with the following change:
- the balloon runs both filters and only transmits data when its temperature estimate is different from the ground station estimate
- if the ground station does not get any data from the balloon, it runs its state-transition without any input data
Intuitively speaking, with this method, the error that the ground station makes at any time instant will be less than the threshold. We set the threshold to be $0.3$°C, and see how much we save. Notice that we cannot just transmit the innovation sequence from the balloon, because the ground center does not have access to the untransmitted data.
def run_symmetric_kalman_1d(observations, threshold=1, q=1e-4, r=1e-1):
# --- 1. INITIALIZATION ---
# The 'True' Filter at the balloon, has access to every sensor hit
x_true = observations[0]
p_true = 1.0
x_true_history = [x_true]
# The 'Shadow' Filter that only updates when a ping occurs
# The ground center only has this, while the balloon has both
x_shadow = observations[0]
p_shadow = 1.0
tx_indices = [0]
ground_view = [x_shadow]
shadow_uncertainty = [p_shadow]
for i in range(1, len(observations)):
# --- 2. THE INTERNAL 'TRUE' UPDATE (Always runs) ---
# Predict
x_true_priori = x_true
p_true_priori = p_true + q
# Update with current sensor
k_true = p_true_priori / (p_true_priori + r) # Kalman Gain
x_true = x_true_priori + k_true * (observations[i] - x_true_priori)
p_true = (1 - k_true) * p_true_priori
x_true_history.append(x_true)
# --- 3. ground center update ---
# The shadow filter ONLY performs the Prediction step
x_shadow_priori = x_shadow
p_shadow_priori = p_shadow + q
# --- Balloon compares its estimate to the Ground's Estimate ---#
divergence = abs(x_true - x_shadow_priori)
if divergence > threshold:
# PING! Send the 'x_true' to the ground
tx_indices.append(i)
# The Shadow Filter performs a Measurement Update using the ping
x_shadow = x_true
p_shadow = p_shadow_priori
else:
# SILENCE: Shadow filter stays in prediction mode
x_shadow = x_shadow_priori
p_shadow = p_shadow_priori
ground_view.append(x_shadow)
shadow_uncertainty.append(p_shadow)
return tx_indices, np.array(ground_view), np.array(shadow_uncertainty), x_true_history
# Execute
threshold = 0.5 # 0.5°C threshold for demonstration (can be tuned)
tx_idx, ground_path, p_history, x_true_history = run_symmetric_kalman_1d(clean_df['temperature_C'].values, threshold=threshold)
num_transmissions = len(tx_idx)
optimized_bandwidth_kb = (num_transmissions * num_sensors * PAYLOAD_PER_SENSOR + num_transmissions * PACKET_OVERHEAD) / 1024
# 2. Comparison Metrics
savings_kb = naive_bandwidth_kb - optimized_bandwidth_kb
reduction_percent = (1 - (optimized_bandwidth_kb / naive_bandwidth_kb)) * 100
compression_ratio = total_readings / num_transmissions
print(f"--- SYMMETRIC TELEMETRY PERFORMANCE ---")
print(f"Naive Data Usage: {naive_bandwidth_kb:.2f} KB")
print(f"Optimized Data Usage: {optimized_bandwidth_kb:.2f} KB")
print(f"Total Data Saved: {savings_kb:.2f} KB")
print(f"Bandwidth Reduction: {reduction_percent:.2f}%")
print(f"Compression Ratio: {compression_ratio:.2f}x")
print(f"---------------------------------------")
# 1. Calculate the errors (Actual - Reconstructed)
errors_from_balloon_estimate = x_true_history - ground_path
max_drift = np.max(np.abs(errors_from_balloon_estimate))
print(f"Max Reconstruction Error between Balloon and Ground Station: {max_drift:.4f}°C")
rmse = np.sqrt(np.mean(np.square(errors_from_balloon_estimate)))
print(f"Root Mean Squared Error between Balloon and Ground Station: {rmse:.4f}°C")
fig, ax = plt.subplots(figsize=(10, 6))
# Histogram with white edges for clarity
ax.hist(errors_from_balloon_estimate, bins=50, color='#6f42c1', edgecolor='white', alpha=0.8)
# 3. Add the Threshold Lines
# These are the "Guardrails" of your telemetry system
ax.axvline(threshold, color='#dc3545', linestyle='--', linewidth=2, label='Threshold (+{:.1f}°C)'.format(threshold))
ax.axvline(-threshold, color='#dc3545', linestyle='--', linewidth=2, label='Threshold (-{:.1f}°C)'.format(threshold))
# 4. Styling
ax.set_title('Distribution of Reconstruction Errors (from the balloon observations)', fontsize=16, fontweight='bold', loc='left')
ax.set_xlabel('Error (Balloon estimate - Ground View) [°C]', fontsize=12)
ax.set_ylabel('Frequency', fontsize=12)
ax.legend()
ax.grid(axis='y', linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()
--- SYMMETRIC TELEMETRY PERFORMANCE --- Naive Data Usage: 236.80 KB Optimized Data Usage: 7.93 KB Total Data Saved: 228.87 KB Bandwidth Reduction: 96.65% Compression Ratio: 29.86x --------------------------------------- Max Reconstruction Error between Balloon and Ground Station: 0.4998°C Root Mean Squared Error between Balloon and Ground Station: 0.2591°C
We plot the estimates in a 2000 observation section in the middle of the flightpath.
# 2. Select the 1000-observation segment in the middle
mid = len(x_true_history) // 2
start, end = mid - 1000, mid + 1000
slc = slice(start, end)
# Convert x_true_history to a numpy array if it isn't already for easy slicing
x_true_arr = np.array(x_true_history)
# Map tx_idx to the local coordinates of our 100-point window
pings_in_window = [i - start for i in tx_idx if start <= i < end]
# 3. Plotting
fig, ax = plt.subplots(figsize=(12, 6))
indices = np.arange(end - start)
raw_sensor_seg = clean_df['temperature_C'].values[slc]
ax.plot(indices, raw_sensor_seg, '-', color='#adb5bd', alpha=0.6, markersize=4, label='Raw Sensor')
# The Balloon's Internal Truth (Smooth, filtered version of sensor)
ax.plot(indices, x_true_arr[slc], color='#007bff', linewidth=2, label='Balloon Estimate', zorder=2)
# The Ground Station View (Shadow)
ax.plot(indices, ground_path[slc], color='#dc3545', linestyle='--', linewidth=2, label='Ground Station Estimate', zorder=3)
# Highlight the Transmission Events (Stars)
if pings_in_window:
ax.scatter(pings_in_window, x_true_arr[slc][pings_in_window],
color='gold', edgecolor='black', s=120, marker='*',
label='Ping', zorder=5)
# 4. Styling & Labels
ax.set_title('Symmetric Dual-Observer: Middle 1000-Point Segment', fontsize=16, fontweight='bold', loc='left')
ax.set_xlabel('Time Step (Relative to Segment Start)', fontsize=12)
ax.set_ylabel('Temperature (°C)', fontsize=12)
ax.legend(frameon=True, loc='best')
ax.grid(True, linestyle=':', alpha=0.6)
plt.tight_layout()
plt.show()
We also plot the behavior during the 700-1700 section where the balloon rises steadily through the troposphere and has an almost linear falloff in temperature.
# 2. Select the 1000-observation segment in the middle
mid = 1200
start, end = mid - 500, mid + 500
slc = slice(start, end)
# Convert x_true_history to a numpy array if it isn't already for easy slicing
x_true_arr = np.array(x_true_history)
# Map tx_idx to the local coordinates of our 100-point window
pings_in_window = [i - start for i in tx_idx if start <= i < end]
# 3. Plotting
fig, ax = plt.subplots(figsize=(12, 6))
indices = np.arange(end - start)
raw_sensor_seg = clean_df['temperature_C'].values[slc]
ax.plot(indices, raw_sensor_seg, '-', color='#adb5bd', alpha=0.6, markersize=4, label='Raw Sensor')
# The Balloon's Internal Truth (Smooth, filtered version of sensor)
ax.plot(indices, x_true_arr[slc], color='#007bff', linewidth=2, label='Balloon Estimate', zorder=2)
# The Ground Station View (Shadow)
ax.plot(indices, ground_path[slc], color='#dc3545', linestyle='--', linewidth=2, label='Ground Station Estimate', zorder=3)
# Highlight the Transmission Events (Stars)
if pings_in_window:
ax.scatter(pings_in_window, x_true_arr[slc][pings_in_window],
color='gold', edgecolor='black', s=120, marker='*',
label='Ping', zorder=5)
# 4. Styling & Labels
ax.set_title('Symmetric Dual-Observer: Middle 1000-Point Segment', fontsize=16, fontweight='bold', loc='left')
ax.set_xlabel('Time Step (Relative to Segment Start)', fontsize=12)
ax.set_ylabel('Temperature (°C)', fontsize=12)
ax.legend(frameon=True, loc='best')
ax.grid(True, linestyle=':', alpha=0.6)
plt.tight_layout()
plt.show()
Note that the estimated temperature at the balloon and the ground station are both a bit higher than the raw sensor reading. This is because the 1D Kalman sensor predicts the same temperature at the next time instant and always lags the true sensor.
We also plot the full flight.
# 2. Select the 100-observation segment in the middle
start, end = 0, len(x_true_history)
slc = slice(start, end)
# Convert x_true_history to a numpy array if it isn't already for easy slicing
x_true_arr = np.array(x_true_history)
# Map tx_idx to the local coordinates of our 100-point window
pings_in_window = [i - start for i in tx_idx if start <= i < end]
# 3. Plotting
fig, ax = plt.subplots(figsize=(12, 6))
indices = np.arange(end - start)
raw_sensor_seg = clean_df['temperature_C'].values[slc]
ax.plot(indices, raw_sensor_seg, '-', color='#adb5bd', alpha=0.6, markersize=4, label='Raw Sensor')
# The Balloon's Internal Truth (Smooth, filtered version of sensor)
ax.plot(indices, x_true_arr[slc], color='#007bff', linewidth=2, label='Balloon Estimate', zorder=2)
# The Ground Station View (Shadow)
ax.plot(indices, ground_path[slc], color='#dc3545', linestyle='-', linewidth=2, label='Ground Station Estimate', zorder=3)
# Highlight the Transmission Events (Stars)
if pings_in_window:
ax.scatter(pings_in_window, x_true_arr[slc][pings_in_window],
color='gold', edgecolor='black', s=10, marker='o',
label='Ping', zorder=5)
# 4. Styling & Labels
ax.set_title('Symmetric Dual-Observer: entire flight', fontsize=16, fontweight='bold', loc='left')
ax.set_xlabel('Time Step', fontsize=12)
ax.set_ylabel('Temperature (°C)', fontsize=12)
ax.legend(frameon=True, loc='best')
ax.grid(True, linestyle=':', alpha=0.6)
plt.tight_layout()
plt.show()
ground_station_error = clean_df['temperature_C'].values[slc] - ground_path[slc]
max_error = np.max(np.abs(ground_station_error))
rmse_error = np.sqrt(np.mean(np.square(ground_station_error)))
print(f"Max Error between Ground Station Estimate and Actual: {max_error:.4f}°C")
print(f"RMSE between Ground Station Estimate and Actual: {rmse_error:.4f}°C")
Max Error between Ground Station Estimate and Actual: 2.4727°C RMSE between Ground Station Estimate and Actual: 0.9594°C
Clearly, the balloon only transmits when the temperature is changing, and stops transmitting when the the temperature profile becomes flat. Intuitively, this makes sense, since the update $x_k = x_{k-1} + w_k$ can track unchanging temperatures really well! But what happens when the temperature keeps changing, like in the long segment between about 500 to 1800?
We can handle that using a two-dimensional Kalman filter!
4. Improving temperature reconstruction using a 2D Kalman filter¶
We now extend this idea to a two-dimensional (2D) system. Instead of tracking a single scalar state, we track a vector state, in this case both the temperature and rate of change of temperature. Concretely, let the state be
$$ \mathbf{x}_k = \begin{bmatrix} x_k^{(1)} \\ x_k^{(2)} \end{bmatrix}, $$
where each component represents part of the system we care about.
As before, we assume the system evolves linearly, and that measurements are noisy observations of the state. Written component-wise, the state transition is $$ \begin{aligned} x_k^{(1)} &= a_{11} x_{k-1}^{(1)} + a_{12} x_{k-1}^{(2)} + b_1 u_k + w_k^{(1)} \\ x_k^{(2)} &= a_{21} x_{k-1}^{(1)} + a_{22} x_{k-1}^{(2)} + b_2 u_k + w_k^{(2)} \end{aligned} $$
and the measurement is:
$$ z_k = c_1 x_k^{(1)} + c_2 x_k^{(2)} + v_k. $$
At this point, writing everything out component-by-component quickly becomes cumbersome. Instead, we switch to matrix notation, which scales naturally to higher dimensions.
On defining the state vector $\mathbf{x}_k \in \mathbb{R}^2$, state transition matrix $\mathbf{A} \in \mathbb{R}^{2 \times 2}$, control matrix $\mathbf{B} \in \mathbb{R}^{2 \times 1}$ and the measurement matrix: $\mathbf{C} \in \mathbb{R}^{1 \times 2}$, the system can be written as $$ \begin{aligned} \mathbf{x}_k &= \mathbf{A}\mathbf{x}_{k-1} + \mathbf{B}u_k + \mathbf{w}_k\\ z_k &= \mathbf{C}\mathbf{x}_k + v_k \end{aligned} $$
Here, $\mathbf{w}_k \sim \mathcal{N}(\mathbf{0}, \mathbf{Q})$ is now a vector-valued Gaussian process noise with covariance matrix $\mathbf{Q}$, and $v_k \sim \mathcal{N}(0, R)$ remains scalar measurement noise. The estimate $\hat{\mathbf{x}}$ is now a vector, and the uncertainty becomes a covariance matrix that tracks both the uncertainty in each state component and how those are related.
$$ \mathbf{P}_{k \,|\, k} = \begin{bmatrix} \mathrm{var}(x^{(1)}) & \mathrm{cov}(x^{(1)}, x^{(2)}) \\ \mathrm{cov}(x^{(2)}, x^{(1)}) & \mathrm{var}(x^{(2)}) \end{bmatrix}. $$
The Kalman filter proceeds in the same two steps as before.
Prediction: $$ \hat{\mathbf{x}}_{k \,|\, k-1} = \mathbf{A}\hat{\mathbf{x}}_{k-1 \,|\, k-1} + \mathbf{B}u_k $$ $$ \mathbf{P}_{k \,|\, k-1} = \mathbf{A}\mathbf{P}_{k-1 \,|\, k-1}\mathbf{A}^\top + \mathbf{Q}. $$
As in the 1D case, the uncertainty grows during prediction.
Update:
First, the Kalman gain $$ \mathbf{K}_k = \mathbf{P}_{k \,|\, k-1}\mathbf{C}^\top \left(\mathbf{C}\mathbf{P}_{k \,|\, k-1}\mathbf{C}^\top + R \right)^{-1}. $$
Then the state estimate update $$ \hat{\mathbf{x}}_{k \,|\, k} = \hat{\mathbf{x}}_{k \,|\, k-1} + \mathbf{K}_k \left(z_k - \mathbf{C}\hat{\mathbf{x}}_{k \,|\, k-1}\right). $$
The term $(z_k - \mathbf{C}\hat{\mathbf{x}}_{k \,|\, k-1})$ is again the innovation.
Finally, update the covariance: $$ \mathbf{P}_{k \,|\, k} = \left(\mathbf{I} - \mathbf{K}_k \mathbf{C}\right)\mathbf{P}_{k \,|\, k-1}. $$
Aside from replacing scalars with vectors and matrices, the structure is identical to the 1D case. This formulation scales cleanly to higher-dimensional systems, which is why we adopt matrix notation moving forward.
We next implement this 2D Kalman filter for our temperature prediction. Earlier we had assumed for simplicity that the observations were all equally spaced apart, but this time we incorporate the time between observations. Our state tracks both temperature and rate of change of temperature per second. We assume initially that $\mathbf{P} = \mathbf{I}$, i.e., errors in the two are uncorrelated. Our $\mathbf{C}$ matrix is just $[1,0]$, since we only observe the temperature, not the rate.
The state transition matrix captures the relationship $T_{\text{new}} = T_{\text{old}} + \text{Rate} \times dt$. The process noise covariance $\mathbf{Q}$ is more interesting. We assume the system is perturbed by a random "acceleration" $w(t)$ with spectral density $σ_q^2$​. This represents unmodeled atmospheric turbulence or sudden lapse rate changes.
The effect of this noise on the state over a time interval $dt$ is given by the integral $Q=\int_0^{dt} ​F(\tau) G σ_q^ 2 ​G^\top F(\tau)^\top d\tau$ where $G=[0,1]^T$ is the noise effect vector and $F(\tau)=\begin{bmatrix} 1 & \tau \\ 0 & 1 \end{bmatrix}$ is the state transition for a duration $\tau$. Solving this integral results in the Discrete Process Noise matrix $$\mathbf{Q} = \sigma_q^2 \begin{bmatrix} \frac{dt^4}{4} & \frac{dt^3}{2} \\ \frac{dt^3}{2} & dt^2 \end{bmatrix} $$ Notice two things about this matrix: the position uncertainty (at the $(0,0)$ index) grows extremely quickly with time with the accumulated effects of acceleration and velocity, and the position and velocity uncertainties are correlated (the off-diagonal terms).
def run_2d_kalman_filter(obs_temp, time_dts, q_var=1e-4, r_var=1e-1):
# Initial State: [Temperature, Rate of Change (deg/sec)]
x = np.array([[obs_temp[0]], [0.0]])
P = np.eye(2)
# Observation Matrix (We only measure temperature, not rate)
C = np.array([[1.0, 0.0]])
R = np.array([[r_var]])
predictions = []
# Add the first estimate to keep lengths consistent
predictions.append(x[0, 0])
for i in range(1, len(obs_temp)):
dt = time_dts[i]
# 1. State Transition Matrix (Kinematic)
# Temp_new = Temp_old + (Rate * dt)
F = np.array([[1.0, dt],
[0.0, 1.0]])
# 2. Process Noise Matrix (Q)
# Higher dt means more uncertainty in the prediction
Q = q_var * np.array([
[(dt**4)/4, (dt**3)/2],
[(dt**3)/2, dt**2]
])
# --- PREDICT ---
x = F @ x
P = F @ P @ F.T + Q
# --- UPDATE ---
z = obs_temp[i]
y = z - (C @ x) # Innovation
S = C @ P @ C.T + R # System uncertainty
K = P @ C.T / S[0,0] # Kalman Gain
x = x + K * y
P = (np.eye(2) - K @ C) @ P
predictions.append(x[0, 0])
return np.array(predictions)
We now run the 2D Kalman filter on our data. We pick the long segment between about 500 to 1800 to see how well the reconstruction is doing. We run both the 1D and 2D Kalman filter predictions.
temp_pred_2d = run_2d_kalman_filter(clean_df['temperature_C'].values, clean_df['dt'].values, q_var=1e-4, r_var=0.25)
actual_temps = clean_df['temperature_C'].values
# Plot 1: Constant Slope Region (Manual Midpoint)
mid1 = 1200
start1, end1 = mid1 - 500, mid1 + 500
seg_actual1 = actual_temps[start1:end1]
seg_pred1d_1 = temp_pred_1d[start1:end1]
seg_pred2d_1 = temp_pred_2d[start1:end1]
# Plot 2: Middle of Flight (Calculated Midpoint)
mid2 = len(clean_df) // 2
start2, end2 = mid2 - 500, mid2 + 500
seg_actual2 = actual_temps[start2:end2]
seg_pred1d_2 = temp_pred_1d[start2:end2]
seg_pred2d_2 = temp_pred_2d[start2:end2]
indices = np.arange(1000)
# --- 2. PLOTTING ---
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
# TOP SUBPLOT: Constant Slope Region
ax1.plot(indices, seg_actual1, '-', color='#adb5bd', alpha=0.5, linewidth=5, label='Raw Sensor')
ax1.plot(indices, seg_pred1d_1, '-', color='#007bff', linewidth=1.5, label='1D Kalman Prediction')
ax1.plot(indices, seg_pred2d_1, '-', color='#28a745', linewidth=1.5, label='2D Kalman Prediction')
ax1.set_title('Region 1: Constant Slope (Ascent)', fontsize=14, fontweight='bold', loc='left')
ax1.set_ylabel('Temperature (°C)')
ax1.legend(loc='upper right', fontsize=9)
ax1.grid(True, linestyle='--', alpha=0.3)
# BOTTOM SUBPLOT: Middle of Flight (Innovation vs Reality)
ax2.plot(indices, seg_actual2, '-', color='#adb5bd', alpha=0.5, linewidth=5, label='Raw Sensor')
ax2.plot(indices, seg_pred1d_2, '-', color='#007bff', linewidth=1.5, label='1D Kalman Prediction')
ax2.plot(indices, seg_pred2d_2, '-', color='#28a745', linewidth=1.5, label='2D Kalman Prediction')
ax2.set_title('Region 2: Flight Midpoint (Dynamic Changes)', fontsize=14, fontweight='bold', loc='left')
ax2.set_xlabel('Time Step (Seconds)')
ax2.set_ylabel('Temperature (°C)')
ax2.legend(loc='upper right', fontsize=9)
ax2.grid(True, linestyle='--', alpha=0.3)
plt.tight_layout()
plt.show()
The estimate now tracks the data very closely, especially when the change with respect to time is almost linear! There's still some smoothing going on in the latter section when the relationship is more irregular, but the errors are minimized. This provides some intuition that our state space model is actually a good one!
The error histograms of the two filters are next.
# 1. Calculate the Errors (Residuals)
# Error = Truth - Prediction
error_1d = actual_temps - temp_pred_1d
error_2d = actual_temps - temp_pred_2d
# 2. Setup the Subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6), sharey=True)
xlimit = max(np.max(np.abs(error_1d)), np.max(np.abs(error_2d))) * 1.1 # Set x-axis limits based on max error
# 3. Plot 1D Errors
ax1.hist(error_1d, bins=100, color='#007bff', range = (-xlimit, xlimit), edgecolor='white', alpha=0.7)
ax1.set_title('1D Kalman Filter Error Distribution'+" "+f"(Variance: {np.var(error_1d):.6f})", fontsize=14, fontweight='bold')
ax1.set_xlabel('Error (°C)', fontsize=12)
ax1.set_ylabel('Frequency', fontsize=12)
ax1.grid(axis='y', linestyle='--', alpha=0.4)
# 4. Plot 2D Errors
ax2.hist(error_2d, bins=100, range = (-xlimit, xlimit),color='#28a745', edgecolor='white', alpha=0.7)
ax2.set_title('2D Kalman Filter Error Distribution'+" "+f"(Variance: {np.var(error_2d):.6f})", fontsize=14, fontweight='bold')
ax2.set_xlabel('Error (°C)', fontsize=12)
ax2.grid(axis='y', linestyle='--', alpha=0.4)
# 5. Add a vertical line at 0 for reference
for ax in [ax1, ax2]:
ax.axvline(0, color='black', linestyle='-', linewidth=1, alpha=0.5)
plt.tight_layout()
plt.show()
5. Using 2D Kalman filter for better temperature prediction¶
We now extend the symmetric dual-observer idea to the 2D Kalman filter, where the state includes both temperature and its rate of change (drift). This allows the model to better capture gradual trends instead of assuming the temperature is static between updates.
As before, the balloon runs both a true filter (updated with every observation) and a shadow filter (which mirrors the ground station). Both filters now evolve using a time-dependent state transition model, incorporating variable time steps.
At each step:
- The true filter performs a full predict-update cycle using the latest measurement.
- The shadow filter performs only the prediction step, simulating what the ground station would compute without new data.
- The balloon compares the temperature estimate of the true filter against the predicted shadow estimate.
- If the error is greater than the threshold, the balloon pings the ground station, letting the ground station synchronize with the balloon. Otherwise, the ground station evolves its estimates by prediction.
def run_symmetric_kalman_2d(obs_temp, time_dts, threshold=1.0, q_var=1e-4, r_var=1e-1):
"""
2D Kalman filter with event-triggered transmission (symmetric architecture).
The balloon runs a 'true' filter (updated every step) and a 'shadow' filter
(mirroring what the ground center has). A ping is sent whenever the
error between the true estimate and the shadow prior exceeds
the threshold.
Parameters
----------
obs_temp : array-like, temperature observations
time_dts : array-like, time deltas (seconds), same length as obs_temp
threshold : float, threshold for triggering a ping
q_var : float, process noise variance
r_var : float, measurement noise variance
Returns
-------
tx_indices : list of int, timestep indices at which a ping was sent
ground_view : (N,) ndarray, ground center's temperature estimate
shadow_uncertainty: (N,) ndarray, trace of shadow covariance (total uncertainty)
true_history : (N,) ndarray, balloon's internal temperature estimate
"""
C = np.array([[1.0, 0.0]])
R = np.array([[r_var]])
# --- 1. INITIALIZATION ---
x_true = np.array([[obs_temp[0]], [0.0]])
P_true = np.eye(2)
x_shadow = np.array([[obs_temp[0]], [0.0]])
P_shadow = np.eye(2)
tx_indices = [0]
ground_view = [x_shadow[0, 0]]
shadow_uncertainty = [np.trace(P_shadow)]
true_history = [x_true[0, 0]]
for i in range(1, len(obs_temp)):
dt = time_dts[i]
F = np.array([[1.0, dt],
[0.0, 1.0]])
Q = q_var * np.array([
[(dt**4)/4, (dt**3)/2],
[(dt**3)/2, dt**2 ]
])
# --- 2. TRUE FILTER UPDATE (always runs on the balloon) ---
x_true_prior = F @ x_true
P_true_prior = F @ P_true @ F.T + Q
y_true = obs_temp[i] - (C @ x_true_prior)[0, 0]
S_true = (C @ P_true_prior @ C.T + R)[0, 0]
K_true = (P_true_prior @ C.T) / S_true
x_true = x_true_prior + K_true * y_true
P_true = (np.eye(2) - K_true @ C) @ P_true_prior
true_history.append(x_true[0, 0])
# --- 3. SHADOW FILTER PREDICTION (mirrors ground center) ---
x_shadow_prior = F @ x_shadow
P_shadow_prior = F @ P_shadow @ F.T + Q
temp_err = (x_true[0,0] - x_shadow_prior[0,0])**2
if temp_err > threshold**2:
# PING: transmit balloon's full state estimate to the ground
tx_indices.append(i)
x_shadow = x_true.copy()
P_shadow = P_true.copy() # also transmit covariance
else:
# SILENCE: ground coasts on its prediction
x_shadow = x_shadow_prior
P_shadow = P_shadow_prior
ground_view.append(x_shadow[0, 0])
shadow_uncertainty.append(np.trace(P_shadow))
return tx_indices, np.array(ground_view), np.array(shadow_uncertainty), np.array(true_history)
threshold = 0.5 # 0.5°C threshold for demonstration (can be tuned)
tx_idx, ground_path, p_history, x_true_history = run_symmetric_kalman_2d(
clean_df['temperature_C'].values,
clean_df['dt'].values,
threshold=threshold,
q_var=1e-4,
r_var=0.25
)
num_transmissions = len(tx_idx)
optimized_bandwidth_kb = (num_transmissions * num_sensors * PAYLOAD_PER_SENSOR + num_transmissions * PACKET_OVERHEAD) / 1024
# 2. Comparison Metrics
savings_kb = naive_bandwidth_kb - optimized_bandwidth_kb
reduction_percent = (1 - (optimized_bandwidth_kb / naive_bandwidth_kb)) * 100
compression_ratio = total_readings / num_transmissions
print(f"--- SYMMETRIC TELEMETRY PERFORMANCE ---")
print(f"Naive Data Usage: {naive_bandwidth_kb:.2f} KB")
print(f"Optimized Data Usage: {optimized_bandwidth_kb:.2f} KB")
print(f"Total Data Saved: {savings_kb:.2f} KB")
print(f"Bandwidth Reduction: {reduction_percent:.2f}%")
print(f"Compression Ratio: {compression_ratio:.2f}x")
print(f"---------------------------------------")
# 1. Calculate the errors (Actual - Reconstructed)
errors_from_balloon_estimate = x_true_history - ground_path
max_drift = np.max(np.abs(errors_from_balloon_estimate))
print(f"Max Reconstruction Error: {max_drift:.4f}°C")
fig, ax = plt.subplots(figsize=(10, 6))
# Histogram with white edges for clarity
ax.hist(errors_from_balloon_estimate, bins=50, color='#6f42c1', edgecolor='white', alpha=0.8)
# 3. Add the Threshold Lines
# These are the "Guardrails" of your telemetry system
ax.axvline(threshold, color='#dc3545', linestyle='--', linewidth=2, label='Threshold (+{:.1f}°C)'.format(threshold))
ax.axvline(-threshold, color='#dc3545', linestyle='--', linewidth=2, label='Threshold (-{:.1f}°C)'.format(threshold))
# 4. Styling
ax.set_title('Distribution of Reconstruction Errors (from the balloon observations)', fontsize=16, fontweight='bold', loc='left')
ax.set_xlabel('Error (Balloon estimate - Ground View) [°C]', fontsize=12)
ax.set_ylabel('Frequency', fontsize=12)
ax.legend()
ax.grid(axis='y', linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()
--- SYMMETRIC TELEMETRY PERFORMANCE --- Naive Data Usage: 236.80 KB Optimized Data Usage: 5.94 KB Total Data Saved: 230.86 KB Bandwidth Reduction: 97.49% Compression Ratio: 39.88x --------------------------------------- Max Reconstruction Error: 0.4992°C
We then plot the ground truth, the balloon estimate, the ground station estimate and the ping locations, as we did before for the 1D case.
fig, axes = plt.subplots(2, 1, figsize=(14, 11))
x_true_arr = np.array(x_true_history)
segments = [
(len(x_true_history) // 2, 'Middle Segment'),
(1200, 'Segment at Index 1200'),
]
for ax, (mid, title) in zip(axes, segments):
start, end = mid - 500, mid + 500
slc = slice(start, end)
indices = np.arange(end - start)
pings_in_window = [i - start for i in tx_idx if start <= i < end]
raw_sensor_seg = clean_df['temperature_C'].values[slc]
ax.plot(indices, raw_sensor_seg, '-', color='#adb5bd', alpha=0.6, markersize=4, label='Raw Sensor')
ax.plot(indices, x_true_arr[slc], color='#007bff', linewidth=2, label='Balloon Truth (x_true_history)', zorder=2)
ax.plot(indices, ground_path[slc], color='#dc3545', linestyle='--', linewidth=2, label='Ground View (ground_path)', zorder=3)
if pings_in_window:
ax.scatter(pings_in_window, x_true_arr[slc][pings_in_window],
color='gold', edgecolor='black', s=120, marker='*',
label='Ping (tx_idx)', zorder=5)
ax.set_title(f'Symmetric Dual-Observer: {title}', fontsize=14, fontweight='bold', loc='left')
ax.set_xlabel('Time Step (Relative to Segment Start)', fontsize=11)
ax.set_ylabel('Temperature (°C)', fontsize=11)
ax.legend(frameon=True, loc='best')
ax.grid(True, linestyle=':', alpha=0.6)
plt.tight_layout()
plt.show()
ground_station_error = clean_df['temperature_C'].values[slc] - ground_path[slc]
max_error = np.max(np.abs(ground_station_error))
rmse_error = np.sqrt(np.mean(np.square(ground_station_error)))
print(f"Max Error between Ground Station Estimate and Actual: {max_error:.4f}°C")
print(f"RMSE between Ground Station Estimate and Actual: {rmse_error:.4f}°C")
Max Error between Ground Station Estimate and Actual: 0.6646°C RMSE between Ground Station Estimate and Actual: 0.2135°C
Compared to the 1D Kalman filter case, the balloon can ping less, especially when the change is nearly linear, and the reconstruction at the ground station is much closer to the true values!
6. Dénouement: Kalman filter for position and velocity with wind as control¶
Finally, we move away from temperature estimation to a 6D symmetric Kalman filter that tracks the balloon’s full kinematic state, position and velocity, in 3D while incorporating wind as a control input.
The structure remains familiar: the balloon runs both a true filter (with full access to measurements and wind) and a shadow filter (representing the ground station, which only updates on transmissions). However, the dynamics are now significantly richer:
- The state evolves under a constant-velocity model, with wind contributing a horizontal displacement at each step.
- Wind affects position directly, but does not overwrite the velocity state, capturing the idea that wind biases motion without redefining the system’s internal dynamics.
At each timestep:
- The true filter performs prediction using the current wind vector, followed by a full measurement update using both position and velocity observations.
- The shadow filter predicts forward using the last transmitted wind, reflecting the ground station’s limited knowledge.
Transmission is triggered by a combined position–velocity error metric:
- Position error and velocity error are merged into a single quantity using a scaling factor ($\lambda$), which converts velocity discrepancies into equivalent position error over a characteristic timescale.
- This ensures that both fast divergence (velocity mismatch) and accumulated drift (position mismatch) are properly penalized.
When a ping occurs, the balloon transmits its full state and wind information, and the ground center is brought into exact synchronization. Otherwise, the ground continues propagating forward using stale wind information, inevitably drifting until the next correction.
This final formulation captures the full interplay between dynamics, environmental forcing, and communication constraints, yielding a system that transmits only when the combined physical state meaningfully diverges.
def run_symmetric_kalman_pos_vel_wind(
obs_pos, # (N, 3) array: [x_dist, y_dist, z_dist] in metres
obs_vel, # (N, 3) array: [vx, vy, vz] in m/s
obs_wind_speed, # (N,) array: wind speed in m/s
obs_wind_dir, # (N,) array: wind direction in degrees (met convention: FROM)
time_dts, # (N,) array: time deltas in seconds
pos_threshold=50.0, # metres
vel_threshold=5.0, # m/s
vel_weight=5.0, # lambda: effectively converts m/s -> metres for threshold
q_pos=1e-2, # process noise, position block
q_vel=1e-3, # process noise, velocity block
r_pos=10.0, # measurement noise std, position (metres)
r_vel=0.5, # measurement noise std, velocity (m/s)
):
"""
6D Kalman filter (position + velocity) with wind as control input and
event-triggered transmission (symmetric architecture).
State vector: x = [px, py, pz, vx, vy, vz]^T (6,)
Dynamics (constant-velocity + wind control):
p_{k+1} = p_k + v_k * dt + B_wind * u_k * dt
v_{k+1} = v_k
where u_k = [wind_x, wind_y, 0] is the horizontal wind vector and
B_wind couples wind into the position residual only (i.e. wind biases
the expected displacement, not the velocity state directly).
Transmission trigger:
err = ||dp||^2 + lambda^2 * ||dv||^2 > (pos_threshold^2 + lambda^2 * vel_threshold^2)
where dp = pos_true - pos_shadow_prior, dv = vel_true - vel_shadow_prior,
and lambda = vel_weight (units: seconds). With the default lambda=25 s,
a 1 m/s velocity discrepancy "costs" as much as 25 m of position error,
which matches the typical time-to-divergence for balloon-scale velocities.
Parameters
----------
obs_pos : (N, 3) measured positions [x, y, z] in metres
obs_vel : (N, 3) measured velocities [vx, vy, vz] in m/s
obs_wind_speed : (N,) wind speed in m/s
obs_wind_dir : (N,) meteorological wind direction in degrees
(direction FROM which wind blows;
0/360 = from North, 90 = from East)
time_dts : (N,) time step durations in seconds
pos_threshold : float position component of trigger (metres)
vel_threshold : float velocity component of trigger (m/s)
vel_weight : float lambda (seconds); scales velocity error to metres
q_pos : float process noise variance for position states
q_vel : float process noise variance for velocity states
r_pos : float measurement noise std for position (metres)
r_vel : float measurement noise std for velocity (m/s)
Returns
-------
tx_indices : list[int] timestep indices at which a ping was sent
ground_pos : (N, 3) ground center's position estimate
ground_vel : (N, 3) ground center's velocity estimate
shadow_uncertainty : (N,) trace of shadow covariance
true_pos_history : (N, 3) balloon's internal position estimate
true_vel_history : (N, 3) balloon's internal velocity estimate
wind_vec_history : (N, 3) wind vector used at each step [wx, wy, 0]
"""
obs_pos = np.asarray(obs_pos, dtype=float) # (N, 3)
obs_vel = np.asarray(obs_vel, dtype=float) # (N, 3)
obs_wind_speed = np.asarray(obs_wind_speed, dtype=float)
obs_wind_dir = np.asarray(obs_wind_dir, dtype=float)
time_dts = np.asarray(time_dts, dtype=float)
N = len(obs_pos)
# --- Measurement matrix: observe full state [px, py, pz, vx, vy, vz] ---
C = np.eye(6)
R = np.diag([r_pos**2, r_pos**2, r_pos**2,
r_vel**2, r_vel**2, r_vel**2])
# --- Combined threshold in squared mixed units ---
trigger_threshold_sq = pos_threshold**2 + (vel_weight * vel_threshold)**2
def wind_to_vector(speed, direction_deg):
"""Met convention: direction is FROM. North=0, clockwise.
Returns [wx, wy, 0] where x=East, y=North."""
rad = np.radians(direction_deg)
wx = -speed * np.sin(rad) # eastward component
wy = -speed * np.cos(rad) # northward component
return np.array([wx, wy, 0.0])
# --- Initialisation ---
x0 = np.concatenate([obs_pos[0], obs_vel[0]]) # (6,)
x_true = x0.copy()
P_true = np.eye(6)
x_shadow = x0.copy()
P_shadow = np.eye(6)
tx_indices = [0]
ground_pos = [x_shadow[:3].copy()]
ground_vel = [x_shadow[3:].copy()]
shadow_uncertainty = [np.trace(P_shadow)]
true_pos_history = [x_true[:3].copy()]
true_vel_history = [x_true[3:].copy()]
wind0 = wind_to_vector(obs_wind_speed[0], obs_wind_dir[0])
wind_vec_history = [wind0.copy()]
wind_shadow = wind0.copy() # shadow's last known wind
for i in range(1, N):
dt = time_dts[i]
# --- State transition matrix F (6x6) ---
F = np.eye(6)
F[0, 3] = dt # px += vx * dt
F[1, 4] = dt # py += vy * dt
F[2, 5] = dt # pz += vz * dt
# --- Process noise Q ---
Q = np.diag([q_pos * dt**2, q_pos * dt**2, q_pos * dt**2,
q_vel * dt**2, q_vel * dt**2, q_vel * dt**2])
# --- Wind control vector (horizontal only) ---
wind_true = wind_to_vector(obs_wind_speed[i], obs_wind_dir[i])
# Control effect on position: displacement = wind * dt (additive bias)
# B maps wind_vec (3,) -> state (6,); only position rows are affected
wind_displacement = np.array([
wind_true[0] * dt,
wind_true[1] * dt,
0.0, # wind does not drive vertical position
0.0, 0.0, 0.0 # wind does not directly update velocity state
])
# --- TRUE FILTER: predict ---
x_true_prior = F @ x_true + wind_displacement
P_true_prior = F @ P_true @ F.T + Q
# --- TRUE FILTER: update with measurement ---
y = np.concatenate([obs_pos[i], obs_vel[i]]) - C @ x_true_prior
S = C @ P_true_prior @ C.T + R # = P_prior + R (since C=I)
K = P_true_prior @ C.T @ np.linalg.inv(S)
x_true = x_true_prior + K @ y
P_true = (np.eye(6) - K @ C) @ P_true_prior
true_pos_history.append(x_true[:3].copy())
true_vel_history.append(x_true[3:].copy())
wind_vec_history.append(wind_true.copy())
# --- SHADOW FILTER: predict using last known wind ---
wind_shadow_displacement = np.array([
wind_shadow[0] * dt,
wind_shadow[1] * dt,
0.0,
0.0, 0.0, 0.0
])
x_shadow_prior = F @ x_shadow + wind_shadow_displacement
P_shadow_prior = F @ P_shadow @ F.T + Q
# --- TRIGGER CHECK ---
dp = x_true[:3] - x_shadow_prior[:3]
dv = x_true[3:] - x_shadow_prior[3:]
err_sq = np.dot(dp, dp) + (vel_weight**2) * np.dot(dv, dv)
if err_sq > trigger_threshold_sq:
tx_indices.append(i)
x_shadow = x_true.copy()
P_shadow = P_true.copy()
wind_shadow = wind_true.copy()
else:
x_shadow = x_shadow_prior
P_shadow = P_shadow_prior
# wind_shadow unchanged: ground doesn't know current wind
ground_pos.append(x_shadow[:3].copy())
ground_vel.append(x_shadow[3:].copy())
shadow_uncertainty.append(np.trace(P_shadow))
return (
tx_indices,
np.array(ground_pos),
np.array(ground_vel),
np.array(shadow_uncertainty),
np.array(true_pos_history),
np.array(true_vel_history),
np.array(wind_vec_history),
)
(tx_indices, ground_pos, ground_vel, shadow_uncertainty,
true_pos_history, true_vel_history, wind_vec_history) = run_symmetric_kalman_pos_vel_wind(
obs_pos = clean_df[['x_dist', 'y_dist', 'z_dist']].values,
obs_vel = clean_df[['vx', 'vy', 'vz']].values,
obs_wind_speed = clean_df['wind speed_m/s'].values,
obs_wind_dir = clean_df['wind direction_degree'].values,
time_dts = clean_df['dt'].values,
pos_threshold = 10.0,
vel_threshold = 5.0,
vel_weight = 25.0,
)
To compute the bandwidth savings, we have to be a bit careful. Each ping in the naive implementation consists of 9 floats (position, velocity, wind, each 3D) while in the Kalman implementation each ping also contains the covariance matrix $\mathbf{P}$. Still, the massive reduction in transmissions leads to a lot of saved bandwidth.
N = len(true_pos_history)
# --- Naive: transmit everything every step ---
# Each step: pos(3) + vel(3) + wind(3) = 9 floats
FLOATS_PER_STEP_NAIVE = 9
naive_bandwidth_kb = (N * FLOATS_PER_STEP_NAIVE * PAYLOAD_PER_SENSOR + N * PACKET_OVERHEAD) / 1024
# --- Optimised: only transmit on ping ---
# Each ping: pos(3) + vel(3) + wind(3) + P diagonal(6) = 15 floats
# (sending full 6x6 P would be 36 floats; diagonal is sufficient to reset)
FLOATS_PER_PING = 15
num_transmissions = len(tx_indices)
steps_saved = N - num_transmissions
optimized_bandwidth_kb = (num_transmissions * FLOATS_PER_PING * PAYLOAD_PER_SENSOR + num_transmissions * PACKET_OVERHEAD) / 1024
savings_kb = naive_bandwidth_kb - optimized_bandwidth_kb
reduction_percent = (1 - optimized_bandwidth_kb / naive_bandwidth_kb) * 100
compression_ratio = N / num_transmissions
print(f"--- 3D SYMMETRIC TELEMETRY PERFORMANCE ---")
print(f"Total timesteps: {N}")
print(f"Total transmissions: {num_transmissions}")
print(f"Steps with no ping: {steps_saved} ({100*steps_saved/N:.1f}% silent)")
print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Naive bandwidth: {naive_bandwidth_kb:.2f} KB")
print(f"Optimised bandwidth: {optimized_bandwidth_kb:.2f} KB")
print(f"Total saved: {savings_kb:.2f} KB ({reduction_percent:.1f}% reduction)")
print(f"------------------------------------------")
--- 3D SYMMETRIC TELEMETRY PERFORMANCE --- Total timesteps: 6062 Total transmissions: 203 Steps with no ping: 5859 (96.7% silent) Compression ratio: 29.86x Naive bandwidth: 307.84 KB Optimised bandwidth: 15.07 KB Total saved: 292.77 KB (95.1% reduction) ------------------------------------------
print(np.shape(ground_pos))
print(np.shape(clean_df[['x_dist', 'y_dist', 'z_dist']].values))
pos_error = np.linalg.norm(ground_pos - clean_df[['x_dist', 'y_dist', 'z_dist']].values, axis=1)
vel_error = np.linalg.norm(ground_vel - clean_df[['vx', 'vy', 'vz']].values, axis=1)
rmse_pos = np.sqrt(np.mean(pos_error**2))
rmse_vel = np.sqrt(np.mean(vel_error**2))
print(f"RMSE Position Error: {rmse_pos:.2f} m")
print(f"RMSE Velocity Error: {rmse_vel:.2f} m/s")
(6062, 3) (6062, 3) RMSE Position Error: 159.81 m RMSE Velocity Error: 6.58 m/s
# For better plotting, we can smooth the ground path by linearly interpolating between the ping points
def smooth_ground_path(ground_pos, tx_indices, N):
smoothed = ground_pos.copy()
tx = list(tx_indices)
if tx[-1] != N - 1:
tx.append(N - 1)
for k in range(len(tx) - 1):
i0, i1 = tx[k], tx[k+1]
for dim in range(3):
smoothed[i0:i1+1, dim] = np.linspace(
ground_pos[i0, dim], ground_pos[i1, dim], i1 - i0 + 1
)
return smoothed
ground_pos_smoothed = smooth_ground_path(ground_pos, tx_indices, len(true_pos_history))
import matplotlib.gridspec as gridspec
tp = true_pos_history # (N, 3)
gp = ground_pos_smoothed # (N, 3)
wv = wind_vec_history # (N, 3)
tx = np.array(tx_indices)
N = len(tp)
idx = np.arange(N)
ping_pos = tp[tx] # positions at ping moments
# --- FIGURE SETUP ---
# Maximize the figsize for a standalone hero plot
fig = plt.figure(figsize=(16, 12))
ax = fig.add_subplot(111, projection='3d')
# --- PLOTTING ---
# Balloon Truth
ax.plot(tp[:, 0], tp[:, 1], tp[:, 2],
color='#007bff', lw=3, label='Balloon Truth', zorder=2)
# Ground View (Shadow)
ax.plot(gp[:, 0], gp[:, 1], gp[:, 2],
color='#dc3545', lw=2, ls='--', alpha=0.8, label='Ground View', zorder=2)
# Pings
ax.scatter(ping_pos[:, 0], ping_pos[:, 1], ping_pos[:, 2],
color='gold', edgecolor='black', s=100, marker='.',
label='Telemetry Ping', zorder=1)
# --- AXIS STYLING ---
ax.set_xlabel('X (m, East)', labelpad=15, fontsize=14)
ax.set_ylabel('Y (m, North)', labelpad=15, fontsize=14)
ax.set_zlabel('Z (m, Altitude)', labelpad=15, fontsize=14)
# Force the title higher up
ax.set_title('3D Symmetric Kalman Filter Reconstruction',
fontsize=20, fontweight='bold', pad=40)
# --- VIEW OPTIMIZATION ---
# Adjust these to find the best angle for your specific flight path
ax.view_init(elev=20, azim=-45)
# This reduces the "margin" around the 3D box to make the plot look bigger
fig.subplots_adjust(left=0, right=1, bottom=0, top=0.9)
ax.legend(loc='upper left', fontsize=12)
ax.grid(True, alpha=0.3)
plt.show()
# --- FILTERING LOGIC ---
# Define the range around 4000m
z_min, z_max = 3000, 5000
mask = (tp[:, 2] >= z_min) & (tp[:, 2] <= z_max)
# Apply mask to all main arrays
tp_seg = tp[mask]
gp_seg = gp[mask]
# Filter pings: only keep indices that are within the mask range
tx_mask = (tp[tx, 2] >= z_min) & (tp[tx, 2] <= z_max)
ping_pos_seg = tp[tx][tx_mask]
# --- FIGURE SETUP ---
fig = plt.figure(figsize=(16, 12))
ax = fig.add_subplot(111, projection='3d')
# --- PLOTTING (Using Segmented Data) ---
ax.plot(tp_seg[:, 0], tp_seg[:, 1], tp_seg[:, 2],
color='#007bff', lw=3, label='Balloon Truth', zorder=2)
ax.plot(gp_seg[:, 0], gp_seg[:, 1], gp_seg[:, 2],
color='#dc3545', lw=2, ls='--', alpha=0.8, label='Ground View', zorder=2)
ax.scatter(ping_pos_seg[:, 0], ping_pos_seg[:, 1], ping_pos_seg[:, 2],
color='gold', edgecolor='black', s=100, marker='.',
label='Telemetry Ping', zorder=1)
# --- AXIS STYLING ---
ax.set_xlabel('X (m, East)', labelpad=15, fontsize=14)
ax.set_ylabel('Y (m, North)', labelpad=15, fontsize=14)
ax.set_zlabel('Z (m, Altitude)', labelpad=15, fontsize=14)
ax.set_title('3D Reconstruction Detail (3500m - 4500m)',
fontsize=20, fontweight='bold', pad=40)
ax.view_init(elev=20, azim=-45)
fig.subplots_adjust(left=0, right=1, bottom=0, top=0.9)
ax.legend(loc='upper left', fontsize=12)
ax.grid(True, alpha=0.3)
plt.show()
# --- 1. SETUP THE SUBPLOTS ---
fig, (ax_xy, ax_yz, ax_xz) = plt.subplots(1, 3, figsize=(20, 6))
fig.suptitle('2D Projections: 3500m - 4500m Altitude Segment', fontsize=18, fontweight='bold', y=1.05)
# --- 2. XY PLANE (PLAN VIEW: EAST VS NORTH) ---
ax_xy.plot(tp_seg[:, 0], tp_seg[:, 1], color='#007bff', lw=2, label='Truth')
ax_xy.plot(gp_seg[:, 0], gp_seg[:, 1], color='#dc3545', ls='--', lw=1.5, alpha=0.8, label='Ground')
ax_xy.scatter(ping_pos_seg[:, 0], ping_pos_seg[:, 1], color='gold', edgecolor='black', s=60, marker='.', zorder=5)
ax_xy.set_xlabel('X (m, East)')
ax_xy.set_ylabel('Y (m, North)')
ax_xy.set_title('XY - Plan View')
# --- 3. YZ PLANE (SIDE VIEW: NORTH VS ALTITUDE) ---
ax_yz.plot(tp_seg[:, 1], tp_seg[:, 2], color='#007bff', lw=2)
ax_yz.plot(gp_seg[:, 1], gp_seg[:, 2], color='#dc3545', ls='--', lw=1.5, alpha=0.8)
ax_yz.scatter(ping_pos_seg[:, 1], ping_pos_seg[:, 2], color='gold', edgecolor='black', s=60, marker='.', zorder=5)
ax_yz.set_xlabel('Y (m, North)')
ax_yz.set_ylabel('Z (m, Altitude)')
ax_yz.set_title('YZ - North vs Alt')
# --- 4. XZ PLANE (SIDE VIEW: EAST VS ALTITUDE) ---
ax_xz.plot(tp_seg[:, 0], tp_seg[:, 2], color='#007bff', lw=2)
ax_xz.plot(gp_seg[:, 0], gp_seg[:, 2], color='#dc3545', ls='--', lw=1.5, alpha=0.8)
ax_xz.scatter(ping_pos_seg[:, 0], ping_pos_seg[:, 2], color='gold', edgecolor='black', s=60, marker='.', zorder=5)
ax_xz.set_xlabel('X (m, East)')
ax_xz.set_ylabel('Z (m, Altitude)')
ax_xz.set_title('XZ - East vs Alt')
# --- 5. CLEANUP ---
for ax in [ax_xy, ax_yz, ax_xz]:
ax.grid(True, ls=':', alpha=0.6)
ax_xy.legend(loc='best', fontsize=10)
plt.tight_layout()
plt.show()