mirror of
https://github.com/btdig/dhtcrawler2.git
synced 2025-01-31 02:21:37 +00:00
add sphinx support, in expirment status right now
This commit is contained in:
parent
b961dc9c46
commit
f242d4e44f
@ -38,3 +38,11 @@
|
||||
[debug_info,
|
||||
{i, "include"},
|
||||
{outdir,"ebin"}]}.
|
||||
{'src/utils/*',
|
||||
[debug_info,
|
||||
{i, "include"},
|
||||
{outdir,"ebin"}]}.
|
||||
{'src/sphinx_builder/*',
|
||||
[debug_info,
|
||||
{i, "include"},
|
||||
{outdir,"ebin"}]}.
|
||||
|
34
src/common/hex.erl
Normal file
34
src/common/hex.erl
Normal file
@ -0,0 +1,34 @@
|
||||
-module(hex).
|
||||
-export([bin_to_hexstr/1,hexstr_to_bin/1]).
|
||||
|
||||
hex(N) when N < 10 ->
|
||||
$0+N;
|
||||
hex(N) when N >= 10, N < 16 ->
|
||||
$a+(N-10).
|
||||
|
||||
int(C) when $0 =< C, C =< $9 ->
|
||||
C - $0;
|
||||
int(C) when $A =< C, C =< $F ->
|
||||
C - $A + 10;
|
||||
int(C) when $a =< C, C =< $f ->
|
||||
C - $a + 10.
|
||||
|
||||
to_hex(N) when N < 256 ->
|
||||
[hex(N div 16), hex(N rem 16)].
|
||||
|
||||
list_to_hexstr([]) ->
|
||||
[];
|
||||
list_to_hexstr([H|T]) ->
|
||||
to_hex(H) ++ list_to_hexstr(T).
|
||||
|
||||
bin_to_hexstr(Bin) ->
|
||||
list_to_hexstr(binary_to_list(Bin)).
|
||||
|
||||
hexstr_to_bin(S) ->
|
||||
list_to_binary(hexstr_to_list(S)).
|
||||
|
||||
hexstr_to_list([X,Y|T]) ->
|
||||
[int(X)*16 + int(Y) | hexstr_to_list(T)];
|
||||
hexstr_to_list([]) ->
|
||||
[].
|
||||
|
10
src/common/string_util.erl
Normal file
10
src/common/string_util.erl
Normal file
@ -0,0 +1,10 @@
|
||||
%%
|
||||
%% string_util.erl
|
||||
%% Kevin Lynx
|
||||
%%
|
||||
-module(string_util).
|
||||
-export([format/2]).
|
||||
|
||||
format(Fmt, Arg) when is_list(Fmt), is_list(Arg) ->
|
||||
lists:flatten(io_lib:format(Fmt, Arg)).
|
||||
|
@ -9,7 +9,9 @@
|
||||
seconds_to_local_time/1,
|
||||
local_time_to_universal_time/1,
|
||||
now_utc_time/0,
|
||||
now_day_seconds/0]).
|
||||
now_day_seconds/0,
|
||||
date_time_string/0,
|
||||
date_time_stamp/0]).
|
||||
-compile(export_all).
|
||||
|
||||
diff_milsecs(T1, T2) ->
|
||||
@ -38,3 +40,15 @@ local_time_to_universal_time(Datetime) ->
|
||||
|
||||
now_utc_time() ->
|
||||
local_time_to_universal_time(calendar:local_time()).
|
||||
|
||||
date_time_string() ->
|
||||
{{Y, M, D}, {H, Min, Sec}} = calendar:local_time(),
|
||||
L = io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b",
|
||||
[Y, M, D, H, Min, Sec]),
|
||||
lists:flatten(L).
|
||||
|
||||
date_time_stamp() ->
|
||||
{{Y, M, D}, {H, Min, Sec}} = calendar:local_time(),
|
||||
L = io_lib:format("~b~2..0b~2..0b~2..0b~2..0b~2..0b",
|
||||
[Y, M, D, H, Min, Sec]),
|
||||
lists:flatten(L).
|
||||
|
69
src/common/xmerl_xml_cdata.erl
Normal file
69
src/common/xmerl_xml_cdata.erl
Normal file
@ -0,0 +1,69 @@
|
||||
%%
|
||||
%% xmerl_xml_cdata.erl
|
||||
%% Kevin Lynx
|
||||
%% To fix erlang xmerl_xml not support `cdata' bug, damn it!
|
||||
%%
|
||||
-module(xmerl_xml_cdata).
|
||||
-export(['#xml-inheritance#'/0]).
|
||||
-export(['#root#'/4,
|
||||
'#element#'/5,
|
||||
'#text#'/1]).
|
||||
-import(xmerl_lib, [markup/3, empty_tag/2]).
|
||||
-include_lib("xmerl/include/xmerl.hrl").
|
||||
|
||||
|
||||
'#xml-inheritance#'() -> [].
|
||||
|
||||
|
||||
%% The '#text#' function is called for every text segment.
|
||||
|
||||
'#text#'(Text) ->
|
||||
%io:format("Text=~p~n",[Text]),
|
||||
export_text(Text).
|
||||
|
||||
|
||||
%% The '#root#' tag is called when the entire structure has been
|
||||
%% exported. It does not appear in the structure itself.
|
||||
|
||||
'#root#'(Data, [#xmlAttribute{name=prolog,value=V}], [], _E) ->
|
||||
[V,Data];
|
||||
'#root#'(Data, _Attrs, [], _E) ->
|
||||
["<?xml version=\"1.0\"?>", Data].
|
||||
|
||||
|
||||
%% The '#element#' function is the default handler for XML elements.
|
||||
|
||||
'#element#'(Tag, [], Attrs, _Parents, _E) ->
|
||||
%io:format("Empty Tag=~p~n",[Tag]),
|
||||
empty_tag(Tag, Attrs);
|
||||
'#element#'(Tag, Data, Attrs, _Parents, _E) ->
|
||||
%io:format("Tag=~p~n",[Tag]),
|
||||
markup(Tag, Attrs, Data).
|
||||
|
||||
export_text(T) ->
|
||||
export_text(T, []).
|
||||
|
||||
export_text([$' | T], Cont) ->
|
||||
"'" ++ export_text(T, Cont);
|
||||
export_text([$" | T], Cont) ->
|
||||
""" ++ export_text(T, Cont);
|
||||
export_text([$< | T] = Text, Cont) ->
|
||||
case (lists:prefix("<![CDATA[", Text)) and (lists:suffix("]]>", Text)) of
|
||||
true -> Text;
|
||||
false ->
|
||||
"<" ++ export_text(T, Cont)
|
||||
end;
|
||||
export_text([$> | T], Cont) ->
|
||||
">" ++ export_text(T, Cont);
|
||||
export_text([$& | T], Cont) ->
|
||||
"&" ++ export_text(T, Cont);
|
||||
export_text([C | T], Cont) when is_integer(C) ->
|
||||
[C | export_text(T, Cont)];
|
||||
export_text([T | T1], Cont) ->
|
||||
export_text(T, [T1 | Cont]);
|
||||
export_text([], [T | Cont]) ->
|
||||
export_text(T, Cont);
|
||||
export_text([], []) ->
|
||||
[];
|
||||
export_text(Bin, Cont) ->
|
||||
export_text(binary_to_list(Bin), Cont).
|
@ -6,6 +6,7 @@
|
||||
-module(db_frontend).
|
||||
-export([start/3,
|
||||
search/1,
|
||||
search_by_sphinx/1,
|
||||
today_top/0,
|
||||
search_one/1,
|
||||
stats/0,
|
||||
@ -27,6 +28,10 @@ search(Keyword) ->
|
||||
Conn = mongo_pool:get(?DB_POOLNAME),
|
||||
db_store_mongo:search(Conn, Keyword).
|
||||
|
||||
search_by_sphinx(Keyword) ->
|
||||
Conn = mongo_pool:get(?DB_POOLNAME),
|
||||
sphinx_search:search(Conn, Keyword).
|
||||
|
||||
today_top() ->
|
||||
Conn = mongo_pool:get(?DB_POOLNAME),
|
||||
DaySecs = time_util:now_day_seconds(),
|
||||
|
@ -5,6 +5,7 @@
|
||||
%%
|
||||
-module(http_handler).
|
||||
-export([search/3,
|
||||
sphinx_search/3,
|
||||
test_search/1,
|
||||
index/3,
|
||||
stats/3,
|
||||
@ -29,6 +30,18 @@ search(SessionID, Env, Input) ->
|
||||
Response = simple_html(K, Body),
|
||||
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
|
||||
|
||||
sphinx_search(SessionID, Env, Input) ->
|
||||
{K, Body} = case http_common:get_search_keyword(Input) of
|
||||
[] ->
|
||||
{"", "invalid input"};
|
||||
Key ->
|
||||
US = http_common:list_to_utf_binary(Key),
|
||||
?LOG_STR(?INFO, ?FMT("remote ~p search /~s/", [http_common:remote_addr(Env), US])),
|
||||
{Key, do_search_sphinx(Key)}
|
||||
end,
|
||||
Response = simple_html(K, Body),
|
||||
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
|
||||
|
||||
top(SessionID, _Env, _Input) ->
|
||||
Rets = db_frontend:all_top(),
|
||||
BodyList = format_search_result(Rets),
|
||||
@ -99,6 +112,12 @@ do_search(Keyword) ->
|
||||
Body = ?TEXT("<ol>~s</ol>", [lists:flatten(BodyList)]),
|
||||
Tip ++ Body.
|
||||
|
||||
do_search_sphinx(Keyword) ->
|
||||
Rets = db_frontend:search_by_sphinx(Keyword),
|
||||
BodyList = format_search_result(Rets),
|
||||
Body = ?TEXT("<ol>~s</ol>", [lists:flatten(BodyList)]),
|
||||
Body.
|
||||
|
||||
format_search_result(RetList) ->
|
||||
[format_one_result(Result, false) || Result <- RetList].
|
||||
|
||||
|
38
src/http_front/sphinx_search.erl
Normal file
38
src/http_front/sphinx_search.erl
Normal file
@ -0,0 +1,38 @@
|
||||
%%
|
||||
%% sphinx_search.erl
|
||||
%% Kevin Lynx
|
||||
%% 07.28.2013
|
||||
%%
|
||||
-module(sphinx_search).
|
||||
-export([init/0, search/2]).
|
||||
-define(PORT, 9312).
|
||||
-define(INDEX, "xml").
|
||||
-define(PAGECNT, 10).
|
||||
|
||||
init() ->
|
||||
code:add_path("deps/giza/ebin").
|
||||
%application:start(giza).
|
||||
|
||||
search(Conn, Key) ->
|
||||
Q1 = giza_query:new(?INDEX, Key),
|
||||
Q2 = giza_query:port(Q1, ?PORT),
|
||||
{ok, Ret} = giza_request:send(Q2),
|
||||
decode_search_ret(Conn, Ret).
|
||||
|
||||
decode_search_ret(Conn, Ret) ->
|
||||
Hashes = [translate_hash(Item) || Item <- Ret],
|
||||
[db_store_mongo:index(Conn, Hash) || Hash <- Hashes].
|
||||
|
||||
translate_hash({_DocID, Item}) ->
|
||||
Attrs = proplists:get_value(attrs, Item),
|
||||
H1 = proplists:get_value(<<"hash1">>, Attrs),
|
||||
H2 = proplists:get_value(<<"hash2">>, Attrs),
|
||||
H3 = proplists:get_value(<<"hash3">>, Attrs),
|
||||
H4 = proplists:get_value(<<"hash4">>, Attrs),
|
||||
H5 = proplists:get_value(<<"hash5">>, Attrs),
|
||||
Hash = sphinx_id:tohash({H1, H2, H3, H4, H5}),
|
||||
40 = length(Hash),
|
||||
Hash.
|
||||
|
||||
|
||||
|
122
src/sphinx_builder/sphinx_builder.erl
Normal file
122
src/sphinx_builder/sphinx_builder.erl
Normal file
@ -0,0 +1,122 @@
|
||||
%%
|
||||
%% sphinx_builder.erl
|
||||
%% Kevin Lynx
|
||||
%% 07.29.2013
|
||||
%%
|
||||
-module(sphinx_builder).
|
||||
-include("vlog.hrl").
|
||||
-behaviour(gen_server).
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
-export([start_link/3]).
|
||||
-export([worker_run/0]).
|
||||
-record(state, {processed = 0, worker_cnt, wait_workers = []}).
|
||||
-define(WORKER_WAIT, 30*1000).
|
||||
-define(STATE_FILE, "priv/sphinx_builder.sta").
|
||||
|
||||
start_link(IP, Port, Count) ->
|
||||
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count], []).
|
||||
|
||||
srv_name() ->
|
||||
?MODULE.
|
||||
|
||||
init([IP, Port, WorkerCnt]) ->
|
||||
?I(?FMT("spawn ~p workers", [WorkerCnt])),
|
||||
[spawn_link(?MODULE, worker_run, []) || _ <- lists:seq(1, WorkerCnt)],
|
||||
Offset = load_result(),
|
||||
sphinx_torrent:start_link(IP, Port, Offset),
|
||||
{ok, #state{processed = Offset, worker_cnt = WorkerCnt}}.
|
||||
|
||||
handle_call({get, Pid}, _From, State) ->
|
||||
#state{processed = Processed, worker_cnt = WaitCnt, wait_workers = WaitWorkers} = State,
|
||||
{NewProcessed, Ret} = case sphinx_torrent:get() of
|
||||
{} ->
|
||||
{Processed, wait};
|
||||
Tor ->
|
||||
check_progress(Processed + 1),
|
||||
{Processed + 1, Tor}
|
||||
end,
|
||||
NewWaits = update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers),
|
||||
check_all_done(NewWaits, WaitCnt, NewProcessed, length(NewWaits) > length(WaitWorkers)),
|
||||
{reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits}}.
|
||||
|
||||
handle_cast(_, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(_, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_, State) ->
|
||||
{ok, State}.
|
||||
|
||||
code_change(_, _, State) ->
|
||||
{ok, State}.
|
||||
|
||||
update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers) ->
|
||||
case lists:member(Pid, WaitWorkers) of
|
||||
true when NewProcessed > Processed ->
|
||||
lists:delete(Pid, WaitWorkers);
|
||||
false when NewProcessed == Processed ->
|
||||
[Pid|WaitWorkers];
|
||||
_ ->
|
||||
WaitWorkers
|
||||
end.
|
||||
|
||||
check_all_done(WaitWorkers, WaitCnt, Processed, true)
|
||||
when length(WaitWorkers) == WaitCnt ->
|
||||
Try = sphinx_torrent:try_times(),
|
||||
case Try > 5 of
|
||||
true ->
|
||||
io:format("haven't got any torrents for a while, force save~n", []),
|
||||
save_result(Processed),
|
||||
sphinx_xml:force_save();
|
||||
false ->
|
||||
ok
|
||||
end;
|
||||
check_all_done(_WaitWorkers, _WaitCnt, _Processed, _) ->
|
||||
ok.
|
||||
|
||||
worker_run() ->
|
||||
Ret = gen_server:call(srv_name(), {get, self()}),
|
||||
do_process(Ret),
|
||||
worker_run().
|
||||
|
||||
do_process({_, wait}) ->
|
||||
?T(?FMT("worker ~p sleep ~p ms", [self(), ?WORKER_WAIT])),
|
||||
timer:sleep(?WORKER_WAIT);
|
||||
do_process({ID, Doc}) ->
|
||||
case db_store_mongo:decode_torrent_item(Doc) of
|
||||
{single, Hash, {Name, _}, Query, CreatedAt} ->
|
||||
sphinx_xml:insert({Hash, Name, [], ID, Query, CreatedAt});
|
||||
{multi, Hash, {Name, Files}, Query, CreatedAt} ->
|
||||
sphinx_xml:insert({Hash, Name, Files, ID, Query, CreatedAt})
|
||||
end.
|
||||
|
||||
load_result() ->
|
||||
case file:consult(?STATE_FILE) of
|
||||
{error, _Reason} ->
|
||||
io:format("start a new processing~n", []),
|
||||
0;
|
||||
{ok, [Ret]} ->
|
||||
Sum = proplists:get_value(processed, Ret),
|
||||
io:format("continue to process from ~p~n", [Sum]),
|
||||
Sum
|
||||
end.
|
||||
|
||||
save_result(Sum) ->
|
||||
Ret = [{processed, Sum}],
|
||||
io:format("save result ~p~n", [Sum]),
|
||||
file:write_file(?STATE_FILE, io_lib:fwrite("~p.\n",[Ret])).
|
||||
|
||||
check_progress(Sum) ->
|
||||
case (Sum rem 500 == 0) and (Sum > 0) of
|
||||
true ->
|
||||
save_result(Sum),
|
||||
io:format(" -> ~p~n", [Sum]);
|
||||
false ->
|
||||
ok
|
||||
end.
|
52
src/sphinx_builder/sphinx_builder_sup.erl
Normal file
52
src/sphinx_builder/sphinx_builder_sup.erl
Normal file
@ -0,0 +1,52 @@
|
||||
%%
|
||||
%% sphinx_builder_sup.erl
|
||||
%% Kevin Lynx
|
||||
%% 07.28.2013
|
||||
%%
|
||||
-module(sphinx_builder_sup).
|
||||
-behaviour(supervisor).
|
||||
-export([start_standalone/1, start_standalone/3, start_link/3]).
|
||||
-export([init/1]).
|
||||
|
||||
start_dep_apps() ->
|
||||
code:add_path("deps/bson/ebin"),
|
||||
code:add_path("deps/mongodb/ebin"),
|
||||
Apps = [asn1, crypto, public_key, ssl, inets, xmerl, bson, mongodb],
|
||||
[application:start(App) || App <- Apps].
|
||||
|
||||
start_standalone([IP, Port, Count]) ->
|
||||
IPort = list_to_integer(Port),
|
||||
ISize = list_to_integer(Count),
|
||||
start_standalone(IP, IPort, ISize),
|
||||
receive
|
||||
fuck_erl_s_option -> ok
|
||||
end.
|
||||
|
||||
start_standalone(IP, Port, Size) ->
|
||||
start_dep_apps(),
|
||||
start_link(IP, Port, Size).
|
||||
|
||||
start_link(IP, Port, Count) ->
|
||||
supervisor:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count]).
|
||||
|
||||
%%
|
||||
srv_name() ->
|
||||
?MODULE.
|
||||
|
||||
init([IP, Port, Count]) ->
|
||||
Spec = {one_for_one, 1, 600},
|
||||
config:start_link("sphinx_builder.config", fun() -> config_default() end),
|
||||
Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]},
|
||||
Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]},
|
||||
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]},
|
||||
Children = [Logger, Builder, Indexer],
|
||||
{ok, {Spec, Children}}.
|
||||
|
||||
config_default() ->
|
||||
[{load_torrent_interval, 500}, % millseconds
|
||||
{max_doc_per_file, 1000},
|
||||
{torrent_batch_count, 100},
|
||||
{delta_source_file, "var/source/delta.xml"},
|
||||
{sphinx_config_file, "var/etc/csft.conf"},
|
||||
{delta_index_name, "delta"},
|
||||
{main_index_name, "main"}].
|
29
src/sphinx_builder/sphinx_cmd.erl
Normal file
29
src/sphinx_builder/sphinx_cmd.erl
Normal file
@ -0,0 +1,29 @@
|
||||
%%
|
||||
%% sphinx_cmd.erl
|
||||
%% Kevin Lynx
|
||||
%% 07.28.2013
|
||||
%%
|
||||
-module(sphinx_cmd).
|
||||
-export([build_delta_index/5, merge_index/3]).
|
||||
-compile(export_all).
|
||||
-include("vlog.hrl").
|
||||
|
||||
% Index file, Delta index name
|
||||
build_delta_index(IndexFile, Delta, CfgFile, MinID, MaxID) ->
|
||||
Cmd = "indexer -c " ++ CfgFile ++ " --rotate " ++ Delta,
|
||||
Res = os:cmd(Cmd),
|
||||
Dest = backup_delta_file(Delta, MinID, MaxID, IndexFile),
|
||||
?I(?FMT("command `~s' result on ~s~n" ++ Res, [Cmd, Dest])).
|
||||
|
||||
merge_index(Main, Delta, CfgFile) ->
|
||||
Cmd = string_util:format("indexer -c " ++ CfgFile ++ " --merge ~s ~s --rotate",
|
||||
[Main, Delta]),
|
||||
Res = os:cmd(Cmd),
|
||||
?I(?FMT("command `~s' result~n" ++ Res, [Cmd])).
|
||||
|
||||
backup_delta_file(Delta, MinID, MaxID, IndexFile) ->
|
||||
Path = filename:dirname(IndexFile),
|
||||
Dest = string_util:format(Path ++ "/" ++ Delta ++ "[~b-~b]" ++ ".xml",
|
||||
[MinID, MaxID]),
|
||||
file:copy(IndexFile, Dest),
|
||||
Dest.
|
69
src/sphinx_builder/sphinx_doc.erl
Normal file
69
src/sphinx_builder/sphinx_doc.erl
Normal file
@ -0,0 +1,69 @@
|
||||
%%
|
||||
%% sphinx_doc.erl
|
||||
%% 07.27.2013
|
||||
%%
|
||||
-module(sphinx_doc).
|
||||
-include_lib("xmerl/include/xmerl.hrl").
|
||||
-export([write_xml/2, element/6]).
|
||||
-compile(export_all).
|
||||
-define(PROLOG, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>").
|
||||
-define(CR, #xmlText{value="\
|
||||
"}).
|
||||
|
||||
write_xml(File, Elems) ->
|
||||
Doc = {'sphinx:docset', [], [schema(), ?CR] ++ Elems},
|
||||
Content = xmerl:export_simple([Doc], xmerl_xml_cdata, [{prolog, ?PROLOG}]),
|
||||
file:write_file(File, Content).
|
||||
|
||||
element(Hash, Name, Files, ID, Query, CreatedAt) when is_list(Hash), length(Hash) == 40 ->
|
||||
IDSet = idset(Hash),
|
||||
OtherAttrs = [{query, [integer_to_list(Query)]}, {created_at, integer_to_list(CreatedAt)}],
|
||||
EleSubject = {subject, [], [with_cdata(Name)]},
|
||||
EleFiles = {files, [], [with_cdata(files_name(Files))]},
|
||||
{'sphinx:document', [{id, integer_to_list(ID)}], IDSet ++ OtherAttrs ++ [EleSubject, EleFiles, ?CR]}.
|
||||
|
||||
schema() ->
|
||||
HashIds = [attr_int32("hash1"), attr_int32("hash2"), attr_int32("hash3"),
|
||||
attr_int32("hash4"), attr_int32("hash5")],
|
||||
Other = [attr_int32("query"), attr("created_at", "timestamp")],
|
||||
{'sphinx:schema', [field("subject"), field("files")] ++ HashIds ++ Other}.
|
||||
|
||||
field(Name) ->
|
||||
{'sphinx:field', [{name, [Name]}], []}.
|
||||
|
||||
attr(Name, Type) ->
|
||||
{'sphinx:attr', [{name, [Name]}, {type, [Type]}], []}.
|
||||
|
||||
attr_int32(Name) ->
|
||||
attr(Name, "int", "32").
|
||||
|
||||
attr(Name, Type, Bits) ->
|
||||
{'sphinx:attr', [{name, [Name]}, {type, [Type]}, {bits, [Bits]}], []}.
|
||||
|
||||
idset(Hash) ->
|
||||
{H1, H2, H3, H4, H5} = sphinx_id:fromhash(Hash),
|
||||
[{hash1, [], [integer_to_list(H1)]},
|
||||
{hash2, [], [integer_to_list(H2)]},
|
||||
{hash3, [], [integer_to_list(H3)]},
|
||||
{hash4, [], [integer_to_list(H4)]},
|
||||
{hash5, [], [integer_to_list(H5)]}].
|
||||
|
||||
files_name(Files) ->
|
||||
lists:foldl(fun({Name, _}, Acc) ->
|
||||
FName = filter_name(Name),
|
||||
case length(Acc) > 0 of
|
||||
true when length(FName) > 0 -> Acc ++ " " ++ FName;
|
||||
true -> Acc;
|
||||
false -> FName
|
||||
end
|
||||
end, [], Files).
|
||||
|
||||
filter_name(Name) ->
|
||||
case lists:prefix("_____padding_file", Name) of
|
||||
true -> [];
|
||||
false -> Name
|
||||
end.
|
||||
|
||||
with_cdata(Text) ->
|
||||
"<![CDATA[" ++ Text ++ "]]>".
|
||||
|
18
src/sphinx_builder/sphinx_id.erl
Normal file
18
src/sphinx_builder/sphinx_id.erl
Normal file
@ -0,0 +1,18 @@
|
||||
%%
|
||||
%% sphinx_id.erl
|
||||
%% Kevin Lynx
|
||||
%%
|
||||
-module(sphinx_id).
|
||||
-export([fromhash/1, tohash/1]).
|
||||
-compile(export_all).
|
||||
|
||||
fromhash(Hash) when is_list(Hash), length(Hash) == 40 ->
|
||||
B = hex:hexstr_to_bin(Hash),
|
||||
<<H1:32/integer, H2:32/integer, H3:32/integer, H4:32/integer, H5:32/integer>> = B,
|
||||
{H1, H2, H3, H4, H5}.
|
||||
|
||||
|
||||
tohash({H1, H2, H3, H4, H5}) ->
|
||||
B = <<H1:32/integer, H2:32/integer, H3:32/integer, H4:32/integer, H5:32/integer>>,
|
||||
string:to_upper(hex:bin_to_hexstr(B)).
|
||||
|
106
src/sphinx_builder/sphinx_torrent.erl
Normal file
106
src/sphinx_builder/sphinx_torrent.erl
Normal file
@ -0,0 +1,106 @@
|
||||
%%
|
||||
%% sphinx_torrent.erl
|
||||
%% Kevin Lynx
|
||||
%% 07.29.2013
|
||||
%%
|
||||
-module(sphinx_torrent).
|
||||
-include("vlog.hrl").
|
||||
-behaviour(gen_server).
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
-export([start_link/3, get/0, try_times/0]).
|
||||
-define(DBNAME, torrents).
|
||||
-define(COLLNAME, hashes).
|
||||
-define(POOLNAME, db_pool).
|
||||
-define(WAIT_TIME, 30*1000).
|
||||
-record(state, {offset = 0, max, try_times = 0, tors = []}).
|
||||
|
||||
start_link(IP, Port, Offset) ->
|
||||
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []).
|
||||
|
||||
get() ->
|
||||
gen_server:call(srv_name(), get, infinity).
|
||||
|
||||
try_times() ->
|
||||
gen_server:call(srv_name(), try_times).
|
||||
|
||||
%%
|
||||
srv_name() ->
|
||||
sphinx_torrent_loader.
|
||||
|
||||
init([IP, Port, Offset]) ->
|
||||
Max = config:get(torrent_batch_count, 100),
|
||||
mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}),
|
||||
{ok, #state{offset = Offset, max = Max}, 0}.
|
||||
|
||||
handle_cast(load, State) ->
|
||||
#state{offset = Skip, max = Max, tors = Tors} = State,
|
||||
Request = Max * 2 div 3,
|
||||
?T(?FMT("request next ~p torrents", [Request])),
|
||||
LoadTors = do_load_torrents(Skip, Request),
|
||||
case length(LoadTors) of
|
||||
0 -> timer:send_after(?WAIT_TIME, try_load);
|
||||
_ -> ok
|
||||
end,
|
||||
?T(?FMT("load ~p torrents", [length(LoadTors)])),
|
||||
NewOffset = Skip + length(LoadTors),
|
||||
{noreply, State#state{offset = NewOffset, tors = Tors ++ LoadTors}};
|
||||
|
||||
handle_cast(stop, State) ->
|
||||
{stop, normal, State}.
|
||||
|
||||
handle_info(try_load, State) ->
|
||||
#state{offset = Skip, max = Max, try_times = Try} = State,
|
||||
?T(?FMT("try load ~p torrents from ~p", [Max, Skip])),
|
||||
LoadTors = do_load_torrents(Skip, Max),
|
||||
LoadCnt = length(LoadTors),
|
||||
NewTry = case LoadCnt > 0 of
|
||||
true -> 0;
|
||||
false -> timer:send_after(?WAIT_TIME, try_load), Try + 1
|
||||
end,
|
||||
?T(?FMT("load ~p torrents, try times ~p", [LoadCnt, NewTry])),
|
||||
{noreply, State#state{offset = Skip + LoadCnt, tors = LoadTors, try_times = NewTry}};
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
self() ! try_load,
|
||||
{noreply, State}.
|
||||
|
||||
handle_call(try_times, _From, #state{try_times = Try} = State) ->
|
||||
{reply, Try, State};
|
||||
|
||||
handle_call(get, _From, State) ->
|
||||
#state{tors = Tors, max = Max} = State,
|
||||
{Tor, NewTors} = case length(Tors) of
|
||||
0 -> {{}, []};
|
||||
_ -> [H|Rest] = Tors, {H, Rest}
|
||||
end,
|
||||
try_load_next(NewTors, Max),
|
||||
{reply, Tor, State#state{tors = NewTors}}.
|
||||
|
||||
terminate(_, State) ->
|
||||
mongo_sup:stop_pool(?POOLNAME),
|
||||
{ok, State}.
|
||||
|
||||
code_change(_, _, State) ->
|
||||
{ok, State}.
|
||||
|
||||
try_load_next([], _) ->
|
||||
?T("no torrents"),
|
||||
skip;
|
||||
try_load_next(Tors, Max) when length(Tors) == Max div 3 ->
|
||||
?T(?FMT("attempt to load next ~p torrents", [Max * 2 div 3])),
|
||||
gen_server:cast(self(), load);
|
||||
try_load_next(_, _) ->
|
||||
ok.
|
||||
|
||||
do_load_torrents(Skip, Size) ->
|
||||
Conn = mongo_pool:get(?POOLNAME),
|
||||
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
|
||||
Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
|
||||
mongo_cursor:take(Cursor, Size)
|
||||
end).
|
||||
|
79
src/sphinx_builder/sphinx_xml.erl
Normal file
79
src/sphinx_builder/sphinx_xml.erl
Normal file
@ -0,0 +1,79 @@
|
||||
%%
|
||||
%% sphinx_xml.erl
|
||||
%% Kevin Lynx
|
||||
%% Write sphinx xml file
|
||||
%%
|
||||
-module(sphinx_xml).
|
||||
-behaviour(gen_server).
|
||||
-include("vlog.hrl").
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
-export([start_link/0, insert/1, force_save/0]).
|
||||
-record(state, {docs = [], max, startid = -1}).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, srv_name()}, ?MODULE, [], []).
|
||||
|
||||
insert(Doc) ->
|
||||
gen_server:cast(srv_name(), {insert, Doc}).
|
||||
|
||||
force_save() ->
|
||||
gen_server:cast(srv_name(), save).
|
||||
|
||||
srv_name() ->
|
||||
?MODULE.
|
||||
|
||||
%%
|
||||
init([]) ->
|
||||
Max = config:get(max_doc_per_file, 1000),
|
||||
{ok, #state{max = Max}}.
|
||||
|
||||
terminate(_, State) ->
|
||||
{ok, State}.
|
||||
|
||||
code_change(_, _, State) ->
|
||||
{ok, State}.
|
||||
|
||||
handle_cast(save, #state{docs = Docs} = State) when length(Docs) > 0 ->
|
||||
#state{startid = StartID} = State,
|
||||
EndID = length(Docs) + StartID - 1,
|
||||
try_save(Docs, 0, StartID, EndID),
|
||||
{noreply, State#state{docs = []}};
|
||||
|
||||
handle_cast({insert, {Hash, Name, Files, ID, Query, CreatedAt}}, State) ->
|
||||
#state{docs = Docs, max = Max, startid = StartID} = State,
|
||||
NewStartID = if length(Docs) == 0 -> ID; true -> StartID end,
|
||||
Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt),
|
||||
NewDocs = try_save([Doc|Docs], Max, NewStartID, ID),
|
||||
{noreply, State#state{docs = NewDocs, startid = NewStartID}};
|
||||
|
||||
handle_cast(stop, State) ->
|
||||
{stop, normal, State}.
|
||||
|
||||
handle_call(_, _From, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(_, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
try_save(Docs, Max, StartID, NowID) when length(Docs) >= Max, length(Docs) > 0 ->
|
||||
File = config:get(delta_source_file),
|
||||
Conf = config:get(sphinx_config_file),
|
||||
Delta = config:get(delta_index_name),
|
||||
Main = config:get(main_index_name),
|
||||
io:format("sync sphinx index ~p documents...", [length(Docs)]),
|
||||
?I(?FMT("save sphinx xml file ~s", [File])),
|
||||
sphinx_doc:write_xml(File, Docs),
|
||||
?I(?FMT("build delta index : ~s", [Delta])),
|
||||
sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, NowID),
|
||||
?I(?FMT("merge delta to main index ~s -> ~s", [Delta, Main])),
|
||||
sphinx_cmd:merge_index(Main, Delta, Conf),
|
||||
?I("index updated done"),
|
||||
io:format("done~n", []),
|
||||
[];
|
||||
try_save(Docs, _, _, _) ->
|
||||
Docs.
|
Loading…
Reference in New Issue
Block a user