From 01476577b8c62e069e486fbcd23f7d2755870f8e Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 28 Jul 2023 22:00:07 +0200 Subject: [PATCH] (loader) Speed up loading back to original speeds with a cascading DELETE FROM EC_URL rather than EC_PAGE_DATA. * Also clean up code and have proper rollbacks for transactions. --- .../nu/marginalia/loading/LoaderMain.java | 3 + .../loader/SqlLoadProcessedDocument.java | 93 ++++++++-------- .../loader/SqlLoadProcessedDomain.java | 36 +++--- .../loading/loader/SqlLoadUrls.java | 103 ++++++++++-------- 4 files changed, 129 insertions(+), 106 deletions(-) diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index c8441330..68bcf8c4 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -57,6 +57,7 @@ public class LoaderMain { var instance = injector.getInstance(LoaderMain.class); try { var instructions = instance.fetchInstructions(); + logger.info("Instructions received"); instance.run(instructions); } catch (Exception ex) { @@ -103,6 +104,7 @@ public class LoaderMain { LoaderMain.loadTotal = loadTotal; + logger.info("Loading {} files", loadTotal); for (var entry : WorkLog.iterable(logFile)) { heartbeat.setProgress(loaded++ / (double) loadTotal); @@ -130,6 +132,7 @@ public class LoaderMain { logger.info("Loading finished"); } catch (Exception ex) { + ex.printStackTrace(); logger.error("Failed to load", ex); instructions.err(); throw ex; diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java index 02c4202c..909ec986 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java @@ -64,61 +64,66 @@ public class SqlLoadProcessedDocument { public void load(LoaderData data, List documents) { - try (var conn = dataSource.getConnection(); - var stmt = conn.prepareCall("CALL INSERT_PAGE_VISIT(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { - conn.setAutoCommit(false); + try (var conn = dataSource.getConnection()) { + try (var insertCall = conn.prepareCall("CALL INSERT_PAGE_VISIT(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + ) { + conn.setAutoCommit(false); - int cnt = 0; int batchOffset = 0; - for (var doc : documents) { - int urlId = data.getUrlId(doc.url()); - if (urlId <= 0) { - logger.warn("Failed to resolve ID for URL {}", doc.url()); - continue; - } + int cnt = 0; + int batchOffset = 0; + for (var doc : documents) { + int urlId = data.getUrlId(doc.url()); + if (urlId <= 0) { + logger.warn("Failed to resolve ID for URL {}", doc.url()); + continue; + } - stmt.setInt(1, urlId); - stmt.setString(2, doc.state().name()); - stmt.setString(3, doc.title()); - stmt.setString(4, StringUtils.truncate(doc.description(), 255)); - stmt.setInt(5, doc.length()); - stmt.setInt(6, doc.htmlFeatures()); - stmt.setString(7, doc.standard()); - stmt.setDouble(8, doc.quality()); - stmt.setLong(9, doc.hash()); - if (doc.pubYear() != null) { - stmt.setShort(10, (short) doc.pubYear().intValue()); - } - else { - stmt.setInt(10, Types.SMALLINT); - } - stmt.addBatch(); + insertCall.setInt(1, urlId); + insertCall.setString(2, doc.state().name()); + insertCall.setString(3, doc.title()); + insertCall.setString(4, StringUtils.truncate(doc.description(), 255)); + insertCall.setInt(5, doc.length()); + insertCall.setInt(6, doc.htmlFeatures()); + insertCall.setString(7, doc.standard()); + insertCall.setDouble(8, doc.quality()); + insertCall.setLong(9, doc.hash()); + if (doc.pubYear() != null) { + insertCall.setShort(10, (short) doc.pubYear().intValue()); + } else { + insertCall.setInt(10, Types.SMALLINT); + } + insertCall.addBatch(); - if (++cnt == 100) { - var ret = stmt.executeBatch(); + if (++cnt == 100) { + var ret = insertCall.executeBatch(); + conn.commit(); + + for (int rv = 0; rv < cnt; rv++) { + if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { + logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]); + } + } + + cnt = 0; + batchOffset += 100; + } + } + if (cnt > 0) { + var ret = insertCall.executeBatch(); conn.commit(); - for (int rv = 0; rv < cnt; rv++) { if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]); } } - - cnt = 0; - batchOffset += 100; } - } - if (cnt > 0) { - var ret = stmt.executeBatch(); - conn.commit(); - for (int rv = 0; rv < cnt; rv++) { - if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { - logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]); - } - } - } - - conn.setAutoCommit(true); + conn.setAutoCommit(true); + } + catch (SQLException ex) { + conn.rollback(); + throw ex; + } } catch (SQLException ex) { logger.warn("SQL error inserting document", ex); diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java index df598b14..9ac576af 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java @@ -27,6 +27,10 @@ public class SqlLoadProcessedDomain { try (var conn = dataSource.getConnection()) { try (var stmt = conn.createStatement()) { stmt.execute("DROP PROCEDURE IF EXISTS INITIALIZE_DOMAIN"); + + // Note that there should be no need to delete from EC_PAGE_DATA here as it's done via their + // CASCADE DELETE constraint on EC_URL. + stmt.execute(""" CREATE PROCEDURE INITIALIZE_DOMAIN ( IN ST ENUM('ACTIVE', 'EXHAUSTED', 'SPECIAL', 'SOCIAL_MEDIA', 'BLOCKED', 'REDIR', 'ERROR', 'UNKNOWN'), @@ -36,7 +40,7 @@ public class SqlLoadProcessedDomain { BEGIN DELETE FROM DOMAIN_METADATA WHERE ID=DID; DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID; - DELETE FROM EC_PAGE_DATA WHERE ID IN (SELECT ID FROM EC_URL WHERE DOMAIN_ID = DID); + DELETE FROM EC_URL WHERE DOMAIN_ID=DID; UPDATE EC_DOMAIN SET INDEX_DATE=NOW(), STATE=ST, DOMAIN_ALIAS=NULL, INDEXED=GREATEST(INDEXED,IDX), IP=IP WHERE ID=DID; DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID; END @@ -54,20 +58,24 @@ public class SqlLoadProcessedDomain { loadDomains.load(data, domain); - try (var conn = dataSource.getConnection(); - var initCall = conn.prepareCall("CALL INITIALIZE_DOMAIN(?,?,?,?)")) - { - initCall.setString(1, state.name()); - initCall.setInt(2, 1 + data.sizeHint / 100); - initCall.setInt(3, data.getDomainId(domain)); - initCall.setString(4, StringUtils.truncate(ip, 48)); - int rc = initCall.executeUpdate(); - conn.commit(); - if (rc < 1) { - logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc); - } + try (var conn = dataSource.getConnection()) { + try (var initCall = conn.prepareCall("CALL INITIALIZE_DOMAIN(?,?,?,?)")) { + initCall.setString(1, state.name()); + initCall.setInt(2, 1 + data.sizeHint / 100); + initCall.setInt(3, data.getDomainId(domain)); + initCall.setString(4, StringUtils.truncate(ip, 48)); + int rc = initCall.executeUpdate(); + conn.commit(); + if (rc < 1) { + logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc); + } - loadUrls.loadUrlsForDomain(data, domain, 0); + loadUrls.loadUrlsForDomain(data, domain, 0); + } + catch (SQLException ex) { + conn.rollback(); + throw ex; + } } catch (SQLException ex) { logger.warn("SQL error initializing domain", ex); diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java index a0b0f8cb..922baf91 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java @@ -34,69 +34,76 @@ public class SqlLoadUrls { return; int maxOldId = 0; - try (var conn = dataSource.getConnection(); - var insertCall = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)"); - var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL")) - { - conn.setAutoCommit(false); - var rs = queryMaxId.executeQuery(); - if (rs.next()) { - maxOldId = rs.getInt(1); - } + try (var conn = dataSource.getConnection()) { - int cnt = 0; int batchOffset = 0; + try (var insertStmt = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)"); + var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL")) { - for (var url : urls) { - if (data.getUrlId(url) != 0) - continue; - if (url.path.length() >= 255) { - logger.info("Skipping bad URL {}", url); - continue; + conn.setAutoCommit(false); + + var rs = queryMaxId.executeQuery(); + if (rs.next()) { + maxOldId = rs.getInt(1); } - var domainId = data.getDomainId(url.domain); - affectedDomains.add(url.domain); + int cnt = 0; + int batchOffset = 0; - insertCall.setString(1, url.proto); - insertCall.setInt(2, domainId); - if (url.port != null) { - insertCall.setInt(3, url.port); + for (var url : urls) { + if (data.getUrlId(url) != 0) + continue; + if (url.path.length() >= 255) { + logger.info("Skipping bad URL {}", url); + continue; + } + var domainId = data.getDomainId(url.domain); + + affectedDomains.add(url.domain); + + insertStmt.setString(1, url.proto); + insertStmt.setInt(2, domainId); + if (url.port != null) { + insertStmt.setInt(3, url.port); + } else { + insertStmt.setNull(3, Types.INTEGER); + } + insertStmt.setString(4, url.path); + insertStmt.setString(5, url.param); + insertStmt.setLong(6, hashPath(url.path, url.param)); + insertStmt.addBatch(); + + if (++cnt == 1000) { + var ret = insertStmt.executeBatch(); + for (int rv = 0; rv < cnt; rv++) { + if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { + logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]); + } + } + + batchOffset += cnt; + cnt = 0; + } } - else { - insertCall.setNull(3, Types.INTEGER); - } - insertCall.setString(4, url.path); - insertCall.setString(5, url.param); - insertCall.setLong(6, hashPath(url.path, url.param)); - insertCall.addBatch(); - if (++cnt == 1000) { - var ret = insertCall.executeBatch(); + if (cnt > 0) { + var ret = insertStmt.executeBatch(); for (int rv = 0; rv < cnt; rv++) { if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]); } } + } - batchOffset += cnt; - cnt = 0; + conn.commit(); + conn.setAutoCommit(true); + + for (var domain : affectedDomains) { + loadUrlsForDomain(data, domain, maxOldId); } } - - if (cnt > 0) { - var ret = insertCall.executeBatch(); - for (int rv = 0; rv < cnt; rv++) { - if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { - logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]); - } - } - } - - conn.commit(); - conn.setAutoCommit(true); - - for (var domain : affectedDomains) { - loadUrlsForDomain(data, domain, maxOldId); + catch (SQLException ex) { + conn.rollback(); + throw ex; } } catch (SQLException ex) {