Merge branch 'src' of github.com:kevinlynx/dhtcrawler2 into src

This commit is contained in:
Kevin Lynx 2013-07-18 13:09:12 +08:00
commit cd9ae2ec53
9 changed files with 176 additions and 102 deletions

View File

@ -6,47 +6,28 @@
-module(crawler_app). -module(crawler_app).
-behaviour(application). -behaviour(application).
-export([start/2, stop/1]). -export([start/2, stop/1]).
-export([start/0, startboth/0, stop/0]). -export([start/0, stop/0]).
% application behaviour callback
start(_Type, _StartArgs) -> start(_Type, _StartArgs) ->
File = config_file_name(), config:start_link("dhtcrawler.config", fun() -> config_default() end),
io:format("load config file ~s ", [File]), do_start().
case file:consult(File) of
{error, _Reason} ->
do_default_start(File);
{ok, [Cfg]} ->
do_start(Cfg)
end.
stop(_State) -> stop(_State) ->
config:stop(),
crawler_sup:stop(). crawler_sup:stop().
config_file_name() -> do_start() ->
filename:join([filename:dirname(code:which(?MODULE)), StartPort = config:get(start_port),
"..", "priv", "dhtcrawler.config"]). Count = config:get(node_count),
LogLevel = config:get(loglevel),
do_default_start(File) -> DBConn = config:get(dbconn),
List = [{start_port, 6776}, DBHost = config:get(dbhost),
{node_count, 50}, DBPort = config:get(dbport),
{hash_max_cache, 10}, CacheMaxCount = config:get(hash_max_cache),
{loglevel, 3}, CacheMaxTime = config:get(cache_max_time),
{dbconn, 15},
{dbhost, "localhost"},
{dbport, 27017}],
filelib:ensure_dir("priv/"),
file:write_file(File, io_lib:fwrite("~p.\n",[List])),
do_start(List).
do_start(List) ->
StartPort = proplists:get_value(start_port, List),
Count = proplists:get_value(node_count, List),
LogLevel = proplists:get_value(loglevel, List),
DBConn = proplists:get_value(dbconn, List),
DBHost = proplists:get_value(dbhost, List),
DBPort = proplists:get_value(dbport, List),
HashMaxCache = proplists:get_value(hash_max_cache, List),
io:format("dhtcrawler startup ~p, ~p, ~p:~p~n", [StartPort, Count, DBHost, DBPort]), io:format("dhtcrawler startup ~p, ~p, ~p:~p~n", [StartPort, Count, DBHost, DBPort]),
crawler_sup:start_link({StartPort, Count, DBHost, DBPort, LogLevel, DBConn, HashMaxCache}). crawler_sup:start_link({StartPort, Count, DBHost, DBPort, LogLevel, DBConn, CacheMaxTime, CacheMaxCount}).
start() -> start() ->
error_logger:logfile({open, "crash.log"}), error_logger:logfile({open, "crash.log"}),
@ -60,7 +41,13 @@ start() ->
stop() -> stop() ->
application:stop(dhtcrawler). application:stop(dhtcrawler).
startboth() -> config_default() ->
start(), [{start_port, 6776},
crawler_http:start(). {node_count, 50},
{hash_max_cache, 300},
{cache_max_time, 2*60}, % seconds
{loglevel, 3},
{dbconn, 5},
{dbhost, "localhost"},
{dbport, 27017}].

View File

