add torrent local cache, not tested now

This commit is contained in:
Kevin Lynx 2013-07-03 22:56:06 +08:00
parent 0dcf0854a8
commit 5b344c4fe2
9 changed files with 272 additions and 14 deletions

View File

@ -26,3 +26,7 @@
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/local_torrent/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.

View File

@ -67,7 +67,6 @@ handle_info(_, State) ->
{noreply, State}.
handle_cast({process_hash, Doc, DownloadDoc}, State) ->
#state{downloader = DownPid} = State,
Conn = db_conn(State),
{Hash} = bson:lookup(hash, Doc),
ListHash = binary_to_list(Hash),
@ -82,7 +81,7 @@ handle_cast({process_hash, Doc, DownloadDoc}, State) ->
State;
false ->
?T(?FMT("start to download the torrent ~s", [ListHash])),
try_download(State, DownPid, ListHash, Doc)
try_download(State, ListHash, Doc)
end,
{noreply, NewState};
@ -92,7 +91,7 @@ handle_cast(stop, State) ->
handle_call(_, _From, State) ->
{noreply, State}.
try_download(State, Pid, Hash, Doc) ->
try_download(State, Hash, Doc) ->
#state{downloading = D} = State,
Conn = db_conn(State),
NewDownloading = case D >= ?MAX_DOWNLOAD of
@ -100,11 +99,22 @@ try_download(State, Pid, Hash, Doc) ->
insert_to_download_wait(Conn, Doc),
D;
false -> % download it now
tor_download:download(Pid, Hash),
do_download(State, Hash),
D + 1
end,
State#state{downloading = NewDownloading}.
% and now we can retrieve the torrent from local cache
do_download(State, Hash) ->
#state{downloader = Pid} = State,
Conn = db_conn(State),
case db_loc_torrent:load(Conn, Hash) of
not_found -> % not in the local cache, download it now
tor_download:download(Pid, Hash);
Content -> % process later
self() ! {got_torrent, ok, Hash, Content}
end.
try_save(State, Hash, Name, Length, Files) ->
Conn = db_conn(State),
case catch db_store_mongo:insert(Conn, Hash, Name, Length, Files) of

View File

@ -51,7 +51,8 @@ stats() ->
TorSum = db_store_mongo:count(Conn),
D1 = db_system:stats_day_at_slave(Conn, DaySecs),
D2 = db_system:stats_day_at_slave(Conn, DaySecs - ?ONEDAY_SECS),
{TorSum, [decode_stats(D1), decode_stats(D2)]}.
D3 = db_system:stats_day_at_slave(Conn, DaySecs - 2 * ?ONEDAY_SECS),
{TorSum, [decode_stats(D1), decode_stats(D2), decode_stats(D3)]}.
decode_stats(Stats) ->
{DaySec} = bson:lookup('_id', Stats),

View File

