11package eu .clarin .cmdi .curation .api .subprocessor .collection ;
22
3+ import eu .clarin .cmdi .curation .api .CurationModule ;
34import eu .clarin .cmdi .curation .api .conf .ApiConfig ;
45import eu .clarin .cmdi .curation .api .entity .CMDCollection ;
56import eu .clarin .cmdi .curation .api .entity .CMDInstance ;
1213import eu .clarin .cmdi .curation .api .report .collection .sec .ResProxyReport .InvalidReference ;
1314import eu .clarin .cmdi .curation .api .report .instance .CMDInstanceReport ;
1415import eu .clarin .cmdi .curation .api .exception .MalFunctioningProcessorException ;
16+ import eu .clarin .cmdi .curation .api .report .profile .CMDProfileReport ;
17+ import eu .clarin .cmdi .curation .cr .CRService ;
1518import eu .clarin .linkchecker .persistence .model .AggregatedStatus ;
1619import eu .clarin .linkchecker .persistence .repository .AggregatedStatusRepository ;
1720import eu .clarin .linkchecker .persistence .repository .UrlRepository ;
2831import java .nio .file .Files ;
2932import java .nio .file .Path ;
3033import java .nio .file .attribute .BasicFileAttributes ;
31- import java .util .ArrayList ;
32- import java .util .Collection ;
33- import java .util .HashMap ;
34- import java .util .Map ;
34+ import java .util .*;
3535import java .util .concurrent .*;
36+ import java .util .concurrent .atomic .AtomicInteger ;
3637import java .util .concurrent .locks .Lock ;
3738import java .util .concurrent .locks .ReentrantLock ;
3839import java .util .stream .Stream ;
@@ -52,6 +53,8 @@ public class CollectionAggregator {
5253
5354 private final Map <String , Collection <String >> mdSelfLinks = new HashMap <>();
5455
56+ private final Map <String , AtomicInteger > profileUsage = new HashMap <>();
57+
5558 private final Lock lock = new ReentrantLock ();
5659
5760 public CollectionAggregator (ApiConfig conf , ApplicationContext ctx , AggregatedStatusRepository aRep , UrlRepository uRep ) {
@@ -77,7 +80,7 @@ public void process(CMDCollection collection, CollectionReport collectionReport)
7780
7881 final ExecutorService executor = Executors .newVirtualThreadPerTaskExecutor ();
7982 //
80- final Semaphore maxTreads = new Semaphore (conf .getMaxThreads ());
83+ final Semaphore maxInQueue = new Semaphore (conf .getMaxInQueue ());
8184
8285 try {
8386 Files .walkFileTree (collection .getPath (), new FileVisitor <>() {
@@ -91,6 +94,13 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
9194 @ Override
9295 public FileVisitResult visitFile (Path filePath , BasicFileAttributes attrs ) {
9396
97+ try {
98+ maxInQueue .acquire ();
99+ }
100+ catch (InterruptedException e ) {
101+ throw new RuntimeException (e );
102+ }
103+
94104 collectionReport .fileReport .numOfFiles ++;
95105
96106 if (attrs .size () > collectionReport .fileReport .maxFileSize ) {
@@ -111,16 +121,16 @@ public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) {
111121
112122 CMDInstanceReport instanceReport ;
113123 try {
114- maxTreads .acquire ();
115124 instanceReport = instance .generateReport ();
116125 }
117- catch (MalFunctioningProcessorException | InterruptedException e ) {
126+ catch (MalFunctioningProcessorException e ) {
118127 executor .shutdownNow ();
119128 throw new RuntimeException (e );
120- } finally {
121- maxTreads .release ();
122129 }
123-
130+ finally {
131+ maxInQueue .release ();
132+ }
133+ CollectionAggregator .this .lock .lock ();
124134 addReport (collectionReport , instanceReport );
125135
126136 }); // end executor.execute
@@ -168,8 +178,6 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
168178 */
169179 public void addReport (CollectionReport collectionReport , CMDInstanceReport instanceReport ) {
170180
171- this .lock .lock ();
172-
173181 try {
174182 if (!instanceReport .details .isEmpty ()) {//only add a record if there are details to report
175183
@@ -254,7 +262,10 @@ public void addReport(CollectionReport collectionReport, CMDInstanceReport insta
254262 .count ++
255263 );
256264 collectionReport .facetReport .aggregatedScore += instanceReport .facetReport .score ;
257- } else {
265+
266+ this .profileUsage .computeIfAbsent (instanceReport .profileHeaderReport .getSchemaLocation (), k -> new AtomicInteger ()).incrementAndGet ();
267+ }
268+ else {
258269 collectionReport .fileReport .numOfFilesNonProcessable ++;
259270 }
260271 }
@@ -448,6 +459,11 @@ private void calculateAverages(CollectionReport collectionReport) {
448459 collectionReport .avgScore = collectionReport .aggregatedScore
449460 / (double ) collectionReport .fileReport .numOfFilesProcessable ;
450461
462+ // add profileUsage
463+ this .profileUsage .forEach ((k ,v ) -> this .ctx .getBean (CurationModule .class )
464+ .processCMDProfile (k )
465+ .collectionUsage
466+ .add (new CMDProfileReport .CollectionUsage (collectionReport .fileReport .provider , v .get ())));
451467 }
452468 }
453469}
0 commit comments