@ -21,20 +21,15 @@ stop() ->
srv_name() -> srv_name() ->
dht_crawler_sup. dht_crawler_sup.
init([{StartPort, Count, DBHost, DBPort, LogLevel, DBConn, HashCacheMax}]) -> init([{StartPort, Count, DBHost, DBPort, LogLevel, DBConn, CacheTime, HashCacheMax}]) ->
Spec = {one_for_one, 1, 600}, Spec = {one_for_one, 1, 600},
Instances = create_dht_instance(StartPort, Count), Instances = create_dht_instance(StartPort, Count),
Logger = [{dht_logger, {vlog, start_link, ["dht_crawler.txt", LogLevel]}, Logger = [{dht_logger, {vlog, start_link, ["dht_crawler.txt", LogLevel]},
permanent, 2000, worker, dynamic}], permanent, 2000, worker, dynamic}],
%Downloader = [{torrent_downloader, {torrent_download, start_link, []}, HashInserter = [{hash_cache_writer, {hash_cache_writer, start_link, [DBHost, DBPort, DBConn, CacheTime, HashCacheMax]},
% permanent, 2000, worker, dynamic}],
%DBStorer = [{torrent_index, {torrent_index, start_link, [DBHost, DBPort, crawler_stats, DBConn]},
% permanent, 2000, worker, dynamic}],
HashInserter = [{db_hash, {db_hash, start_link, [DBHost, DBPort, DBConn, HashCacheMax]},
permanent, 2000, worker, dynamic}], permanent, 2000, worker, dynamic}],
Stats = [{crawler_stats, {crawler_stats, start_link, []}, Stats = [{crawler_stats, {crawler_stats, start_link, []},
permanent, 2000, worker, dynamic}], permanent, 2000, worker, dynamic}],
%Children = Logger ++ Downloader ++ DBStorer ++ Instances,
Children = Logger ++ HashInserter ++ Stats ++ Instances, Children = Logger ++ HashInserter ++ Stats ++ Instances,
{ok, {Spec, Children}}. {ok, {Spec, Children}}.

View File

@ -23,7 +23,8 @@ handle_event(get_peers, {InfoHash, _IP, _Port}) ->
crawler_stats:get_peers(), crawler_stats:get_peers(),
%spawn(?MODULE, process_infohash_event, [InfoHash]); %spawn(?MODULE, process_infohash_event, [InfoHash]);
MagHash = dht_id:tohex(InfoHash), MagHash = dht_id:tohex(InfoHash),
db_hash:insert(MagHash); %db_hash:insert(MagHash);
hash_cache_writer:insert(MagHash);
handle_event(startup, {MyID}) -> handle_event(startup, {MyID}) ->
spawn(?MODULE, tell_more_nodes, [MyID]). spawn(?MODULE, tell_more_nodes, [MyID]).

View File

