(search) IP and IP geolocation in site info view

This commit also fixes a bug in the loader where the IP field wouldn't always populate as intended, and refactors the DomainInformationService to use significantly fewer SQL queries.
This commit is contained in:
Viktor Lofgren 2023-12-09 20:04:27 +01:00
parent 37af60254f
commit 91dd45cf64
10 changed files with 201 additions and 248 deletions

View File

@ -3,8 +3,6 @@ package nu.marginalia.assistant.client.model;
import lombok.*;
import nu.marginalia.model.EdgeDomain;
import java.util.List;
@Getter @AllArgsConstructor @NoArgsConstructor @Builder
@ToString
public class DomainInformation {
@ -23,5 +21,22 @@ public class DomainInformation {
boolean inCrawlQueue;
boolean unknownDomain;
String ip;
String ipCountry;
String state;
public String getIpFlag() {
if (ipCountry == null || ipCountry.isBlank()) {
return "";
}
String country = ipCountry;
if ("UK".equals(country)) {
return "GB";
}
int offset = 0x1F1E6;
int asciiOffset = 0x41;
int firstChar = Character.codePointAt(country, 0) - asciiOffset + offset;
int secondChar = Character.codePointAt(country, 1) - asciiOffset + offset;
return new String(Character.toChars(firstChar)) + new String(Character.toChars(secondChar));
}
}

View File

@ -18,6 +18,7 @@ import java.util.TreeMap;
@Singleton
public class GeoIpBlocklist {
private final TreeMap<Long, GeoIpBlocklist.IpRange> ranges = new TreeMap<>();
private final Set<String> blacklist = Set.of("CN", "HK");
private final Set<String> graylist = Set.of("RU", "TW", "IN", "ZA", "SG", "UA");

View File

@ -3,6 +3,7 @@ package nu.marginalia.io.processed;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.model.processed.DomainRecord;
import nu.marginalia.model.processed.DomainWithIp;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
@ -19,10 +20,10 @@ public class DomainRecordParquetFileReader {
}
@NotNull
public static List<String> getDomainNames(Path path) throws IOException {
public static List<DomainWithIp> getBasicDomainInformation(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DomainRecord.newDomainNameHydrator()),
List.of("domain"))
List.of("domain", "ip"))
.toList();
}

View File

