From dcf018183965f5630c2ec90b89122b9d62539fea Mon Sep 17 00:00:00 2001 From: Kevin Lynx Date: Sun, 21 Jul 2013 21:18:40 +0800 Subject: [PATCH] NOTE: rewrite hash_reader, config changed, dht_hash database changed, require to remove existed dht_hash database --- src/crawler/hash_cache_writer.erl | 31 ++++- src/hash_reader/hash_download.erl | 159 ++++++++++++++++++++++++ src/hash_reader/hash_download_cache.erl | 101 +++++++++++++++ src/hash_reader/hash_reader.erl | 6 +- src/hash_reader/hash_reader2.erl | 71 +++++++++++ src/hash_reader/hash_reader_common.erl | 34 +++++ src/hash_reader/hash_reader_sup.erl | 6 +- tools/HISTORY.md | 13 ++ 8 files changed, 414 insertions(+), 7 deletions(-) create mode 100644 src/hash_reader/hash_download.erl create mode 100644 src/hash_reader/hash_download_cache.erl create mode 100644 src/hash_reader/hash_reader2.erl create mode 100644 src/hash_reader/hash_reader_common.erl diff --git a/src/crawler/hash_cache_writer.erl b/src/crawler/hash_cache_writer.erl index 351ed11..54e6879 100644 --- a/src/crawler/hash_cache_writer.erl +++ b/src/crawler/hash_cache_writer.erl @@ -15,6 +15,7 @@ terminate/2, code_change/3]). -export([start_link/5, stop/0, insert/1]). +-export([do_save/1]). % to avoid unused warning -record(state, {cache_time, cache_max}). -define(TBLNAME, hash_table). -define(DBPOOL, hash_write_db). @@ -57,7 +58,7 @@ handle_call(_, _From, State) -> handle_info(do_save_cache, #state{cache_time = Time} = State) -> ?T("timeout to save cache hashes"), - do_save(table_size(?TBLNAME)), + do_save_merge(table_size(?TBLNAME)), schedule_save(Time), {noreply, State}; @@ -83,10 +84,36 @@ try_save(#state{cache_max = Max}) -> try_save(Size, Max) when Size >= Max -> ?T(?FMT("try save all cache hashes ~p", [Size])), - do_save(Size); + do_save_merge(Size); try_save(_, _) -> ok. +%% new method +%% merge hashes into database, to decrease hashes processed by hash_reader +do_save_merge(0) -> + ok; +do_save_merge(_) -> + First = ets:first(?TBLNAME), + ReqAt = time_util:now_seconds(), + do_save(First, ReqAt), + ets:delete_all_objects(?TBLNAME). + +do_save('$end_of_table', _) -> + 0; +do_save(Key, ReqAt) -> + Conn = mongo_pool:get(?DBPOOL), + ReqCnt = get_req_cnt(Key), + BHash = list_to_binary(Key), + Cmd = {findAndModify, ?HASH_COLLNAME, query, {'_id', BHash}, + update, {'$inc', {req_cnt, ReqCnt}, '$set', {req_at, ReqAt}}, + fields, {'_id', 1}, upsert, true, new, false}, + mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + mongo:command(Cmd) + end), + Next = ets:next(?TBLNAME, Key), + ReqCnt + do_save(Next, ReqAt). + +%% old method do_save(0) -> ok; do_save(_) -> diff --git a/src/hash_reader/hash_download.erl b/src/hash_reader/hash_download.erl new file mode 100644 index 0000000..cac0044 --- /dev/null +++ b/src/hash_reader/hash_download.erl @@ -0,0 +1,159 @@ +%% +%% hash_download.erl +%% Kevin Lynx +%% 07.21.2013 +%% +-module(hash_download). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1]). +-record(state, {dbpool, downloader, downloading = 0, max}). +-define(WAIT_TIME, 1*60*1000). + +start_link(DBPool) -> + gen_server:start_link(?MODULE, [DBPool], []). + +init([DBPool]) -> + {ok, DownPid} = tor_download:start_link(), + tor_download_stats:register(DownPid), + Max = config:get(max_download_per_reader, 50), + {ok, #state{dbpool = DBPool, downloader = DownPid, max = Max}, 0}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast({process_hash, Doc}, State) -> + #state{downloading = DownCnt, downloader = DownPid, max = Max} = State, + {BHash} = bson:lookup('_id', Doc), + Hash = binary_to_list(BHash), + ReqCnt = hash_reader_common:get_req_cnt(Doc), + Conn = db_conn(State), + AddDown = case db_store_mongo:inc_announce(Conn, Hash, ReqCnt) of + true -> + ?T(?FMT("hash ~s already exists in db", [Hash])), + hash_reader_common:on_updated(Conn), + 0; + false -> + schedule_download(Conn, DownPid, Hash) + end, + case AddDown + DownCnt < Max of + true -> + schedule_next(); + false -> + ?T(?FMT("reached the max download ~p, wait", [Max])), + wait_downloader_notify + end, + {noreply, State#state{downloading = DownCnt + AddDown}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info({got_torrent, failed, _Hash}, State) -> + #state{downloading = D} = State, + schedule_next(), + hash_reader_stats:handle_download_failed(), + {noreply, State#state{downloading = D - 1}}; + +handle_info({got_torrent, ok, Hash, Content}, State) -> + schedule_next(), + Conn = db_conn(State), + true = is_binary(Content), + SaveTor = config:get(save_torrent, true), + if SaveTor -> loc_torrent_cache:save(Conn, Hash, Content); true -> ok end, + NewState = got_torrent_content(State, Hash, Content), + hash_reader_stats:handle_download_ok(), + {noreply, NewState}; + +handle_info({got_torrent_from_cache, Hash, Content}, State) -> + on_used_cache(), + schedule_next(), + NewState = got_torrent_content(State, Hash, Content), + {noreply, NewState}; + +handle_info(timeout, State) -> + schedule_next(), + {noreply, State}. + +schedule_next() -> + case hash_download_cache:get_one() of + {} -> + timer:send_after(?WAIT_TIME); + Doc -> + gen_server:cast(self(), {process_hash, Doc}) + end. + +schedule_download(Conn, Pid, Hash) -> + TryFilter = config:get(check_cache, false), + Down = case TryFilter of + true -> + db_hash_index:exist(Conn, Hash); + false -> + true + end, + try_download(Down, Conn, Pid, Hash). + +try_download(false, _, _, Hash) -> + ?T(?FMT("hash does not exist in index_cache, filter it ~s", [Hash])), + 0; +try_download(true, Conn, Pid, Hash) -> + case loc_torrent_cache:load(Conn, Hash) of + not_found -> + tor_download:download(Pid, Hash); + Content -> + ?T(?FMT("found torrent in local cache ~s", [Hash])), + self() ! {got_torrent_from_cache, Hash, Content} + end, + 1. + +db_conn(State) -> + #state{dbpool = DBPool} = State, + mongo_pool:get(DBPool). + +got_torrent_content(State, MagHash, Content) -> + #state{downloading = D} = State, + case catch(torrent_file:parse(Content)) of + {'EXIT', _} -> + ?W(?FMT("parse a torrent failed ~s", [MagHash])), + skip; + {Type, Info} -> + got_torrent(State, MagHash, Type, Info) + end, + State#state{downloading = D - 1}. + +got_torrent(State, Hash, single, {Name, Length}) -> + try_save(State, Hash, Name, Length, []); + +got_torrent(State, Hash, multi, {Root, Files}) -> + try_save(State, Hash, Root, 0, Files). + +try_save(State, Hash, Name, Length, Files) -> + Conn = db_conn(State), + case catch db_store_mongo:insert(Conn, Hash, Name, Length, Files) of + {'EXIT', Reason} -> + ?E(?FMT("save torrent failed ~p", [Reason])); + _ -> + on_saved(Conn) + end. + +on_used_cache() -> + hash_reader_stats:handle_used_cache(). + +on_saved(Conn) -> + % `get_peers' here means we have processed a request + db_system:stats_get_peers(Conn), + % increase the `new' counter + db_system:stats_new_saved(Conn), + hash_reader_stats:handle_insert(). + diff --git a/src/hash_reader/hash_download_cache.erl b/src/hash_reader/hash_download_cache.erl new file mode 100644 index 0000000..84f6216 --- /dev/null +++ b/src/hash_reader/hash_download_cache.erl @@ -0,0 +1,101 @@ +%% +%% hash_download_cache.erl +%% Kevin Lynx +%% cache these wait_download hashes, the downloader will read hashes from here, +%% to avoid database operation, if the cache is too big, save it then. +%% 07.21.2013 +%% +-module(hash_download_cache). +-include("vlog.hrl"). +-include("db_common.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1, + stop/0, + insert/1, + get_one/0]). +-record(state, {cache = [], max, dbpool}). +-define(SAVE_BATCH, 100). + +start_link(DBPool) -> + Max = config:get(max_download_cache, 100), + gen_server:start_link({local, srv_name()}, ?MODULE, [DBPool, Max], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +insert(Doc) -> + gen_server:cast(srv_name(), {insert, Doc}). + +get_one() -> + gen_server:call(srv_name(), get_one, infinity). + +srv_name() -> + ?MODULE. + +% +init([DBPool, Max]) -> + {ok, #state{max = Max, dbpool = DBPool}}. + +terminate(_, State) -> + #state{dbpool = DBPool, cache = Cache} = State, + check_save(DBPool, Cache, 0), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast({insert, Doc}, State) -> + #state{dbpool = DBPool, cache = Cache, max = Max} = State, + NewCache = check_save(DBPool, [Doc|Cache], Max), + {noreply, State#state{cache = NewCache}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(get_one, _From, State) -> + #state{dbpool = DBPool, cache = Cache} = State, + {Doc, NewCache} = try_load(DBPool, Cache), + {reply, Doc, State#state{cache = NewCache}}; + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + +check_save(DBPool, Cache, Max) when length(Cache) >= Max -> + SplitCnt = 2 * Max div 3, + {Remain, ToSave} = lists:split(SplitCnt, Cache), + ?T(?FMT("download_cache reached the max, save 1/3 ~p", [length(ToSave)])), + do_save(DBPool, ToSave), + Remain; +check_save(_, Cache, _) -> + Cache. + +do_save(_DBPool, []) -> + ok; +do_save(DBPool, Docs) -> + Insert = fun(Doc) -> + Conn = mongo_pool:get(DBPool), + {ID} = bson:lookup('_id', Doc), + Sel = {'_id', ID}, + mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + mongo:update(?HASH_DOWNLOAD_COLL, Sel, Doc, true) + end) + end, + [Insert(Doc) || Doc <- Docs]. + +try_load(DBPool, []) -> + ?T("download_cache empty, load hash from db"), + Conn = mongo_pool:get(DBPool), + {Doc} = hash_reader_common:load_delete_doc(Conn, ?HASH_DOWNLOAD_COLL), + {Doc, []}; +try_load(_, [First|Rest]) -> + {First, Rest}. + diff --git a/src/hash_reader/hash_reader.erl b/src/hash_reader/hash_reader.erl index 15a8434..22c36d8 100644 --- a/src/hash_reader/hash_reader.erl +++ b/src/hash_reader/hash_reader.erl @@ -106,7 +106,7 @@ handle_info(M, State) -> handle_cast({process_hash, Doc, DownloadDoc}, State) -> Conn = db_conn(State), - {Hash} = bson:lookup(hash, Doc), + {Hash} = bson:lookup('_id', Doc), % lookup(hash, Doc) ReqCnt = get_req_cnt(Doc), ListHash = binary_to_list(Hash), ?T(?FMT("process a hash ~s download-doc ~p", [ListHash, DownloadDoc])), @@ -214,7 +214,7 @@ got_torrent(State, Hash, multi, {Root, Files}) -> % insert the doc to the `wait-download' collection, and when the % downloader is free, it will download this doc. insert_to_download_wait(Conn, Doc) -> - {ID} = bson:lookup('_id', Doc), + {ID} = bson:lookup('_id', Doc), % lookup(hash, Doc) Sel = {'_id', ID}, mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> % may exist already @@ -229,7 +229,7 @@ check_in_index_cache(_, {}) -> timer:send_after(?WAIT_TIME, timeout), empty; check_in_index_cache(Conn, {Doc}) -> - {Hash} = bson:lookup(hash, Doc), + {Hash} = bson:lookup('_id', Doc), % lookup(hash, Doc) ListHash = binary_to_list(Hash), Try = should_try_download(config:get(check_cache, false), Conn, ListHash), case Try of diff --git a/src/hash_reader/hash_reader2.erl b/src/hash_reader/hash_reader2.erl new file mode 100644 index 0000000..51949ca --- /dev/null +++ b/src/hash_reader/hash_reader2.erl @@ -0,0 +1,71 @@ +%% +%% hash_reader.erl +%% Kevin Lynx +%% 07.21.2013 +%% +-module(hash_reader2). +-include("vlog.hrl"). +-include("db_common.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1]). +-define(WAIT_TIME, 1*60*1000). +-record(state, {dbpool}). + +start_link(DBPool) -> + gen_server:start_link(?MODULE, [DBPool], []). + +init([DBPool]) -> + hash_download:start_link(DBPool), + {ok, #state{dbpool = DBPool}, 0}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast({process_hash, Doc}, State) -> + {BHash} = bson:lookup('_id', Doc), + Hash = binary_to_list(BHash), + ReqCnt = hash_reader_common:get_req_cnt(Doc), + Conn = db_conn(State), + case db_store_mongo:inc_announce(Conn, Hash, ReqCnt) of + true -> + hash_reader_common:on_updated(Conn); + false -> + ?T(?FMT("insert doc ~s to download_cache", [Hash])), + hash_download_cache:insert(Doc) + end, + schedule_next(Conn), + {noreply, State}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(timeout, State) -> + Conn = db_conn(State), + schedule_next(Conn), + {noreply, State}. + +schedule_next(Conn) -> + case hash_reader_common:load_delete_doc(Conn, ?HASH_COLLNAME) of + {} -> + ?T("start to wait for new hash"), + timer:send_after(?WAIT_TIME, timeout); + {Doc} -> + gen_server:cast(self(), {process_hash, Doc}) + end. + +db_conn(State) -> + #state{dbpool = DBPool} = State, + mongo_pool:get(DBPool). + diff --git a/src/hash_reader/hash_reader_common.erl b/src/hash_reader/hash_reader_common.erl new file mode 100644 index 0000000..20ca4a3 --- /dev/null +++ b/src/hash_reader/hash_reader_common.erl @@ -0,0 +1,34 @@ +%% +%% hash_reader_common.erl +%% Kevin Lynx +%% 07.21.2013 +%% +-module(hash_reader_common). +-include("db_common.hrl"). +-export([get_req_cnt/1, + on_updated/1, + load_delete_doc/2]). + +get_req_cnt(Doc) -> + case bson:lookup(req_cnt, Doc) of + {} -> 0; + {R} -> R + end. + +on_updated(Conn) -> + % `get_peers' here means we have processed a request + db_system:stats_get_peers(Conn), + % also increase the updated counter + db_system:stats_updated(Conn), + hash_reader_stats:handle_update(). + +load_delete_doc(Conn, Col) -> + Cmd = {findAndModify, Col, fields, {}, remove, true}, + Ret = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + mongo:command(Cmd) + end), + case Ret of + {value, undefined, ok, 1.0} -> {}; + {value, Obj, lastErrorObject, _, ok, 1.0} -> {Obj} + end. + diff --git a/src/hash_reader/hash_reader_sup.erl b/src/hash_reader/hash_reader_sup.erl index 463ef8f..6c95fe8 100644 --- a/src/hash_reader/hash_reader_sup.erl +++ b/src/hash_reader/hash_reader_sup.erl @@ -36,11 +36,12 @@ start_standalone(IP, Port, Size) -> config:start_link("hash_reader.config", fun() -> config_default() end), tor_name_seg:init(), % NOTE: + DownloadCache = {hash_download_cache, {hash_download_cache, start_link, [?DBPOOLNAME]}, permanent, 2000, worker, [hash_download_cache]}, Stats = {hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]}, DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]}, Log = {vlog, {vlog, start_link, ["log/hash_reader.log", 3]}, permanent, 2000, worker, [vlog]}, DBDateRange = {db_daterange, {db_daterange, start_link, [?DBPOOLNAME]}, permanent, 1000, worker, [db_daterange]}, - start_link(IP, Port, Size, [Log, DBDateRange, DownloadStats, Stats]). + start_link(IP, Port, Size, [Log, DownloadCache, DBDateRange, DownloadStats, Stats]). start_link(IP, Port, Size) -> start_link(IP, Port, Size, []). @@ -62,7 +63,7 @@ init([PoolName, Size, OtherProcess]) -> {ok, {Spec, Children}}. create_child(PoolName, Index) -> - {child_id(Index), {hash_reader, start_link, [PoolName]}, + {child_id(Index), {hash_reader2, start_link, [PoolName]}, permanent, 1000, worker, dynamic}. child_id(Index) -> @@ -75,5 +76,6 @@ config_default() -> {load_from_db, false}, {text_seg, simple}, {check_cache, false}, + {max_download_cache, 100}, {max_download_per_reader, 100}, {torrent_path, "torrents/"}]. diff --git a/tools/HISTORY.md b/tools/HISTORY.md index 0f1ed15..cf841f2 100644 --- a/tools/HISTORY.md +++ b/tools/HISTORY.md @@ -1,3 +1,16 @@ +## 07.21.2013 + +* rewrite hash_reader, now it will keep a wait_download cache +* change hash_writer(crawler) to insert unique hash + +## 07.19.2013 + +* add simple json searhch api to http + +## 07.15.2013 + +* crawler now will keep a hash cache, merge same hash in the cache, this makes hash_reader process less hashes + ## 07.08.2013 * add torrent importer which can import local torrents into torrents database