(loader) Speed up loading back to original speeds with a cascading DELETE FROM EC_URL rather than EC_PAGE_DATA.

* Also clean up code and have proper rollbacks for transactions.
This commit is contained in:
Viktor Lofgren 2023-07-28 22:00:07 +02:00
parent e237df4a10
commit 01476577b8
4 changed files with 129 additions and 106 deletions

View File

@ -57,6 +57,7 @@ public class LoaderMain {
var instance = injector.getInstance(LoaderMain.class);
try {
var instructions = instance.fetchInstructions();
logger.info("Instructions received");
instance.run(instructions);
}
catch (Exception ex) {
@ -103,6 +104,7 @@ public class LoaderMain {
LoaderMain.loadTotal = loadTotal;
logger.info("Loading {} files", loadTotal);
for (var entry : WorkLog.iterable(logFile)) {
heartbeat.setProgress(loaded++ / (double) loadTotal);
@ -130,6 +132,7 @@ public class LoaderMain {
logger.info("Loading finished");
}
catch (Exception ex) {
ex.printStackTrace();
logger.error("Failed to load", ex);
instructions.err();
throw ex;

View File

@ -64,61 +64,66 @@ public class SqlLoadProcessedDocument {
public void load(LoaderData data, List<LoadProcessedDocument> documents) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareCall("CALL INSERT_PAGE_VISIT(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
conn.setAutoCommit(false);
try (var conn = dataSource.getConnection()) {
try (var insertCall = conn.prepareCall("CALL INSERT_PAGE_VISIT(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
) {
conn.setAutoCommit(false);
int cnt = 0; int batchOffset = 0;
for (var doc : documents) {
int urlId = data.getUrlId(doc.url());
if (urlId <= 0) {
logger.warn("Failed to resolve ID for URL {}", doc.url());
continue;
}
int cnt = 0;
int batchOffset = 0;
for (var doc : documents) {
int urlId = data.getUrlId(doc.url());
if (urlId <= 0) {
logger.warn("Failed to resolve ID for URL {}", doc.url());
continue;
}
stmt.setInt(1, urlId);
stmt.setString(2, doc.state().name());
stmt.setString(3, doc.title());
stmt.setString(4, StringUtils.truncate(doc.description(), 255));
stmt.setInt(5, doc.length());
stmt.setInt(6, doc.htmlFeatures());
stmt.setString(7, doc.standard());
stmt.setDouble(8, doc.quality());
stmt.setLong(9, doc.hash());
if (doc.pubYear() != null) {
stmt.setShort(10, (short) doc.pubYear().intValue());
}
else {
stmt.setInt(10, Types.SMALLINT);
}
stmt.addBatch();
insertCall.setInt(1, urlId);
insertCall.setString(2, doc.state().name());
insertCall.setString(3, doc.title());
insertCall.setString(4, StringUtils.truncate(doc.description(), 255));
insertCall.setInt(5, doc.length());
insertCall.setInt(6, doc.htmlFeatures());
insertCall.setString(7, doc.standard());
insertCall.setDouble(8, doc.quality());
insertCall.setLong(9, doc.hash());
if (doc.pubYear() != null) {
insertCall.setShort(10, (short) doc.pubYear().intValue());
} else {
insertCall.setInt(10, Types.SMALLINT);
}
insertCall.addBatch();
if (++cnt == 100) {
var ret = stmt.executeBatch();
if (++cnt == 100) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
cnt = 0;
batchOffset += 100;
}
}
if (cnt > 0) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
cnt = 0;
batchOffset += 100;
}
}
if (cnt > 0) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
}
conn.setAutoCommit(true);
conn.setAutoCommit(true);
}
catch (SQLException ex) {
conn.rollback();
throw ex;
}
} catch (SQLException ex) {
logger.warn("SQL error inserting document", ex);

View File

@ -27,6 +27,10 @@ public class SqlLoadProcessedDomain {
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.createStatement()) {
stmt.execute("DROP PROCEDURE IF EXISTS INITIALIZE_DOMAIN");
// Note that there should be no need to delete from EC_PAGE_DATA here as it's done via their
// CASCADE DELETE constraint on EC_URL.
stmt.execute("""
CREATE PROCEDURE INITIALIZE_DOMAIN (
IN ST ENUM('ACTIVE', 'EXHAUSTED', 'SPECIAL', 'SOCIAL_MEDIA', 'BLOCKED', 'REDIR', 'ERROR', 'UNKNOWN'),
@ -36,7 +40,7 @@ public class SqlLoadProcessedDomain {
BEGIN
DELETE FROM DOMAIN_METADATA WHERE ID=DID;
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID;
DELETE FROM EC_PAGE_DATA WHERE ID IN (SELECT ID FROM EC_URL WHERE DOMAIN_ID = DID);
DELETE FROM EC_URL WHERE DOMAIN_ID=DID;
UPDATE EC_DOMAIN SET INDEX_DATE=NOW(), STATE=ST, DOMAIN_ALIAS=NULL, INDEXED=GREATEST(INDEXED,IDX), IP=IP WHERE ID=DID;
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID;
END
@ -54,20 +58,24 @@ public class SqlLoadProcessedDomain {
loadDomains.load(data, domain);
try (var conn = dataSource.getConnection();
var initCall = conn.prepareCall("CALL INITIALIZE_DOMAIN(?,?,?,?)"))
{
initCall.setString(1, state.name());
initCall.setInt(2, 1 + data.sizeHint / 100);
initCall.setInt(3, data.getDomainId(domain));
initCall.setString(4, StringUtils.truncate(ip, 48));
int rc = initCall.executeUpdate();
conn.commit();
if (rc < 1) {
logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc);
}
try (var conn = dataSource.getConnection()) {
try (var initCall = conn.prepareCall("CALL INITIALIZE_DOMAIN(?,?,?,?)")) {
initCall.setString(1, state.name());
initCall.setInt(2, 1 + data.sizeHint / 100);
initCall.setInt(3, data.getDomainId(domain));
initCall.setString(4, StringUtils.truncate(ip, 48));
int rc = initCall.executeUpdate();
conn.commit();
if (rc < 1) {
logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc);
}
loadUrls.loadUrlsForDomain(data, domain, 0);
loadUrls.loadUrlsForDomain(data, domain, 0);
}
catch (SQLException ex) {
conn.rollback();
throw ex;
}
}
catch (SQLException ex) {
logger.warn("SQL error initializing domain", ex);

View File

@ -34,69 +34,76 @@ public class SqlLoadUrls {
return;
int maxOldId = 0;
try (var conn = dataSource.getConnection();
var insertCall = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)");
var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL"))
{
conn.setAutoCommit(false);
var rs = queryMaxId.executeQuery();
if (rs.next()) {
maxOldId = rs.getInt(1);
}
try (var conn = dataSource.getConnection()) {
int cnt = 0; int batchOffset = 0;
try (var insertStmt = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)");
var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL")) {
for (var url : urls) {
if (data.getUrlId(url) != 0)
continue;
if (url.path.length() >= 255) {
logger.info("Skipping bad URL {}", url);
continue;
conn.setAutoCommit(false);
var rs = queryMaxId.executeQuery();
if (rs.next()) {
maxOldId = rs.getInt(1);
}
var domainId = data.getDomainId(url.domain);
affectedDomains.add(url.domain);
int cnt = 0;
int batchOffset = 0;
insertCall.setString(1, url.proto);
insertCall.setInt(2, domainId);
if (url.port != null) {
insertCall.setInt(3, url.port);
for (var url : urls) {
if (data.getUrlId(url) != 0)
continue;
if (url.path.length() >= 255) {
logger.info("Skipping bad URL {}", url);
continue;
}
var domainId = data.getDomainId(url.domain);
affectedDomains.add(url.domain);
insertStmt.setString(1, url.proto);
insertStmt.setInt(2, domainId);
if (url.port != null) {
insertStmt.setInt(3, url.port);
} else {
insertStmt.setNull(3, Types.INTEGER);
}
insertStmt.setString(4, url.path);
insertStmt.setString(5, url.param);
insertStmt.setLong(6, hashPath(url.path, url.param));
insertStmt.addBatch();
if (++cnt == 1000) {
var ret = insertStmt.executeBatch();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
batchOffset += cnt;
cnt = 0;
}
}
else {
insertCall.setNull(3, Types.INTEGER);
}
insertCall.setString(4, url.path);
insertCall.setString(5, url.param);
insertCall.setLong(6, hashPath(url.path, url.param));
insertCall.addBatch();
if (++cnt == 1000) {
var ret = insertCall.executeBatch();
if (cnt > 0) {
var ret = insertStmt.executeBatch();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
}
batchOffset += cnt;
cnt = 0;
conn.commit();
conn.setAutoCommit(true);
for (var domain : affectedDomains) {
loadUrlsForDomain(data, domain, maxOldId);
}
}
if (cnt > 0) {
var ret = insertCall.executeBatch();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
}
conn.commit();
conn.setAutoCommit(true);
for (var domain : affectedDomains) {
loadUrlsForDomain(data, domain, maxOldId);
catch (SQLException ex) {
conn.rollback();
throw ex;
}
}
catch (SQLException ex) {