see master branch for logs

This commit is contained in:
Kevin Lynx 2013-07-06 00:18:56 +08:00
parent 5b344c4fe2
commit 2944019ca1
9 changed files with 217 additions and 22 deletions

75
src/common/config.erl Normal file
View File

@ -0,0 +1,75 @@
%%
%% config.erl
%% Kevin Lynx
%% 07.04.2013
%%
-module(config).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1,
start_link/2,
stop/0,
get/1,
get/2]).
start_link(Name) ->
start_link(Name, nil).
start_link(Name, Fun) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [Name, Fun], []).
stop() ->
gen_server:cast(srv_name(), stop).
get(Key) ->
get(Key, nil).
get(Key, Def) ->
gen_server:call(srv_name(), {get, Key, Def}).
%%
srv_name() ->
?MODULE.
init([File, Fun]) ->
FullPath = in_priv_path(File),
State = case file:consult(FullPath) of
{error, Reason} ->
Config = if Fun == nil -> []; true -> Fun() end,
file:write_file(FullPath, io_lib:fwrite("~p.\n",[Config])),
Config;
{ok, [Config]} ->
io:format("load file ~p success~n", [FullPath]),
Config
end,
{ok, State}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call({get, Key, Def}, _From, Config) ->
Ret = case proplists:is_defined(Key, Config) of
false -> Def;
true -> proplists:get_value(Key, Config)
end,
{reply, Ret, Config};
handle_call(_, _From, State) ->
{noreply, State}.
handle_info(_, State) ->
{noreply, State}.
in_priv_path(Name) ->
filename:join([filename:dirname(code:which(?MODULE)), "..", "priv", Name]).

View File

@ -21,6 +21,7 @@
-define(WAIT_TIME, 1*60*1000).
% the max concurrent download tasks
-define(MAX_DOWNLOAD, 50).
-define(DOWNLOAD_INTERVAL, 100).
start_link(DBPool) ->
gen_server:start_link(?MODULE, [DBPool], []).
@ -47,16 +48,17 @@ handle_info({got_torrent, failed, _Hash}, State) ->
{noreply, State#state{downloading = D - 1}};
handle_info({got_torrent, ok, Hash, Content}, State) ->
#state{downloading = D} = State,
Conn = db_conn(State),
try_next_download(Conn),
case catch(torrent_file:parse(Content)) of
{'EXIT', _} ->
State;
{Type, Info} ->
got_torrent(State, Hash, Type, Info)
end,
{noreply, State#state{downloading = D - 1}};
% save the torrent file
SaveTor = config:get(save_torrent, true),
if SaveTor -> loc_torrent_cache:save(Conn, Hash, Content); true -> ok end,
NewState = got_torrent_content(Conn, State, Hash, Content),
{noreply, NewState};
handle_info({got_torrent_from_cache, Hash, Content}, State) ->
Conn = db_conn(State),
NewState = got_torrent_content(Conn, State, Hash, Content),
{noreply, NewState};
handle_info(timeout, State) ->
Conn = db_conn(State),
@ -66,6 +68,24 @@ handle_info(timeout, State) ->
handle_info(_, State) ->
{noreply, State}.
% when there's no hash to process
handle_cast(process_download_hash, State) ->
#state{downloading = D} = State,
NewD = case D >= ?MAX_DOWNLOAD of
true ->
% the only thing we can do is just wait
timer:send_after(?WAIT_TIME, timeout),
D;
false ->
% launch downloader
Conn = db_conn(State),
try_next_download(Conn),
% until the max downloader count reaches
timer:send_after(?DOWNLOAD_INTERVAL, process_download_hash),
D + 1
end,
{noreply, State#state{downloading = NewD}};
handle_cast({process_hash, Doc, DownloadDoc}, State) ->
Conn = db_conn(State),
{Hash} = bson:lookup(hash, Doc),
@ -93,9 +113,9 @@ handle_call(_, _From, State) ->
try_download(State, Hash, Doc) ->
#state{downloading = D} = State,
Conn = db_conn(State),
NewDownloading = case D >= ?MAX_DOWNLOAD of
true -> % put it into the download queue
Conn = db_conn(State),
insert_to_download_wait(Conn, Doc),
D;
false -> % download it now
@ -108,11 +128,12 @@ try_download(State, Hash, Doc) ->
do_download(State, Hash) ->
#state{downloader = Pid} = State,
Conn = db_conn(State),
case db_loc_torrent:load(Conn, Hash) of
case loc_torrent_cache:load(Conn, Hash) of
not_found -> % not in the local cache, download it now
tor_download:download(Pid, Hash);
Content -> % process later
self() ! {got_torrent, ok, Hash, Content}
on_used_cache(),
self() ! {got_torrent_from_cache, Hash, Content}
end.
try_save(State, Hash, Name, Length, Files) ->
@ -124,6 +145,9 @@ try_save(State, Hash, Name, Length, Files) ->
on_saved(Conn)
end.
on_used_cache() ->
hash_reader_stats:handle_used_cache().
on_saved(Conn) ->
% `get_peers' here means we have processed a request
db_system:stats_get_peers(Conn),
@ -138,6 +162,17 @@ on_updated(Conn) ->
db_system:stats_updated(Conn),
hash_reader_stats:handle_update().
got_torrent_content(Conn, State, MagHash, Content) ->
#state{downloading = D} = State,
try_next_download(Conn),
case catch(torrent_file:parse(Content)) of
{'EXIT', _} ->
skip;
{Type, Info} ->
got_torrent(State, MagHash, Type, Info)
end,
State#state{downloading = D - 1}.
got_torrent(State, Hash, single, {Name, Length}) ->
try_save(State, Hash, Name, Length, []);
@ -162,6 +197,7 @@ try_next_download(Conn) ->
end),
schedule_next(Doc, true).
% if there's no hash, try `wait_download'
try_next(Conn) ->
Doc = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
D = mongo:find_one(?HASH_COLLNAME, {}),
@ -181,7 +217,7 @@ schedule_next({}, true) ->
ok;
schedule_next({}, false) ->
timer:send_after(?WAIT_TIME, timeout);
gen_server:cast(self(), process_download_hash);
schedule_next({Doc}, DownloadDoc) ->
gen_server:cast(self(), {process_hash, Doc, DownloadDoc}).

View File

@ -32,6 +32,7 @@ start_standalone(IP, Port, Size) ->
filelib:ensure_dir("log/"),
start_dep_apps(),
tor_download:start_global(),
config:start_link("hash_reader.config", fun() -> config_default() end),
% NOTE:
Stats = {db_hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]},
DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]},
@ -61,3 +62,8 @@ create_child(PoolName, Index) ->
child_id(Index) ->
list_to_atom(lists:flatten(io_lib:format("db_hash_reader_~p", [Index]))).
config_default() ->
[{save_torrent, true},
{save_to_db, false},
{save_to_file, true},
{torrent_path, "torrents/"}].

