44using System . Collections . Concurrent ;
55using System . Collections . Generic ;
66using System . Data . Common ;
7+ using System . Diagnostics ;
78using System . Linq ;
89using System . Reflection ;
910using System . Reflection . Metadata ;
@@ -108,7 +109,6 @@ private set
108109
109110 private DateTime lastSendQueuedMessage ;
110111
111- private SemaphoreSlim presentSemaphoreSlim = new SemaphoreSlim ( 1 ) ;
112112 private bool present ;
113113 public bool Present
114114 {
@@ -170,6 +170,7 @@ private void Instance_PresentUpdateTimerElapsed(object sender, EventArgs e)
170170 private async void DeviceModel_Initialized ( object sender , EventArgs e )
171171 {
172172 deviceModel . Initialized -= DeviceModel_Initialized ;
173+ deviceModel . ParameterValueAdded -= DeviceModel_ParameterValueAdded ;
173174 await collectParameters ( ) ;
174175 }
175176 private async Task collectParameters ( )
@@ -288,41 +289,88 @@ private async Task requestParameter(ERDM_Parameter parameter, object payload = n
288289 }
289290 }
290291
292+ private SemaphoreSlim updateSenaphoreSlim = new SemaphoreSlim ( 1 ) ;
291293 private async Task updateParameters ( )
292294 {
293295 if ( QueuedSupported && deviceModel . KnownNotSupportedParameters . Contains ( ERDM_Parameter . QUEUED_MESSAGE ) )
294296 QueuedSupported = false ;
295297
296- if ( QueuedSupported && ! deviceModel . KnownNotSupportedParameters . Contains ( ERDM_Parameter . QUEUED_MESSAGE ) )
298+ if ( updateSenaphoreSlim . CurrentCount == 0 )
299+ return ;
300+
301+ await updateSenaphoreSlim . WaitAsync ( ) ;
302+ try
297303 {
298- if ( DateTime . UtcNow - lastSendQueuedMessage < TimeSpan . FromMilliseconds ( GlobalTimers . Instance . QueuedUpdateTime ) )
299- return ;
300- ParameterBag parameterBag = new ParameterBag ( ERDM_Parameter . QUEUED_MESSAGE , this . DeviceModel . ManufacturerID , DeviceInfo . DeviceModelId , DeviceInfo . SoftwareVersionId ) ;
301- var define = MetadataFactory . GetDefine ( parameterBag ) ;
302- if ( define . GetRequest . HasValue )
304+ if ( QueuedSupported && ! deviceModel . KnownNotSupportedParameters . Contains ( ERDM_Parameter . QUEUED_MESSAGE ) )
303305 {
304- byte mc = 0 ;
305- do
306+ if ( DateTime . UtcNow - lastSendQueuedMessage < TimeSpan . FromMilliseconds ( GlobalTimers . Instance . QueuedUpdateTime ) )
307+ return ;
308+ ParameterBag parameterBag = new ParameterBag ( ERDM_Parameter . QUEUED_MESSAGE , this . DeviceModel . ManufacturerID , DeviceInfo . DeviceModelId , DeviceInfo . SoftwareVersionId ) ;
309+ var define = MetadataFactory . GetDefine ( parameterBag ) ;
310+ if ( define . GetRequest . HasValue )
306311 {
307- lastSendQueuedMessage = DateTime . UtcNow ;
308- mc = await requestGetParameterWithPayload ( parameterBag , define , UID , Subdevice , ERDM_Status . ADVISORY ) ;
309- await Task . Delay ( GlobalTimers . Instance . UpdateDelayBetweenQueuedUpdateRequests ) ;
310-
312+ byte mc = 0 ;
313+ do
314+ {
315+ var cts = new CancellationTokenSource ( ) ;
316+ cts . CancelAfter ( TimeSpan . FromMilliseconds ( GlobalTimers . Instance . ParameterUpdateTimerInterval ) ) ;
317+ var task = Task . Run ( async ( ) =>
318+ {
319+ lastSendQueuedMessage = DateTime . UtcNow ;
320+
321+ Stopwatch sw = new Stopwatch ( ) ;
322+ sw ? . Restart ( ) ;
323+ mc = await requestGetParameterWithPayload ( parameterBag , define , UID , Subdevice , ERDM_Status . ADVISORY ) ;
324+ sw ? . Stop ( ) ;
325+ Logger ? . LogTrace ( $ "Queued Parameter update took { sw . ElapsedMilliseconds } ms for { mc } messages.") ;
326+ } , cts . Token ) ;
327+ await task ;
328+
329+ if ( task . IsCompletedSuccessfully )
330+ await Task . Delay ( GlobalTimers . Instance . UpdateDelayBetweenQueuedUpdateRequests ) ;
331+ else
332+ {
333+ Logger ? . LogTrace ( task . Exception , $ "Queue Parameter update failed: { task . Exception ? . Message } ") ;
334+ return ;
335+ }
336+
337+ }
338+ while ( mc != 0 ) ;
339+ return ;
311340 }
312- while ( mc != 0 ) ;
313- return ;
341+ }
342+ while ( ParameterUpdatedBag . TryPeek ( out ParameterUpdatedBag bag ) )
343+ {
344+ if ( DateTime . UtcNow - bag . Timestamp < TimeSpan . FromMilliseconds ( GlobalTimers . Instance . NonQueuedUpdateTime ) )
345+ return ;
346+
347+ var cts = new CancellationTokenSource ( ) ;
348+ cts . CancelAfter ( TimeSpan . FromMilliseconds ( GlobalTimers . Instance . ParameterUpdateTimerInterval ) ) ;
349+ var task = Task . Run ( async ( ) =>
350+ {
351+ Stopwatch sw = new Stopwatch ( ) ;
352+ sw ? . Restart ( ) ;
353+ await requestParameter ( bag . Parameter , bag . Index ) ;
354+ sw ? . Stop ( ) ;
355+ Logger ? . LogTrace ( $ "Parameter update for { bag . Parameter } with index { bag . Index } took { sw . ElapsedMilliseconds } ms") ;
356+
357+ UpdateParameterUpdatedBag ( bag . Parameter , bag . Index ) ;
358+ } , cts . Token ) ;
359+ await task ;
360+
361+ if ( task . IsCompletedSuccessfully )
362+ await Task . Delay ( GlobalTimers . Instance . UpdateDelayBetweenNonQueuedUpdateRequests ) ;
363+ else
364+ Logger ? . LogTrace ( task . Exception , $ "Parameter update for { bag . Parameter } with index { bag . Index } failed: { task . Exception ? . Message } ") ;
314365 }
315366 }
316- while ( ParameterUpdatedBag . TryPeek ( out ParameterUpdatedBag bag ) )
367+ catch ( Exception ex )
317368 {
318- if ( DateTime . UtcNow - bag . Timestamp < TimeSpan . FromMilliseconds ( GlobalTimers . Instance . NonQueuedUpdateTime ) )
319- return ;
320-
321- await requestParameter ( bag . Parameter , bag . Index ) ;
322-
323- UpdateParameterUpdatedBag ( bag . Parameter , bag . Index ) ;
324-
325- await Task . Delay ( GlobalTimers . Instance . UpdateDelayBetweenNonQueuedUpdateRequests ) ;
369+ Logger ? . LogError ( ex ) ;
370+ }
371+ finally
372+ {
373+ updateSenaphoreSlim . Release ( ) ;
326374 }
327375 }
328376
@@ -335,11 +383,29 @@ private async Task getDeviceModelAndCollectAllParameters()
335383 if ( ! deviceModel . IsInitialized )
336384 {
337385 deviceModel . Initialized += DeviceModel_Initialized ;
338- await deviceModel . Initialize ( ) ;
386+ deviceModel . ParameterValueAdded += DeviceModel_ParameterValueAdded ;
387+ if ( ! deviceModel . IsInitializing )
388+ await deviceModel . Initialize ( ) ;
389+ else
390+ InvkoeDeviceModelParameterValueAdded ( ) ;
339391 }
340392 else
393+ {
394+ InvkoeDeviceModelParameterValueAdded ( ) ;
341395 await collectParameters ( ) ;
396+ }
397+ void InvkoeDeviceModelParameterValueAdded ( )
398+ {
399+ foreach ( var item in this . deviceModel . ParameterValues )
400+ base . InvokeParameterValueAdded ( new ParameterValueAddedEventArgs ( item . Key , item . Value ) ) ;
401+ }
402+ }
403+
404+ private void DeviceModel_ParameterValueAdded ( object sender , ParameterValueAddedEventArgs e )
405+ {
406+ base . InvokeParameterValueAdded ( e ) ;
342407 }
408+
343409 private async Task collectAllParametersOnRoot ( )
344410 {
345411 await requestParameters ( ) ;
0 commit comments