diff --git a/src/common/time_util.erl b/src/common/time_util.erl index a85fb88..9bb33d9 100644 --- a/src/common/time_util.erl +++ b/src/common/time_util.erl @@ -10,6 +10,7 @@ local_time_to_universal_time/1, now_utc_time/0, now_day_seconds/0, + date_time_string/1, date_time_string/0, date_time_stamp/0]). -compile(export_all). @@ -42,10 +43,15 @@ now_utc_time() -> local_time_to_universal_time(calendar:local_time()). date_time_string() -> - {{Y, M, D}, {H, Min, Sec}} = calendar:local_time(), + date_time_string(calendar:local_time()). + +date_time_string({{Y, M, D}, {H, Min, Sec}}) -> L = io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Min, Sec]), - lists:flatten(L). + lists:flatten(L); +date_time_string(DaySecs) -> + DateTime = seconds_to_local_time(DaySecs), + date_time_string(DateTime). date_time_stamp() -> {{Y, M, D}, {H, Min, Sec}} = calendar:local_time(), diff --git a/src/db_store_mongo.erl b/src/db_store_mongo.erl index 0645d5c..1eb2730 100644 --- a/src/db_store_mongo.erl +++ b/src/db_store_mongo.erl @@ -19,7 +19,7 @@ search_recently/2, search_newest_top/3, search/2]). --export([decode_torrent_item/1]). +-export([decode_torrent_item/1, ensure_date_index/1]). -compile(export_all). -define(DBNAME, torrents). -define(COLLNAME, hashes). @@ -162,6 +162,12 @@ ensure_search_index(Conn) -> mongo:ensure_index(?COLLNAME, Spec) end). +ensure_date_index(Conn) -> + Spec = {key, {created_at, 1}}, + mongo_do(Conn, fun() -> + mongo:ensure_index(?COLLNAME, Spec) + end). + % not work enable_text_search(Conn) -> Cmd = {setParameter, 1, textSearchEnabled, true}, diff --git a/src/sphinx_builder/sphinx_builder.erl b/src/sphinx_builder/sphinx_builder.erl index ce7d3f5..1cad167 100644 --- a/src/sphinx_builder/sphinx_builder.erl +++ b/src/sphinx_builder/sphinx_builder.erl @@ -27,8 +27,8 @@ srv_name() -> init([IP, Port, WorkerCnt]) -> ?I(?FMT("spawn ~p workers", [WorkerCnt])), [spawn_link(?MODULE, worker_run, []) || _ <- lists:seq(1, WorkerCnt)], - Offset = load_result(), - sphinx_torrent:start_link(IP, Port, Offset), + {Offset, Date} = load_result(), + sphinx_torrent:start_link(IP, Port, Date), {ok, #state{processed = Offset, worker_cnt = WorkerCnt}}. handle_call({get, Pid}, _From, State) -> @@ -103,16 +103,18 @@ load_result() -> case file:consult(?STATE_FILE) of {error, _Reason} -> io:format("start a new processing~n", []), - 0; + {0, 0}; {ok, [Ret]} -> Sum = proplists:get_value(processed, Ret), - io:format("continue to process from ~p~n", [Sum]), - Sum + DateSecs = proplists:get_value(offset_date, Ret), + io:format("continue to process from ~s~n", [time_util:date_time_string(DateSecs)]), + {Sum, DateSecs} end. save_result(Sum) -> - Ret = [{processed, Sum}], - io:format("save result ~p~n", [Sum]), + DateSecs = sphinx_torrent:offset_date(), + Ret = [{processed, Sum}, {offset_date, DateSecs}], + io:format("save result ~s~n", [time_util:date_time_string(DateSecs)]), file:write_file(?STATE_FILE, io_lib:fwrite("~p.\n",[Ret])). check_progress(Sum) -> diff --git a/src/sphinx_builder/sphinx_builder_sup.erl b/src/sphinx_builder/sphinx_builder_sup.erl index 8e3bb0b..969c7d8 100644 --- a/src/sphinx_builder/sphinx_builder_sup.erl +++ b/src/sphinx_builder/sphinx_builder_sup.erl @@ -46,7 +46,7 @@ init([IP, Port, Count]) -> config:start_link("sphinx_builder.config", fun() -> config_default() end), Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]}, Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]}, - Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 1]}, permanent, 1000, worker, [vlog]}, + Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]}, Children = [Logger, Builder, Indexer], {ok, {Spec, Children}}. diff --git a/src/sphinx_builder/sphinx_torrent.erl b/src/sphinx_builder/sphinx_torrent.erl index 4e4d098..b5cf421 100644 --- a/src/sphinx_builder/sphinx_torrent.erl +++ b/src/sphinx_builder/sphinx_torrent.erl @@ -12,60 +12,66 @@ handle_info/2, terminate/2, code_change/3]). --export([start_link/3, get/0, try_times/0]). --export([do_load_torrents/2]). % disable warning only +-export([start_link/3, get/0, offset_date/0, try_times/0]). -define(DBNAME, torrents). -define(COLLNAME, hashes). -define(POOLNAME, db_pool). -define(WAIT_TIME, 30*1000). --record(state, {offset = 0, max, try_times = 0, tors = [], cursor}). +-define(DATE_RANGE, 12*60*60). +-record(state, {max, try_times = 0, tors = [], date}). -start_link(IP, Port, Offset) -> - gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []). +start_link(IP, Port, Date) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Date], []). get() -> gen_server:call(srv_name(), get, infinity). try_times() -> - gen_server:call(srv_name(), try_times). + gen_server:call(srv_name(), try_times, infinity). + +offset_date() -> + gen_server:call(srv_name(), date, infinity). %% srv_name() -> sphinx_torrent_loader. -init([IP, Port, Offset]) -> +init([IP, Port, Date]) -> Max = config:get(torrent_batch_count, 100), mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}), - {ok, #state{offset = Offset, max = Max}, 0}. - -handle_cast(load, State) -> - #state{cursor = Cursor, offset = Skip, max = Max, tors = Tors} = State, - Request = Max * 2 div 3, - ?T(?FMT("request next ~p torrents", [Request])), - LoadTors = load_next_batch(Cursor, Request), - case length(LoadTors) of - 0 -> - ?T(?FMT("no torrents in cursor ~p", [Cursor])), - mongo_cursor:close(Cursor), - timer:send_after(?WAIT_TIME, try_load); - _ -> ok - end, - ?T(?FMT("load ~p torrents", [length(LoadTors)])), - NewOffset = Skip + length(LoadTors), - {noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}}; + Conn = mongo_pool:get(?POOLNAME), + db_store_mongo:ensure_date_index(Conn), + CheckDate = find_first_date(Date), + io:format("load torrent from ~s~n", [time_util:date_time_string(CheckDate)]), + {ok, #state{date = CheckDate, max = Max}, 0}. handle_cast(stop, State) -> {stop, normal, State}. handle_info(try_load, State) -> - #state{offset = Skip, max = Max, try_times = Try} = State, - ?T(?FMT("try load ~p torrents from ~p", [Max, Skip])), - case load_cursor_batch(Skip, Max) of - {} -> - timer:send_after(?WAIT_TIME, try_load), - {noreply, State#state{try_times = Try + 1}}; - {Cursor, R} -> - {noreply, State#state{try_times = 0, offset = Skip + length(R), tors = R, cursor = Cursor}} + #state{date = Date, tors = Tors, max = Max, try_times = Try} = State, + case length(Tors) < Max of + true -> + case do_load_torrents(Date, Max) of + [] -> + NewDate = forward_date(Date), + NewTry = case NewDate == Date of + true -> + timer:send_after(?WAIT_TIME, try_load), + Try + 1; + false -> + timer:send_after(100, try_load), + Try + end, + {noreply, State#state{date = NewDate, try_times = NewTry}}; + Ret -> + timer:send_after(100, try_load), + NewDate = query_created_at(lists:last(Ret)), + {noreply, State#state{date = NewDate, tors = Tors ++ Ret, try_times = 0}} + end; + false -> + timer:send_after(100, try_load), + {noreply, State} end; handle_info(timeout, State) -> @@ -75,13 +81,15 @@ handle_info(timeout, State) -> handle_call(try_times, _From, #state{try_times = Try} = State) -> {reply, Try, State}; +handle_call(date, _From, #state{date = Date} = State) -> + {reply, Date, State}; + handle_call(get, _From, State) -> - #state{tors = Tors, max = Max} = State, + #state{tors = Tors} = State, {Tor, NewTors} = case length(Tors) of 0 -> {{}, []}; _ -> [H|Rest] = Tors, {H, Rest} end, - try_load_next(NewTors, Max), {reply, Tor, State#state{tors = NewTors}}. terminate(_, State) -> @@ -91,36 +99,14 @@ terminate(_, State) -> code_change(_, _, State) -> {ok, State}. -try_load_next([], _) -> - ?T("no torrents"), - skip; -try_load_next(Tors, Max) when length(Tors) == Max div 3 -> - ?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])), - gen_server:cast(self(), load); -try_load_next(_, _) -> - ok. - -load_cursor(Skip, Size) -> - Conn = mongo_pool:get(?POOLNAME), - mongo:do(safe, master, Conn, ?DBNAME, fun() -> - mongo:find(?COLLNAME, {}, {}, Skip, Size) - end). - -load_cursor_batch(Skip, Size) -> - Cursor = load_cursor(Skip, Size), - case load_next_batch(Cursor, Size) of - [] -> - mongo_cursor:close(Cursor), {}; - R -> - {Cursor, R} - end. - -% will cause `get_more' -load_next_batch(Cursor, Size) -> - mongo_cursor:take(Cursor, Size). +forward_date(Date) -> + NowSecs = time_util:now_seconds(), + EndDate = Date + ?DATE_RANGE, + if EndDate < NowSecs -> EndDate; true -> Date end. % will cause lots of queries -do_load_torrents(Skip, Size) -> +do_load_torrents(Date, Size) -> + Q = {created_at, {'$gt', Date, '$lt', Date + ?DATE_RANGE}}, Conn = mongo_pool:get(?POOLNAME), mongo:do(safe, master, Conn, ?DBNAME, fun() -> % 1: cost lots of memory even close the cursor @@ -130,9 +116,27 @@ do_load_torrents(Skip, Size) -> %Cursor = mongo:find(?COLLNAME, {}, {}, Skip), %mongo_cursor:take(Cursor, Size), % 3: - Cursor = mongo:find(?COLLNAME, {}, {}, Skip), + Cursor = mongo:find(?COLLNAME, Q, {}), Ret = mongo_cursor:take(Cursor, Size), mongo_cursor:close(Cursor), Ret end). +find_first_date(0) -> + Conn = mongo_pool:get(?POOLNAME), + Ret = mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:find_one(?COLLNAME, {}) + end), + case Ret of + {} -> + time_util:now_seconds(); + {Doc} -> + query_created_at(Doc) - 1 + end; +find_first_date(Date) -> + Date. + +query_created_at(Tor) -> + {CreatedT} = bson:lookup(created_at, Tor), + CreatedT. + diff --git a/src/sphinx_builder/sphinx_torrent2.erl b/src/sphinx_builder/sphinx_torrent2.erl new file mode 100644 index 0000000..91f1f95 --- /dev/null +++ b/src/sphinx_builder/sphinx_torrent2.erl @@ -0,0 +1,138 @@ +%% +%% sphinx_torrent.erl +%% Kevin Lynx +%% 07.29.2013 +%% +-module(sphinx_torrent2). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/3, get/0, try_times/0]). +-export([do_load_torrents/2]). % disable warning only +-define(DBNAME, torrents). +-define(COLLNAME, hashes). +-define(POOLNAME, db_pool). +-define(WAIT_TIME, 30*1000). +-record(state, {offset = 0, max, try_times = 0, tors = [], cursor}). + +start_link(IP, Port, Offset) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []). + +get() -> + gen_server:call(srv_name(), get, infinity). + +try_times() -> + gen_server:call(srv_name(), try_times). + +%% +srv_name() -> + sphinx_torrent_loader. + +init([IP, Port, Offset]) -> + Max = config:get(torrent_batch_count, 100), + mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}), + {ok, #state{offset = Offset, max = Max}, 0}. + +handle_cast(load, State) -> + #state{cursor = Cursor, offset = Skip, max = Max, tors = Tors} = State, + Request = Max * 2 div 3, + ?T(?FMT("request next ~p torrents", [Request])), + LoadTors = load_next_batch(Cursor, Request), + case length(LoadTors) of + 0 -> + ?T(?FMT("no torrents in cursor ~p", [Cursor])), + mongo_cursor:close(Cursor), + timer:send_after(?WAIT_TIME, try_load); + _ -> ok + end, + ?T(?FMT("load ~p torrents", [length(LoadTors)])), + NewOffset = Skip + length(LoadTors), + {noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_info(try_load, State) -> + #state{offset = Skip, max = Max, try_times = Try} = State, + ?T(?FMT("try load ~p torrents from ~p", [Max, Skip])), + case load_cursor_batch(Skip, Max) of + {} -> + timer:send_after(?WAIT_TIME, try_load), + {noreply, State#state{try_times = Try + 1}}; + {Cursor, R} -> + {noreply, State#state{try_times = 0, offset = Skip + length(R), tors = R, cursor = Cursor}} + end; + +handle_info(timeout, State) -> + self() ! try_load, + {noreply, State}. + +handle_call(try_times, _From, #state{try_times = Try} = State) -> + {reply, Try, State}; + +handle_call(get, _From, State) -> + #state{tors = Tors, max = Max} = State, + {Tor, NewTors} = case length(Tors) of + 0 -> {{}, []}; + _ -> [H|Rest] = Tors, {H, Rest} + end, + try_load_next(NewTors, Max), + {reply, Tor, State#state{tors = NewTors}}. + +terminate(_, State) -> + mongo_sup:stop_pool(?POOLNAME), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +try_load_next([], _) -> + ?T("no torrents"), + skip; +try_load_next(Tors, Max) when length(Tors) == Max div 3 -> + ?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])), + gen_server:cast(self(), load); +try_load_next(_, _) -> + ok. + +load_cursor(Skip, Size) -> + Conn = mongo_pool:get(?POOLNAME), + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:find(?COLLNAME, {}, {}, Skip, Size) + end). + +load_cursor_batch(Skip, Size) -> + Cursor = load_cursor(Skip, Size), + case load_next_batch(Cursor, Size) of + [] -> + mongo_cursor:close(Cursor), {}; + R -> + {Cursor, R} + end. + +% will cause `get_more' +load_next_batch(Cursor, Size) -> + mongo_cursor:take(Cursor, Size). + +% will cause lots of queries +do_load_torrents(Skip, Size) -> + Conn = mongo_pool:get(?POOLNAME), + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + % 1: cost lots of memory even close the cursor + %Cursor = mongo:find(?COLLNAME, {}, {}, Skip, Size), + %Ret = mongo_cursor:rest(Cursor), but close the cursor will reduce the memory + % 2: cost lots of memory more than solution 1 + %Cursor = mongo:find(?COLLNAME, {}, {}, Skip), + %mongo_cursor:take(Cursor, Size), + % 3: + Cursor = mongo:find(?COLLNAME, {}, {}, Skip), + Ret = mongo_cursor:take(Cursor, Size), + mongo_cursor:close(Cursor), + Ret + end). +