diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java b/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java index 93fce02..c12306a 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java @@ -938,7 +938,7 @@ public void listURLs( continue; } - Iterator urliter = urlIterator(e); + CloseableIterator urliter = urlIterator(e); while (urliter.hasNext()) { totalCount++; @@ -951,17 +951,24 @@ public void listURLs( break; } } + + try { + urliter.close(); + } catch (IOException e1) { + LOG.warn("Error closing URLIterator", e1); + } } } responseObserver.onCompleted(); } - protected Iterator urlIterator(Entry qentry) { + protected CloseableIterator urlIterator( + Entry qentry) { return urlIterator(qentry, 0L, Long.MAX_VALUE); } - protected abstract Iterator urlIterator( + protected abstract CloseableIterator urlIterator( Entry qentry, long start, long max); /** diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/CloseableIterator.java b/service/src/main/java/crawlercommons/urlfrontier/service/CloseableIterator.java new file mode 100644 index 0000000..7f37e2e --- /dev/null +++ b/service/src/main/java/crawlercommons/urlfrontier/service/CloseableIterator.java @@ -0,0 +1,15 @@ +// SPDX-FileCopyrightText: 2020 Crawler-commons +// SPDX-License-Identifier: Apache-2.0 + +package crawlercommons.urlfrontier.service; + +import java.io.Closeable; +import java.util.Iterator; + +/** + * Adds close to the Iterator Needed when we need to close resources used by the Iterator (e.g. The + * RocksDBIterator in case of RocksDb implementation). + * + * @param + */ +public interface CloseableIterator extends Closeable, Iterator {} diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java b/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java index 351437f..3bf21e6 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java @@ -11,6 +11,7 @@ import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; import crawlercommons.urlfrontier.service.AbstractFrontierService; +import crawlercommons.urlfrontier.service.CloseableIterator; import crawlercommons.urlfrontier.service.QueueInterface; import crawlercommons.urlfrontier.service.QueueWithinCrawl; import crawlercommons.urlfrontier.service.SynchronizedStreamObserver; @@ -224,12 +225,12 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo } } - public Iterator urlIterator( + public CloseableIterator urlIterator( Entry qentry, long start, long maxURLs) { return new MemoryURLItemIterator(qentry, start, maxURLs); } - class MemoryURLItemIterator implements Iterator { + class MemoryURLItemIterator implements CloseableIterator { private final org.slf4j.Logger LOG = LoggerFactory.getLogger(MemoryURLItemIterator.class); @@ -298,5 +299,10 @@ public URLItem next() { } return null; // shouldn't happen } + + @Override + public void close() { + // No need to close anything here + } } } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java index 569fa9a..aac08bc 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java @@ -12,6 +12,7 @@ import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; import crawlercommons.urlfrontier.service.AbstractFrontierService; +import crawlercommons.urlfrontier.service.CloseableIterator; import crawlercommons.urlfrontier.service.QueueInterface; import crawlercommons.urlfrontier.service.QueueWithinCrawl; import crawlercommons.urlfrontier.service.SynchronizedStreamObserver; @@ -26,7 +27,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -300,7 +300,7 @@ protected int sendURLsForQueue( } // too early for it? - long scheduled = Long.parseLong(currentKey.substring(pos2 + 1, pos3)); + long scheduled = Long.parseLong(currentKey, pos2 + 1, pos3, 10); if (scheduled > now) { // they are sorted by date no need to go further return alreadySent; @@ -823,7 +823,7 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo final int pos2 = currentKey.indexOf('_', pos + 1); final int pos3 = currentKey.indexOf('_', pos2 + 1); - fromEpoch = Long.parseLong(currentKey.substring(pos2 + 1, pos3)); + fromEpoch = Long.parseLong(currentKey, pos2 + 1, pos3, 10); try { info = @@ -856,12 +856,12 @@ public void getURLStatus(URLStatusRequest request, StreamObserver respo } } - public Iterator urlIterator( + public CloseableIterator urlIterator( Entry qentry, long start, long maxURLs) { return new RocksDBURLItemIterator(qentry, start, maxURLs); } - class RocksDBURLItemIterator implements Iterator { + class RocksDBURLItemIterator implements CloseableIterator { private final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBURLItemIterator.class); @@ -960,7 +960,7 @@ public URLItem next() { final int pos2 = schedulingKey.indexOf('_', pos1 + 1); final int pos3 = schedulingKey.indexOf('_', pos2 + 1); - fromEpoch = Long.parseLong(schedulingKey.substring(pos2 + 1, pos3)); + fromEpoch = Long.parseLong(schedulingKey, pos2 + 1, pos3, 10); try { info = URLInfo.parseFrom(scheduled); @@ -998,5 +998,10 @@ public URLItem next() { return null; // Shouldn't happen } + + @Override + public void close() { + this.rocksIterator.close(); + } } } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java index b2b1c30..d88cc5b 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/ShardedRocksDBService.java @@ -8,6 +8,7 @@ import crawlercommons.urlfrontier.Urlfrontier.URLInfo; import crawlercommons.urlfrontier.Urlfrontier.URLItem; import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest; +import crawlercommons.urlfrontier.service.CloseableIterator; import crawlercommons.urlfrontier.service.QueueInterface; import crawlercommons.urlfrontier.service.QueueWithinCrawl; import crawlercommons.urlfrontier.service.SynchronizedStreamObserver; @@ -15,7 +16,6 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -101,7 +101,7 @@ public void listURLs(ListUrlParams request, StreamObserver responseObse @Override // TODO Implementation of urlIterator for ShardedRocksDB - protected Iterator urlIterator( + protected CloseableIterator urlIterator( Entry qentry, long start, long max) { throw new UnsupportedOperationException( "Feature not implemented for ShardedRocksDB backend");