mirror of
https://github.com/btdig/dhtcrawler2.git
synced 2025-02-24 06:09:05 +00:00
integrate torrent downloader monitor
This commit is contained in:
parent
24b8061577
commit
9d8c60f0da
@ -42,18 +42,19 @@ insert(Conn, Hash) when is_list(Hash) ->
|
|||||||
gen_server:cast(srv_name(), insert),
|
gen_server:cast(srv_name(), insert),
|
||||||
IRet.
|
IRet.
|
||||||
|
|
||||||
% [ListHash, ListHash]
|
% [{ListHash, Req}, {ListHash, Req}]
|
||||||
lookup(Conn, DaySecs, Count) ->
|
lookup(Conn, DaySecs, Count) ->
|
||||||
Sel = {'$query', {date, DaySecs}, '$orderby', {reqs, -1}},
|
Sel = {'$query', {date, DaySecs}, '$orderby', {reqs, -1}},
|
||||||
List = mongo:do(safe, master, Conn, ?DBNAME, fun() ->
|
List = mongo:do(safe, master, Conn, ?DBNAME, fun() ->
|
||||||
Cursor = mongo:find(?COLLNAME, Sel, {'_id', 1}, 0, Count),
|
Cursor = mongo:find(?COLLNAME, Sel, {'_id', 1, reqs, 1}, 0, Count),
|
||||||
mongo_cursor:rest(Cursor)
|
mongo_cursor:rest(Cursor)
|
||||||
end),
|
end),
|
||||||
[decode_hash(Doc) || Doc <- List].
|
[decode_hash(Doc) || Doc <- List].
|
||||||
|
|
||||||
decode_hash(Doc) ->
|
decode_hash(Doc) ->
|
||||||
{ID} = bson:lookup('_id', Doc),
|
{ID} = bson:lookup('_id', Doc),
|
||||||
binary_to_list(ID).
|
{Req} = bson:lookup(reqs, Doc),
|
||||||
|
{binary_to_list(ID), Req}.
|
||||||
|
|
||||||
% delete all oldest hashes
|
% delete all oldest hashes
|
||||||
try_delete_oldest(Conn) ->
|
try_delete_oldest(Conn) ->
|
||||||
|
@ -91,9 +91,16 @@ search_newest_top(Conn, Count, DaySecs) ->
|
|||||||
% use hash_date to search, based on date index, so the search speed is very fast
|
% use hash_date to search, based on date index, so the search speed is very fast
|
||||||
search_newest_top_by_date(Conn, Count, DaySecs) ->
|
search_newest_top_by_date(Conn, Count, DaySecs) ->
|
||||||
Hashes = db_daterange:lookup(Conn, DaySecs, Count),
|
Hashes = db_daterange:lookup(Conn, DaySecs, Count),
|
||||||
Infos = [index(Conn, Hash) || Hash <- Hashes],
|
Infos = [day_info(Conn, Hash, Req) || {Hash, Req} <- Hashes],
|
||||||
lists:filter(fun(E) -> E /= {} end, Infos).
|
lists:filter(fun(E) -> E /= {} end, Infos).
|
||||||
|
|
||||||
|
day_info(Conn, Hash, Req) ->
|
||||||
|
case index(Conn, Hash) of
|
||||||
|
{} -> {};
|
||||||
|
Info ->
|
||||||
|
setelement(4, Info, Req)
|
||||||
|
end.
|
||||||
|
|
||||||
index(Conn, Hash) when is_list(Hash) ->
|
index(Conn, Hash) when is_list(Hash) ->
|
||||||
Ret = mongo_do_slave(Conn, fun() ->
|
Ret = mongo_do_slave(Conn, fun() ->
|
||||||
mongo:find_one(?COLLNAME, {'_id', list_to_binary(Hash)})
|
mongo:find_one(?COLLNAME, {'_id', list_to_binary(Hash)})
|
||||||
@ -214,6 +221,8 @@ decode_ret_item(Item) ->
|
|||||||
{Torrent} = bson:lookup(obj, Item),
|
{Torrent} = bson:lookup(obj, Item),
|
||||||
decode_torrent_item(Torrent).
|
decode_torrent_item(Torrent).
|
||||||
|
|
||||||
|
% {single, Hash, {Name, Length}, RequestCount, CreatedAt}
|
||||||
|
% {multi, Hash, {Name, Files}, RequestCount, CreatedAt}
|
||||||
decode_torrent_item(Torrent) ->
|
decode_torrent_item(Torrent) ->
|
||||||
{BinHash} = bson:lookup('_id', Torrent),
|
{BinHash} = bson:lookup('_id', Torrent),
|
||||||
Hash = binary_to_list(BinHash),
|
Hash = binary_to_list(BinHash),
|
||||||
|
@ -23,15 +23,15 @@
|
|||||||
-define(REQ_TIMEOUT, 60*1000).
|
-define(REQ_TIMEOUT, 60*1000).
|
||||||
% when ibrowse crashed, it will not notify these requests timeout, that will
|
% when ibrowse crashed, it will not notify these requests timeout, that will
|
||||||
% make these requests stay in the state forever
|
% make these requests stay in the state forever
|
||||||
-define(REQ_ERROR_TIMEOUT, 2*?REQ_TIMEOUT).
|
-define(REQ_ERROR_TIMEOUT, 5*60*1000).
|
||||||
-define(IS_ERROR_TIMEOUT(T), (timer:now_diff(now(), T) div 1000 > ?REQ_ERROR_TIMEOUT)).
|
-define(IS_ERROR_TIMEOUT(T), (timer:now_diff(now(), T) div 1000 > ?REQ_ERROR_TIMEOUT)).
|
||||||
-record(state, {start, hashSum = 0, reqSum = 0, totalTime = 0, reqs}).
|
-record(state, {start, hashSum = 0, reqSum = 0, totalTime = 0, reqs}).
|
||||||
|
|
||||||
start_global() ->
|
start_global() ->
|
||||||
ibrowse:start(),
|
ibrowse:start(),
|
||||||
Options = [{max_sessions, ?HTTP_SESSION}, {max_pipeline_size, ?HTTP_PIPELINE}],
|
%Options = [{max_sessions, ?HTTP_SESSION}, {max_pipeline_size, ?HTTP_PIPELINE}],
|
||||||
% not work here ?
|
% not work here ?
|
||||||
[ibrowse:set_dest(Host, 80, Options) || Host <- get_req_hosts()],
|
%[ibrowse:set_dest(Host, 80, Options) || Host <- get_req_hosts()],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
@ -48,14 +48,14 @@ stats(Pid) ->
|
|||||||
gen_server:call(Pid, get_stats, infinity).
|
gen_server:call(Pid, get_stats, infinity).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, #state{start = now(), reqs = gb_trees:empty()}}.
|
{ok, #state{start = now(), reqs = gb_trees:empty()}, 0}.
|
||||||
|
|
||||||
handle_cast({download, MagHash, From}, State) ->
|
handle_cast({download, MagHash, From}, State) ->
|
||||||
#state{reqs = Reqs, hashSum = H, reqSum = R} = State,
|
#state{reqs = Reqs, hashSum = H, reqSum = R} = State,
|
||||||
% remove these invalid requests
|
% remove these invalid requests
|
||||||
UpdateReqs = Reqs, %check_error_timeout_reqs(Reqs),
|
UpdateReqs = Reqs, %check_error_timeout_reqs(Reqs),
|
||||||
NewReqs = create_download(UpdateReqs, MagHash, From),
|
NewReqs = create_download(UpdateReqs, MagHash, From),
|
||||||
NewSum = R + 1 - gb_trees:size(Reqs) - gb_trees:size(UpdateReqs),
|
NewSum = R + 1 - (gb_trees:size(Reqs) - gb_trees:size(UpdateReqs)),
|
||||||
{noreply, State#state{reqs = NewReqs, hashSum = H + 1, reqSum = NewSum}};
|
{noreply, State#state{reqs = NewReqs, hashSum = H + 1, reqSum = NewSum}};
|
||||||
|
|
||||||
handle_cast(stop, State) ->
|
handle_cast(stop, State) ->
|
||||||
@ -93,6 +93,10 @@ handle_info({ibrowse_async_response, ReqID, Body}, State) ->
|
|||||||
end,
|
end,
|
||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
|
|
||||||
|
handle_info(timeout, State) ->
|
||||||
|
timer:send_interval(?REQ_ERROR_TIMEOUT div 2, check_error_timeout),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(check_error_timeout, State) ->
|
handle_info(check_error_timeout, State) ->
|
||||||
#state{reqs = Reqs} = State,
|
#state{reqs = Reqs} = State,
|
||||||
NewReqs = check_error_timeout_reqs(Reqs),
|
NewReqs = check_error_timeout_reqs(Reqs),
|
||||||
@ -164,19 +168,20 @@ unzip_content(_B) ->
|
|||||||
error.
|
error.
|
||||||
|
|
||||||
%% http stuff
|
%% http stuff
|
||||||
get_req_hosts() ->
|
%get_req_hosts() ->
|
||||||
["http://bt.box.n0808.com",
|
% ["http://bt.box.n0808.com",
|
||||||
"http://torcache.net",
|
% "http://torcache.net",
|
||||||
"http:/torrange.com",
|
% "http:/torrange.com",
|
||||||
"http://zoink.it"].
|
% "http://zoink.it"].
|
||||||
|
|
||||||
create_req_urls(MagHash) when is_list(MagHash), length(MagHash) == 40 ->
|
create_req_urls(MagHash) when is_list(MagHash), length(MagHash) == 40 ->
|
||||||
U1 = "http://torcache.net/torrent/" ++ MagHash ++ ".torrent",
|
U1 = "http://torcache.net/torrent/" ++ MagHash ++ ".torrent",
|
||||||
U2 = format_btbox_url(MagHash),
|
U2 = format_btbox_url(MagHash),
|
||||||
U3 = "http://torrage.com/torrent/" ++ MagHash ++ ".torrent",
|
U3 = "http://torrage.com/torrent/" ++ MagHash ++ ".torrent",
|
||||||
|
% zoink cause ibrowse crash because error response
|
||||||
% zoink.it support https, but the ssl library seems memory leak
|
% zoink.it support https, but the ssl library seems memory leak
|
||||||
U4 = "http://zoink.it/torrent/" ++ MagHash ++ ".torrent",
|
%U4 = "http://zoink.it/torrent/" ++ MagHash ++ ".torrent",
|
||||||
[U1, U2, U3, U4].
|
[U1, U2, U3].
|
||||||
|
|
||||||
is_ssl_url(URL) when is_list(URL), length(URL) > 4 ->
|
is_ssl_url(URL) when is_list(URL), length(URL) > 4 ->
|
||||||
string:substr(URL, 1, 5) == "https".
|
string:substr(URL, 1, 5) == "https".
|
||||||
@ -187,16 +192,26 @@ format_btbox_url(MagHash) ->
|
|||||||
"http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".
|
"http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".
|
||||||
|
|
||||||
check_error_timeout_reqs(Reqs) ->
|
check_error_timeout_reqs(Reqs) ->
|
||||||
|
Size = gb_trees:size(Reqs),
|
||||||
ReqList = gb_trees:to_list(Reqs),
|
ReqList = gb_trees:to_list(Reqs),
|
||||||
lists:foldl(fun(E, NewReqs) ->
|
NewReqs = lists:foldl(fun(E, NewReqs) ->
|
||||||
check_error_timeout(NewReqs, E)
|
check_error_timeout(NewReqs, E)
|
||||||
end, gb_trees:empty(), ReqList).
|
end, gb_trees:empty(), ReqList),
|
||||||
|
NewSize = gb_trees:size(NewReqs),
|
||||||
|
case NewSize < Size of
|
||||||
|
true ->
|
||||||
|
?E(?FMT("remove ~p timeout download reqs, new size ~p",
|
||||||
|
[Size - NewSize, NewSize]));
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
NewReqs.
|
||||||
|
|
||||||
check_error_timeout(Acc, {ReqID, {MagHash, _, From, _, Start} = Req}) ->
|
check_error_timeout(Acc, {ReqID, {MagHash, _, From, _, Start} = Req}) ->
|
||||||
case ?IS_ERROR_TIMEOUT(Start) of
|
case ?IS_ERROR_TIMEOUT(Start) of
|
||||||
true ->
|
true ->
|
||||||
From ! {got_torrent, failed, MagHash},
|
From ! {got_torrent, failed, MagHash},
|
||||||
?E(?FMT("download req error timeout ~s", [MagHash])),
|
?E(?FMT("download req timeout ~p ~s", [ReqID, MagHash])),
|
||||||
Acc;
|
Acc;
|
||||||
false ->
|
false ->
|
||||||
gb_trees:insert(ReqID, Req, Acc)
|
gb_trees:insert(ReqID, Req, Acc)
|
||||||
|
Loading…
Reference in New Issue
Block a user