change sphinx_builder, query from mongodb by `skip' really don't work well, build a date index to query by date range

This commit is contained in:
Kevin Lynx 2013-08-04 21:37:50 +08:00
parent 6f6aac3b35
commit db83eecfd5
6 changed files with 230 additions and 74 deletions

View File

@ -10,6 +10,7 @@
local_time_to_universal_time/1,
now_utc_time/0,
now_day_seconds/0,
date_time_string/1,
date_time_string/0,
date_time_stamp/0]).
-compile(export_all).
@ -42,10 +43,15 @@ now_utc_time() ->
local_time_to_universal_time(calendar:local_time()).
date_time_string() ->
{{Y, M, D}, {H, Min, Sec}} = calendar:local_time(),
date_time_string(calendar:local_time()).
date_time_string({{Y, M, D}, {H, Min, Sec}}) ->
L = io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b",
[Y, M, D, H, Min, Sec]),
lists:flatten(L).
lists:flatten(L);
date_time_string(DaySecs) ->
DateTime = seconds_to_local_time(DaySecs),
date_time_string(DateTime).
date_time_stamp() ->
{{Y, M, D}, {H, Min, Sec}} = calendar:local_time(),

View File

@ -19,7 +19,7 @@
search_recently/2,
search_newest_top/3,
search/2]).
-export([decode_torrent_item/1]).
-export([decode_torrent_item/1, ensure_date_index/1]).
-compile(export_all).
-define(DBNAME, torrents).
-define(COLLNAME, hashes).
@ -162,6 +162,12 @@ ensure_search_index(Conn) ->
mongo:ensure_index(?COLLNAME, Spec)
end).
ensure_date_index(Conn) ->
Spec = {key, {created_at, 1}},
mongo_do(Conn, fun() ->
mongo:ensure_index(?COLLNAME, Spec)
end).
% not work
enable_text_search(Conn) ->
Cmd = {setParameter, 1, textSearchEnabled, true},

View File

@ -27,8 +27,8 @@ srv_name() ->
init([IP, Port, WorkerCnt]) ->
?I(?FMT("spawn ~p workers", [WorkerCnt])),
[spawn_link(?MODULE, worker_run, []) || _ <- lists:seq(1, WorkerCnt)],
Offset = load_result(),
sphinx_torrent:start_link(IP, Port, Offset),
{Offset, Date} = load_result(),
sphinx_torrent:start_link(IP, Port, Date),
{ok, #state{processed = Offset, worker_cnt = WorkerCnt}}.
handle_call({get, Pid}, _From, State) ->
@ -103,16 +103,18 @@ load_result() ->
case file:consult(?STATE_FILE) of
{error, _Reason} ->
io:format("start a new processing~n", []),
0;
{0, 0};
{ok, [Ret]} ->
Sum = proplists:get_value(processed, Ret),
io:format("continue to process from ~p~n", [Sum]),
Sum
DateSecs = proplists:get_value(offset_date, Ret),
io:format("continue to process from ~s~n", [time_util:date_time_string(DateSecs)]),
{Sum, DateSecs}
end.
save_result(Sum) ->
Ret = [{processed, Sum}],
io:format("save result ~p~n", [Sum]),
DateSecs = sphinx_torrent:offset_date(),
Ret = [{processed, Sum}, {offset_date, DateSecs}],
io:format("save result ~s~n", [time_util:date_time_string(DateSecs)]),
file:write_file(?STATE_FILE, io_lib:fwrite("~p.\n",[Ret])).
check_progress(Sum) ->

View File

@ -46,7 +46,7 @@ init([IP, Port, Count]) ->
config:start_link("sphinx_builder.config", fun() -> config_default() end),
Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]},
Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]},
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 1]}, permanent, 1000, worker, [vlog]},
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]},
Children = [Logger, Builder, Indexer],
{ok, {Spec, Children}}.

View File

@ -12,60 +12,66 @@
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/3, get/0, try_times/0]).
-export([do_load_torrents/2]). % disable warning only
-export([start_link/3, get/0, offset_date/0, try_times/0]).
-define(DBNAME, torrents).
-define(COLLNAME, hashes).
-define(POOLNAME, db_pool).
-define(WAIT_TIME, 30*1000).
-record(state, {offset = 0, max, try_times = 0, tors = [], cursor}).
-define(DATE_RANGE, 12*60*60).
-record(state, {max, try_times = 0, tors = [], date}).
start_link(IP, Port, Offset) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []).
start_link(IP, Port, Date) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Date], []).
get() ->
gen_server:call(srv_name(), get, infinity).
try_times() ->
gen_server:call(srv_name(), try_times).
gen_server:call(srv_name(), try_times, infinity).
offset_date() ->
gen_server:call(srv_name(), date, infinity).
%%
srv_name() ->
sphinx_torrent_loader.
init([IP, Port, Offset]) ->
init([IP, Port, Date]) ->
Max = config:get(torrent_batch_count, 100),
mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}),
{ok, #state{offset = Offset, max = Max}, 0}.
handle_cast(load, State) ->
#state{cursor = Cursor, offset = Skip, max = Max, tors = Tors} = State,
Request = Max * 2 div 3,
?T(?FMT("request next ~p torrents", [Request])),
LoadTors = load_next_batch(Cursor, Request),
case length(LoadTors) of
0 ->
?T(?FMT("no torrents in cursor ~p", [Cursor])),
mongo_cursor:close(Cursor),
timer:send_after(?WAIT_TIME, try_load);
_ -> ok
end,
?T(?FMT("load ~p torrents", [length(LoadTors)])),
NewOffset = Skip + length(LoadTors),
{noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}};
Conn = mongo_pool:get(?POOLNAME),
db_store_mongo:ensure_date_index(Conn),
CheckDate = find_first_date(Date),
io:format("load torrent from ~s~n", [time_util:date_time_string(CheckDate)]),
{ok, #state{date = CheckDate, max = Max}, 0}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info(try_load, State) ->
#state{offset = Skip, max = Max, try_times = Try} = State,
?T(?FMT("try load ~p torrents from ~p", [Max, Skip])),
case load_cursor_batch(Skip, Max) of
{} ->
timer:send_after(?WAIT_TIME, try_load),
{noreply, State#state{try_times = Try + 1}};
{Cursor, R} ->
{noreply, State#state{try_times = 0, offset = Skip + length(R), tors = R, cursor = Cursor}}
#state{date = Date, tors = Tors, max = Max, try_times = Try} = State,
case length(Tors) < Max of
true ->
case do_load_torrents(Date, Max) of
[] ->
NewDate = forward_date(Date),
NewTry = case NewDate == Date of
true ->
timer:send_after(?WAIT_TIME, try_load),
Try + 1;
false ->
timer:send_after(100, try_load),
Try
end,
{noreply, State#state{date = NewDate, try_times = NewTry}};
Ret ->
timer:send_after(100, try_load),
NewDate = query_created_at(lists:last(Ret)),
{noreply, State#state{date = NewDate, tors = Tors ++ Ret, try_times = 0}}
end;
false ->
timer:send_after(100, try_load),
{noreply, State}
end;
handle_info(timeout, State) ->
@ -75,13 +81,15 @@ handle_info(timeout, State) ->
handle_call(try_times, _From, #state{try_times = Try} = State) ->
{reply, Try, State};
handle_call(date, _From, #state{date = Date} = State) ->
{reply, Date, State};
handle_call(get, _From, State) ->
#state{tors = Tors, max = Max} = State,
#state{tors = Tors} = State,
{Tor, NewTors} = case length(Tors) of
0 -> {{}, []};
_ -> [H|Rest] = Tors, {H, Rest}
end,
try_load_next(NewTors, Max),
{reply, Tor, State#state{tors = NewTors}}.
terminate(_, State) ->
@ -91,36 +99,14 @@ terminate(_, State) ->
code_change(_, _, State) ->
{ok, State}.
try_load_next([], _) ->
?T("no torrents"),
skip;
try_load_next(Tors, Max) when length(Tors) == Max div 3 ->
?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])),
gen_server:cast(self(), load);
try_load_next(_, _) ->
ok.
load_cursor(Skip, Size) ->
Conn = mongo_pool:get(?POOLNAME),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:find(?COLLNAME, {}, {}, Skip, Size)
end).
load_cursor_batch(Skip, Size) ->
Cursor = load_cursor(Skip, Size),
case load_next_batch(Cursor, Size) of
[] ->
mongo_cursor:close(Cursor), {};
R ->
{Cursor, R}
end.
% will cause `get_more'
load_next_batch(Cursor, Size) ->
mongo_cursor:take(Cursor, Size).
forward_date(Date) ->
NowSecs = time_util:now_seconds(),
EndDate = Date + ?DATE_RANGE,
if EndDate < NowSecs -> EndDate; true -> Date end.
% will cause lots of queries
do_load_torrents(Skip, Size) ->
do_load_torrents(Date, Size) ->
Q = {created_at, {'$gt', Date, '$lt', Date + ?DATE_RANGE}},
Conn = mongo_pool:get(?POOLNAME),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
% 1: cost lots of memory even close the cursor
@ -130,9 +116,27 @@ do_load_torrents(Skip, Size) ->
%Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
%mongo_cursor:take(Cursor, Size),
% 3:
Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
Cursor = mongo:find(?COLLNAME, Q, {}),
Ret = mongo_cursor:take(Cursor, Size),
mongo_cursor:close(Cursor),
Ret
end).
find_first_date(0) ->
Conn = mongo_pool:get(?POOLNAME),
Ret = mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:find_one(?COLLNAME, {})
end),
case Ret of
{} ->
time_util:now_seconds();
{Doc} ->
query_created_at(Doc) - 1
end;
find_first_date(Date) ->
Date.
query_created_at(Tor) ->
{CreatedT} = bson:lookup(created_at, Tor),
CreatedT.

