diff --git a/src/common/config.erl b/src/common/config.erl new file mode 100644 index 0000000..9d0b2cf --- /dev/null +++ b/src/common/config.erl @@ -0,0 +1,75 @@ +%% +%% config.erl +%% Kevin Lynx +%% 07.04.2013 +%% +-module(config). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1, + start_link/2, + stop/0, + get/1, + get/2]). + +start_link(Name) -> + start_link(Name, nil). + +start_link(Name, Fun) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [Name, Fun], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +get(Key) -> + get(Key, nil). + +get(Key, Def) -> + gen_server:call(srv_name(), {get, Key, Def}). + +%% +srv_name() -> + ?MODULE. + +init([File, Fun]) -> + FullPath = in_priv_path(File), + State = case file:consult(FullPath) of + {error, Reason} -> + Config = if Fun == nil -> []; true -> Fun() end, + file:write_file(FullPath, io_lib:fwrite("~p.\n",[Config])), + Config; + {ok, [Config]} -> + io:format("load file ~p success~n", [FullPath]), + Config + end, + {ok, State}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call({get, Key, Def}, _From, Config) -> + Ret = case proplists:is_defined(Key, Config) of + false -> Def; + true -> proplists:get_value(Key, Config) + end, + {reply, Ret, Config}; + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + +in_priv_path(Name) -> + filename:join([filename:dirname(code:which(?MODULE)), "..", "priv", Name]). diff --git a/src/hash_reader/db_hash_reader.erl b/src/hash_reader/db_hash_reader.erl index dd6b141..c575173 100644 --- a/src/hash_reader/db_hash_reader.erl +++ b/src/hash_reader/db_hash_reader.erl @@ -21,6 +21,7 @@ -define(WAIT_TIME, 1*60*1000). % the max concurrent download tasks -define(MAX_DOWNLOAD, 50). +-define(DOWNLOAD_INTERVAL, 100). start_link(DBPool) -> gen_server:start_link(?MODULE, [DBPool], []). @@ -47,16 +48,17 @@ handle_info({got_torrent, failed, _Hash}, State) -> {noreply, State#state{downloading = D - 1}}; handle_info({got_torrent, ok, Hash, Content}, State) -> - #state{downloading = D} = State, Conn = db_conn(State), - try_next_download(Conn), - case catch(torrent_file:parse(Content)) of - {'EXIT', _} -> - State; - {Type, Info} -> - got_torrent(State, Hash, Type, Info) - end, - {noreply, State#state{downloading = D - 1}}; + % save the torrent file + SaveTor = config:get(save_torrent, true), + if SaveTor -> loc_torrent_cache:save(Conn, Hash, Content); true -> ok end, + NewState = got_torrent_content(Conn, State, Hash, Content), + {noreply, NewState}; + +handle_info({got_torrent_from_cache, Hash, Content}, State) -> + Conn = db_conn(State), + NewState = got_torrent_content(Conn, State, Hash, Content), + {noreply, NewState}; handle_info(timeout, State) -> Conn = db_conn(State), @@ -66,6 +68,24 @@ handle_info(timeout, State) -> handle_info(_, State) -> {noreply, State}. +% when there's no hash to process +handle_cast(process_download_hash, State) -> + #state{downloading = D} = State, + NewD = case D >= ?MAX_DOWNLOAD of + true -> + % the only thing we can do is just wait + timer:send_after(?WAIT_TIME, timeout), + D; + false -> + % launch downloader + Conn = db_conn(State), + try_next_download(Conn), + % until the max downloader count reaches + timer:send_after(?DOWNLOAD_INTERVAL, process_download_hash), + D + 1 + end, + {noreply, State#state{downloading = NewD}}; + handle_cast({process_hash, Doc, DownloadDoc}, State) -> Conn = db_conn(State), {Hash} = bson:lookup(hash, Doc), @@ -93,9 +113,9 @@ handle_call(_, _From, State) -> try_download(State, Hash, Doc) -> #state{downloading = D} = State, - Conn = db_conn(State), NewDownloading = case D >= ?MAX_DOWNLOAD of true -> % put it into the download queue + Conn = db_conn(State), insert_to_download_wait(Conn, Doc), D; false -> % download it now @@ -108,11 +128,12 @@ try_download(State, Hash, Doc) -> do_download(State, Hash) -> #state{downloader = Pid} = State, Conn = db_conn(State), - case db_loc_torrent:load(Conn, Hash) of + case loc_torrent_cache:load(Conn, Hash) of not_found -> % not in the local cache, download it now tor_download:download(Pid, Hash); Content -> % process later - self() ! {got_torrent, ok, Hash, Content} + on_used_cache(), + self() ! {got_torrent_from_cache, Hash, Content} end. try_save(State, Hash, Name, Length, Files) -> @@ -124,6 +145,9 @@ try_save(State, Hash, Name, Length, Files) -> on_saved(Conn) end. +on_used_cache() -> + hash_reader_stats:handle_used_cache(). + on_saved(Conn) -> % `get_peers' here means we have processed a request db_system:stats_get_peers(Conn), @@ -138,6 +162,17 @@ on_updated(Conn) -> db_system:stats_updated(Conn), hash_reader_stats:handle_update(). +got_torrent_content(Conn, State, MagHash, Content) -> + #state{downloading = D} = State, + try_next_download(Conn), + case catch(torrent_file:parse(Content)) of + {'EXIT', _} -> + skip; + {Type, Info} -> + got_torrent(State, MagHash, Type, Info) + end, + State#state{downloading = D - 1}. + got_torrent(State, Hash, single, {Name, Length}) -> try_save(State, Hash, Name, Length, []); @@ -162,6 +197,7 @@ try_next_download(Conn) -> end), schedule_next(Doc, true). +% if there's no hash, try `wait_download' try_next(Conn) -> Doc = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> D = mongo:find_one(?HASH_COLLNAME, {}), @@ -181,7 +217,7 @@ schedule_next({}, true) -> ok; schedule_next({}, false) -> - timer:send_after(?WAIT_TIME, timeout); + gen_server:cast(self(), process_download_hash); schedule_next({Doc}, DownloadDoc) -> gen_server:cast(self(), {process_hash, Doc, DownloadDoc}). diff --git a/src/hash_reader/db_hash_reader_sup.erl b/src/hash_reader/db_hash_reader_sup.erl index 204bd8f..0f59aa8 100644 --- a/src/hash_reader/db_hash_reader_sup.erl +++ b/src/hash_reader/db_hash_reader_sup.erl @@ -32,6 +32,7 @@ start_standalone(IP, Port, Size) -> filelib:ensure_dir("log/"), start_dep_apps(), tor_download:start_global(), + config:start_link("hash_reader.config", fun() -> config_default() end), % NOTE: Stats = {db_hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]}, DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]}, @@ -61,3 +62,8 @@ create_child(PoolName, Index) -> child_id(Index) -> list_to_atom(lists:flatten(io_lib:format("db_hash_reader_~p", [Index]))). +config_default() -> + [{save_torrent, true}, + {save_to_db, false}, + {save_to_file, true}, + {torrent_path, "torrents/"}]. diff --git a/src/hash_reader/hash_reader_stats.erl b/src/hash_reader/hash_reader_stats.erl index 5974786..38bfae8 100644 --- a/src/hash_reader/hash_reader_stats.erl +++ b/src/hash_reader/hash_reader_stats.erl @@ -16,8 +16,9 @@ stop/0, handle_update/0, handle_insert/0, + handle_used_cache/0, dump/0]). --record(state, {tref, count, start, name, updated = 0, inserted = 0}). +-record(state, {tref, count, start, name, cache_used = 0, updated = 0, inserted = 0}). -define(STATS_INTERVAL, 10*60*1000). -define(TEXT(Fmt, Arg), lists:flatten(io_lib:format(Fmt, Arg))). @@ -39,6 +40,9 @@ handle_update() -> handle_insert() -> gen_server:cast(srv_name(), inc_inserted). +handle_used_cache() -> + gen_server:cast(srv_name(), inc_cache_used). + srv_name() -> ?MODULE. @@ -73,6 +77,10 @@ handle_cast(inc_updated, State) -> handle_cast(inc_inserted, State) -> #state{inserted = I} = State, {noreply, State#state{inserted = I + 1}}; + +handle_cast(inc_cache_used, State) -> + #state{cache_used = C} = State, + {noreply, State#state{cache_used = C + 1}}; handle_cast(stop, State) -> {stop, normal, State}. @@ -104,18 +112,22 @@ date_string() -> [Year, Month, Day, Hour, Min, Sec])). format_stats(State) -> - #state{count = C, start = Start, updated = U, inserted = I} = State, + #state{count = C, start = Start, cache_used = Cache, updated = U, inserted = I} = State, {Day, {H, M, S}} = stats_time(Start), Mins = Day * 24 * 60 + H * 60 + M, TotalMins = if Mins > 0 -> Mins; true -> 1 end, Speed = (U + I) div TotalMins, + InsertPercent = I * 100 / (if U > 0 -> U; true -> 1 end), + CachePercent = Cache * 100 / (if I > 0 -> I; true -> 1 end), ?TEXT(" stats time ~b ~2.10.0b:~2.10.0b:~2.10.0b~n", [Day, H, M, S]) ++ ?TEXT(" Reader count ~p~n", [C]) ++ ?TEXT(" Process speed ~p req/min~n", [Speed]) ++ ?TEXT(" Download torrents speed ~p tor/min~n", [I div TotalMins]) ++ ?TEXT(" Updated ~p~n", [U]) ++ - ?TEXT(" Inserted ~p~n", [I]). + ?TEXT(" Inserted ~p~n", [I]) ++ + ?TEXT(" Inserted percentage ~.2f%~n", [InsertPercent]) ++ + ?TEXT(" Used cache torrents ~p (~.2f%)~n", [Cache, CachePercent]). format_download_stats() -> Start = now(), diff --git a/src/local_torrent/loc_torrent.erl b/src/local_torrent/loc_torrent.erl index 0ede5ed..37e8c80 100644 --- a/src/local_torrent/loc_torrent.erl +++ b/src/local_torrent/loc_torrent.erl @@ -43,16 +43,16 @@ handle_call(_, _From, State) -> {noreply, State}. handle_info({got_torrent, failed, _Hash}, State) -> - #state{downcount = C} = State, {_, NewState} = download_next(State), - {noreply, NewState#state{downcount = C - 1}}; + #state{downcount = NewC} = NewState, + {noreply, NewState#state{downcount = NewC - 1}}; handle_info({got_torrent, ok, Hash, Content}, State) -> - #state{downcount = C} = State, Conn = db_conn(State), - db_loc_torrent:save(Conn, Hash, Content), + loc_torrent_cache:save(Conn, Hash, Content), {_, NewState} = download_next(State), - {noreply, NewState#state{downcount = C - 1}}; + #state{downcount = NewC} = NewState, + {noreply, NewState#state{downcount = NewC - 1}}; handle_info(timeout, State) -> {Status, NewState} = download_next(State), @@ -80,6 +80,7 @@ download_next(#state{downcount = Count} = State) when Count < ?MAX_DOWNLOAD -> {Status, State#state{downcount = NewCount}}; download_next(State) -> + wait_download(), {wait, State}. wait_download() -> diff --git a/src/local_torrent/loc_torrent_cache.erl b/src/local_torrent/loc_torrent_cache.erl new file mode 100644 index 0000000..b66bbf3 --- /dev/null +++ b/src/local_torrent/loc_torrent_cache.erl @@ -0,0 +1,51 @@ +%% +%% loc_torrent_cache.erl +%% Kevin Lynx +%% 07.05.2013 +%% +-module(loc_torrent_cache). +-include("vlog.hrl"). +-export([save/3, load/2]). + +save(Conn, MagHash, Content) when is_list(MagHash), length(MagHash) == 40 -> + SaveDB = config:get(save_to_db, false), + if SaveDB -> db_loc_torrent:save(Conn, MagHash, Content); + true -> ok end, + SaveFile = config:get(save_to_file, true), + if SaveFile -> save_to_file(MagHash, Content); + true -> ok end, + ok. + +load(Conn, MagHash) when is_list(MagHash), length(MagHash) == 40 -> + case load_from_file(MagHash) of + not_found -> + db_loc_torrent:load(Conn, MagHash); + Content -> + Content + end. + +%% TODO: put these file-realted codes to another module +save_to_file(MagHash, Content) -> + FileName = torrent_file_name(MagHash), + case file:write_file(FileName, Content) of + ok -> ok; + {error, Reason} -> + ?E(?FMT("save torrent ~s on disk failed ~p", [FileName, Reason])) + end. + +load_from_file(MagHash) -> + FileName = torrent_file_name(MagHash), + case file:read_file(FileName) of + {ok, Content} -> + Content; + {error, _} -> + not_found + end. + +% path/AA/BB/AABBxxxxx.torrent +torrent_file_name(MagHash) -> + Path = config:get(torrent_path, "torrents/"), + FullPath = Path ++ lists:sublist(MagHash, 1, 2) ++ "/" ++ + lists:sublist(MagHash, 3, 2) ++ "/", + filelib:ensure_dir(FullPath), + FullPath ++ MagHash ++ ".torrent". diff --git a/src/local_torrent/loc_torrent_sup.erl b/src/local_torrent/loc_torrent_sup.erl index a0051b5..69f588f 100644 --- a/src/local_torrent/loc_torrent_sup.erl +++ b/src/local_torrent/loc_torrent_sup.erl @@ -32,10 +32,12 @@ start_standalone(IP, Port, Size) -> io:format("db: ~p:~p downloader count ~p~n", [IP, Port, Size]), start_dep_apps(), tor_download:start_global(), + config:start_link("torcache.config", fun() -> config_default() end), start_link(IP, Port, Size, []). start_link(IP, Port, Size) -> - start_link(IP, Port, Size, []). + OtherProcess = [], + start_link(IP, Port, Size, OtherProcess). start_link(IP, Port, Size, OtherProcess) -> PoolName = loc_torrent_db_pool, @@ -57,3 +59,9 @@ create_child(PoolName, Index) -> child_id(Index) -> list_to_atom(lists:flatten(io_lib:format("loc_torrent_~p", [Index]))). +config_default() -> + [{save_to_db, false}, + {save_to_file, true}, + {torrent_path, "torrents/"}]. + + diff --git a/tools/HISTORY.md b/tools/HISTORY.md new file mode 100644 index 0000000..dd3eb80 --- /dev/null +++ b/tools/HISTORY.md @@ -0,0 +1,5 @@ +## 07.05.2013 + +* add torrent downloader which will download torrents and store them in database or local file system +* hash_reader now use local torrents first, if not it will download, and depends on the config it may save the file too + diff --git a/tools/win_start_torcache.bat b/tools/win_start_torcache.bat new file mode 100644 index 0000000..6426486 --- /dev/null +++ b/tools/win_start_torcache.bat @@ -0,0 +1 @@ +erl -pa ebin -noshell -run loc_torrent_sup start_standalone localhost 27017 20