@@ -73,7 +73,9 @@ public void Start()
73
73
return ;
74
74
}
75
75
76
- _repository . ComponentStatusPullInterval = _settings . ComponentStatusPullInterval ;
76
+ // Give the pulling code some time to complete before declaring an entity
77
+ // as outdated. 1.25 might be enough additional time.
78
+ _repository . ComponentStatusOutdatedTimeout = _settings . ComponentStatusPullInterval * 1.25 ;
77
79
78
80
AttachToMessageBus ( ) ;
79
81
@@ -90,51 +92,53 @@ public void Start()
90
92
91
93
public HistoryExtract BuildHistoryExtract ( string componentUid , string statusUid , DateTime rangeStart , DateTime rangeEnd , TimeSpan interval , HistoryExtractDataType dataType )
92
94
{
93
- return new HistoryExtractBuilder ( _repository ) . Build ( componentUid , statusUid , rangeStart , rangeEnd , interval , dataType ) ;
94
- }
95
+ var historyExtract = new HistoryExtractBuilder ( _repository ) . Build ( componentUid , statusUid , rangeStart , rangeEnd , interval , dataType ) ;
96
+ for ( var i = historyExtract . DataPoints . Count - 1 ; i > 0 ; i -- )
97
+ {
95
98
96
- private void AttachToMessageBus ( )
97
- {
98
- var filter = new WirehomeDictionary ( )
99
- . WithValue ( "type" , "component_registry.event.status_changed" ) ;
99
+ }
100
100
101
- _messageBusService . Subscribe ( "history_receiver" , filter , OnComponentStatusChanged ) ;
101
+ return historyExtract ;
102
102
}
103
103
104
- private void OnComponentStatusChanged ( WirehomeDictionary message )
104
+ private bool TryInitializeRepository ( )
105
105
{
106
106
try
107
107
{
108
- var componentStatusValue = new ComponentStatusValue
109
- {
110
- ComponentUid = Convert . ToString ( message [ "component_uid" ] , CultureInfo . InvariantCulture ) ,
111
- StatusUid = Convert . ToString ( message [ "status_uid" ] , CultureInfo . InvariantCulture ) ,
112
- Value = Convert . ToString ( message . GetValueOrDefault ( "new_value" , null ) , CultureInfo . InvariantCulture ) ,
113
- Timestamp = DateTime . UtcNow
114
- } ;
108
+ var repository = new HistoryRepository ( ) ;
109
+ repository . Initialize ( ) ;
110
+ _repository = repository ;
115
111
116
- _pendingComponentStatusValues . Add ( componentStatusValue , _systemService . CancellationToken ) ;
112
+ return true ;
117
113
}
118
114
catch ( Exception exception )
119
115
{
120
- _logger . LogError ( exception , "Error while processing changed component status." ) ;
116
+ _logger . Log ( LogLevel . Warning , exception , "Error while initializing history repository." ) ;
117
+ return false ;
121
118
}
122
119
}
123
120
124
- private bool TryInitializeRepository ( )
121
+ private void AttachToMessageBus ( )
122
+ {
123
+ var filter = new WirehomeDictionary ( )
124
+ . WithValue ( "type" , "component_registry.event.status_changed" ) ;
125
+
126
+ _messageBusService . Subscribe ( "history_receiver" , filter , OnComponentStatusChanged ) ;
127
+ }
128
+
129
+ private void OnComponentStatusChanged ( WirehomeDictionary message )
125
130
{
126
131
try
127
132
{
128
- var repository = new HistoryRepository ( ) ;
129
- repository . Initialize ( ) ;
130
- _repository = repository ;
131
-
132
- return true ;
133
+ TryEnqueueComponentStatusValue (
134
+ Convert . ToString ( message [ "component_uid" ] , CultureInfo . InvariantCulture ) ,
135
+ Convert . ToString ( message [ "status_uid" ] , CultureInfo . InvariantCulture ) ,
136
+ message . GetValueOrDefault ( "new_value" , null ) ,
137
+ DateTime . UtcNow ) ;
133
138
}
134
139
catch ( Exception exception )
135
140
{
136
- _logger . Log ( LogLevel . Warning , exception , "Error while initializing history repository." ) ;
137
- return false ;
141
+ _logger . LogError ( exception , "Error while processing changed component status." ) ;
138
142
}
139
143
}
140
144
@@ -184,21 +188,17 @@ private async Task TryUpdateComponentStatusValuesAsync(CancellationToken cancell
184
188
{
185
189
try
186
190
{
187
- await Task . Delay ( _settings . ComponentStatusPullInterval , cancellationToken ) ;
191
+ await Task . Delay ( _settings . ComponentStatusPullInterval , cancellationToken ) . ConfigureAwait ( false ) ;
188
192
189
193
foreach ( var component in _componentRegistryService . GetComponents ( ) )
190
194
{
191
195
foreach ( var status in component . Status )
192
196
{
193
- var componentStatusValue = new ComponentStatusValue
194
- {
195
- ComponentUid = component . Uid ,
196
- StatusUid = status . Key ,
197
- Value = Convert . ToString ( status . Value , CultureInfo . InvariantCulture ) ,
198
- Timestamp = DateTime . UtcNow
199
- } ;
200
-
201
- _pendingComponentStatusValues . Add ( componentStatusValue , cancellationToken ) ;
197
+ TryEnqueueComponentStatusValue (
198
+ component . Uid ,
199
+ status . Key ,
200
+ status . Value ,
201
+ DateTime . UtcNow ) ;
202
202
}
203
203
}
204
204
}
@@ -211,5 +211,42 @@ private async Task TryUpdateComponentStatusValuesAsync(CancellationToken cancell
211
211
}
212
212
}
213
213
}
214
+
215
+ private void TryEnqueueComponentStatusValue (
216
+ string componentUid ,
217
+ string statusUid ,
218
+ object value ,
219
+ DateTime timestamp )
220
+ {
221
+ try
222
+ {
223
+ var stringValue = Convert . ToString ( value , CultureInfo . InvariantCulture ) ;
224
+
225
+ var roundSetting = _componentRegistryService . GetComponentSetting ( componentUid , "history.round_digits" ) ;
226
+ if ( roundSetting != null )
227
+ {
228
+ var roundDigitsCount = Convert . ToInt32 ( roundSetting ) ;
229
+ if ( decimal . TryParse ( stringValue , out var @decimal ) )
230
+ {
231
+ @decimal = Math . Round ( @decimal , roundDigitsCount ) ;
232
+ stringValue = Convert . ToString ( @decimal , CultureInfo . InvariantCulture ) ;
233
+ }
234
+ }
235
+
236
+ var componentStatusValue = new ComponentStatusValue
237
+ {
238
+ ComponentUid = componentUid ,
239
+ StatusUid = statusUid ,
240
+ Value = stringValue ,
241
+ Timestamp = timestamp
242
+ } ;
243
+
244
+ _pendingComponentStatusValues . Add ( componentStatusValue ) ;
245
+ }
246
+ catch ( Exception exception )
247
+ {
248
+ _logger . LogError ( exception , $ "Error while enque component status value '{ componentUid } .{ statusUid } '.") ;
249
+ }
250
+ }
214
251
}
215
252
}
0 commit comments