diff --git a/Emakefile b/Emakefile
index e195d9b..2b049cd 100644
--- a/Emakefile
+++ b/Emakefile
@@ -38,3 +38,11 @@
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
+{'src/utils/*',
+ [debug_info,
+ {i, "include"},
+ {outdir,"ebin"}]}.
+{'src/sphinx_builder/*',
+ [debug_info,
+ {i, "include"},
+ {outdir,"ebin"}]}.
diff --git a/src/common/hex.erl b/src/common/hex.erl
new file mode 100644
index 0000000..b28af29
--- /dev/null
+++ b/src/common/hex.erl
@@ -0,0 +1,34 @@
+-module(hex).
+-export([bin_to_hexstr/1,hexstr_to_bin/1]).
+
+hex(N) when N < 10 ->
+ $0+N;
+hex(N) when N >= 10, N < 16 ->
+ $a+(N-10).
+
+int(C) when $0 =< C, C =< $9 ->
+ C - $0;
+int(C) when $A =< C, C =< $F ->
+ C - $A + 10;
+int(C) when $a =< C, C =< $f ->
+ C - $a + 10.
+
+to_hex(N) when N < 256 ->
+ [hex(N div 16), hex(N rem 16)].
+
+list_to_hexstr([]) ->
+ [];
+list_to_hexstr([H|T]) ->
+ to_hex(H) ++ list_to_hexstr(T).
+
+bin_to_hexstr(Bin) ->
+ list_to_hexstr(binary_to_list(Bin)).
+
+hexstr_to_bin(S) ->
+ list_to_binary(hexstr_to_list(S)).
+
+hexstr_to_list([X,Y|T]) ->
+ [int(X)*16 + int(Y) | hexstr_to_list(T)];
+hexstr_to_list([]) ->
+ [].
+
\ No newline at end of file
diff --git a/src/common/string_util.erl b/src/common/string_util.erl
new file mode 100644
index 0000000..7a599dd
--- /dev/null
+++ b/src/common/string_util.erl
@@ -0,0 +1,10 @@
+%%
+%% string_util.erl
+%% Kevin Lynx
+%%
+-module(string_util).
+-export([format/2]).
+
+format(Fmt, Arg) when is_list(Fmt), is_list(Arg) ->
+ lists:flatten(io_lib:format(Fmt, Arg)).
+
diff --git a/src/common/time_util.erl b/src/common/time_util.erl
index a2a1f4d..a85fb88 100644
--- a/src/common/time_util.erl
+++ b/src/common/time_util.erl
@@ -9,7 +9,9 @@
seconds_to_local_time/1,
local_time_to_universal_time/1,
now_utc_time/0,
- now_day_seconds/0]).
+ now_day_seconds/0,
+ date_time_string/0,
+ date_time_stamp/0]).
-compile(export_all).
diff_milsecs(T1, T2) ->
@@ -38,3 +40,15 @@ local_time_to_universal_time(Datetime) ->
now_utc_time() ->
local_time_to_universal_time(calendar:local_time()).
+
+date_time_string() ->
+ {{Y, M, D}, {H, Min, Sec}} = calendar:local_time(),
+ L = io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b",
+ [Y, M, D, H, Min, Sec]),
+ lists:flatten(L).
+
+date_time_stamp() ->
+ {{Y, M, D}, {H, Min, Sec}} = calendar:local_time(),
+ L = io_lib:format("~b~2..0b~2..0b~2..0b~2..0b~2..0b",
+ [Y, M, D, H, Min, Sec]),
+ lists:flatten(L).
diff --git a/src/common/xmerl_xml_cdata.erl b/src/common/xmerl_xml_cdata.erl
new file mode 100644
index 0000000..9bab876
--- /dev/null
+++ b/src/common/xmerl_xml_cdata.erl
@@ -0,0 +1,69 @@
+%%
+%% xmerl_xml_cdata.erl
+%% Kevin Lynx
+%% To fix erlang xmerl_xml not support `cdata' bug, damn it!
+%%
+-module(xmerl_xml_cdata).
+-export(['#xml-inheritance#'/0]).
+-export(['#root#'/4,
+ '#element#'/5,
+ '#text#'/1]).
+-import(xmerl_lib, [markup/3, empty_tag/2]).
+-include_lib("xmerl/include/xmerl.hrl").
+
+
+'#xml-inheritance#'() -> [].
+
+
+%% The '#text#' function is called for every text segment.
+
+'#text#'(Text) ->
+%io:format("Text=~p~n",[Text]),
+ export_text(Text).
+
+
+%% The '#root#' tag is called when the entire structure has been
+%% exported. It does not appear in the structure itself.
+
+'#root#'(Data, [#xmlAttribute{name=prolog,value=V}], [], _E) ->
+ [V,Data];
+'#root#'(Data, _Attrs, [], _E) ->
+ ["", Data].
+
+
+%% The '#element#' function is the default handler for XML elements.
+
+'#element#'(Tag, [], Attrs, _Parents, _E) ->
+%io:format("Empty Tag=~p~n",[Tag]),
+ empty_tag(Tag, Attrs);
+'#element#'(Tag, Data, Attrs, _Parents, _E) ->
+%io:format("Tag=~p~n",[Tag]),
+ markup(Tag, Attrs, Data).
+
+export_text(T) ->
+ export_text(T, []).
+
+export_text([$' | T], Cont) ->
+ "'" ++ export_text(T, Cont);
+export_text([$" | T], Cont) ->
+ """ ++ export_text(T, Cont);
+export_text([$< | T] = Text, Cont) ->
+ case (lists:prefix("", Text)) of
+ true -> Text;
+ false ->
+ "<" ++ export_text(T, Cont)
+ end;
+export_text([$> | T], Cont) ->
+ ">" ++ export_text(T, Cont);
+export_text([$& | T], Cont) ->
+ "&" ++ export_text(T, Cont);
+export_text([C | T], Cont) when is_integer(C) ->
+ [C | export_text(T, Cont)];
+export_text([T | T1], Cont) ->
+ export_text(T, [T1 | Cont]);
+export_text([], [T | Cont]) ->
+ export_text(T, Cont);
+export_text([], []) ->
+ [];
+export_text(Bin, Cont) ->
+ export_text(binary_to_list(Bin), Cont).
diff --git a/src/http_front/db_frontend.erl b/src/http_front/db_frontend.erl
index 8d047f5..bf16720 100644
--- a/src/http_front/db_frontend.erl
+++ b/src/http_front/db_frontend.erl
@@ -6,6 +6,7 @@
-module(db_frontend).
-export([start/3,
search/1,
+ search_by_sphinx/1,
today_top/0,
search_one/1,
stats/0,
@@ -27,6 +28,10 @@ search(Keyword) ->
Conn = mongo_pool:get(?DB_POOLNAME),
db_store_mongo:search(Conn, Keyword).
+search_by_sphinx(Keyword) ->
+ Conn = mongo_pool:get(?DB_POOLNAME),
+ sphinx_search:search(Conn, Keyword).
+
today_top() ->
Conn = mongo_pool:get(?DB_POOLNAME),
DaySecs = time_util:now_day_seconds(),
diff --git a/src/http_front/http_handler.erl b/src/http_front/http_handler.erl
index 2535395..46ebbb2 100644
--- a/src/http_front/http_handler.erl
+++ b/src/http_front/http_handler.erl
@@ -5,6 +5,7 @@
%%
-module(http_handler).
-export([search/3,
+ sphinx_search/3,
test_search/1,
index/3,
stats/3,
@@ -29,6 +30,18 @@ search(SessionID, Env, Input) ->
Response = simple_html(K, Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
+sphinx_search(SessionID, Env, Input) ->
+ {K, Body} = case http_common:get_search_keyword(Input) of
+ [] ->
+ {"", "invalid input"};
+ Key ->
+ US = http_common:list_to_utf_binary(Key),
+ ?LOG_STR(?INFO, ?FMT("remote ~p search /~s/", [http_common:remote_addr(Env), US])),
+ {Key, do_search_sphinx(Key)}
+ end,
+ Response = simple_html(K, Body),
+ mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
+
top(SessionID, _Env, _Input) ->
Rets = db_frontend:all_top(),
BodyList = format_search_result(Rets),
@@ -99,6 +112,12 @@ do_search(Keyword) ->
Body = ?TEXT("
~s
", [lists:flatten(BodyList)]),
Tip ++ Body.
+do_search_sphinx(Keyword) ->
+ Rets = db_frontend:search_by_sphinx(Keyword),
+ BodyList = format_search_result(Rets),
+ Body = ?TEXT("~s
", [lists:flatten(BodyList)]),
+ Body.
+
format_search_result(RetList) ->
[format_one_result(Result, false) || Result <- RetList].
diff --git a/src/http_front/sphinx_search.erl b/src/http_front/sphinx_search.erl
new file mode 100644
index 0000000..bd19967
--- /dev/null
+++ b/src/http_front/sphinx_search.erl
@@ -0,0 +1,38 @@
+%%
+%% sphinx_search.erl
+%% Kevin Lynx
+%% 07.28.2013
+%%
+-module(sphinx_search).
+-export([init/0, search/2]).
+-define(PORT, 9312).
+-define(INDEX, "xml").
+-define(PAGECNT, 10).
+
+init() ->
+ code:add_path("deps/giza/ebin").
+ %application:start(giza).
+
+search(Conn, Key) ->
+ Q1 = giza_query:new(?INDEX, Key),
+ Q2 = giza_query:port(Q1, ?PORT),
+ {ok, Ret} = giza_request:send(Q2),
+ decode_search_ret(Conn, Ret).
+
+decode_search_ret(Conn, Ret) ->
+ Hashes = [translate_hash(Item) || Item <- Ret],
+ [db_store_mongo:index(Conn, Hash) || Hash <- Hashes].
+
+translate_hash({_DocID, Item}) ->
+ Attrs = proplists:get_value(attrs, Item),
+ H1 = proplists:get_value(<<"hash1">>, Attrs),
+ H2 = proplists:get_value(<<"hash2">>, Attrs),
+ H3 = proplists:get_value(<<"hash3">>, Attrs),
+ H4 = proplists:get_value(<<"hash4">>, Attrs),
+ H5 = proplists:get_value(<<"hash5">>, Attrs),
+ Hash = sphinx_id:tohash({H1, H2, H3, H4, H5}),
+ 40 = length(Hash),
+ Hash.
+
+
+
diff --git a/src/sphinx_builder/sphinx_builder.erl b/src/sphinx_builder/sphinx_builder.erl
new file mode 100644
index 0000000..4a2ac3f
--- /dev/null
+++ b/src/sphinx_builder/sphinx_builder.erl
@@ -0,0 +1,122 @@
+%%
+%% sphinx_builder.erl
+%% Kevin Lynx
+%% 07.29.2013
+%%
+-module(sphinx_builder).
+-include("vlog.hrl").
+-behaviour(gen_server).
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+-export([start_link/3]).
+-export([worker_run/0]).
+-record(state, {processed = 0, worker_cnt, wait_workers = []}).
+-define(WORKER_WAIT, 30*1000).
+-define(STATE_FILE, "priv/sphinx_builder.sta").
+
+start_link(IP, Port, Count) ->
+ gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count], []).
+
+srv_name() ->
+ ?MODULE.
+
+init([IP, Port, WorkerCnt]) ->
+ ?I(?FMT("spawn ~p workers", [WorkerCnt])),
+ [spawn_link(?MODULE, worker_run, []) || _ <- lists:seq(1, WorkerCnt)],
+ Offset = load_result(),
+ sphinx_torrent:start_link(IP, Port, Offset),
+ {ok, #state{processed = Offset, worker_cnt = WorkerCnt}}.
+
+handle_call({get, Pid}, _From, State) ->
+ #state{processed = Processed, worker_cnt = WaitCnt, wait_workers = WaitWorkers} = State,
+ {NewProcessed, Ret} = case sphinx_torrent:get() of
+ {} ->
+ {Processed, wait};
+ Tor ->
+ check_progress(Processed + 1),
+ {Processed + 1, Tor}
+ end,
+ NewWaits = update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers),
+ check_all_done(NewWaits, WaitCnt, NewProcessed, length(NewWaits) > length(WaitWorkers)),
+ {reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits}}.
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+handle_info(_, State) ->
+ {noreply, State}.
+
+terminate(_, State) ->
+ {ok, State}.
+
+code_change(_, _, State) ->
+ {ok, State}.
+
+update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers) ->
+ case lists:member(Pid, WaitWorkers) of
+ true when NewProcessed > Processed ->
+ lists:delete(Pid, WaitWorkers);
+ false when NewProcessed == Processed ->
+ [Pid|WaitWorkers];
+ _ ->
+ WaitWorkers
+ end.
+
+check_all_done(WaitWorkers, WaitCnt, Processed, true)
+when length(WaitWorkers) == WaitCnt ->
+ Try = sphinx_torrent:try_times(),
+ case Try > 5 of
+ true ->
+ io:format("haven't got any torrents for a while, force save~n", []),
+ save_result(Processed),
+ sphinx_xml:force_save();
+ false ->
+ ok
+ end;
+check_all_done(_WaitWorkers, _WaitCnt, _Processed, _) ->
+ ok.
+
+worker_run() ->
+ Ret = gen_server:call(srv_name(), {get, self()}),
+ do_process(Ret),
+ worker_run().
+
+do_process({_, wait}) ->
+ ?T(?FMT("worker ~p sleep ~p ms", [self(), ?WORKER_WAIT])),
+ timer:sleep(?WORKER_WAIT);
+do_process({ID, Doc}) ->
+ case db_store_mongo:decode_torrent_item(Doc) of
+ {single, Hash, {Name, _}, Query, CreatedAt} ->
+ sphinx_xml:insert({Hash, Name, [], ID, Query, CreatedAt});
+ {multi, Hash, {Name, Files}, Query, CreatedAt} ->
+ sphinx_xml:insert({Hash, Name, Files, ID, Query, CreatedAt})
+ end.
+
+load_result() ->
+ case file:consult(?STATE_FILE) of
+ {error, _Reason} ->
+ io:format("start a new processing~n", []),
+ 0;
+ {ok, [Ret]} ->
+ Sum = proplists:get_value(processed, Ret),
+ io:format("continue to process from ~p~n", [Sum]),
+ Sum
+ end.
+
+save_result(Sum) ->
+ Ret = [{processed, Sum}],
+ io:format("save result ~p~n", [Sum]),
+ file:write_file(?STATE_FILE, io_lib:fwrite("~p.\n",[Ret])).
+
+check_progress(Sum) ->
+ case (Sum rem 500 == 0) and (Sum > 0) of
+ true ->
+ save_result(Sum),
+ io:format(" -> ~p~n", [Sum]);
+ false ->
+ ok
+ end.
diff --git a/src/sphinx_builder/sphinx_builder_sup.erl b/src/sphinx_builder/sphinx_builder_sup.erl
new file mode 100644
index 0000000..194499d
--- /dev/null
+++ b/src/sphinx_builder/sphinx_builder_sup.erl
@@ -0,0 +1,52 @@
+%%
+%% sphinx_builder_sup.erl
+%% Kevin Lynx
+%% 07.28.2013
+%%
+-module(sphinx_builder_sup).
+-behaviour(supervisor).
+-export([start_standalone/1, start_standalone/3, start_link/3]).
+-export([init/1]).
+
+start_dep_apps() ->
+ code:add_path("deps/bson/ebin"),
+ code:add_path("deps/mongodb/ebin"),
+ Apps = [asn1, crypto, public_key, ssl, inets, xmerl, bson, mongodb],
+ [application:start(App) || App <- Apps].
+
+start_standalone([IP, Port, Count]) ->
+ IPort = list_to_integer(Port),
+ ISize = list_to_integer(Count),
+ start_standalone(IP, IPort, ISize),
+ receive
+ fuck_erl_s_option -> ok
+ end.
+
+start_standalone(IP, Port, Size) ->
+ start_dep_apps(),
+ start_link(IP, Port, Size).
+
+start_link(IP, Port, Count) ->
+ supervisor:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count]).
+
+%%
+srv_name() ->
+ ?MODULE.
+
+init([IP, Port, Count]) ->
+ Spec = {one_for_one, 1, 600},
+ config:start_link("sphinx_builder.config", fun() -> config_default() end),
+ Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]},
+ Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]},
+ Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]},
+ Children = [Logger, Builder, Indexer],
+ {ok, {Spec, Children}}.
+
+config_default() ->
+ [{load_torrent_interval, 500}, % millseconds
+ {max_doc_per_file, 1000},
+ {torrent_batch_count, 100},
+ {delta_source_file, "var/source/delta.xml"},
+ {sphinx_config_file, "var/etc/csft.conf"},
+ {delta_index_name, "delta"},
+ {main_index_name, "main"}].
diff --git a/src/sphinx_builder/sphinx_cmd.erl b/src/sphinx_builder/sphinx_cmd.erl
new file mode 100644
index 0000000..cb5a13d
--- /dev/null
+++ b/src/sphinx_builder/sphinx_cmd.erl
@@ -0,0 +1,29 @@
+%%
+%% sphinx_cmd.erl
+%% Kevin Lynx
+%% 07.28.2013
+%%
+-module(sphinx_cmd).
+-export([build_delta_index/5, merge_index/3]).
+-compile(export_all).
+-include("vlog.hrl").
+
+% Index file, Delta index name
+build_delta_index(IndexFile, Delta, CfgFile, MinID, MaxID) ->
+ Cmd = "indexer -c " ++ CfgFile ++ " --rotate " ++ Delta,
+ Res = os:cmd(Cmd),
+ Dest = backup_delta_file(Delta, MinID, MaxID, IndexFile),
+ ?I(?FMT("command `~s' result on ~s~n" ++ Res, [Cmd, Dest])).
+
+merge_index(Main, Delta, CfgFile) ->
+ Cmd = string_util:format("indexer -c " ++ CfgFile ++ " --merge ~s ~s --rotate",
+ [Main, Delta]),
+ Res = os:cmd(Cmd),
+ ?I(?FMT("command `~s' result~n" ++ Res, [Cmd])).
+
+backup_delta_file(Delta, MinID, MaxID, IndexFile) ->
+ Path = filename:dirname(IndexFile),
+ Dest = string_util:format(Path ++ "/" ++ Delta ++ "[~b-~b]" ++ ".xml",
+ [MinID, MaxID]),
+ file:copy(IndexFile, Dest),
+ Dest.
diff --git a/src/sphinx_builder/sphinx_doc.erl b/src/sphinx_builder/sphinx_doc.erl
new file mode 100644
index 0000000..66ce6fe
--- /dev/null
+++ b/src/sphinx_builder/sphinx_doc.erl
@@ -0,0 +1,69 @@
+%%
+%% sphinx_doc.erl
+%% 07.27.2013
+%%
+-module(sphinx_doc).
+-include_lib("xmerl/include/xmerl.hrl").
+-export([write_xml/2, element/6]).
+-compile(export_all).
+-define(PROLOG, "").
+-define(CR, #xmlText{value="\
+ "}).
+
+write_xml(File, Elems) ->
+ Doc = {'sphinx:docset', [], [schema(), ?CR] ++ Elems},
+ Content = xmerl:export_simple([Doc], xmerl_xml_cdata, [{prolog, ?PROLOG}]),
+ file:write_file(File, Content).
+
+element(Hash, Name, Files, ID, Query, CreatedAt) when is_list(Hash), length(Hash) == 40 ->
+ IDSet = idset(Hash),
+ OtherAttrs = [{query, [integer_to_list(Query)]}, {created_at, integer_to_list(CreatedAt)}],
+ EleSubject = {subject, [], [with_cdata(Name)]},
+ EleFiles = {files, [], [with_cdata(files_name(Files))]},
+ {'sphinx:document', [{id, integer_to_list(ID)}], IDSet ++ OtherAttrs ++ [EleSubject, EleFiles, ?CR]}.
+
+schema() ->
+ HashIds = [attr_int32("hash1"), attr_int32("hash2"), attr_int32("hash3"),
+ attr_int32("hash4"), attr_int32("hash5")],
+ Other = [attr_int32("query"), attr("created_at", "timestamp")],
+ {'sphinx:schema', [field("subject"), field("files")] ++ HashIds ++ Other}.
+
+field(Name) ->
+ {'sphinx:field', [{name, [Name]}], []}.
+
+attr(Name, Type) ->
+ {'sphinx:attr', [{name, [Name]}, {type, [Type]}], []}.
+
+attr_int32(Name) ->
+ attr(Name, "int", "32").
+
+attr(Name, Type, Bits) ->
+ {'sphinx:attr', [{name, [Name]}, {type, [Type]}, {bits, [Bits]}], []}.
+
+idset(Hash) ->
+ {H1, H2, H3, H4, H5} = sphinx_id:fromhash(Hash),
+ [{hash1, [], [integer_to_list(H1)]},
+ {hash2, [], [integer_to_list(H2)]},
+ {hash3, [], [integer_to_list(H3)]},
+ {hash4, [], [integer_to_list(H4)]},
+ {hash5, [], [integer_to_list(H5)]}].
+
+files_name(Files) ->
+ lists:foldl(fun({Name, _}, Acc) ->
+ FName = filter_name(Name),
+ case length(Acc) > 0 of
+ true when length(FName) > 0 -> Acc ++ " " ++ FName;
+ true -> Acc;
+ false -> FName
+ end
+ end, [], Files).
+
+filter_name(Name) ->
+ case lists:prefix("_____padding_file", Name) of
+ true -> [];
+ false -> Name
+ end.
+
+with_cdata(Text) ->
+ "".
+
diff --git a/src/sphinx_builder/sphinx_id.erl b/src/sphinx_builder/sphinx_id.erl
new file mode 100644
index 0000000..e36d6df
--- /dev/null
+++ b/src/sphinx_builder/sphinx_id.erl
@@ -0,0 +1,18 @@
+%%
+%% sphinx_id.erl
+%% Kevin Lynx
+%%
+-module(sphinx_id).
+-export([fromhash/1, tohash/1]).
+-compile(export_all).
+
+fromhash(Hash) when is_list(Hash), length(Hash) == 40 ->
+ B = hex:hexstr_to_bin(Hash),
+ <> = B,
+ {H1, H2, H3, H4, H5}.
+
+
+tohash({H1, H2, H3, H4, H5}) ->
+ B = <>,
+ string:to_upper(hex:bin_to_hexstr(B)).
+
diff --git a/src/sphinx_builder/sphinx_torrent.erl b/src/sphinx_builder/sphinx_torrent.erl
new file mode 100644
index 0000000..ad1fab5
--- /dev/null
+++ b/src/sphinx_builder/sphinx_torrent.erl
@@ -0,0 +1,106 @@
+%%
+%% sphinx_torrent.erl
+%% Kevin Lynx
+%% 07.29.2013
+%%
+-module(sphinx_torrent).
+-include("vlog.hrl").
+-behaviour(gen_server).
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+-export([start_link/3, get/0, try_times/0]).
+-define(DBNAME, torrents).
+-define(COLLNAME, hashes).
+-define(POOLNAME, db_pool).
+-define(WAIT_TIME, 30*1000).
+-record(state, {offset = 0, max, try_times = 0, tors = []}).
+
+start_link(IP, Port, Offset) ->
+ gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []).
+
+get() ->
+ gen_server:call(srv_name(), get, infinity).
+
+try_times() ->
+ gen_server:call(srv_name(), try_times).
+
+%%
+srv_name() ->
+ sphinx_torrent_loader.
+
+init([IP, Port, Offset]) ->
+ Max = config:get(torrent_batch_count, 100),
+ mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}),
+ {ok, #state{offset = Offset, max = Max}, 0}.
+
+handle_cast(load, State) ->
+ #state{offset = Skip, max = Max, tors = Tors} = State,
+ Request = Max * 2 div 3,
+ ?T(?FMT("request next ~p torrents", [Request])),
+ LoadTors = do_load_torrents(Skip, Request),
+ case length(LoadTors) of
+ 0 -> timer:send_after(?WAIT_TIME, try_load);
+ _ -> ok
+ end,
+ ?T(?FMT("load ~p torrents", [length(LoadTors)])),
+ NewOffset = Skip + length(LoadTors),
+ {noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}};
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_info(try_load, State) ->
+ #state{offset = Skip, max = Max, try_times = Try} = State,
+ ?T(?FMT("try load ~p torrents from ~p", [Max, Skip])),
+ LoadTors = do_load_torrents(Skip, Max),
+ LoadCnt = length(LoadTors),
+ NewTry = case LoadCnt > 0 of
+ true -> 0;
+ false -> timer:send_after(?WAIT_TIME, try_load), Try + 1
+ end,
+ ?T(?FMT("load ~p torrents, try times ~p", [LoadCnt, NewTry])),
+ {noreply, State#state{offset = Skip + LoadCnt, tors = LoadTors, try_times = NewTry}};
+
+handle_info(timeout, State) ->
+ self() ! try_load,
+ {noreply, State}.
+
+handle_call(try_times, _From, #state{try_times = Try} = State) ->
+ {reply, Try, State};
+
+handle_call(get, _From, State) ->
+ #state{tors = Tors, max = Max} = State,
+ {Tor, NewTors} = case length(Tors) of
+ 0 -> {{}, []};
+ _ -> [H|Rest] = Tors, {H, Rest}
+ end,
+ try_load_next(NewTors, Max),
+ {reply, Tor, State#state{tors = NewTors}}.
+
+terminate(_, State) ->
+ mongo_sup:stop_pool(?POOLNAME),
+ {ok, State}.
+
+code_change(_, _, State) ->
+ {ok, State}.
+
+try_load_next([], _) ->
+ ?T("no torrents"),
+ skip;
+try_load_next(Tors, Max) when length(Tors) == Max div 3 ->
+ ?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])),
+ gen_server:cast(self(), load);
+try_load_next(_, _) ->
+ ok.
+
+do_load_torrents(Skip, Size) ->
+ Conn = mongo_pool:get(?POOLNAME),
+ mongo:do(safe, master, Conn, ?DBNAME, fun() ->
+ Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
+ mongo_cursor:take(Cursor, Size)
+ end).
+
diff --git a/src/sphinx_builder/sphinx_xml.erl b/src/sphinx_builder/sphinx_xml.erl
new file mode 100644
index 0000000..b01338d
--- /dev/null
+++ b/src/sphinx_builder/sphinx_xml.erl
@@ -0,0 +1,79 @@
+%%
+%% sphinx_xml.erl
+%% Kevin Lynx
+%% Write sphinx xml file
+%%
+-module(sphinx_xml).
+-behaviour(gen_server).
+-include("vlog.hrl").
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+-export([start_link/0, insert/1, force_save/0]).
+-record(state, {docs = [], max, startid = -1}).
+
+start_link() ->
+ gen_server:start_link({local, srv_name()}, ?MODULE, [], []).
+
+insert(Doc) ->
+ gen_server:cast(srv_name(), {insert, Doc}).
+
+force_save() ->
+ gen_server:cast(srv_name(), save).
+
+srv_name() ->
+ ?MODULE.
+
+%%
+init([]) ->
+ Max = config:get(max_doc_per_file, 1000),
+ {ok, #state{max = Max}}.
+
+terminate(_, State) ->
+ {ok, State}.
+
+code_change(_, _, State) ->
+ {ok, State}.
+
+handle_cast(save, #state{docs = Docs} = State) when length(Docs) > 0 ->
+ #state{startid = StartID} = State,
+ EndID = length(Docs) + StartID - 1,
+ try_save(Docs, 0, StartID, EndID),
+ {noreply, State#state{docs = []}};
+
+handle_cast({insert, {Hash, Name, Files, ID, Query, CreatedAt}}, State) ->
+ #state{docs = Docs, max = Max, startid = StartID} = State,
+ NewStartID = if length(Docs) == 0 -> ID; true -> StartID end,
+ Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt),
+ NewDocs = try_save([Doc|Docs], Max, NewStartID, ID),
+ {noreply, State#state{docs = NewDocs, startid = NewStartID}};
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_call(_, _From, State) ->
+ {noreply, State}.
+
+handle_info(_, State) ->
+ {noreply, State}.
+
+try_save(Docs, Max, StartID, NowID) when length(Docs) >= Max, length(Docs) > 0 ->
+ File = config:get(delta_source_file),
+ Conf = config:get(sphinx_config_file),
+ Delta = config:get(delta_index_name),
+ Main = config:get(main_index_name),
+ io:format("sync sphinx index ~p documents...", [length(Docs)]),
+ ?I(?FMT("save sphinx xml file ~s", [File])),
+ sphinx_doc:write_xml(File, Docs),
+ ?I(?FMT("build delta index : ~s", [Delta])),
+ sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, NowID),
+ ?I(?FMT("merge delta to main index ~s -> ~s", [Delta, Main])),
+ sphinx_cmd:merge_index(Main, Delta, Conf),
+ ?I("index updated done"),
+ io:format("done~n", []),
+ [];
+try_save(Docs, _, _, _) ->
+ Docs.