diff --git a/Emakefile b/Emakefile index 9d64f5f..965adfa 100644 --- a/Emakefile +++ b/Emakefile @@ -26,3 +26,7 @@ [debug_info, {i, "include"}, {outdir,"ebin"}]}. +{'src/local_torrent/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. diff --git a/src/hash_reader/db_hash_reader.erl b/src/hash_reader/db_hash_reader.erl index ee6b0c4..dd6b141 100644 --- a/src/hash_reader/db_hash_reader.erl +++ b/src/hash_reader/db_hash_reader.erl @@ -67,7 +67,6 @@ handle_info(_, State) -> {noreply, State}. handle_cast({process_hash, Doc, DownloadDoc}, State) -> - #state{downloader = DownPid} = State, Conn = db_conn(State), {Hash} = bson:lookup(hash, Doc), ListHash = binary_to_list(Hash), @@ -82,7 +81,7 @@ handle_cast({process_hash, Doc, DownloadDoc}, State) -> State; false -> ?T(?FMT("start to download the torrent ~s", [ListHash])), - try_download(State, DownPid, ListHash, Doc) + try_download(State, ListHash, Doc) end, {noreply, NewState}; @@ -92,7 +91,7 @@ handle_cast(stop, State) -> handle_call(_, _From, State) -> {noreply, State}. -try_download(State, Pid, Hash, Doc) -> +try_download(State, Hash, Doc) -> #state{downloading = D} = State, Conn = db_conn(State), NewDownloading = case D >= ?MAX_DOWNLOAD of @@ -100,11 +99,22 @@ try_download(State, Pid, Hash, Doc) -> insert_to_download_wait(Conn, Doc), D; false -> % download it now - tor_download:download(Pid, Hash), + do_download(State, Hash), D + 1 end, State#state{downloading = NewDownloading}. +% and now we can retrieve the torrent from local cache +do_download(State, Hash) -> + #state{downloader = Pid} = State, + Conn = db_conn(State), + case db_loc_torrent:load(Conn, Hash) of + not_found -> % not in the local cache, download it now + tor_download:download(Pid, Hash); + Content -> % process later + self() ! {got_torrent, ok, Hash, Content} + end. + try_save(State, Hash, Name, Length, Files) -> Conn = db_conn(State), case catch db_store_mongo:insert(Conn, Hash, Name, Length, Files) of diff --git a/src/http_front/db_frontend.erl b/src/http_front/db_frontend.erl index 95f764f..609a21c 100644 --- a/src/http_front/db_frontend.erl +++ b/src/http_front/db_frontend.erl @@ -51,7 +51,8 @@ stats() -> TorSum = db_store_mongo:count(Conn), D1 = db_system:stats_day_at_slave(Conn, DaySecs), D2 = db_system:stats_day_at_slave(Conn, DaySecs - ?ONEDAY_SECS), - {TorSum, [decode_stats(D1), decode_stats(D2)]}. + D3 = db_system:stats_day_at_slave(Conn, DaySecs - 2 * ?ONEDAY_SECS), + {TorSum, [decode_stats(D1), decode_stats(D2), decode_stats(D3)]}. decode_stats(Stats) -> {DaySec} = bson:lookup('_id', Stats), diff --git a/src/http_front/http_cache.erl b/src/http_front/http_cache.erl index 6e284ba..7091316 100644 --- a/src/http_front/http_cache.erl +++ b/src/http_front/http_cache.erl @@ -14,10 +14,11 @@ -export([start_link/0, stop/0, search/1, + stats/0, today_top/0]). -export([async_update/2]). -record(state, {cache}). --define(OUT_OF_DATE, 5*60*1000). +-define(OUT_OF_DATE, 10*60*1000). -define(CACHE_SIZE, 1000). start_link() -> @@ -32,6 +33,9 @@ search(Key) -> today_top() -> gen_server:call(srv_name(), {query, top}). +stats() -> + gen_server:call(srv_name(), {query, stats}). + init([]) -> {ok, #state{cache = gb_trees:empty()}, 0}. @@ -48,6 +52,7 @@ handle_cast(decrease_cache, State) -> #state{cache = Cache} = State, NewCache = remove_oldest(Cache), spawn_update(top), % make sure `top' exists + spawn_update(stats), {noreply, State#state{cache = NewCache}}; handle_cast({update, Type}, State) -> @@ -66,6 +71,7 @@ handle_call(_, _From, State) -> handle_info(timeout, State) -> spawn_update(top), + spawn_update(stats), {noreply, State}; handle_info({enter_cache, Type, Time, Ret}, State) -> @@ -103,17 +109,22 @@ do_update({search, Key}) -> db_frontend:search(Key); do_update(top) -> - db_frontend:today_top(). + db_frontend:today_top(); + +do_update(stats) -> + db_frontend:stats(). do_query(Type, #state{cache = Cache} = State) -> {Start, Ret} = gb_trees:get(Type, Cache), - case is_outofdate(Start) of + NewCache = case is_outofdate(Start) of true -> - spawn_update(Type); + spawn_update(Type), + % update the time so that it will not be scheduled more time + gb_trees:enter(Type, {now(), Ret}, Cache); false -> - ok + Cache end, - {State, Ret}. + {State#state{cache = NewCache}, Ret}. is_outofdate(Time) -> (timer:now_diff(now(), Time) div 1000) > ?OUT_OF_DATE. diff --git a/src/http_front/http_handler.erl b/src/http_front/http_handler.erl index 9f24b88..9ac3362 100644 --- a/src/http_front/http_handler.erl +++ b/src/http_front/http_handler.erl @@ -8,6 +8,7 @@ test_search/1, index/3, stats/3, + real_stats/3, recent/3, today_top/3, top/3]). @@ -47,13 +48,20 @@ recent(SessionID, _Env, _Input) -> mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). stats(SessionID, _Env, _Input) -> - {TorSum, StatsList} = db_frontend:stats(), + Response = format_stats_list(http_cache:stats()), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +real_stats(SessionID, _Env, _Input) -> + Response = format_stats_list(db_frontend:stats()), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +format_stats_list(Stats) -> + {TorSum, StatsList} = Stats, Body = ?TEXT("

total ~p torrents

", [TorSum]) ++ "", - Response = simple_html("", Body), - mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + simple_html("", Body). index(SessionID, _Env, Input) -> Body = case get_index_hash(Input) of @@ -89,6 +97,9 @@ test_search(Keyword) -> Body = do_search(Keyword), file:write_file(Filename, simple_html(Keyword, Body)). +do_search(Keyword) when length(Keyword) =< 1 -> + too_short_tip(); + do_search(Keyword) -> {Rets, Stats} = http_cache:search(Keyword), {_Found, Cost, Scanned} = Stats, @@ -164,3 +175,6 @@ format_date_string(Secs) -> {{Y, M, D}, _} = time_util:seconds_to_local_time(Secs), ?TEXT("~b-~2..0b-~2..0b", [Y, M, D]). +too_short_tip() -> + "too short keyword, you're going to kill me, enjoy this " ++ + "girl". diff --git a/src/local_torrent/README.md b/src/local_torrent/README.md new file mode 100644 index 0000000..b72d47d --- /dev/null +++ b/src/local_torrent/README.md @@ -0,0 +1,3 @@ +## local torrent + +Use torrent downloader to download torrent files and store them in local database. diff --git a/src/local_torrent/db_loc_torrent.erl b/src/local_torrent/db_loc_torrent.erl new file mode 100644 index 0000000..272e094 --- /dev/null +++ b/src/local_torrent/db_loc_torrent.erl @@ -0,0 +1,66 @@ +%% +%% db_loc_torrent.erl +%% Kevin Lynx +%% 07.03.2013 +%% +-module(db_loc_torrent). +-include("db_common.hrl"). +-define(DBNAME, torfiles). +-define(COLLNAME, torrents). +-export([load_hash/1, + save/3, + load/2]). +-export([test_load/1]). + +% return MagHash as a list +load_hash(Conn) -> + % db.runCommand({findAndModify:"wait_download",query:{cached:{$ne:1}},update:{$inc:{cached:1}}}) + Cmd = {findAndModify, ?HASH_DOWNLOAD_COLL, query, {cached, {'$ne', 1}}, + update, {'$set', {cached, 1}}, fields, {hash, 1}}, + Ret = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + mongo:command(Cmd) + end), + case bson:lookup(value, Ret) of + {undefined} -> + []; + {} -> + []; + {Obj} -> + {BinHash} = bson:lookup(hash, Obj), + binary_to_list(BinHash) + end. + +save(Conn, MagHash, Content) -> + Doc = torrent_doc(MagHash, Content), + mongo:do(unsafe, master, Conn, ?DBNAME, fun() -> + Sel = {'_id', list_to_binary(MagHash)}, + mongo:update(?COLLNAME, Sel, Doc, true) + end). + +% load a torrent file +load(Conn, MagHash) when is_list(MagHash) -> + Ret = mongo:do(safe, master, Conn, ?DBNAME, fun() -> + Sel = {'_id', list_to_binary(MagHash)}, + mongo:find_one(?COLLNAME, Sel) + end), + case Ret of + {} -> + not_found; + {Doc} -> + {{bin, bin, Content}} = bson:lookup(content, Doc), + Content + end. + +torrent_doc(MagHash, Content) when is_binary(Content), is_list(MagHash) -> + {'_id', list_to_binary(MagHash), + content, {bin, bin, Content}}. + +%% +test_load(MagHash) -> + {ok, Conn} = mongo_connection:start_link({localhost, 27017}), + Content = load(Conn, MagHash), + Name = MagHash ++ ".torrent", + file:write_file(Name, Content), + Name. + + diff --git a/src/local_torrent/loc_torrent.erl b/src/local_torrent/loc_torrent.erl new file mode 100644 index 0000000..0ede5ed --- /dev/null +++ b/src/local_torrent/loc_torrent.erl @@ -0,0 +1,90 @@ +%% +%% loc_torrent.erl +%% Kevin Lynx +%% 07.03.2013 +%% +-module(loc_torrent). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1, + stop/1]). +-record(state, {downcount = 0, dbpool, downloader}). +% when the wait_download is empty, +-define(WAIT_TIME, 2*60*1000). +-define(INTERVAL, 100). +-define(MAX_DOWNLOAD, 50). + +start_link(DBPool) -> + gen_server:start_link(?MODULE, [DBPool], []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +%% +init([DBPool]) -> + {ok, DownPid} = tor_download:start_link(), + {ok, #state{dbpool = DBPool, downloader = DownPid}, 0}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info({got_torrent, failed, _Hash}, State) -> + #state{downcount = C} = State, + {_, NewState} = download_next(State), + {noreply, NewState#state{downcount = C - 1}}; + +handle_info({got_torrent, ok, Hash, Content}, State) -> + #state{downcount = C} = State, + Conn = db_conn(State), + db_loc_torrent:save(Conn, Hash, Content), + {_, NewState} = download_next(State), + {noreply, NewState#state{downcount = C - 1}}; + +handle_info(timeout, State) -> + {Status, NewState} = download_next(State), + if Status == ok -> schedule_best(); true -> ok end, + {noreply, NewState}; + +handle_info(_, State) -> + {noreply, State}. + +% to avoid the high cpu/network at startup +schedule_best() -> + timer:send_after(?INTERVAL, timeout). + +download_next(#state{downcount = Count} = State) when Count < ?MAX_DOWNLOAD -> + #state{downloader = DownPid} = State, + Conn = db_conn(State), + {Status, NewCount} = case db_loc_torrent:load_hash(Conn) of + [] -> + wait_download(), + {wait, Count}; + Hash -> + tor_download:download(DownPid, Hash), + {ok, Count + 1} + end, + {Status, State#state{downcount = NewCount}}; + +download_next(State) -> + {wait, State}. + +wait_download() -> + timer:send_after(?WAIT_TIME, timeout). + +db_conn(State) -> + #state{dbpool = DBPool} = State, + mongo_pool:get(DBPool). diff --git a/src/local_torrent/loc_torrent_sup.erl b/src/local_torrent/loc_torrent_sup.erl new file mode 100644 index 0000000..a0051b5 --- /dev/null +++ b/src/local_torrent/loc_torrent_sup.erl @@ -0,0 +1,59 @@ +%% +%% loc_torrent_sup.erl +%% Kevin Lynx +%% 07.03.2013 +%% +-module(loc_torrent_sup). +-behaviour(supervisor). +-export([init/1]). +-export([start_link/3, + start_dep_apps/0, + start_standalone/3, + start_standalone/1]). + +start_dep_apps() -> + code:add_path("deps/bson/ebin"), + code:add_path("deps/mongodb/ebin"), + code:add_path("deps/kdht/ebin"), + code:add_path("deps/ibrowse/ebin"), + Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb], + [application:start(App) || App <- Apps]. + +start_standalone([IP, Port, Size]) -> + IPort = list_to_integer(Port), + ISize = list_to_integer(Size), + start_standalone(IP, IPort, ISize), + receive + fuck_erl_s_option -> ok + end. + +start_standalone(IP, Port, Size) -> + io:format("loc torrent (torrent downloader) startup~n", []), + io:format("db: ~p:~p downloader count ~p~n", [IP, Port, Size]), + start_dep_apps(), + tor_download:start_global(), + start_link(IP, Port, Size, []). + +start_link(IP, Port, Size) -> + start_link(IP, Port, Size, []). + +start_link(IP, Port, Size, OtherProcess) -> + PoolName = loc_torrent_db_pool, + mongo_sup:start_pool(PoolName, 5, {IP, Port}), + supervisor:start_link({local, srv_name()}, ?MODULE, [PoolName, Size, OtherProcess]). + +srv_name() -> + ?MODULE. + +init([PoolName, Size, OtherProcess]) -> + Spec = {one_for_one, 1, 600}, + Children = OtherProcess ++ [create_child(PoolName, Index) || Index <- lists:seq(1, Size)], + {ok, {Spec, Children}}. + +create_child(PoolName, Index) -> + {child_id(Index), {loc_torrent, start_link, [PoolName]}, + permanent, 1000, worker, dynamic}. + +child_id(Index) -> + list_to_atom(lists:flatten(io_lib:format("loc_torrent_~p", [Index]))). +