diff --git a/src/db_daterange.erl b/src/db_daterange.erl new file mode 100644 index 0000000..4774487 --- /dev/null +++ b/src/db_daterange.erl @@ -0,0 +1,113 @@ +%% +%% db_daterange.erl +%% Kevin Lynx +%% 07.10.2013 +%% To track the most recently hashes +%% +-module(db_daterange). +-export([insert/2, + lookup/3]). +-export([start_link/1, + stop/0]). +-define(DBNAME, hash_date). +-define(COLLNAME, hashes). +-define(DATE_COL, date). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-record(state, {startday, dbpool, counter}). +-define(CHECK_BOUNDARY, 1000). +-define(DEL_DAY_BEFORE_SECS, 5*24*60*60). + +% query by date (in seconds), build the index +ensure_date_index(Conn) -> + Spec = {key, {?DATE_COL, 1}}, + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:ensure_index(?COLLNAME, Spec) + end). + +% '_id': Hash, date: DaySecs, reqs: RequestCount +insert(Conn, Hash) when is_list(Hash) -> + DaySecs = time_util:now_day_seconds(), + BHash = list_to_binary(Hash), + Cmd = {findAndModify, ?COLLNAME, query, {'_id', BHash}, upsert, true, + update, {'$inc', {reqs, 1}, '$set', {?DATE_COL, DaySecs}}, fields, {'_id', 1}}, + IRet = mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:command(Cmd) + end), + gen_server:cast(srv_name(), insert), + IRet. + +% [ListHash, ListHash] +lookup(Conn, DaySecs, Count) -> + Sel = {'$query', {date, DaySecs}, '$orderby', {reqs, -1}}, + List = mongo:do(safe, master, Conn, ?DBNAME, fun() -> + Cursor = mongo:find(?COLLNAME, Sel, {'_id', 1}, 0, Count), + mongo_cursor:rest(Cursor) + end), + [decode_hash(Doc) || Doc <- List]. + +decode_hash(Doc) -> + {ID} = bson:lookup('_id', Doc), + binary_to_list(ID). + +% delete all oldest hashes +try_delete_oldest(Conn) -> + TodaySecs = time_util:now_day_seconds(), + DelDay = TodaySecs - ?DEL_DAY_BEFORE_SECS, + Sel = {date, {'$lte', DelDay}}, + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:delete(?COLLNAME, Sel) + end). + +%%%% +start_link(DBPool) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [DBPool], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +srv_name() -> + ?MODULE. + +init([Pool]) -> + Conn = mongo_pool:get(Pool), + ensure_date_index(Conn), + Today = time_util:now_day_seconds(), + {ok, #state{startday = Today, dbpool = Pool, counter = 0}}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +% not care duplicated, check when there is more than 1 day +handle_cast(insert, State) -> + #state{startday = StartDay, dbpool = Pool, counter = Count} = State, + NowDay = time_util:now_day_seconds(), + {NewCount, NewStart} = case Count > ?CHECK_BOUNDARY of + true when NowDay > StartDay -> + Conn = mongo_pool:get(Pool), + try_delete_oldest(Conn), + {0, NowDay}; + true -> + {0, StartDay}; + false -> + {Count + 1, StartDay} + end, + {noreply, State#state{startday = NewStart, counter = NewCount}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + diff --git a/src/db_store_mongo.erl b/src/db_store_mongo.erl index a3fc7b3..7b5c979 100644 --- a/src/db_store_mongo.erl +++ b/src/db_store_mongo.erl @@ -13,6 +13,7 @@ inc_announce/2, exist/2, index/2, + search_newest_top_by_date/3, search_announce_top/2, search_recently/2, search_newest_top/3, @@ -57,6 +58,7 @@ search(Conn, Key) when is_list(Key) -> end), {decode_search(Ret), decode_search_stats(Ret)}. +% deprecated search_announce_top(Conn, Count) -> Sel = {'$query', {}, '$orderby', {announce, -1}}, List = mongo_do_slave(Conn, fun() -> @@ -67,6 +69,7 @@ search_announce_top(Conn, Count) -> end), [decode_torrent_item(Item) || Item <- List]. +% deprecated % db.hashes.find({$query:{},$orderby:{created_at: 1}}).limit(10); search_recently(Conn, Count) -> Sel = {'$query', {}, '$orderby', {created_at, -1}}, @@ -76,6 +79,7 @@ search_recently(Conn, Count) -> end), [decode_torrent_item(Item) || Item <- List]. +% deprecated search_newest_top(Conn, Count, DaySecs) -> Sel = {'$query', {created_at, {'$gt', DaySecs}}, '$orderby', {announce, -1}}, List = mongo_do_slave(Conn, fun() -> @@ -84,6 +88,12 @@ search_newest_top(Conn, Count, DaySecs) -> end), [decode_torrent_item(Item) || Item <- List]. +% use hash_date to search, based on date index, so the search speed is very fast +search_newest_top_by_date(Conn, Count, DaySecs) -> + Hashes = db_daterange:lookup(Conn, DaySecs, Count), + Infos = [index(Conn, Hash) || Hash <- Hashes], + lists:filter(fun(E) -> E /= {} end, Infos). + index(Conn, Hash) when is_list(Hash) -> Ret = mongo_do_slave(Conn, fun() -> mongo:find_one(?COLLNAME, {'_id', list_to_binary(Hash)}) @@ -95,6 +105,7 @@ index(Conn, Hash) when is_list(Hash) -> insert(Conn, Hash, Name, Length, Files) when is_list(Hash) -> NewDoc = create_torrent_desc(Conn, Hash, Name, Length, 1, Files), + db_daterange:insert(Conn, Hash), mongo_do(Conn, fun() -> % the doc may already exist because the other process has inserted before Sel = {'_id', list_to_binary(Hash)}, @@ -115,14 +126,16 @@ inc_announce(Conn, Hash) when is_binary(Hash) -> % damn, mongodb-erlang doesnot support update a field for an object, % `findAndModify` works but it will change `announce' datatype to double Cmd = {findAndModify, ?COLLNAME, query, {'_id', Hash}, - update, {'$inc', {announce, 1}}, fields, {}, + update, {'$inc', {announce, 1}}, fields, {'_id', 1}, % not specifed or {} will return whole object new, false}, Ret = mongo_do(Conn, fun() -> mongo:command(Cmd) end), case Ret of {value, undefined, ok, 1.0} -> false; - {value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} -> true; + {value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} -> + db_daterange:insert(Conn, binary_to_list(Hash)), + true; _ -> false end. @@ -287,4 +300,8 @@ test_index(Hash) -> index(Conn, Hash) end). +test_insertdate(Hash) -> + test_content(fun(Conn) -> + db_daterange:insert(Conn, Hash) + end). diff --git a/src/hash_reader/hash_reader_sup.erl b/src/hash_reader/hash_reader_sup.erl index e95d32f..9d84976 100644 --- a/src/hash_reader/hash_reader_sup.erl +++ b/src/hash_reader/hash_reader_sup.erl @@ -10,6 +10,7 @@ start_dep_apps/0, start_standalone/3, start_standalone/1]). +-define(DBPOOLNAME, mongodb_conn_pool_name). start_dep_apps() -> code:add_path("deps/bson/ebin"), @@ -37,13 +38,14 @@ start_standalone(IP, Port, Size) -> Stats = {hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]}, DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]}, Log = {vlog, {vlog, start_link, ["log/hash_reader.log", 3]}, permanent, 2000, worker, [vlog]}, - start_link(IP, Port, Size, [DownloadStats, Stats, Log]). + DBDateRange = {db_daterange, {db_daterange, start_link, [?DBPOOLNAME]}, permanent, 1000, worker, [db_daterange]}, + start_link(IP, Port, Size, [Log, DBDateRange, DownloadStats, Stats]). start_link(IP, Port, Size) -> start_link(IP, Port, Size, []). start_link(IP, Port, Size, OtherProcess) -> - PoolName = mongodb_conn_pool_name, + PoolName = ?DBPOOLNAME, mongo_sup:start_pool(PoolName, 5, {IP, Port}), % ensure index Conn = mongo_pool:get(PoolName), diff --git a/src/http_front/db_frontend.erl b/src/http_front/db_frontend.erl index 6f1ce9f..48db28f 100644 --- a/src/http_front/db_frontend.erl +++ b/src/http_front/db_frontend.erl @@ -39,7 +39,7 @@ search_one(MagHash) -> find_day_top(_Conn, _DaySecs, 0) -> []; find_day_top(Conn, DaySecs, Try) -> - case db_store_mongo:search_newest_top(Conn, 50, DaySecs) of + case db_store_mongo:search_newest_top_by_date(Conn, 50, DaySecs) of [] -> find_day_top(Conn, DaySecs - ?ONEDAY_SECS, Try - 1); List -> diff --git a/src/http_front/http_cache.erl b/src/http_front/http_cache.erl index d603818..502866f 100644 --- a/src/http_front/http_cache.erl +++ b/src/http_front/http_cache.erl @@ -83,6 +83,14 @@ handle_info({enter_cache, Type, Time, Ret}, State) -> handle_info(_, State) -> {noreply, State}. +% for debug purpose +query(top, State) -> + {State, do_update(top)}; + +% for debug purpose +query(stats, State) -> + {State, do_update(stats)}; + query(Type, State) -> #state{cache = Cache} = State, case gb_trees:is_defined(Type, Cache) of