View File

@ -16,8 +16,9 @@
stop/0,
handle_update/0,
handle_insert/0,
handle_used_cache/0,
dump/0]).
-record(state, {tref, count, start, name, updated = 0, inserted = 0}).
-record(state, {tref, count, start, name, cache_used = 0, updated = 0, inserted = 0}).
-define(STATS_INTERVAL, 10*60*1000).
-define(TEXT(Fmt, Arg), lists:flatten(io_lib:format(Fmt, Arg))).
@ -39,6 +40,9 @@ handle_update() ->
handle_insert() ->
gen_server:cast(srv_name(), inc_inserted).
handle_used_cache() ->
gen_server:cast(srv_name(), inc_cache_used).
srv_name() ->
?MODULE.
@ -73,6 +77,10 @@ handle_cast(inc_updated, State) ->
handle_cast(inc_inserted, State) ->
#state{inserted = I} = State,
{noreply, State#state{inserted = I + 1}};
handle_cast(inc_cache_used, State) ->
#state{cache_used = C} = State,
{noreply, State#state{cache_used = C + 1}};
handle_cast(stop, State) ->
{stop, normal, State}.
@ -104,18 +112,22 @@ date_string() ->
[Year, Month, Day, Hour, Min, Sec])).
format_stats(State) ->
#state{count = C, start = Start, updated = U, inserted = I} = State,
#state{count = C, start = Start, cache_used = Cache, updated = U, inserted = I} = State,
{Day, {H, M, S}} = stats_time(Start),
Mins = Day * 24 * 60 + H * 60 + M,
TotalMins = if Mins > 0 -> Mins; true -> 1 end,
Speed = (U + I) div TotalMins,
InsertPercent = I * 100 / (if U > 0 -> U; true -> 1 end),
CachePercent = Cache * 100 / (if I > 0 -> I; true -> 1 end),
?TEXT(" stats time ~b ~2.10.0b:~2.10.0b:~2.10.0b~n",
[Day, H, M, S]) ++
?TEXT(" Reader count ~p~n", [C]) ++
?TEXT(" Process speed ~p req/min~n", [Speed]) ++
?TEXT(" Download torrents speed ~p tor/min~n", [I div TotalMins]) ++
?TEXT(" Updated ~p~n", [U]) ++
?TEXT(" Inserted ~p~n", [I]).
?TEXT(" Inserted ~p~n", [I]) ++
?TEXT(" Inserted percentage ~.2f%~n", [InsertPercent]) ++
?TEXT(" Used cache torrents ~p (~.2f%)~n", [Cache, CachePercent]).
format_download_stats() ->
Start = now(),