@ -0,0 +1,122 @@
%%
%% hash_cache_writer.erl
%% Kevin Lynx
%% 07.17.2013
%% cache received hashes and pre-process theses hashes before inserted into database
%%
-module(hash_cache_writer).
-include("db_common.hrl").
-include("vlog.hrl").
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/5, stop/0, insert/1]).
-record(state, {cache_time, cache_max}).
-define(TBLNAME, hash_table).
-define(DBPOOL, hash_write_db).
start_link(IP, Port, DBConn, MaxTime, MaxCnt) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, DBConn, MaxTime, MaxCnt], []).
stop() ->
gen_server:cast(srv_name(), stop).
insert(Hash) when is_list(Hash) ->
gen_server:cast(srv_name(), {insert, Hash}).
srv_name() ->
?MODULE.
init([IP, Port, DBConn, MaxTime, Max]) ->
mongo_sup:start_pool(?DBPOOL, DBConn, {IP, Port}),
ets:new(?TBLNAME, [set, named_table]),
{ok, #state{cache_time = 1000 * MaxTime, cache_max = Max}, 0}.
terminate(_, State) ->
mongo_sup:stop_pool(?DBPOOL),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast({insert, Hash}, State) ->
do_insert(Hash),
try_save(State),
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
handle_info(do_save_cache, #state{cache_time = Time} = State) ->
?T("timeout to save cache hashes"),
do_save(table_size(?TBLNAME)),
schedule_save(Time),
{noreply, State};
handle_info(timeout, #state{cache_time = Time} = State) ->
schedule_save(Time),
{noreply, State}.
schedule_save(Time) ->
timer:send_after(Time, do_save_cache).
%%
do_insert(Hash) when is_list(Hash) ->
NewVal = 1 + get_req_cnt(Hash),
ets:insert(?TBLNAME, {Hash, NewVal}).
table_size(Tbl) ->
Infos = ets:info(Tbl),
proplists:get_value(size, Infos).
try_save(#state{cache_max = Max}) ->
TSize = table_size(?TBLNAME),
try_save(TSize, Max).
try_save(Size, Max) when Size >= Max ->
?T(?FMT("try save all cache hashes ~p", [Size])),
do_save(Size);
try_save(_, _) ->
ok.
do_save(0) ->
ok;
do_save(_) ->
Conn = mongo_pool:get(?DBPOOL),
First = ets:first(?TBLNAME),
ReqAt = time_util:now_seconds(),
{ReqSum, Docs} = to_docs(First, ReqAt),
ets:delete_all_objects(?TBLNAME),
do_save(Conn, Docs, ReqSum).
to_docs('$end_of_table', _) ->
{0, []};
to_docs(Key, ReqAt) ->
ReqCnt = get_req_cnt(Key),
Doc = {hash, list_to_binary(Key), req_at, ReqAt, req_cnt, ReqCnt},
Next = ets:next(?TBLNAME, Key),
{ReqSum, Docs} = to_docs(Next, ReqAt),
{ReqSum + ReqCnt, [Doc|Docs]}.
do_save(Conn, Docs, ReqSum) ->
?T(?FMT("send ~p hashes req-count ~p to db", [length(Docs), ReqSum])),
% `safe' may cause this process message queue increasing, but `unsafe' may cause the
% database crashes.
mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
mongo:insert(?HASH_COLLNAME, Docs)
end),
db_system:stats_cache_query_inserted(Conn, length(Docs)),
db_system:stats_query_inserted(Conn, ReqSum).
get_req_cnt(Hash) ->
case ets:lookup(?TBLNAME, Hash) of
[{Hash, ReqCnt}] -> ReqCnt;
[] -> 0
end.

View File

@ -5,7 +5,7 @@
%% To track the most recently hashes %% To track the most recently hashes
%% %%
-module(db_daterange). -module(db_daterange).
-export([insert/2, -export([insert/3,
lookup/3]). lookup/3]).
-export([start_link/1, -export([start_link/1,
stop/0]). stop/0]).
@ -31,11 +31,11 @@ ensure_date_index(Conn) ->
end). end).
% '_id': Hash, date: DaySecs, reqs: RequestCount % '_id': Hash, date: DaySecs, reqs: RequestCount
insert(Conn, Hash) when is_list(Hash) -> insert(Conn, Hash, ReqCnt) when is_list(Hash) ->
DaySecs = time_util:now_day_seconds(), DaySecs = time_util:now_day_seconds(),
BHash = list_to_binary(Hash), BHash = list_to_binary(Hash),
Cmd = {findAndModify, ?COLLNAME, query, {'_id', BHash}, upsert, true, Cmd = {findAndModify, ?COLLNAME, query, {'_id', BHash}, upsert, true,
update, {'$inc', {reqs, 1}, '$set', {?DATE_COL, DaySecs}}, fields, {'_id', 1}}, update, {'$inc', {reqs, ReqCnt}, '$set', {?DATE_COL, DaySecs}}, fields, {'_id', 1}},
IRet = mongo:do(safe, master, Conn, ?DBNAME, fun() -> IRet = mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:command(Cmd) mongo:command(Cmd)
end), end),

View File

