5
5
import matplotlib .pyplot as plt
6
6
from math import pi , sqrt , ceil , atan2
7
7
8
+
9
+ '''
10
+ To create a sparse matrix, you have to create three "parallel" arrays,
11
+ one with row indicies, one with column indicies, and one with the actual
12
+ values to put in the matrix. This is a helper function to make this process easier.
13
+ '''
8
14
def dense_2_sp_lists (M : np .array , tl_row : int , tl_col : int , row_vec = True ):
9
15
'''
10
16
This function takes in a dense matrix and turns it into a flat array.
@@ -44,15 +50,27 @@ def dense_2_sp_lists(M: np.array, tl_row : int, tl_col: int, row_vec=True):
44
50
'''
45
51
L's columns will be 3*N big, where N is the number of timesteps
46
52
Rows will be organized organized dynamics first, then measurements:
47
-
48
53
'''
49
54
class satelliteModelBatch :
55
+ '''
56
+ A class to perform the factor graph optimization. It stores
57
+ all the measurements in the class, has several "helper" functions, and the
58
+ "opt" function that does the actual optimization using the helper functions
59
+ '''
60
+
50
61
def __init__ (self , meas : np .array , R , Q ,
51
62
dt : float = 5 ,
52
63
x0 :np .array = np .array ([0 , 2E7 , 4500 , 0 ])):
64
+ '''
65
+ Get the measurements, dt (for running the dynamics),
66
+ the R & Q matrices, and the initial location and initialize
67
+ the hidden variables
68
+ '''
53
69
self .N = len (meas )
54
70
# self.N = 10
55
71
self .dt = dt
72
+ # To do more accurate dynamics, if dt is large, use multiple, smaller
73
+ # timesteps instead
56
74
if self .dt > 1 :
57
75
self .prop_dt = self .dt / ceil (self .dt )
58
76
self .n_timesteps = int (ceil (self .dt ))
@@ -77,6 +95,10 @@ def create_init_state(self):
77
95
self .states [i ] = self .prop_one_timestep (self .states [i - 1 ])
78
96
79
97
def prop_one_timestep (self , state ):
98
+ '''
99
+ This runs the actual dynamics, i.e. given a value for x (really x_k) find
100
+ what x should be (by dynamics) at the next timestep (self.dt forward in time) -- x_{k+1}
101
+ '''
80
102
going_out = state .copy ()
81
103
for _ in range (self .n_timesteps ):
82
104
dist = la .norm (going_out [:2 ])
@@ -89,7 +111,9 @@ def prop_one_timestep(self, state):
89
111
return going_out
90
112
91
113
92
-
114
+ '''
115
+ Helper functions to tell what rows you are indexing into in the 'L' matrix
116
+ '''
93
117
def state_idx (self , i : int ) -> int :
94
118
return 4 * i
95
119
@@ -123,9 +147,9 @@ def F_mat(self,state):
123
147
Takes in a current state and finds the derivative of the
124
148
propagate forward one step function (prop_one_timestep)
125
149
126
- Inputs: state -- a 3 -vector (np.array)
150
+ Inputs: state -- a 4 -vector (np.array)
127
151
128
- Returns a 3x3 np.array
152
+ Returns a 4x4 np.array
129
153
'''
130
154
F = np .eye (4 )
131
155
curr_state = state .copy ()
@@ -151,6 +175,12 @@ def F_mat(self,state):
151
175
return F
152
176
153
177
def create_L (self ):
178
+ '''
179
+ This creates the big matrix (L) given the current state of the whole system
180
+ '''
181
+ # First, determine how many non zero entries (nnz_entries) will be in the
182
+ # sparse matrix. Then create the 3 parallel arrays that will be used to
183
+ # form this matrix
154
184
H_size = 4
155
185
F_size = 16 # Should be state size**2
156
186
nnz_entries = 2 * F_size * (self .N - 1 ) + H_size * self .N
@@ -180,6 +210,11 @@ def create_L(self):
180
210
return sp .csr_matrix ((data_l ,(row_l ,col_l )))
181
211
182
212
def create_y (self , state_vec = None ):
213
+ '''
214
+ Compute the residual vector. Can compute it for the
215
+ current state (pass in "None") or for a new state that you
216
+ want to test without setting the internal state.
217
+ '''
183
218
if state_vec is not None :
184
219
state_data = self .vec_to_data (state_vec )
185
220
else :
@@ -211,7 +246,7 @@ def add_delta(self,delta_x: np.array = None) -> np.array:
211
246
This takes the current state and adds on delta_x.
212
247
It DOES NOT modify any internal state
213
248
214
- Inputs: delta_x, a self.N X 3 vector (np.array)
249
+ Inputs: delta_x, a self.N X 3 vector (np.array)h
215
250
Returns: a full state vector of the same size
216
251
'''
217
252
going_out = np .zeros (self .N * 4 )
@@ -238,7 +273,7 @@ def opt(self):
238
273
Create the Jacobian matrix (L) and the residual vector (y) for
239
274
the current state. Find the best linear approximation to minimize y
240
275
and move in that direction. Repeat until convergence.
241
- (This is a Gauss-Newton optimization procedure)
276
+ (This is a damped Gauss-Newton optimization procedure)
242
277
'''
243
278
finished = False
244
279
num_iters = 0
@@ -257,6 +292,7 @@ def opt(self):
257
292
Lty = L .T .dot (y )
258
293
delta_x = spla .spsolve (M ,Lty )
259
294
scale = 1
295
+ # Damp the Gauss-Newton step if it doesn't do what the linearization predicts
260
296
scale_good = False
261
297
while not scale_good :
262
298
next_y = self .create_y (self .add_delta (delta_x * scale ))
@@ -274,8 +310,13 @@ def opt(self):
274
310
self .update_state (delta_x * scale )
275
311
print ('iteration' ,num_iters ,'delta_x length was' ,la .norm (delta_x * scale ), 'scale was' ,scale )
276
312
finished = la .norm (delta_x )< 12 or num_iters > 100
277
-
313
+
314
+
278
315
def test_Jacobian (batch_uni , col , dx = .001 ):
316
+ '''
317
+ This function is useful for debugging. If things don't work, try to
318
+ figure out where the derivative matrix is wrong.
319
+ '''
279
320
curr_y = batch_uni .create_y ()
280
321
orig_state = batch_uni .add_delta ()
281
322
delta_x = np .zeros (orig_state .shape )
0 commit comments