@ -48,8 +48,8 @@ public class DomainRecord {
return DomainRecord::dehydrate;
}
public static Hydrator<String, String> newDomainNameHydrator() {
return new DomainNameHydrator();
public static Hydrator<DomainWithIp, DomainWithIp> newDomainNameHydrator() {
return new DomainWithIpHydrator();
}
@ -124,23 +124,26 @@ class DomainHydrator implements Hydrator<DomainRecord, DomainRecord> {
}
}
class DomainNameHydrator implements Hydrator<String, String> {
class DomainWithIpHydrator implements Hydrator<DomainWithIp, DomainWithIp> {
@Override
public String start() {
return "";
public DomainWithIp start() {
return new DomainWithIp();
}
@Override
public String add(String target, String heading, Object value) {
public DomainWithIp add(DomainWithIp target, String heading, Object value) {
if ("domain".equals(heading)) {
return (String) value;
target.domain = (String) value;
}
else if ("ip".equals(heading)) {
target.ip = (String) value;
}
return target;
}
@Override
public String finish(String target) {
public DomainWithIp finish(DomainWithIp target) {
return target;
}
}

View File

@ -0,0 +1,6 @@
package nu.marginalia.model.processed;
public class DomainWithIp {
public String domain;
public String ip;
}

View File

@ -9,6 +9,7 @@ import nu.marginalia.io.processed.DomainRecordParquetFileReader;
import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.processed.DomainRecord;
import nu.marginalia.model.processed.DomainWithIp;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,9 +52,9 @@ public class DomainLoaderService {
) {
try (var inserter = new DomainInserter(conn, nodeId)) {
for (var domain : readSetDomainNames(inputData)) {
inserter.accept(new EdgeDomain(domain));
domainNamesAll.add(domain);
for (var domainWithIp : readBasicDomainInformation(inputData)) {
inserter.accept(new EdgeDomain(domainWithIp.domain));
domainNamesAll.add(domainWithIp.domain);
}
}
try (var inserter = new DomainInserter(conn, -1)) {
@ -63,9 +64,9 @@ public class DomainLoaderService {
}
}
try (var updater = new DomainAffinityUpdater(conn, nodeId)) {
for (var domain : readSetDomainNames(inputData)) {
updater.accept(new EdgeDomain(domain));
try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId)) {
for (var domainWithIp : readBasicDomainInformation(inputData)) {
updater.accept(new EdgeDomain(domainWithIp.domain), domainWithIp.ip);
}
}
@ -84,15 +85,15 @@ public class DomainLoaderService {
return ret;
}
Collection<String> readSetDomainNames(LoaderInputData inputData) throws IOException {
final Set<String> domainNamesAll = new HashSet<>(100_000);
Collection<DomainWithIp> readBasicDomainInformation(LoaderInputData inputData) throws IOException {
final Set<DomainWithIp> domainsAll = new HashSet<>(100_000);
var domainFiles = inputData.listDomainFiles();
for (var file : domainFiles) {
domainNamesAll.addAll(DomainRecordParquetFileReader.getDomainNames(file));
domainsAll.addAll(DomainRecordParquetFileReader.getBasicDomainInformation(file));
}
return domainNamesAll;
return domainsAll;
}
Collection<String> readReferencedDomainNames(LoaderInputData inputData) throws IOException {
@ -164,20 +165,25 @@ public class DomainLoaderService {
statement.close();
}
}
private static class DomainAffinityUpdater implements AutoCloseable {
private static class DomainAffinityAndIpUpdater implements AutoCloseable {
private final PreparedStatement statement;
private final int nodeAffinity;
private int count = 0;
public DomainAffinityUpdater(Connection connection, int affinity) throws SQLException {
public DomainAffinityAndIpUpdater(Connection connection, int affinity) throws SQLException {
this.nodeAffinity = affinity;
statement = connection.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY = ? WHERE DOMAIN_NAME=?");
statement = connection.prepareStatement("""
UPDATE EC_DOMAIN
SET NODE_AFFINITY = ?, IP = ?
WHERE DOMAIN_NAME=?
""");
}
public void accept(EdgeDomain domain) throws SQLException {
public void accept(EdgeDomain domain, String ip) throws SQLException {
statement.setInt(1, nodeAffinity);
statement.setString(2, domain.toString());
statement.setString(2, ip);
statement.setString(3, domain.toString());
statement.addBatch();
if (++count > 1000) {

View File

@ -6,7 +6,6 @@ import nu.marginalia.ProcessConfiguration;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter;
import nu.marginalia.io.processed.DomainRecordParquetFileWriter;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.loader.DbTestUtil;
import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
@ -21,10 +20,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.*;
@ -99,7 +95,7 @@ class DomainLoaderServiceTest {
// Verify
Set<String> expectedDomains1 = Sets.union(new HashSet<>(domains1), new HashSet<>(domains2));
assertEquals(expectedDomains1, domainService.readSetDomainNames(new LoaderInputData(workDir, 2)));
assertEquals(expectedDomains1, domainService.readBasicDomainInformation(new LoaderInputData(workDir, 2)));
Set<String> expectedDomains2 = new HashSet<>(linkDomains);
assertEquals(expectedDomains2, domainService.readReferencedDomainNames(new LoaderInputData(workDir, 2)));

View File

@ -6,5 +6,6 @@
Pages Known: {{pagesKnown}} <br/>
Pages Crawled: {{pagesFetched}} <br/>
Pages Indexed: {{pagesIndexed}} <br/>
IP: {{ip}} {{#if ipCountry}}<span title="{{ipCountry}}">{{getIpFlag}}</span>{{/if}}<br/>
</fieldset>
<br/>

View File

@ -11,17 +11,14 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
/*
TODO: This class needs to be refactored, a lot of
these SQL queries are redundant and can be
collapsed into one single query that fetches
all the information
*/
@Singleton
public class DomainInformationService {
private final GeoIpDictionary geoIpDictionary;
private DbDomainQueries dbDomainQueries;
private HikariDataSource dataSource;
@ -30,8 +27,10 @@ public class DomainInformationService {
@Inject
public DomainInformationService(
DbDomainQueries dbDomainQueries,
GeoIpDictionary geoIpDictionary,
HikariDataSource dataSource) {
this.dbDomainQueries = dbDomainQueries;
this.geoIpDictionary = geoIpDictionary;
this.dataSource = dataSource;
}
@ -43,226 +42,75 @@ public class DomainInformationService {
return Optional.empty();
}
boolean blacklisted = isBlacklisted(domain.get());
int pagesKnown = getPagesKnown(domainId);
int pagesVisited = getPagesVisited(domainId);
int pagesIndexed = getPagesIndexed(domainId);
int incomingLinks = getIncomingLinks(domainId);
int outboundLinks = getOutboundLinks(domainId);
int nodeAffinity = getNodeAffinity(domainId);
boolean inCrawlQueue = inCrawlQueue(domainId);
double rank = Math.round(10000.0*(1.0-getRank(domainId)))/100;
var builder = DomainInformation.builder();
try (var connection = dataSource.getConnection();
var stmt = connection.createStatement();
) {
boolean inCrawlQueue;
int outboundLinks = 0;
int pagesVisited = 0;
DomainIndexingState state = getDomainState(domainId);
ResultSet rs;
var di = DomainInformation.builder()
.domain(domain.get())
.blacklisted(blacklisted)
.pagesKnown(pagesKnown)
.pagesFetched(pagesVisited)
.pagesIndexed(pagesIndexed)
.incomingLinks(incomingLinks)
.outboundLinks(outboundLinks)
.ranking(rank)
.state(state.desc)
.inCrawlQueue(inCrawlQueue)
.nodeAffinity(nodeAffinity)
.suggestForCrawling((pagesVisited == 0 && outboundLinks == 0 && !inCrawlQueue))
.build();
rs = stmt.executeQuery(STR."""
SELECT IP, NODE_AFFINITY, DOMAIN_NAME, STATE, IFNULL(RANK, 1) AS RANK
FROM EC_DOMAIN WHERE ID=\{domainId}
""");
if (rs.next()) {
String ip = rs.getString("IP");
return Optional.of(di);
}
builder.ip(ip);
builder.ipCountry(geoIpDictionary.getCountry(ip));
private int getNodeAffinity(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("""
SELECT NODE_AFFINITY FROM EC_DOMAIN WHERE ID=?
""")) {
stmt.setInt(1, domainId);
var rs = stmt.executeQuery();
if (rs.next())
return rs.getInt(1);
builder.nodeAffinity(rs.getInt("NODE_AFFINITY"));
builder.domain(new EdgeDomain(rs.getString("DOMAIN_NAME")));
builder.state(rs.getString("STATE"));
builder.ranking(Math.round(100.0*(1.0-rs.getDouble("RANK"))));
}
rs = stmt.executeQuery(STR."""
SELECT 1 FROM CRAWL_QUEUE
INNER JOIN EC_DOMAIN ON CRAWL_QUEUE.DOMAIN_NAME = EC_DOMAIN.DOMAIN_NAME
WHERE EC_DOMAIN.ID=\{domainId}
""");
inCrawlQueue = rs.next();
builder.inCrawlQueue(inCrawlQueue);
rs = stmt.executeQuery(STR."""
SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=\{domainId}
""");
if (rs.next()) {
builder.incomingLinks(rs.getInt(1));
}
rs = stmt.executeQuery(STR."""
SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=\{domainId}
""");
if (rs.next()) {
builder.outboundLinks(rs.getInt(1));
outboundLinks = rs.getInt(1);
}
rs = stmt.executeQuery(STR."""
SELECT KNOWN_URLS, GOOD_URLS, VISITED_URLS FROM DOMAIN_METADATA WHERE ID=\{domainId}
""");
if (rs.next()) {
pagesVisited = rs.getInt("VISITED_URLS");
builder.pagesKnown(rs.getInt("KNOWN_URLS"));
builder.pagesIndexed(rs.getInt("GOOD_URLS"));
builder.pagesFetched(rs.getInt("VISITED_URLS"));
}
builder.suggestForCrawling((pagesVisited == 0 && outboundLinks == 0 && !inCrawlQueue));
return Optional.of(builder.build());
}
catch (SQLException ex) {
logger.error("SQL error", ex);
}
return -1;
}
@SneakyThrows
private boolean inCrawlQueue(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
SELECT 1 FROM CRAWL_QUEUE
INNER JOIN EC_DOMAIN ON CRAWL_QUEUE.DOMAIN_NAME = EC_DOMAIN.DOMAIN_NAME
WHERE EC_DOMAIN.ID=?
"""))
{
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
return rsp.next();
}
return Optional.empty();
}
}
private OptionalInt getDomainFromPartial(String site) {
return dbDomainQueries.tryGetDomainId(new EdgeDomain(site));
}
@SneakyThrows
public boolean isBlacklisted(EdgeDomain domain) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT ID FROM EC_DOMAIN_BLACKLIST WHERE URL_DOMAIN IN (?,?)")) {
stmt.setString(1, domain.domain);
stmt.setString(2, domain.toString());
var rsp = stmt.executeQuery();
return rsp.next();
}
}
}
@SneakyThrows
public int getPagesKnown(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT KNOWN_URLS FROM DOMAIN_METADATA WHERE ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return rsp.getInt(1);
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
return 0;
}
}
@SneakyThrows
public int getPagesVisited(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT VISITED_URLS FROM DOMAIN_METADATA WHERE ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return rsp.getInt(1);
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
return 0;
}
}
@SneakyThrows
public int getPagesIndexed(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT GOOD_URLS FROM DOMAIN_METADATA WHERE ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return rsp.getInt(1);
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
return 0;
}
}
@SneakyThrows
public int getIncomingLinks(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return rsp.getInt(1);
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
return 0;
}
}
@SneakyThrows
public int getOutboundLinks(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return rsp.getInt(1);
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
return 0;
}
}
public DomainIndexingState getDomainState(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT STATE FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return DomainIndexingState.valueOf(rsp.getString(1));
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return DomainIndexingState.ERROR;
}
public List<EdgeDomain> getLinkingDomains(int domainId) {
try (var connection = dataSource.getConnection()) {
List<EdgeDomain> results = new ArrayList<>(25);
try (var stmt = connection.prepareStatement("SELECT SOURCE_DOMAIN FROM EC_RELATED_LINKS_VIEW WHERE DEST_DOMAIN_ID=? ORDER BY SOURCE_DOMAIN_ID LIMIT 25")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
while (rsp.next()) {
results.add(new EdgeDomain(rsp.getString(1)));
}
return results;
} catch (Exception ex) {
logger.error("DB error", ex);
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return Collections.emptyList();
}
public double getRank(int domainId) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT IFNULL(RANK, 1) FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, domainId);
var rsp = stmt.executeQuery();
if (rsp.next()) {
return rsp.getDouble(1);
}
} catch (Exception ex) {
logger.error("DB error", ex);
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return 1;
}
}

View File

@ -0,0 +1,76 @@
package nu.marginalia.assistant.domains;
import com.opencsv.CSVReader;
import lombok.AllArgsConstructor;
import nu.marginalia.WmsaHome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileReader;
import java.net.InetAddress;
import java.util.TreeMap;
public class GeoIpDictionary {
private volatile TreeMap<Long, IpRange> ranges = null;
private static final Logger logger = LoggerFactory.getLogger(GeoIpDictionary.class);
@AllArgsConstructor
static class IpRange {
public final long from;
public final long to;
public final String country;
}
public GeoIpDictionary() {
Thread.ofPlatform().start(() -> {
try (var reader = new CSVReader(new FileReader(WmsaHome.getIPLocationDatabse().toFile()))) {
var dict = new TreeMap<Long, IpRange>();
for (;;) {
String[] vals = reader.readNext();
if (vals == null) {
break;
}
var range = new IpRange(Long.parseLong(vals[0]),
Long.parseLong(vals[1]),
vals[2]);
dict.put(range.from, range);
}
ranges = dict;
logger.info("Loaded {} IP ranges", ranges.size());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
public String getCountry(String ip) {
try {
return getCountry(InetAddress.getByName(ip));
} catch (Exception e) {
return "";
}
}
public String getCountry(InetAddress address) {
if (null == ranges) { // not loaded yet or failed to load
return "";
}
byte[] bytes = address.getAddress();
long ival = ((long)bytes[0]&0xFF) << 24 | ((long)bytes[1]&0xFF) << 16 | ((long)bytes[2]&0xFF)<< 8 | ((long)bytes[3]&0xFF);
Long key = ranges.floorKey(ival);
if (null == key) {
return "";
}
var range = ranges.get(key);
if (ival >= key && ival < range.to) {
return range.country;
}
return "";
}
}