View File

@ -43,16 +43,16 @@ handle_call(_, _From, State) ->
{noreply, State}.
handle_info({got_torrent, failed, _Hash}, State) ->
#state{downcount = C} = State,
{_, NewState} = download_next(State),
{noreply, NewState#state{downcount = C - 1}};
#state{downcount = NewC} = NewState,
{noreply, NewState#state{downcount = NewC - 1}};
handle_info({got_torrent, ok, Hash, Content}, State) ->
#state{downcount = C} = State,
Conn = db_conn(State),
db_loc_torrent:save(Conn, Hash, Content),
loc_torrent_cache:save(Conn, Hash, Content),
{_, NewState} = download_next(State),
{noreply, NewState#state{downcount = C - 1}};
#state{downcount = NewC} = NewState,
{noreply, NewState#state{downcount = NewC - 1}};
handle_info(timeout, State) ->
{Status, NewState} = download_next(State),
@ -80,6 +80,7 @@ download_next(#state{downcount = Count} = State) when Count < ?MAX_DOWNLOAD ->
{Status, State#state{downcount = NewCount}};
download_next(State) ->
wait_download(),
{wait, State}.
wait_download() ->

View File

@ -0,0 +1,51 @@
%%
%% loc_torrent_cache.erl
%% Kevin Lynx
%% 07.05.2013
%%
-module(loc_torrent_cache).
-include("vlog.hrl").
-export([save/3, load/2]).
save(Conn, MagHash, Content) when is_list(MagHash), length(MagHash) == 40 ->
SaveDB = config:get(save_to_db, false),
if SaveDB -> db_loc_torrent:save(Conn, MagHash, Content);
true -> ok end,
SaveFile = config:get(save_to_file, true),
if SaveFile -> save_to_file(MagHash, Content);
true -> ok end,
ok.
load(Conn, MagHash) when is_list(MagHash), length(MagHash) == 40 ->
case load_from_file(MagHash) of
not_found ->
db_loc_torrent:load(Conn, MagHash);
Content ->
Content
end.
%% TODO: put these file-realted codes to another module
save_to_file(MagHash, Content) ->
FileName = torrent_file_name(MagHash),
case file:write_file(FileName, Content) of
ok -> ok;
{error, Reason} ->
?E(?FMT("save torrent ~s on disk failed ~p", [FileName, Reason]))
end.
load_from_file(MagHash) ->
FileName = torrent_file_name(MagHash),
case file:read_file(FileName) of
{ok, Content} ->
Content;
{error, _} ->
not_found
end.
% path/AA/BB/AABBxxxxx.torrent
torrent_file_name(MagHash) ->
Path = config:get(torrent_path, "torrents/"),
FullPath = Path ++ lists:sublist(MagHash, 1, 2) ++ "/" ++
lists:sublist(MagHash, 3, 2) ++ "/",
filelib:ensure_dir(FullPath),
FullPath ++ MagHash ++ ".torrent".

View File

@ -32,10 +32,12 @@ start_standalone(IP, Port, Size) ->
io:format("db: ~p:~p downloader count ~p~n", [IP, Port, Size]),
start_dep_apps(),
tor_download:start_global(),
config:start_link("torcache.config", fun() -> config_default() end),
start_link(IP, Port, Size, []).
start_link(IP, Port, Size) ->
start_link(IP, Port, Size, []).
OtherProcess = [],
start_link(IP, Port, Size, OtherProcess).
start_link(IP, Port, Size, OtherProcess) ->
PoolName = loc_torrent_db_pool,
@ -57,3 +59,9 @@ create_child(PoolName, Index) ->
child_id(Index) ->
list_to_atom(lists:flatten(io_lib:format("loc_torrent_~p", [Index]))).
config_default() ->
[{save_to_db, false},
{save_to_file, true},
{torrent_path, "torrents/"}].

5
tools/HISTORY.md Normal file
View File

@ -0,0 +1,5 @@
## 07.05.2013
* add torrent downloader which will download torrents and store them in database or local file system
* hash_reader now use local torrents first, if not it will download, and depends on the config it may save the file too

View File

@ -0,0 +1 @@
erl -pa ebin -noshell -run loc_torrent_sup start_standalone localhost 27017 20