@ -11,6 +11,7 @@
unsafe_insert/2, unsafe_insert/2,
count/1, count/1,
inc_announce/2, inc_announce/2,
inc_announce/3,
exist/2, exist/2,
index/2, index/2,
search_newest_top_by_date/3, search_newest_top_by_date/3,
@ -114,7 +115,8 @@ index(Conn, Hash) when is_list(Hash) ->
insert(Conn, Hash, Name, Length, Files) when is_list(Hash) -> insert(Conn, Hash, Name, Length, Files) when is_list(Hash) ->
NewDoc = create_torrent_desc(Conn, Hash, Name, Length, 1, Files), NewDoc = create_torrent_desc(Conn, Hash, Name, Length, 1, Files),
db_daterange:insert(Conn, Hash), % TODO: because of the hash_cache_writer, the new inserted torrent lost the req_cnt value
db_daterange:insert(Conn, Hash, 1),
mongo_do(Conn, fun() -> mongo_do(Conn, fun() ->
% the doc may already exist because the other process has inserted before % the doc may already exist because the other process has inserted before
Sel = {'_id', list_to_binary(Hash)}, Sel = {'_id', list_to_binary(Hash)},
@ -129,13 +131,14 @@ unsafe_insert(Conn, Tors) when is_list(Tors) ->
end). end).
inc_announce(Conn, Hash) when is_list(Hash) -> inc_announce(Conn, Hash) when is_list(Hash) ->
inc_announce(Conn, list_to_binary(Hash)); inc_announce(Conn, Hash, 1).
inc_announce(Conn, Hash) when is_binary(Hash) -> inc_announce(Conn, Hash, Inc) when is_list(Hash) ->
% damn, mongodb-erlang doesnot support update a field for an object, % damn, mongodb-erlang doesnot support update a field for an object,
% `findAndModify` works but it will change `announce' datatype to double % `findAndModify` works but it will change `announce' datatype to double
Cmd = {findAndModify, ?COLLNAME, query, {'_id', Hash}, BHash = list_to_binary(Hash),
update, {'$inc', {announce, 1}}, fields, {'_id', 1}, % not specifed or {} will return whole object Cmd = {findAndModify, ?COLLNAME, query, {'_id', BHash},
update, {'$inc', {announce, Inc}}, fields, {'_id', 1}, % not specifed or {} will return whole object
new, false}, new, false},
Ret = mongo_do(Conn, fun() -> Ret = mongo_do(Conn, fun() ->
mongo:command(Cmd) mongo:command(Cmd)
@ -143,7 +146,7 @@ inc_announce(Conn, Hash) when is_binary(Hash) ->
case Ret of case Ret of
{value, undefined, ok, 1.0} -> false; {value, undefined, ok, 1.0} -> false;
{value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} -> {value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} ->
db_daterange:insert(Conn, binary_to_list(Hash)), db_daterange:insert(Conn, Hash, Inc),
true; true;
_ -> false _ -> false
end. end.

View File

@ -89,7 +89,10 @@ stats_get_peers(Conn) ->
% all queries, not processed % all queries, not processed
stats_query_inserted(Conn, Count) -> stats_query_inserted(Conn, Count) ->
stats_inc_field(Conn, get_peers_query, Count). stats_inc_field(Conn, get_peers_query, Count).
stats_cache_query_inserted(Conn, Count) ->
stats_inc_field(Conn, inserted_query, Count).
stats_inc_field(Conn, Filed) -> stats_inc_field(Conn, Filed) ->
stats_inc_field(Conn, Filed, 1). stats_inc_field(Conn, Filed, 1).
@ -116,6 +119,7 @@ stats_ensure_today(TodaySecs) ->
case mongo:find_one(?STATS_COLLNAME, {'_id', TodaySecs}) of case mongo:find_one(?STATS_COLLNAME, {'_id', TodaySecs}) of
{} -> {} ->
NewDoc = {'_id', TodaySecs, get_peers, 0, get_peers_query, 0, NewDoc = {'_id', TodaySecs, get_peers, 0, get_peers_query, 0,
inserted_query, 0, % because has_cache_writer will merge some queries
updated, 0, new_saved, 0}, updated, 0, new_saved, 0},
mongo:insert(?STATS_COLLNAME, NewDoc), mongo:insert(?STATS_COLLNAME, NewDoc),
NewDoc; NewDoc;
@ -128,5 +132,3 @@ test_torrent_id() ->
{ok, Conn} = mongo_connection:start_link({localhost, 27017}), {ok, Conn} = mongo_connection:start_link({localhost, 27017}),
ID = get_torrent_id(Conn), ID = get_torrent_id(Conn),
ID. ID.

View File

@ -107,13 +107,14 @@ handle_info(M, State) ->
handle_cast({process_hash, Doc, DownloadDoc}, State) -> handle_cast({process_hash, Doc, DownloadDoc}, State) ->
Conn = db_conn(State), Conn = db_conn(State),
{Hash} = bson:lookup(hash, Doc), {Hash} = bson:lookup(hash, Doc),
ReqCnt = get_req_cnt(Doc),
ListHash = binary_to_list(Hash), ListHash = binary_to_list(Hash),
?T(?FMT("process a hash ~s download-doc ~p", [ListHash, DownloadDoc])), ?T(?FMT("process a hash ~s download-doc ~p", [ListHash, DownloadDoc])),
% to avoid register many timers when the hash is empty but download hash is not % to avoid register many timers when the hash is empty but download hash is not
% it also can avoid increase the message queue size, everytime this function get called, % it also can avoid increase the message queue size, everytime this function get called,
% it remove this message and append only another 1. % it remove this message and append only another 1.
if DownloadDoc -> do_nothing; true -> try_next(Conn) end, if DownloadDoc -> do_nothing; true -> try_next(Conn) end,
NewState = case db_store_mongo:inc_announce(Conn, ListHash) of NewState = case db_store_mongo:inc_announce(Conn, ListHash, ReqCnt) of
true -> true ->
?T(?FMT("inc_announce success ~s", [ListHash])), ?T(?FMT("inc_announce success ~s", [ListHash])),
on_updated(Conn), on_updated(Conn),
@ -131,6 +132,12 @@ handle_cast(stop, State) ->
handle_call(_, _From, State) -> handle_call(_, _From, State) ->
{noreply, State}. {noreply, State}.
get_req_cnt(Doc) ->
case bson:lookup(req_cnt, Doc) of
{} -> 0;
{R} -> R
end.
try_download(State, Hash, Doc) -> try_download(State, Hash, Doc) ->
#state{downloading = D} = State, #state{downloading = D} = State,
NewDownloading = case D >= ?MAX_DOWNLOAD of NewDownloading = case D >= ?MAX_DOWNLOAD of

View File

@ -1,43 +0,0 @@
-module(transfer).
-compile(export_all).
-export([start/0]).
-define(DBNAME, torrents).
-define(COLLNAME, hash).
-define(ONE_DAY, 24*60*60).
-define(READ_POOL, read_pool).
-define(WRITE_POOL, write_pool).
-export([start/2]).
start() ->
mongo_sup:start_pool(?READ_POOL, 5, {localhost, 10000}),
mongo_sup:start_pool(?WRITE_POOL, 5, {localhost, 27010}),
[spawn(?MODULE, start, [DayStart, DayEnd]) ||
{DayStart, DayEnd} <- gen_day_ranges()],
ok.
gen_day_ranges() ->
Today = time_util:now_day_seconds(),
[day_secs_at(Today, Before) || Before <- lists:seq(0, 15)].
day_secs_at(Today, Before) ->
{Today - Before * ?ONE_DAY, Today - Before * ?ONE_DAY + ?ONE_DAY}.
start(DaySecs, DaySecsMax) ->
RConn = mongo_pool:get(?READ_POOL),
WConn = mongo_pool:get(?WRITE_POOL),
Docs = mongo:do(safe, master, RConn, ?DBNAME, fun() ->
Cursor = mongo:find(?COLLNAME, {created_at, {'$gt', DaySecs, '$lt', DaySecsMax}}),
mongo_cursor:rest(Cursor)
end),
case Docs of
[] ->
ok;
_ ->
mongo:do(safe, master, WConn, ?DBNAME, fun() ->
mongo:insert(?COLLNAME, Docs)
end)
end,
io:format("done at ~p size ~p~n", [DaySecs, length(Docs)]).