diff --git a/src/crawler/crawler_app.erl b/src/crawler/crawler_app.erl index f6f5d1c..3f87780 100644 --- a/src/crawler/crawler_app.erl +++ b/src/crawler/crawler_app.erl @@ -6,47 +6,28 @@ -module(crawler_app). -behaviour(application). -export([start/2, stop/1]). --export([start/0, startboth/0, stop/0]). +-export([start/0, stop/0]). +% application behaviour callback start(_Type, _StartArgs) -> - File = config_file_name(), - io:format("load config file ~s ", [File]), - case file:consult(File) of - {error, _Reason} -> - do_default_start(File); - {ok, [Cfg]} -> - do_start(Cfg) - end. + config:start_link("dhtcrawler.config", fun() -> config_default() end), + do_start(). stop(_State) -> + config:stop(), crawler_sup:stop(). -config_file_name() -> - filename:join([filename:dirname(code:which(?MODULE)), - "..", "priv", "dhtcrawler.config"]). - -do_default_start(File) -> - List = [{start_port, 6776}, - {node_count, 50}, - {hash_max_cache, 10}, - {loglevel, 3}, - {dbconn, 15}, - {dbhost, "localhost"}, - {dbport, 27017}], - filelib:ensure_dir("priv/"), - file:write_file(File, io_lib:fwrite("~p.\n",[List])), - do_start(List). - -do_start(List) -> - StartPort = proplists:get_value(start_port, List), - Count = proplists:get_value(node_count, List), - LogLevel = proplists:get_value(loglevel, List), - DBConn = proplists:get_value(dbconn, List), - DBHost = proplists:get_value(dbhost, List), - DBPort = proplists:get_value(dbport, List), - HashMaxCache = proplists:get_value(hash_max_cache, List), +do_start() -> + StartPort = config:get(start_port), + Count = config:get(node_count), + LogLevel = config:get(loglevel), + DBConn = config:get(dbconn), + DBHost = config:get(dbhost), + DBPort = config:get(dbport), + CacheMaxCount = config:get(hash_max_cache), + CacheMaxTime = config:get(cache_max_time), io:format("dhtcrawler startup ~p, ~p, ~p:~p~n", [StartPort, Count, DBHost, DBPort]), - crawler_sup:start_link({StartPort, Count, DBHost, DBPort, LogLevel, DBConn, HashMaxCache}). + crawler_sup:start_link({StartPort, Count, DBHost, DBPort, LogLevel, DBConn, CacheMaxTime, CacheMaxCount}). start() -> error_logger:logfile({open, "crash.log"}), @@ -60,7 +41,13 @@ start() -> stop() -> application:stop(dhtcrawler). -startboth() -> - start(), - crawler_http:start(). +config_default() -> + [{start_port, 6776}, + {node_count, 50}, + {hash_max_cache, 300}, + {cache_max_time, 2*60}, % seconds + {loglevel, 3}, + {dbconn, 5}, + {dbhost, "localhost"}, + {dbport, 27017}]. diff --git a/src/crawler/crawler_sup.erl b/src/crawler/crawler_sup.erl index 2495285..a4477dd 100644 --- a/src/crawler/crawler_sup.erl +++ b/src/crawler/crawler_sup.erl @@ -21,20 +21,15 @@ stop() -> srv_name() -> dht_crawler_sup. -init([{StartPort, Count, DBHost, DBPort, LogLevel, DBConn, HashCacheMax}]) -> +init([{StartPort, Count, DBHost, DBPort, LogLevel, DBConn, CacheTime, HashCacheMax}]) -> Spec = {one_for_one, 1, 600}, Instances = create_dht_instance(StartPort, Count), Logger = [{dht_logger, {vlog, start_link, ["dht_crawler.txt", LogLevel]}, permanent, 2000, worker, dynamic}], - %Downloader = [{torrent_downloader, {torrent_download, start_link, []}, - % permanent, 2000, worker, dynamic}], - %DBStorer = [{torrent_index, {torrent_index, start_link, [DBHost, DBPort, crawler_stats, DBConn]}, - % permanent, 2000, worker, dynamic}], - HashInserter = [{db_hash, {db_hash, start_link, [DBHost, DBPort, DBConn, HashCacheMax]}, + HashInserter = [{hash_cache_writer, {hash_cache_writer, start_link, [DBHost, DBPort, DBConn, CacheTime, HashCacheMax]}, permanent, 2000, worker, dynamic}], Stats = [{crawler_stats, {crawler_stats, start_link, []}, permanent, 2000, worker, dynamic}], - %Children = Logger ++ Downloader ++ DBStorer ++ Instances, Children = Logger ++ HashInserter ++ Stats ++ Instances, {ok, {Spec, Children}}. diff --git a/src/crawler/dht_monitor.erl b/src/crawler/dht_monitor.erl index 6c0f7b7..3572ae1 100644 --- a/src/crawler/dht_monitor.erl +++ b/src/crawler/dht_monitor.erl @@ -23,7 +23,8 @@ handle_event(get_peers, {InfoHash, _IP, _Port}) -> crawler_stats:get_peers(), %spawn(?MODULE, process_infohash_event, [InfoHash]); MagHash = dht_id:tohex(InfoHash), - db_hash:insert(MagHash); + %db_hash:insert(MagHash); + hash_cache_writer:insert(MagHash); handle_event(startup, {MyID}) -> spawn(?MODULE, tell_more_nodes, [MyID]). diff --git a/src/crawler/hash_cache_writer.erl b/src/crawler/hash_cache_writer.erl new file mode 100644 index 0000000..1999256 --- /dev/null +++ b/src/crawler/hash_cache_writer.erl @@ -0,0 +1,122 @@ +%% +%% hash_cache_writer.erl +%% Kevin Lynx +%% 07.17.2013 +%% cache received hashes and pre-process theses hashes before inserted into database +%% +-module(hash_cache_writer). +-include("db_common.hrl"). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/5, stop/0, insert/1]). +-record(state, {cache_time, cache_max}). +-define(TBLNAME, hash_table). +-define(DBPOOL, hash_write_db). + +start_link(IP, Port, DBConn, MaxTime, MaxCnt) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, DBConn, MaxTime, MaxCnt], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +insert(Hash) when is_list(Hash) -> + gen_server:cast(srv_name(), {insert, Hash}). + +srv_name() -> + ?MODULE. + +init([IP, Port, DBConn, MaxTime, Max]) -> + mongo_sup:start_pool(?DBPOOL, DBConn, {IP, Port}), + ets:new(?TBLNAME, [set, named_table]), + {ok, #state{cache_time = 1000 * MaxTime, cache_max = Max}, 0}. + +terminate(_, State) -> + mongo_sup:stop_pool(?DBPOOL), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast({insert, Hash}, State) -> + do_insert(Hash), + try_save(State), + {noreply, State}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(do_save_cache, #state{cache_time = Time} = State) -> + ?T("timeout to save cache hashes"), + do_save(table_size(?TBLNAME)), + schedule_save(Time), + {noreply, State}; + +handle_info(timeout, #state{cache_time = Time} = State) -> + schedule_save(Time), + {noreply, State}. + +schedule_save(Time) -> + timer:send_after(Time, do_save_cache). + +%% +do_insert(Hash) when is_list(Hash) -> + NewVal = 1 + get_req_cnt(Hash), + ets:insert(?TBLNAME, {Hash, NewVal}). + +table_size(Tbl) -> + Infos = ets:info(Tbl), + proplists:get_value(size, Infos). + +try_save(#state{cache_max = Max}) -> + TSize = table_size(?TBLNAME), + try_save(TSize, Max). + +try_save(Size, Max) when Size >= Max -> + ?T(?FMT("try save all cache hashes ~p", [Size])), + do_save(Size); +try_save(_, _) -> + ok. + +do_save(0) -> + ok; +do_save(_) -> + Conn = mongo_pool:get(?DBPOOL), + First = ets:first(?TBLNAME), + ReqAt = time_util:now_seconds(), + {ReqSum, Docs} = to_docs(First, ReqAt), + ets:delete_all_objects(?TBLNAME), + do_save(Conn, Docs, ReqSum). + +to_docs('$end_of_table', _) -> + {0, []}; +to_docs(Key, ReqAt) -> + ReqCnt = get_req_cnt(Key), + Doc = {hash, list_to_binary(Key), req_at, ReqAt, req_cnt, ReqCnt}, + Next = ets:next(?TBLNAME, Key), + {ReqSum, Docs} = to_docs(Next, ReqAt), + {ReqSum + ReqCnt, [Doc|Docs]}. + +do_save(Conn, Docs, ReqSum) -> + ?T(?FMT("send ~p hashes req-count ~p to db", [length(Docs), ReqSum])), + % `safe' may cause this process message queue increasing, but `unsafe' may cause the + % database crashes. + mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + mongo:insert(?HASH_COLLNAME, Docs) + end), + db_system:stats_cache_query_inserted(Conn, length(Docs)), + db_system:stats_query_inserted(Conn, ReqSum). + +get_req_cnt(Hash) -> + case ets:lookup(?TBLNAME, Hash) of + [{Hash, ReqCnt}] -> ReqCnt; + [] -> 0 + end. diff --git a/src/db_daterange.erl b/src/db_daterange.erl index 897ddac..828d952 100644 --- a/src/db_daterange.erl +++ b/src/db_daterange.erl @@ -5,7 +5,7 @@ %% To track the most recently hashes %% -module(db_daterange). --export([insert/2, +-export([insert/3, lookup/3]). -export([start_link/1, stop/0]). @@ -31,11 +31,11 @@ ensure_date_index(Conn) -> end). % '_id': Hash, date: DaySecs, reqs: RequestCount -insert(Conn, Hash) when is_list(Hash) -> +insert(Conn, Hash, ReqCnt) when is_list(Hash) -> DaySecs = time_util:now_day_seconds(), BHash = list_to_binary(Hash), Cmd = {findAndModify, ?COLLNAME, query, {'_id', BHash}, upsert, true, - update, {'$inc', {reqs, 1}, '$set', {?DATE_COL, DaySecs}}, fields, {'_id', 1}}, + update, {'$inc', {reqs, ReqCnt}, '$set', {?DATE_COL, DaySecs}}, fields, {'_id', 1}}, IRet = mongo:do(safe, master, Conn, ?DBNAME, fun() -> mongo:command(Cmd) end), diff --git a/src/db_store_mongo.erl b/src/db_store_mongo.erl index fb0c611..48f7e54 100644 --- a/src/db_store_mongo.erl +++ b/src/db_store_mongo.erl @@ -11,6 +11,7 @@ unsafe_insert/2, count/1, inc_announce/2, + inc_announce/3, exist/2, index/2, search_newest_top_by_date/3, @@ -114,7 +115,8 @@ index(Conn, Hash) when is_list(Hash) -> insert(Conn, Hash, Name, Length, Files) when is_list(Hash) -> NewDoc = create_torrent_desc(Conn, Hash, Name, Length, 1, Files), - db_daterange:insert(Conn, Hash), + % TODO: because of the hash_cache_writer, the new inserted torrent lost the req_cnt value + db_daterange:insert(Conn, Hash, 1), mongo_do(Conn, fun() -> % the doc may already exist because the other process has inserted before Sel = {'_id', list_to_binary(Hash)}, @@ -129,13 +131,14 @@ unsafe_insert(Conn, Tors) when is_list(Tors) -> end). inc_announce(Conn, Hash) when is_list(Hash) -> - inc_announce(Conn, list_to_binary(Hash)); - -inc_announce(Conn, Hash) when is_binary(Hash) -> + inc_announce(Conn, Hash, 1). + +inc_announce(Conn, Hash, Inc) when is_list(Hash) -> % damn, mongodb-erlang doesnot support update a field for an object, % `findAndModify` works but it will change `announce' datatype to double - Cmd = {findAndModify, ?COLLNAME, query, {'_id', Hash}, - update, {'$inc', {announce, 1}}, fields, {'_id', 1}, % not specifed or {} will return whole object + BHash = list_to_binary(Hash), + Cmd = {findAndModify, ?COLLNAME, query, {'_id', BHash}, + update, {'$inc', {announce, Inc}}, fields, {'_id', 1}, % not specifed or {} will return whole object new, false}, Ret = mongo_do(Conn, fun() -> mongo:command(Cmd) @@ -143,7 +146,7 @@ inc_announce(Conn, Hash) when is_binary(Hash) -> case Ret of {value, undefined, ok, 1.0} -> false; {value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} -> - db_daterange:insert(Conn, binary_to_list(Hash)), + db_daterange:insert(Conn, Hash, Inc), true; _ -> false end. diff --git a/src/db_system.erl b/src/db_system.erl index 1f73937..9b430a4 100644 --- a/src/db_system.erl +++ b/src/db_system.erl @@ -89,7 +89,10 @@ stats_get_peers(Conn) -> % all queries, not processed stats_query_inserted(Conn, Count) -> stats_inc_field(Conn, get_peers_query, Count). - + +stats_cache_query_inserted(Conn, Count) -> + stats_inc_field(Conn, inserted_query, Count). + stats_inc_field(Conn, Filed) -> stats_inc_field(Conn, Filed, 1). @@ -116,6 +119,7 @@ stats_ensure_today(TodaySecs) -> case mongo:find_one(?STATS_COLLNAME, {'_id', TodaySecs}) of {} -> NewDoc = {'_id', TodaySecs, get_peers, 0, get_peers_query, 0, + inserted_query, 0, % because has_cache_writer will merge some queries updated, 0, new_saved, 0}, mongo:insert(?STATS_COLLNAME, NewDoc), NewDoc; @@ -128,5 +132,3 @@ test_torrent_id() -> {ok, Conn} = mongo_connection:start_link({localhost, 27017}), ID = get_torrent_id(Conn), ID. - - diff --git a/src/hash_reader/hash_reader.erl b/src/hash_reader/hash_reader.erl index 86504d3..fc68f4f 100644 --- a/src/hash_reader/hash_reader.erl +++ b/src/hash_reader/hash_reader.erl @@ -107,13 +107,14 @@ handle_info(M, State) -> handle_cast({process_hash, Doc, DownloadDoc}, State) -> Conn = db_conn(State), {Hash} = bson:lookup(hash, Doc), + ReqCnt = get_req_cnt(Doc), ListHash = binary_to_list(Hash), ?T(?FMT("process a hash ~s download-doc ~p", [ListHash, DownloadDoc])), % to avoid register many timers when the hash is empty but download hash is not % it also can avoid increase the message queue size, everytime this function get called, % it remove this message and append only another 1. if DownloadDoc -> do_nothing; true -> try_next(Conn) end, - NewState = case db_store_mongo:inc_announce(Conn, ListHash) of + NewState = case db_store_mongo:inc_announce(Conn, ListHash, ReqCnt) of true -> ?T(?FMT("inc_announce success ~s", [ListHash])), on_updated(Conn), @@ -131,6 +132,12 @@ handle_cast(stop, State) -> handle_call(_, _From, State) -> {noreply, State}. +get_req_cnt(Doc) -> + case bson:lookup(req_cnt, Doc) of + {} -> 0; + {R} -> R + end. + try_download(State, Hash, Doc) -> #state{downloading = D} = State, NewDownloading = case D >= ?MAX_DOWNLOAD of diff --git a/src/transfer.erl b/src/transfer.erl deleted file mode 100644 index 8c95731..0000000 --- a/src/transfer.erl +++ /dev/null @@ -1,43 +0,0 @@ --module(transfer). --compile(export_all). --export([start/0]). --define(DBNAME, torrents). --define(COLLNAME, hash). --define(ONE_DAY, 24*60*60). --define(READ_POOL, read_pool). --define(WRITE_POOL, write_pool). --export([start/2]). - -start() -> - mongo_sup:start_pool(?READ_POOL, 5, {localhost, 10000}), - mongo_sup:start_pool(?WRITE_POOL, 5, {localhost, 27010}), - [spawn(?MODULE, start, [DayStart, DayEnd]) || - {DayStart, DayEnd} <- gen_day_ranges()], - ok. - -gen_day_ranges() -> - Today = time_util:now_day_seconds(), - [day_secs_at(Today, Before) || Before <- lists:seq(0, 15)]. - -day_secs_at(Today, Before) -> - {Today - Before * ?ONE_DAY, Today - Before * ?ONE_DAY + ?ONE_DAY}. - -start(DaySecs, DaySecsMax) -> - RConn = mongo_pool:get(?READ_POOL), - WConn = mongo_pool:get(?WRITE_POOL), - Docs = mongo:do(safe, master, RConn, ?DBNAME, fun() -> - Cursor = mongo:find(?COLLNAME, {created_at, {'$gt', DaySecs, '$lt', DaySecsMax}}), - mongo_cursor:rest(Cursor) - end), - case Docs of - [] -> - ok; - _ -> - mongo:do(safe, master, WConn, ?DBNAME, fun() -> - mongo:insert(?COLLNAME, Docs) - end) - end, - io:format("done at ~p size ~p~n", [DaySecs, length(Docs)]). - - -