@ -14,10 +14,11 @@
-export([start_link/0,
stop/0,
search/1,
stats/0,
today_top/0]).
-export([async_update/2]).
-record(state, {cache}).
-define(OUT_OF_DATE, 5*60*1000).
-define(OUT_OF_DATE, 10*60*1000).
-define(CACHE_SIZE, 1000).
start_link() ->
@ -32,6 +33,9 @@ search(Key) ->
today_top() ->
gen_server:call(srv_name(), {query, top}).
stats() ->
gen_server:call(srv_name(), {query, stats}).
init([]) ->
{ok, #state{cache = gb_trees:empty()}, 0}.
@ -48,6 +52,7 @@ handle_cast(decrease_cache, State) ->
#state{cache = Cache} = State,
NewCache = remove_oldest(Cache),
spawn_update(top), % make sure `top' exists
spawn_update(stats),
{noreply, State#state{cache = NewCache}};
handle_cast({update, Type}, State) ->
@ -66,6 +71,7 @@ handle_call(_, _From, State) ->
handle_info(timeout, State) ->
spawn_update(top),
spawn_update(stats),
{noreply, State};
handle_info({enter_cache, Type, Time, Ret}, State) ->
@ -103,17 +109,22 @@ do_update({search, Key}) ->
db_frontend:search(Key);
do_update(top) ->
db_frontend:today_top().
db_frontend:today_top();
do_update(stats) ->
db_frontend:stats().
do_query(Type, #state{cache = Cache} = State) ->
{Start, Ret} = gb_trees:get(Type, Cache),
case is_outofdate(Start) of
NewCache = case is_outofdate(Start) of
true ->
spawn_update(Type);
spawn_update(Type),
% update the time so that it will not be scheduled more time
gb_trees:enter(Type, {now(), Ret}, Cache);
false ->
ok
Cache
end,
{State, Ret}.
{State#state{cache = NewCache}, Ret}.
is_outofdate(Time) ->
(timer:now_diff(now(), Time) div 1000) > ?OUT_OF_DATE.

View File

@ -8,6 +8,7 @@
test_search/1,
index/3,
stats/3,
real_stats/3,
recent/3,
today_top/3,
top/3]).
@ -47,13 +48,20 @@ recent(SessionID, _Env, _Input) ->
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
stats(SessionID, _Env, _Input) ->
{TorSum, StatsList} = db_frontend:stats(),
Response = format_stats_list(http_cache:stats()),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
real_stats(SessionID, _Env, _Input) ->
Response = format_stats_list(db_frontend:stats()),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
format_stats_list(Stats) ->
{TorSum, StatsList} = Stats,
Body = ?TEXT("<h3>total ~p torrents</h3>", [TorSum]) ++
"<ul>" ++
format_stats(StatsList) ++
"</ul>",
Response = simple_html("", Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
simple_html("", Body).
index(SessionID, _Env, Input) ->
Body = case get_index_hash(Input) of
@ -89,6 +97,9 @@ test_search(Keyword) ->
Body = do_search(Keyword),
file:write_file(Filename, simple_html(Keyword, Body)).
do_search(Keyword) when length(Keyword) =< 1 ->
too_short_tip();
do_search(Keyword) ->
{Rets, Stats} = http_cache:search(Keyword),
{_Found, Cost, Scanned} = Stats,
@ -164,3 +175,6 @@ format_date_string(Secs) ->
{{Y, M, D}, _} = time_util:seconds_to_local_time(Secs),
?TEXT("~b-~2..0b-~2..0b", [Y, M, D]).
too_short_tip() ->
"too short keyword, you're going to kill me, enjoy this " ++
"<a href='/e/http_handler:search?q=girl'>girl</a>".

View File

@ -0,0 +1,3 @@
## local torrent
Use torrent downloader to download torrent files and store them in local database.

View File

@ -0,0 +1,66 @@
%%
%% db_loc_torrent.erl
%% Kevin Lynx
%% 07.03.2013
%%
-module(db_loc_torrent).
-include("db_common.hrl").
-define(DBNAME, torfiles).
-define(COLLNAME, torrents).
-export([load_hash/1,
save/3,
load/2]).
-export([test_load/1]).
% return MagHash as a list
load_hash(Conn) ->
% db.runCommand({findAndModify:"wait_download",query:{cached:{$ne:1}},update:{$inc:{cached:1}}})
Cmd = {findAndModify, ?HASH_DOWNLOAD_COLL, query, {cached, {'$ne', 1}},
update, {'$set', {cached, 1}}, fields, {hash, 1}},
Ret = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
mongo:command(Cmd)
end),
case bson:lookup(value, Ret) of
{undefined} ->
[];
{} ->
[];
{Obj} ->
{BinHash} = bson:lookup(hash, Obj),
binary_to_list(BinHash)
end.
save(Conn, MagHash, Content) ->
Doc = torrent_doc(MagHash, Content),
mongo:do(unsafe, master, Conn, ?DBNAME, fun() ->
Sel = {'_id', list_to_binary(MagHash)},
mongo:update(?COLLNAME, Sel, Doc, true)
end).
% load a torrent file
load(Conn, MagHash) when is_list(MagHash) ->
Ret = mongo:do(safe, master, Conn, ?DBNAME, fun() ->
Sel = {'_id', list_to_binary(MagHash)},
mongo:find_one(?COLLNAME, Sel)
end),
case Ret of
{} ->
not_found;
{Doc} ->
{{bin, bin, Content}} = bson:lookup(content, Doc),
Content
end.
torrent_doc(MagHash, Content) when is_binary(Content), is_list(MagHash) ->
{'_id', list_to_binary(MagHash),
content, {bin, bin, Content}}.
%%
test_load(MagHash) ->
{ok, Conn} = mongo_connection:start_link({localhost, 27017}),
Content = load(Conn, MagHash),
Name = MagHash ++ ".torrent",
file:write_file(Name, Content),
Name.

View File

@ -0,0 +1,90 @@
%%
%% loc_torrent.erl
%% Kevin Lynx
%% 07.03.2013
%%
-module(loc_torrent).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1,
stop/1]).
-record(state, {downcount = 0, dbpool, downloader}).
% when the wait_download is empty,
-define(WAIT_TIME, 2*60*1000).
-define(INTERVAL, 100).
-define(MAX_DOWNLOAD, 50).
start_link(DBPool) ->
gen_server:start_link(?MODULE, [DBPool], []).
stop(Pid) ->
gen_server:cast(Pid, stop).
%%
init([DBPool]) ->
{ok, DownPid} = tor_download:start_link(),
{ok, #state{dbpool = DBPool, downloader = DownPid}, 0}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
handle_info({got_torrent, failed, _Hash}, State) ->
#state{downcount = C} = State,
{_, NewState} = download_next(State),
{noreply, NewState#state{downcount = C - 1}};
handle_info({got_torrent, ok, Hash, Content}, State) ->
#state{downcount = C} = State,
Conn = db_conn(State),
db_loc_torrent:save(Conn, Hash, Content),
{_, NewState} = download_next(State),
{noreply, NewState#state{downcount = C - 1}};
handle_info(timeout, State) ->
{Status, NewState} = download_next(State),
if Status == ok -> schedule_best(); true -> ok end,
{noreply, NewState};
handle_info(_, State) ->
{noreply, State}.
% to avoid the high cpu/network at startup
schedule_best() ->
timer:send_after(?INTERVAL, timeout).
download_next(#state{downcount = Count} = State) when Count < ?MAX_DOWNLOAD ->
#state{downloader = DownPid} = State,
Conn = db_conn(State),
{Status, NewCount} = case db_loc_torrent:load_hash(Conn) of
[] ->
wait_download(),
{wait, Count};
Hash ->
tor_download:download(DownPid, Hash),
{ok, Count + 1}
end,
{Status, State#state{downcount = NewCount}};
download_next(State) ->
{wait, State}.
wait_download() ->
timer:send_after(?WAIT_TIME, timeout).
db_conn(State) ->
#state{dbpool = DBPool} = State,
mongo_pool:get(DBPool).

View File

@ -0,0 +1,59 @@
%%
%% loc_torrent_sup.erl
%% Kevin Lynx
%% 07.03.2013
%%
-module(loc_torrent_sup).
-behaviour(supervisor).
-export([init/1]).
-export([start_link/3,
start_dep_apps/0,
start_standalone/3,
start_standalone/1]).
start_dep_apps() ->
code:add_path("deps/bson/ebin"),
code:add_path("deps/mongodb/ebin"),
code:add_path("deps/kdht/ebin"),
code:add_path("deps/ibrowse/ebin"),
Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps].
start_standalone([IP, Port, Size]) ->
IPort = list_to_integer(Port),
ISize = list_to_integer(Size),
start_standalone(IP, IPort, ISize),
receive
fuck_erl_s_option -> ok
end.
start_standalone(IP, Port, Size) ->
io:format("loc torrent (torrent downloader) startup~n", []),
io:format("db: ~p:~p downloader count ~p~n", [IP, Port, Size]),
start_dep_apps(),
tor_download:start_global(),
start_link(IP, Port, Size, []).
start_link(IP, Port, Size) ->
start_link(IP, Port, Size, []).
start_link(IP, Port, Size, OtherProcess) ->
PoolName = loc_torrent_db_pool,
mongo_sup:start_pool(PoolName, 5, {IP, Port}),
supervisor:start_link({local, srv_name()}, ?MODULE, [PoolName, Size, OtherProcess]).
srv_name() ->
?MODULE.
init([PoolName, Size, OtherProcess]) ->
Spec = {one_for_one, 1, 600},
Children = OtherProcess ++ [create_child(PoolName, Index) || Index <- lists:seq(1, Size)],
{ok, {Spec, Children}}.
create_child(PoolName, Index) ->
{child_id(Index), {loc_torrent, start_link, [PoolName]},
permanent, 1000, worker, dynamic}.
child_id(Index) ->
list_to_atom(lists:flatten(io_lib:format("loc_torrent_~p", [Index]))).