mirror of
https://github.com/btdig/dhtcrawler2.git
synced 2025-01-19 12:41:36 +00:00
fix hash_reader stop working bug
This commit is contained in:
parent
1a9f38df49
commit
8ee2e54e3d
@ -45,11 +45,14 @@ handle_info({got_torrent, failed, _Hash}, State) ->
|
|||||||
#state{downloading = D} = State,
|
#state{downloading = D} = State,
|
||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
try_next_download(Conn),
|
try_next_download(Conn),
|
||||||
|
?T(?FMT("got torrent failed ~s", [_Hash])),
|
||||||
{noreply, State#state{downloading = D - 1}};
|
{noreply, State#state{downloading = D - 1}};
|
||||||
|
|
||||||
handle_info({got_torrent, ok, Hash, Content}, State) ->
|
handle_info({got_torrent, ok, Hash, Content}, State) ->
|
||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
|
true = is_binary(Content),
|
||||||
% save the torrent file
|
% save the torrent file
|
||||||
|
?T(?FMT("got torrent ok ~s size ~p", [Hash, byte_size(Content)])),
|
||||||
SaveTor = config:get(save_torrent, true),
|
SaveTor = config:get(save_torrent, true),
|
||||||
if SaveTor -> loc_torrent_cache:save(Conn, Hash, Content); true -> ok end,
|
if SaveTor -> loc_torrent_cache:save(Conn, Hash, Content); true -> ok end,
|
||||||
NewState = got_torrent_content(Conn, State, Hash, Content),
|
NewState = got_torrent_content(Conn, State, Hash, Content),
|
||||||
@ -62,21 +65,21 @@ handle_info({got_torrent_from_cache, Hash, Content}, State) ->
|
|||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
|
?T("handle timeout, try next"),
|
||||||
try_next(Conn),
|
try_next(Conn),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(_, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
% when there's no hash to process
|
% when there's no hash to process
|
||||||
handle_cast(process_download_hash, State) ->
|
handle_info(process_download_hash, State) ->
|
||||||
#state{downloading = D} = State,
|
#state{downloading = D} = State,
|
||||||
NewD = case D >= ?MAX_DOWNLOAD of
|
NewD = case D >= ?MAX_DOWNLOAD of
|
||||||
true ->
|
true ->
|
||||||
% the only thing we can do is just wait
|
% the only thing we can do is just wait
|
||||||
timer:send_after(?WAIT_TIME, timeout),
|
timer:send_after(?WAIT_TIME, timeout),
|
||||||
|
?T(?FMT("reach the max download ~p", [D])),
|
||||||
D;
|
D;
|
||||||
false ->
|
false ->
|
||||||
|
?T(?FMT("start a new download ~p", [D])),
|
||||||
% launch downloader
|
% launch downloader
|
||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
try_next_download(Conn),
|
try_next_download(Conn),
|
||||||
@ -86,10 +89,15 @@ handle_cast(process_download_hash, State) ->
|
|||||||
end,
|
end,
|
||||||
{noreply, State#state{downloading = NewD}};
|
{noreply, State#state{downloading = NewD}};
|
||||||
|
|
||||||
|
handle_info(M, State) ->
|
||||||
|
?W(?FMT("unhandled message ~p", [M])),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
handle_cast({process_hash, Doc, DownloadDoc}, State) ->
|
handle_cast({process_hash, Doc, DownloadDoc}, State) ->
|
||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
{Hash} = bson:lookup(hash, Doc),
|
{Hash} = bson:lookup(hash, Doc),
|
||||||
ListHash = binary_to_list(Hash),
|
ListHash = binary_to_list(Hash),
|
||||||
|
?T(?FMT("process a hash ~s download-doc ~p", [ListHash, DownloadDoc])),
|
||||||
% to avoid register many timers when the hash is empty but download hash is not
|
% to avoid register many timers when the hash is empty but download hash is not
|
||||||
% it also can avoid increase the message queue size, everytime this function get called,
|
% it also can avoid increase the message queue size, everytime this function get called,
|
||||||
% it remove this message and append only another 1.
|
% it remove this message and append only another 1.
|
||||||
@ -115,6 +123,7 @@ try_download(State, Hash, Doc) ->
|
|||||||
#state{downloading = D} = State,
|
#state{downloading = D} = State,
|
||||||
NewDownloading = case D >= ?MAX_DOWNLOAD of
|
NewDownloading = case D >= ?MAX_DOWNLOAD of
|
||||||
true -> % put it into the download queue
|
true -> % put it into the download queue
|
||||||
|
?T(?FMT("reach the max download, insert it to wait queue ~s", [Hash])),
|
||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
insert_to_download_wait(Conn, Doc),
|
insert_to_download_wait(Conn, Doc),
|
||||||
D;
|
D;
|
||||||
@ -130,8 +139,10 @@ do_download(State, Hash) ->
|
|||||||
Conn = db_conn(State),
|
Conn = db_conn(State),
|
||||||
case loc_torrent_cache:load(Conn, Hash) of
|
case loc_torrent_cache:load(Conn, Hash) of
|
||||||
not_found -> % not in the local cache, download it now
|
not_found -> % not in the local cache, download it now
|
||||||
|
?T(?FMT("push a request to tor_download ~s", [Hash])),
|
||||||
tor_download:download(Pid, Hash);
|
tor_download:download(Pid, Hash);
|
||||||
Content -> % process later
|
Content -> % process later
|
||||||
|
?T(?FMT("load a torrent from cache ~s", [Hash])),
|
||||||
on_used_cache(),
|
on_used_cache(),
|
||||||
self() ! {got_torrent_from_cache, Hash, Content}
|
self() ! {got_torrent_from_cache, Hash, Content}
|
||||||
end.
|
end.
|
||||||
@ -167,6 +178,7 @@ got_torrent_content(Conn, State, MagHash, Content) ->
|
|||||||
try_next_download(Conn),
|
try_next_download(Conn),
|
||||||
case catch(torrent_file:parse(Content)) of
|
case catch(torrent_file:parse(Content)) of
|
||||||
{'EXIT', _} ->
|
{'EXIT', _} ->
|
||||||
|
?W(?FMT("parse a torrent failed ~s", [MagHash])),
|
||||||
skip;
|
skip;
|
||||||
{Type, Info} ->
|
{Type, Info} ->
|
||||||
got_torrent(State, MagHash, Type, Info)
|
got_torrent(State, MagHash, Type, Info)
|
||||||
@ -217,7 +229,8 @@ schedule_next({}, true) ->
|
|||||||
ok;
|
ok;
|
||||||
|
|
||||||
schedule_next({}, false) ->
|
schedule_next({}, false) ->
|
||||||
gen_server:cast(self(), process_download_hash);
|
?T("hash is empty, try to startup downloader"),
|
||||||
|
self() ! process_download_hash;
|
||||||
|
|
||||||
schedule_next({Doc}, DownloadDoc) ->
|
schedule_next({Doc}, DownloadDoc) ->
|
||||||
gen_server:cast(self(), {process_hash, Doc, DownloadDoc}).
|
gen_server:cast(self(), {process_hash, Doc, DownloadDoc}).
|
||||||
|
Loading…
Reference in New Issue
Block a user