From f242d4e44f348500c7add4f93b4806c4719d7914 Mon Sep 17 00:00:00 2001 From: Kevin Lynx Date: Mon, 29 Jul 2013 23:03:39 +0800 Subject: [PATCH] add sphinx support, in expirment status right now --- Emakefile | 8 ++ src/common/hex.erl | 34 ++++++ src/common/string_util.erl | 10 ++ src/common/time_util.erl | 16 ++- src/common/xmerl_xml_cdata.erl | 69 ++++++++++++ src/http_front/db_frontend.erl | 5 + src/http_front/http_handler.erl | 19 ++++ src/http_front/sphinx_search.erl | 38 +++++++ src/sphinx_builder/sphinx_builder.erl | 122 ++++++++++++++++++++++ src/sphinx_builder/sphinx_builder_sup.erl | 52 +++++++++ src/sphinx_builder/sphinx_cmd.erl | 29 +++++ src/sphinx_builder/sphinx_doc.erl | 69 ++++++++++++ src/sphinx_builder/sphinx_id.erl | 18 ++++ src/sphinx_builder/sphinx_torrent.erl | 106 +++++++++++++++++++ src/sphinx_builder/sphinx_xml.erl | 79 ++++++++++++++ 15 files changed, 673 insertions(+), 1 deletion(-) create mode 100644 src/common/hex.erl create mode 100644 src/common/string_util.erl create mode 100644 src/common/xmerl_xml_cdata.erl create mode 100644 src/http_front/sphinx_search.erl create mode 100644 src/sphinx_builder/sphinx_builder.erl create mode 100644 src/sphinx_builder/sphinx_builder_sup.erl create mode 100644 src/sphinx_builder/sphinx_cmd.erl create mode 100644 src/sphinx_builder/sphinx_doc.erl create mode 100644 src/sphinx_builder/sphinx_id.erl create mode 100644 src/sphinx_builder/sphinx_torrent.erl create mode 100644 src/sphinx_builder/sphinx_xml.erl diff --git a/Emakefile b/Emakefile index e195d9b..2b049cd 100644 --- a/Emakefile +++ b/Emakefile @@ -38,3 +38,11 @@ [debug_info, {i, "include"}, {outdir,"ebin"}]}. +{'src/utils/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/sphinx_builder/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. diff --git a/src/common/hex.erl b/src/common/hex.erl new file mode 100644 index 0000000..b28af29 --- /dev/null +++ b/src/common/hex.erl @@ -0,0 +1,34 @@ +-module(hex). +-export([bin_to_hexstr/1,hexstr_to_bin/1]). + +hex(N) when N < 10 -> + $0+N; +hex(N) when N >= 10, N < 16 -> + $a+(N-10). + +int(C) when $0 =< C, C =< $9 -> + C - $0; +int(C) when $A =< C, C =< $F -> + C - $A + 10; +int(C) when $a =< C, C =< $f -> + C - $a + 10. + +to_hex(N) when N < 256 -> + [hex(N div 16), hex(N rem 16)]. + +list_to_hexstr([]) -> + []; +list_to_hexstr([H|T]) -> + to_hex(H) ++ list_to_hexstr(T). + +bin_to_hexstr(Bin) -> + list_to_hexstr(binary_to_list(Bin)). + +hexstr_to_bin(S) -> + list_to_binary(hexstr_to_list(S)). + +hexstr_to_list([X,Y|T]) -> + [int(X)*16 + int(Y) | hexstr_to_list(T)]; +hexstr_to_list([]) -> + []. + \ No newline at end of file diff --git a/src/common/string_util.erl b/src/common/string_util.erl new file mode 100644 index 0000000..7a599dd --- /dev/null +++ b/src/common/string_util.erl @@ -0,0 +1,10 @@ +%% +%% string_util.erl +%% Kevin Lynx +%% +-module(string_util). +-export([format/2]). + +format(Fmt, Arg) when is_list(Fmt), is_list(Arg) -> + lists:flatten(io_lib:format(Fmt, Arg)). + diff --git a/src/common/time_util.erl b/src/common/time_util.erl index a2a1f4d..a85fb88 100644 --- a/src/common/time_util.erl +++ b/src/common/time_util.erl @@ -9,7 +9,9 @@ seconds_to_local_time/1, local_time_to_universal_time/1, now_utc_time/0, - now_day_seconds/0]). + now_day_seconds/0, + date_time_string/0, + date_time_stamp/0]). -compile(export_all). diff_milsecs(T1, T2) -> @@ -38,3 +40,15 @@ local_time_to_universal_time(Datetime) -> now_utc_time() -> local_time_to_universal_time(calendar:local_time()). + +date_time_string() -> + {{Y, M, D}, {H, Min, Sec}} = calendar:local_time(), + L = io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", + [Y, M, D, H, Min, Sec]), + lists:flatten(L). + +date_time_stamp() -> + {{Y, M, D}, {H, Min, Sec}} = calendar:local_time(), + L = io_lib:format("~b~2..0b~2..0b~2..0b~2..0b~2..0b", + [Y, M, D, H, Min, Sec]), + lists:flatten(L). diff --git a/src/common/xmerl_xml_cdata.erl b/src/common/xmerl_xml_cdata.erl new file mode 100644 index 0000000..9bab876 --- /dev/null +++ b/src/common/xmerl_xml_cdata.erl @@ -0,0 +1,69 @@ +%% +%% xmerl_xml_cdata.erl +%% Kevin Lynx +%% To fix erlang xmerl_xml not support `cdata' bug, damn it! +%% +-module(xmerl_xml_cdata). +-export(['#xml-inheritance#'/0]). +-export(['#root#'/4, + '#element#'/5, + '#text#'/1]). +-import(xmerl_lib, [markup/3, empty_tag/2]). +-include_lib("xmerl/include/xmerl.hrl"). + + +'#xml-inheritance#'() -> []. + + +%% The '#text#' function is called for every text segment. + +'#text#'(Text) -> +%io:format("Text=~p~n",[Text]), + export_text(Text). + + +%% The '#root#' tag is called when the entire structure has been +%% exported. It does not appear in the structure itself. + +'#root#'(Data, [#xmlAttribute{name=prolog,value=V}], [], _E) -> + [V,Data]; +'#root#'(Data, _Attrs, [], _E) -> + ["", Data]. + + +%% The '#element#' function is the default handler for XML elements. + +'#element#'(Tag, [], Attrs, _Parents, _E) -> +%io:format("Empty Tag=~p~n",[Tag]), + empty_tag(Tag, Attrs); +'#element#'(Tag, Data, Attrs, _Parents, _E) -> +%io:format("Tag=~p~n",[Tag]), + markup(Tag, Attrs, Data). + +export_text(T) -> + export_text(T, []). + +export_text([$' | T], Cont) -> + "'" ++ export_text(T, Cont); +export_text([$" | T], Cont) -> + """ ++ export_text(T, Cont); +export_text([$< | T] = Text, Cont) -> + case (lists:prefix("", Text)) of + true -> Text; + false -> + "<" ++ export_text(T, Cont) + end; +export_text([$> | T], Cont) -> + ">" ++ export_text(T, Cont); +export_text([$& | T], Cont) -> + "&" ++ export_text(T, Cont); +export_text([C | T], Cont) when is_integer(C) -> + [C | export_text(T, Cont)]; +export_text([T | T1], Cont) -> + export_text(T, [T1 | Cont]); +export_text([], [T | Cont]) -> + export_text(T, Cont); +export_text([], []) -> + []; +export_text(Bin, Cont) -> + export_text(binary_to_list(Bin), Cont). diff --git a/src/http_front/db_frontend.erl b/src/http_front/db_frontend.erl index 8d047f5..bf16720 100644 --- a/src/http_front/db_frontend.erl +++ b/src/http_front/db_frontend.erl @@ -6,6 +6,7 @@ -module(db_frontend). -export([start/3, search/1, + search_by_sphinx/1, today_top/0, search_one/1, stats/0, @@ -27,6 +28,10 @@ search(Keyword) -> Conn = mongo_pool:get(?DB_POOLNAME), db_store_mongo:search(Conn, Keyword). +search_by_sphinx(Keyword) -> + Conn = mongo_pool:get(?DB_POOLNAME), + sphinx_search:search(Conn, Keyword). + today_top() -> Conn = mongo_pool:get(?DB_POOLNAME), DaySecs = time_util:now_day_seconds(), diff --git a/src/http_front/http_handler.erl b/src/http_front/http_handler.erl index 2535395..46ebbb2 100644 --- a/src/http_front/http_handler.erl +++ b/src/http_front/http_handler.erl @@ -5,6 +5,7 @@ %% -module(http_handler). -export([search/3, + sphinx_search/3, test_search/1, index/3, stats/3, @@ -29,6 +30,18 @@ search(SessionID, Env, Input) -> Response = simple_html(K, Body), mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). +sphinx_search(SessionID, Env, Input) -> + {K, Body} = case http_common:get_search_keyword(Input) of + [] -> + {"", "invalid input"}; + Key -> + US = http_common:list_to_utf_binary(Key), + ?LOG_STR(?INFO, ?FMT("remote ~p search /~s/", [http_common:remote_addr(Env), US])), + {Key, do_search_sphinx(Key)} + end, + Response = simple_html(K, Body), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + top(SessionID, _Env, _Input) -> Rets = db_frontend:all_top(), BodyList = format_search_result(Rets), @@ -99,6 +112,12 @@ do_search(Keyword) -> Body = ?TEXT("
    ~s
", [lists:flatten(BodyList)]), Tip ++ Body. +do_search_sphinx(Keyword) -> + Rets = db_frontend:search_by_sphinx(Keyword), + BodyList = format_search_result(Rets), + Body = ?TEXT("
    ~s
", [lists:flatten(BodyList)]), + Body. + format_search_result(RetList) -> [format_one_result(Result, false) || Result <- RetList]. diff --git a/src/http_front/sphinx_search.erl b/src/http_front/sphinx_search.erl new file mode 100644 index 0000000..bd19967 --- /dev/null +++ b/src/http_front/sphinx_search.erl @@ -0,0 +1,38 @@ +%% +%% sphinx_search.erl +%% Kevin Lynx +%% 07.28.2013 +%% +-module(sphinx_search). +-export([init/0, search/2]). +-define(PORT, 9312). +-define(INDEX, "xml"). +-define(PAGECNT, 10). + +init() -> + code:add_path("deps/giza/ebin"). + %application:start(giza). + +search(Conn, Key) -> + Q1 = giza_query:new(?INDEX, Key), + Q2 = giza_query:port(Q1, ?PORT), + {ok, Ret} = giza_request:send(Q2), + decode_search_ret(Conn, Ret). + +decode_search_ret(Conn, Ret) -> + Hashes = [translate_hash(Item) || Item <- Ret], + [db_store_mongo:index(Conn, Hash) || Hash <- Hashes]. + +translate_hash({_DocID, Item}) -> + Attrs = proplists:get_value(attrs, Item), + H1 = proplists:get_value(<<"hash1">>, Attrs), + H2 = proplists:get_value(<<"hash2">>, Attrs), + H3 = proplists:get_value(<<"hash3">>, Attrs), + H4 = proplists:get_value(<<"hash4">>, Attrs), + H5 = proplists:get_value(<<"hash5">>, Attrs), + Hash = sphinx_id:tohash({H1, H2, H3, H4, H5}), + 40 = length(Hash), + Hash. + + + diff --git a/src/sphinx_builder/sphinx_builder.erl b/src/sphinx_builder/sphinx_builder.erl new file mode 100644 index 0000000..4a2ac3f --- /dev/null +++ b/src/sphinx_builder/sphinx_builder.erl @@ -0,0 +1,122 @@ +%% +%% sphinx_builder.erl +%% Kevin Lynx +%% 07.29.2013 +%% +-module(sphinx_builder). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/3]). +-export([worker_run/0]). +-record(state, {processed = 0, worker_cnt, wait_workers = []}). +-define(WORKER_WAIT, 30*1000). +-define(STATE_FILE, "priv/sphinx_builder.sta"). + +start_link(IP, Port, Count) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count], []). + +srv_name() -> + ?MODULE. + +init([IP, Port, WorkerCnt]) -> + ?I(?FMT("spawn ~p workers", [WorkerCnt])), + [spawn_link(?MODULE, worker_run, []) || _ <- lists:seq(1, WorkerCnt)], + Offset = load_result(), + sphinx_torrent:start_link(IP, Port, Offset), + {ok, #state{processed = Offset, worker_cnt = WorkerCnt}}. + +handle_call({get, Pid}, _From, State) -> + #state{processed = Processed, worker_cnt = WaitCnt, wait_workers = WaitWorkers} = State, + {NewProcessed, Ret} = case sphinx_torrent:get() of + {} -> + {Processed, wait}; + Tor -> + check_progress(Processed + 1), + {Processed + 1, Tor} + end, + NewWaits = update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers), + check_all_done(NewWaits, WaitCnt, NewProcessed, length(NewWaits) > length(WaitWorkers)), + {reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits}}. + +handle_cast(_, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers) -> + case lists:member(Pid, WaitWorkers) of + true when NewProcessed > Processed -> + lists:delete(Pid, WaitWorkers); + false when NewProcessed == Processed -> + [Pid|WaitWorkers]; + _ -> + WaitWorkers + end. + +check_all_done(WaitWorkers, WaitCnt, Processed, true) +when length(WaitWorkers) == WaitCnt -> + Try = sphinx_torrent:try_times(), + case Try > 5 of + true -> + io:format("haven't got any torrents for a while, force save~n", []), + save_result(Processed), + sphinx_xml:force_save(); + false -> + ok + end; +check_all_done(_WaitWorkers, _WaitCnt, _Processed, _) -> + ok. + +worker_run() -> + Ret = gen_server:call(srv_name(), {get, self()}), + do_process(Ret), + worker_run(). + +do_process({_, wait}) -> + ?T(?FMT("worker ~p sleep ~p ms", [self(), ?WORKER_WAIT])), + timer:sleep(?WORKER_WAIT); +do_process({ID, Doc}) -> + case db_store_mongo:decode_torrent_item(Doc) of + {single, Hash, {Name, _}, Query, CreatedAt} -> + sphinx_xml:insert({Hash, Name, [], ID, Query, CreatedAt}); + {multi, Hash, {Name, Files}, Query, CreatedAt} -> + sphinx_xml:insert({Hash, Name, Files, ID, Query, CreatedAt}) + end. + +load_result() -> + case file:consult(?STATE_FILE) of + {error, _Reason} -> + io:format("start a new processing~n", []), + 0; + {ok, [Ret]} -> + Sum = proplists:get_value(processed, Ret), + io:format("continue to process from ~p~n", [Sum]), + Sum + end. + +save_result(Sum) -> + Ret = [{processed, Sum}], + io:format("save result ~p~n", [Sum]), + file:write_file(?STATE_FILE, io_lib:fwrite("~p.\n",[Ret])). + +check_progress(Sum) -> + case (Sum rem 500 == 0) and (Sum > 0) of + true -> + save_result(Sum), + io:format(" -> ~p~n", [Sum]); + false -> + ok + end. diff --git a/src/sphinx_builder/sphinx_builder_sup.erl b/src/sphinx_builder/sphinx_builder_sup.erl new file mode 100644 index 0000000..194499d --- /dev/null +++ b/src/sphinx_builder/sphinx_builder_sup.erl @@ -0,0 +1,52 @@ +%% +%% sphinx_builder_sup.erl +%% Kevin Lynx +%% 07.28.2013 +%% +-module(sphinx_builder_sup). +-behaviour(supervisor). +-export([start_standalone/1, start_standalone/3, start_link/3]). +-export([init/1]). + +start_dep_apps() -> + code:add_path("deps/bson/ebin"), + code:add_path("deps/mongodb/ebin"), + Apps = [asn1, crypto, public_key, ssl, inets, xmerl, bson, mongodb], + [application:start(App) || App <- Apps]. + +start_standalone([IP, Port, Count]) -> + IPort = list_to_integer(Port), + ISize = list_to_integer(Count), + start_standalone(IP, IPort, ISize), + receive + fuck_erl_s_option -> ok + end. + +start_standalone(IP, Port, Size) -> + start_dep_apps(), + start_link(IP, Port, Size). + +start_link(IP, Port, Count) -> + supervisor:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count]). + +%% +srv_name() -> + ?MODULE. + +init([IP, Port, Count]) -> + Spec = {one_for_one, 1, 600}, + config:start_link("sphinx_builder.config", fun() -> config_default() end), + Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]}, + Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]}, + Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]}, + Children = [Logger, Builder, Indexer], + {ok, {Spec, Children}}. + +config_default() -> + [{load_torrent_interval, 500}, % millseconds + {max_doc_per_file, 1000}, + {torrent_batch_count, 100}, + {delta_source_file, "var/source/delta.xml"}, + {sphinx_config_file, "var/etc/csft.conf"}, + {delta_index_name, "delta"}, + {main_index_name, "main"}]. diff --git a/src/sphinx_builder/sphinx_cmd.erl b/src/sphinx_builder/sphinx_cmd.erl new file mode 100644 index 0000000..cb5a13d --- /dev/null +++ b/src/sphinx_builder/sphinx_cmd.erl @@ -0,0 +1,29 @@ +%% +%% sphinx_cmd.erl +%% Kevin Lynx +%% 07.28.2013 +%% +-module(sphinx_cmd). +-export([build_delta_index/5, merge_index/3]). +-compile(export_all). +-include("vlog.hrl"). + +% Index file, Delta index name +build_delta_index(IndexFile, Delta, CfgFile, MinID, MaxID) -> + Cmd = "indexer -c " ++ CfgFile ++ " --rotate " ++ Delta, + Res = os:cmd(Cmd), + Dest = backup_delta_file(Delta, MinID, MaxID, IndexFile), + ?I(?FMT("command `~s' result on ~s~n" ++ Res, [Cmd, Dest])). + +merge_index(Main, Delta, CfgFile) -> + Cmd = string_util:format("indexer -c " ++ CfgFile ++ " --merge ~s ~s --rotate", + [Main, Delta]), + Res = os:cmd(Cmd), + ?I(?FMT("command `~s' result~n" ++ Res, [Cmd])). + +backup_delta_file(Delta, MinID, MaxID, IndexFile) -> + Path = filename:dirname(IndexFile), + Dest = string_util:format(Path ++ "/" ++ Delta ++ "[~b-~b]" ++ ".xml", + [MinID, MaxID]), + file:copy(IndexFile, Dest), + Dest. diff --git a/src/sphinx_builder/sphinx_doc.erl b/src/sphinx_builder/sphinx_doc.erl new file mode 100644 index 0000000..66ce6fe --- /dev/null +++ b/src/sphinx_builder/sphinx_doc.erl @@ -0,0 +1,69 @@ +%% +%% sphinx_doc.erl +%% 07.27.2013 +%% +-module(sphinx_doc). +-include_lib("xmerl/include/xmerl.hrl"). +-export([write_xml/2, element/6]). +-compile(export_all). +-define(PROLOG, ""). +-define(CR, #xmlText{value="\ + "}). + +write_xml(File, Elems) -> + Doc = {'sphinx:docset', [], [schema(), ?CR] ++ Elems}, + Content = xmerl:export_simple([Doc], xmerl_xml_cdata, [{prolog, ?PROLOG}]), + file:write_file(File, Content). + +element(Hash, Name, Files, ID, Query, CreatedAt) when is_list(Hash), length(Hash) == 40 -> + IDSet = idset(Hash), + OtherAttrs = [{query, [integer_to_list(Query)]}, {created_at, integer_to_list(CreatedAt)}], + EleSubject = {subject, [], [with_cdata(Name)]}, + EleFiles = {files, [], [with_cdata(files_name(Files))]}, + {'sphinx:document', [{id, integer_to_list(ID)}], IDSet ++ OtherAttrs ++ [EleSubject, EleFiles, ?CR]}. + +schema() -> + HashIds = [attr_int32("hash1"), attr_int32("hash2"), attr_int32("hash3"), + attr_int32("hash4"), attr_int32("hash5")], + Other = [attr_int32("query"), attr("created_at", "timestamp")], + {'sphinx:schema', [field("subject"), field("files")] ++ HashIds ++ Other}. + +field(Name) -> + {'sphinx:field', [{name, [Name]}], []}. + +attr(Name, Type) -> + {'sphinx:attr', [{name, [Name]}, {type, [Type]}], []}. + +attr_int32(Name) -> + attr(Name, "int", "32"). + +attr(Name, Type, Bits) -> + {'sphinx:attr', [{name, [Name]}, {type, [Type]}, {bits, [Bits]}], []}. + +idset(Hash) -> + {H1, H2, H3, H4, H5} = sphinx_id:fromhash(Hash), + [{hash1, [], [integer_to_list(H1)]}, + {hash2, [], [integer_to_list(H2)]}, + {hash3, [], [integer_to_list(H3)]}, + {hash4, [], [integer_to_list(H4)]}, + {hash5, [], [integer_to_list(H5)]}]. + +files_name(Files) -> + lists:foldl(fun({Name, _}, Acc) -> + FName = filter_name(Name), + case length(Acc) > 0 of + true when length(FName) > 0 -> Acc ++ " " ++ FName; + true -> Acc; + false -> FName + end + end, [], Files). + +filter_name(Name) -> + case lists:prefix("_____padding_file", Name) of + true -> []; + false -> Name + end. + +with_cdata(Text) -> + "". + diff --git a/src/sphinx_builder/sphinx_id.erl b/src/sphinx_builder/sphinx_id.erl new file mode 100644 index 0000000..e36d6df --- /dev/null +++ b/src/sphinx_builder/sphinx_id.erl @@ -0,0 +1,18 @@ +%% +%% sphinx_id.erl +%% Kevin Lynx +%% +-module(sphinx_id). +-export([fromhash/1, tohash/1]). +-compile(export_all). + +fromhash(Hash) when is_list(Hash), length(Hash) == 40 -> + B = hex:hexstr_to_bin(Hash), + <> = B, + {H1, H2, H3, H4, H5}. + + +tohash({H1, H2, H3, H4, H5}) -> + B = <>, + string:to_upper(hex:bin_to_hexstr(B)). + diff --git a/src/sphinx_builder/sphinx_torrent.erl b/src/sphinx_builder/sphinx_torrent.erl new file mode 100644 index 0000000..ad1fab5 --- /dev/null +++ b/src/sphinx_builder/sphinx_torrent.erl @@ -0,0 +1,106 @@ +%% +%% sphinx_torrent.erl +%% Kevin Lynx +%% 07.29.2013 +%% +-module(sphinx_torrent). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/3, get/0, try_times/0]). +-define(DBNAME, torrents). +-define(COLLNAME, hashes). +-define(POOLNAME, db_pool). +-define(WAIT_TIME, 30*1000). +-record(state, {offset = 0, max, try_times = 0, tors = []}). + +start_link(IP, Port, Offset) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []). + +get() -> + gen_server:call(srv_name(), get, infinity). + +try_times() -> + gen_server:call(srv_name(), try_times). + +%% +srv_name() -> + sphinx_torrent_loader. + +init([IP, Port, Offset]) -> + Max = config:get(torrent_batch_count, 100), + mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}), + {ok, #state{offset = Offset, max = Max}, 0}. + +handle_cast(load, State) -> + #state{offset = Skip, max = Max, tors = Tors} = State, + Request = Max * 2 div 3, + ?T(?FMT("request next ~p torrents", [Request])), + LoadTors = do_load_torrents(Skip, Request), + case length(LoadTors) of + 0 -> timer:send_after(?WAIT_TIME, try_load); + _ -> ok + end, + ?T(?FMT("load ~p torrents", [length(LoadTors)])), + NewOffset = Skip + length(LoadTors), + {noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_info(try_load, State) -> + #state{offset = Skip, max = Max, try_times = Try} = State, + ?T(?FMT("try load ~p torrents from ~p", [Max, Skip])), + LoadTors = do_load_torrents(Skip, Max), + LoadCnt = length(LoadTors), + NewTry = case LoadCnt > 0 of + true -> 0; + false -> timer:send_after(?WAIT_TIME, try_load), Try + 1 + end, + ?T(?FMT("load ~p torrents, try times ~p", [LoadCnt, NewTry])), + {noreply, State#state{offset = Skip + LoadCnt, tors = LoadTors, try_times = NewTry}}; + +handle_info(timeout, State) -> + self() ! try_load, + {noreply, State}. + +handle_call(try_times, _From, #state{try_times = Try} = State) -> + {reply, Try, State}; + +handle_call(get, _From, State) -> + #state{tors = Tors, max = Max} = State, + {Tor, NewTors} = case length(Tors) of + 0 -> {{}, []}; + _ -> [H|Rest] = Tors, {H, Rest} + end, + try_load_next(NewTors, Max), + {reply, Tor, State#state{tors = NewTors}}. + +terminate(_, State) -> + mongo_sup:stop_pool(?POOLNAME), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +try_load_next([], _) -> + ?T("no torrents"), + skip; +try_load_next(Tors, Max) when length(Tors) == Max div 3 -> + ?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])), + gen_server:cast(self(), load); +try_load_next(_, _) -> + ok. + +do_load_torrents(Skip, Size) -> + Conn = mongo_pool:get(?POOLNAME), + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + Cursor = mongo:find(?COLLNAME, {}, {}, Skip), + mongo_cursor:take(Cursor, Size) + end). + diff --git a/src/sphinx_builder/sphinx_xml.erl b/src/sphinx_builder/sphinx_xml.erl new file mode 100644 index 0000000..b01338d --- /dev/null +++ b/src/sphinx_builder/sphinx_xml.erl @@ -0,0 +1,79 @@ +%% +%% sphinx_xml.erl +%% Kevin Lynx +%% Write sphinx xml file +%% +-module(sphinx_xml). +-behaviour(gen_server). +-include("vlog.hrl"). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/0, insert/1, force_save/0]). +-record(state, {docs = [], max, startid = -1}). + +start_link() -> + gen_server:start_link({local, srv_name()}, ?MODULE, [], []). + +insert(Doc) -> + gen_server:cast(srv_name(), {insert, Doc}). + +force_save() -> + gen_server:cast(srv_name(), save). + +srv_name() -> + ?MODULE. + +%% +init([]) -> + Max = config:get(max_doc_per_file, 1000), + {ok, #state{max = Max}}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast(save, #state{docs = Docs} = State) when length(Docs) > 0 -> + #state{startid = StartID} = State, + EndID = length(Docs) + StartID - 1, + try_save(Docs, 0, StartID, EndID), + {noreply, State#state{docs = []}}; + +handle_cast({insert, {Hash, Name, Files, ID, Query, CreatedAt}}, State) -> + #state{docs = Docs, max = Max, startid = StartID} = State, + NewStartID = if length(Docs) == 0 -> ID; true -> StartID end, + Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt), + NewDocs = try_save([Doc|Docs], Max, NewStartID, ID), + {noreply, State#state{docs = NewDocs, startid = NewStartID}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + +try_save(Docs, Max, StartID, NowID) when length(Docs) >= Max, length(Docs) > 0 -> + File = config:get(delta_source_file), + Conf = config:get(sphinx_config_file), + Delta = config:get(delta_index_name), + Main = config:get(main_index_name), + io:format("sync sphinx index ~p documents...", [length(Docs)]), + ?I(?FMT("save sphinx xml file ~s", [File])), + sphinx_doc:write_xml(File, Docs), + ?I(?FMT("build delta index : ~s", [Delta])), + sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, NowID), + ?I(?FMT("merge delta to main index ~s -> ~s", [Delta, Main])), + sphinx_cmd:merge_index(Main, Delta, Conf), + ?I("index updated done"), + io:format("done~n", []), + []; +try_save(Docs, _, _, _) -> + Docs.