diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java b/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java index 11c08267..703a70af 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java @@ -16,6 +16,7 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; +import java.util.stream.Stream; public class ParquetSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { private static final Logger logger = LoggerFactory.getLogger(ParquetSerializableCrawlDataStream.class); @@ -26,9 +27,12 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial private boolean wroteDomainRecord = false; private final Path path; + // Reference to the underlying stream that needs to be closed when this object is closed + private final Stream streamForClosing; + public ParquetSerializableCrawlDataStream(Path file) throws IOException { path = file; - backingIterator = CrawledDocumentParquetRecordFileReader.stream(file).iterator(); + backingIterator = (streamForClosing = CrawledDocumentParquetRecordFileReader.stream(file)).iterator(); } @Override @@ -153,7 +157,9 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial etag)); } - public void close() throws IOException {} + public void close() throws IOException { + streamForClosing.close(); + } @Override public SerializableCrawlData next() throws IOException {