View File

@ -0,0 +1,138 @@
%%
%% sphinx_torrent.erl
%% Kevin Lynx
%% 07.29.2013
%%
-module(sphinx_torrent2).
-include("vlog.hrl").
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/3, get/0, try_times/0]).
-export([do_load_torrents/2]). % disable warning only
-define(DBNAME, torrents).
-define(COLLNAME, hashes).
-define(POOLNAME, db_pool).
-define(WAIT_TIME, 30*1000).
-record(state, {offset = 0, max, try_times = 0, tors = [], cursor}).
start_link(IP, Port, Offset) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []).
get() ->
gen_server:call(srv_name(), get, infinity).
try_times() ->
gen_server:call(srv_name(), try_times).
%%
srv_name() ->
sphinx_torrent_loader.
init([IP, Port, Offset]) ->
Max = config:get(torrent_batch_count, 100),
mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}),
{ok, #state{offset = Offset, max = Max}, 0}.
handle_cast(load, State) ->
#state{cursor = Cursor, offset = Skip, max = Max, tors = Tors} = State,
Request = Max * 2 div 3,
?T(?FMT("request next ~p torrents", [Request])),
LoadTors = load_next_batch(Cursor, Request),
case length(LoadTors) of
0 ->
?T(?FMT("no torrents in cursor ~p", [Cursor])),
mongo_cursor:close(Cursor),
timer:send_after(?WAIT_TIME, try_load);
_ -> ok
end,
?T(?FMT("load ~p torrents", [length(LoadTors)])),
NewOffset = Skip + length(LoadTors),
{noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info(try_load, State) ->
#state{offset = Skip, max = Max, try_times = Try} = State,
?T(?FMT("try load ~p torrents from ~p", [Max, Skip])),
case load_cursor_batch(Skip, Max) of
{} ->
timer:send_after(?WAIT_TIME, try_load),
{noreply, State#state{try_times = Try + 1}};
{Cursor, R} ->
{noreply, State#state{try_times = 0, offset = Skip + length(R), tors = R, cursor = Cursor}}
end;
handle_info(timeout, State) ->
self() ! try_load,
{noreply, State}.
handle_call(try_times, _From, #state{try_times = Try} = State) ->
{reply, Try, State};
handle_call(get, _From, State) ->
#state{tors = Tors, max = Max} = State,
{Tor, NewTors} = case length(Tors) of
0 -> {{}, []};
_ -> [H|Rest] = Tors, {H, Rest}
end,
try_load_next(NewTors, Max),
{reply, Tor, State#state{tors = NewTors}}.
terminate(_, State) ->
mongo_sup:stop_pool(?POOLNAME),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
try_load_next([], _) ->
?T("no torrents"),
skip;
try_load_next(Tors, Max) when length(Tors) == Max div 3 ->
?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])),
gen_server:cast(self(), load);
try_load_next(_, _) ->
ok.
load_cursor(Skip, Size) ->
Conn = mongo_pool:get(?POOLNAME),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:find(?COLLNAME, {}, {}, Skip, Size)
end).
load_cursor_batch(Skip, Size) ->
Cursor = load_cursor(Skip, Size),
case load_next_batch(Cursor, Size) of
[] ->
mongo_cursor:close(Cursor), {};
R ->
{Cursor, R}
end.
% will cause `get_more'
load_next_batch(Cursor, Size) ->
mongo_cursor:take(Cursor, Size).
% will cause lots of queries
do_load_torrents(Skip, Size) ->
Conn = mongo_pool:get(?POOLNAME),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
% 1: cost lots of memory even close the cursor
%Cursor = mongo:find(?COLLNAME, {}, {}, Skip, Size),
%Ret = mongo_cursor:rest(Cursor), but close the cursor will reduce the memory
% 2: cost lots of memory more than solution 1
%Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
%mongo_cursor:take(Cursor, Size),
% 3:
Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
Ret = mongo_cursor:take(Cursor, Size),
mongo_cursor:close(Cursor),
Ret
end).