diff --git a/rebar.config b/rebar.config index 38953b0..d46409d 100644 --- a/rebar.config +++ b/rebar.config @@ -4,6 +4,7 @@ {ibrowse, ".*", {git, "git@github.com:cmullaparthi/ibrowse.git", "HEAD"}}, {bson, ".*", {git, "git@github.com:mongodb/bson-erlang.git", "HEAD"}}, {mongodb, ".*", {git, "git@github.com:mongodb/mongodb-erlang.git", "HEAD"}}, - {kdht, ".*", {git, "git@github.com:kevinlynx/kdht.git", "HEAD"}} + {kdht, ".*", {git, "git@github.com:kevinlynx/kdht.git", "HEAD"}}, + {giza, ".*", {git, "https://github.com/kevinlynx/giza.git", "HEAD"}} ]}. diff --git a/src/http_front/crawler_http.erl b/src/http_front/crawler_http.erl index 6caa740..f3714c1 100644 --- a/src/http_front/crawler_http.erl +++ b/src/http_front/crawler_http.erl @@ -32,6 +32,7 @@ start(DBHost, DBPort, Port, PoolSize) -> vlog:start_link("log/crawler_http.log", ?INFO), code:add_path("deps/bson/ebin"), code:add_path("deps/mongodb/ebin"), + code:add_path("deps/giza/ebin"), Apps = [crypto, public_key, ssl, inets, bson, mongodb], [application:start(App) || App <- Apps], gen_server:start({local, srv_name()}, ?MODULE, [DBHost, DBPort, Port, PoolSize], []). diff --git a/src/sphinx_builder/sphinx_builder.erl b/src/sphinx_builder/sphinx_builder.erl index cd12b93..510f870 100644 --- a/src/sphinx_builder/sphinx_builder.erl +++ b/src/sphinx_builder/sphinx_builder.erl @@ -14,7 +14,7 @@ code_change/3]). -export([start_link/3]). -export([worker_run/0]). --record(state, {processed = 0, worker_cnt, wait_workers = []}). +-record(state, {processed = 0, saved = false, worker_cnt, wait_workers = []}). -define(WORKER_WAIT, 30*1000). -define(STATE_FILE, "priv/sphinx_builder.sta"). @@ -32,7 +32,8 @@ init([IP, Port, WorkerCnt]) -> {ok, #state{processed = Offset, worker_cnt = WorkerCnt}}. handle_call({get, Pid}, _From, State) -> - #state{processed = Processed, worker_cnt = WorkerCnt, wait_workers = WaitWorkers} = State, + #state{processed = Processed, worker_cnt = WorkerCnt, wait_workers = WaitWorkers, + saved = Saved} = State, {NewProcessed, Ret} = case sphinx_torrent:get() of {} -> {Processed, wait}; @@ -41,8 +42,9 @@ handle_call({get, Pid}, _From, State) -> {Processed + 1, Tor} end, NewWaits = update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers), - check_all_done(NewWaits, WorkerCnt, NewProcessed, length(NewWaits) > length(WaitWorkers)), - {reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits}}. + NewSaved = (check_all_done(NewWaits, WorkerCnt, NewProcessed, Saved)) and (Ret == wait), + {reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits, + saved = NewSaved}}. handle_cast(_, State) -> {noreply, State}. @@ -66,19 +68,20 @@ update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers) -> WaitWorkers end. -check_all_done(WaitWorkers, WorkerCnt, Processed, true) +check_all_done(WaitWorkers, WorkerCnt, Processed, false) when length(WaitWorkers) == WorkerCnt -> Try = sphinx_torrent:try_times(), case Try > 5 of true -> io:format("haven't got any torrents for a while, force save~n", []), save_result(Processed), - sphinx_xml:force_save(); + sphinx_xml:force_save(), + true; false -> - ok + false end; -check_all_done(_WaitWorkers, _WaitCnt, _Processed, _) -> - ok. +check_all_done(_WaitWorkers, _WaitCnt, _Processed, Saved) -> + Saved. worker_run() -> Ret = gen_server:call(srv_name(), {get, self()}), @@ -91,9 +94,9 @@ do_process({_, wait}) -> do_process({ID, Doc}) -> case db_store_mongo:decode_torrent_item(Doc) of {single, Hash, {Name, _}, Query, CreatedAt} -> - sphinx_xml:insert({Hash, Name, [], ID, Query, CreatedAt}); + sphinx_xml:insert({ID, Hash, Name, [], Query, CreatedAt}); {multi, Hash, {Name, Files}, Query, CreatedAt} -> - sphinx_xml:insert({Hash, Name, Files, ID, Query, CreatedAt}) + sphinx_xml:insert({ID, Hash, Name, Files, Query, CreatedAt}) end. load_result() -> diff --git a/src/sphinx_builder/sphinx_builder_sup.erl b/src/sphinx_builder/sphinx_builder_sup.erl index 194499d..8e3bb0b 100644 --- a/src/sphinx_builder/sphinx_builder_sup.erl +++ b/src/sphinx_builder/sphinx_builder_sup.erl @@ -6,6 +6,7 @@ -module(sphinx_builder_sup). -behaviour(supervisor). -export([start_standalone/1, start_standalone/3, start_link/3]). +-export([init_indexes/0]). -export([init/1]). start_dep_apps() -> @@ -29,6 +30,13 @@ start_standalone(IP, Port, Size) -> start_link(IP, Port, Count) -> supervisor:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count]). +init_indexes() -> + config:start_link("sphinx_builder.config", fun() -> config_default() end), + io:format("try init sphinx index files~n", []), + Conf = config:get(sphinx_config_file), + MainFile = config:get(main_source_file), + DeltaFile = config:get(delta_source_file), + sphinx_cmd:build_init_index(MainFile, DeltaFile, Conf). %% srv_name() -> ?MODULE. @@ -38,14 +46,14 @@ init([IP, Port, Count]) -> config:start_link("sphinx_builder.config", fun() -> config_default() end), Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]}, Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]}, - Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]}, + Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 1]}, permanent, 1000, worker, [vlog]}, Children = [Logger, Builder, Indexer], {ok, {Spec, Children}}. config_default() -> - [{load_torrent_interval, 500}, % millseconds - {max_doc_per_file, 1000}, + [{max_doc_per_file, 1000}, {torrent_batch_count, 100}, + {main_source_file, "var/source/main.xml"}, {delta_source_file, "var/source/delta.xml"}, {sphinx_config_file, "var/etc/csft.conf"}, {delta_index_name, "delta"}, diff --git a/src/sphinx_builder/sphinx_cmd.erl b/src/sphinx_builder/sphinx_cmd.erl index cb5a13d..147c8f1 100644 --- a/src/sphinx_builder/sphinx_cmd.erl +++ b/src/sphinx_builder/sphinx_cmd.erl @@ -4,10 +4,25 @@ %% 07.28.2013 %% -module(sphinx_cmd). --export([build_delta_index/5, merge_index/3]). +-export([build_init_index/3, build_delta_index/5, merge_index/3]). -compile(export_all). -include("vlog.hrl"). +build_init_index(MainFile, DeltaFile, CfgFile) -> + case filelib:is_file(MainFile) and filelib:is_file(DeltaFile) of + true -> + io:format("main/delta index file exists, ignore~n", []); + false -> + do_build_init_index(MainFile, DeltaFile, CfgFile) + end. + +do_build_init_index(MainFile, DeltaFile, CfgFile) -> + sphinx_doc:write_test_xml(MainFile), + sphinx_doc:write_test_xml(DeltaFile), + Cmd = "indexer -c " ++ CfgFile ++ " --all", + Ret = os:cmd(Cmd), + io:format("~p~n", [Ret]). + % Index file, Delta index name build_delta_index(IndexFile, Delta, CfgFile, MinID, MaxID) -> Cmd = "indexer -c " ++ CfgFile ++ " --rotate " ++ Delta, diff --git a/src/sphinx_builder/sphinx_doc.erl b/src/sphinx_builder/sphinx_doc.erl index a7b22d2..313fe75 100644 --- a/src/sphinx_builder/sphinx_doc.erl +++ b/src/sphinx_builder/sphinx_doc.erl @@ -4,12 +4,16 @@ %% -module(sphinx_doc). -include_lib("xmerl/include/xmerl.hrl"). --export([write_xml/2, element/6]). +-export([write_test_xml/1, write_xml/2, element/6]). -compile(export_all). -define(PROLOG, ""). -define(CR, #xmlText{value="\ "}). +write_test_xml(File) -> + Elem = element("33FB6D00DD5E363653235449527EC1DC9959FCAB", "test", [], 1, 1, 1374508800), + write_xml(File, [Elem]). + write_xml(File, Elems) -> Doc = {'sphinx:docset', [], [schema(), ?CR] ++ Elems}, Content = xmerl:export_simple([Doc], xmerl_xml_cdata, [{prolog, ?PROLOG}]), diff --git a/src/sphinx_builder/sphinx_xml.erl b/src/sphinx_builder/sphinx_xml.erl index b01338d..a8f68ef 100644 --- a/src/sphinx_builder/sphinx_xml.erl +++ b/src/sphinx_builder/sphinx_xml.erl @@ -13,13 +13,13 @@ terminate/2, code_change/3]). -export([start_link/0, insert/1, force_save/0]). --record(state, {docs = [], max, startid = -1}). +-record(state, {docs = [], ids = [], max}). start_link() -> gen_server:start_link({local, srv_name()}, ?MODULE, [], []). insert(Doc) -> - gen_server:cast(srv_name(), {insert, Doc}). + gen_server:call(srv_name(), {insert, Doc}, infinity). force_save() -> gen_server:cast(srv_name(), save). @@ -38,42 +38,48 @@ terminate(_, State) -> code_change(_, _, State) -> {ok, State}. -handle_cast(save, #state{docs = Docs} = State) when length(Docs) > 0 -> - #state{startid = StartID} = State, - EndID = length(Docs) + StartID - 1, - try_save(Docs, 0, StartID, EndID), - {noreply, State#state{docs = []}}; +handle_cast(save, #state{docs = []} = State) -> + {noreply, State}; -handle_cast({insert, {Hash, Name, Files, ID, Query, CreatedAt}}, State) -> - #state{docs = Docs, max = Max, startid = StartID} = State, - NewStartID = if length(Docs) == 0 -> ID; true -> StartID end, - Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt), - NewDocs = try_save([Doc|Docs], Max, NewStartID, ID), - {noreply, State#state{docs = NewDocs, startid = NewStartID}}; +handle_cast(save, #state{docs = Docs, ids = IDs} = State) when length(Docs) > 0 -> + try_save(Docs, 0, IDs), + {noreply, State#state{docs = [], ids = []}}; handle_cast(stop, State) -> {stop, normal, State}. +handle_call({insert, {ID, Hash, Name, Files, Query, CreatedAt}}, _From, State) -> + #state{docs = Docs, ids = IDs, max = Max} = State, + Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt), + {NewDocs, NewIDs} = try_save([Doc|Docs], Max, [ID|IDs]), + {reply, ok, State#state{docs = NewDocs, ids = NewIDs}}; + handle_call(_, _From, State) -> {noreply, State}. handle_info(_, State) -> {noreply, State}. -try_save(Docs, Max, StartID, NowID) when length(Docs) >= Max, length(Docs) > 0 -> +try_save(Docs, Max, IDs) when length(Docs) >= Max, length(Docs) > 0 -> File = config:get(delta_source_file), Conf = config:get(sphinx_config_file), Delta = config:get(delta_index_name), Main = config:get(main_index_name), + {StartID, EndID} = get_id_range(IDs), io:format("sync sphinx index ~p documents...", [length(Docs)]), ?I(?FMT("save sphinx xml file ~s", [File])), sphinx_doc:write_xml(File, Docs), ?I(?FMT("build delta index : ~s", [Delta])), - sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, NowID), + sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, EndID), ?I(?FMT("merge delta to main index ~s -> ~s", [Delta, Main])), sphinx_cmd:merge_index(Main, Delta, Conf), ?I("index updated done"), io:format("done~n", []), - []; -try_save(Docs, _, _, _) -> - Docs. + {[], []}; +try_save(Docs, _, IDs) -> + {Docs, IDs}. + +get_id_range([First|IDs]) -> + lists:foldl(fun(ID, {Min, Max}) -> + {min(ID, Min), max(ID, Max)} + end, {First, First}, IDs). diff --git a/tools/HISTORY.md b/tools/HISTORY.md index cf841f2..6ab7db8 100644 --- a/tools/HISTORY.md +++ b/tools/HISTORY.md @@ -1,3 +1,7 @@ +## 07.30.2013 + +* add sphinx (coreseek which based on sphinx) to help searhcing, in expirement stage + ## 07.21.2013 * rewrite hash_reader, now it will keep a wait_download cache diff --git a/tools/create_bin.bat b/tools/create_bin.bat index 2606922..35f4943 100644 --- a/tools/create_bin.bat +++ b/tools/create_bin.bat @@ -3,10 +3,12 @@ mkdir bin\deps\bson\ebin mkdir bin\deps\mongodb\ebin mkdir bin\deps\kdht\ebin mkdir bin\deps\ibrowse\ebin +mkdir bin\deps\giza\ebin copy deps\bson\ebin\*.* bin\deps\bson\ebin\ copy deps\mongodb\ebin\*.* bin\deps\mongodb\ebin\ copy deps\kdht\ebin\*.* bin\deps\kdht\ebin\ copy deps\ibrowse\ebin\*.* bin\deps\ibrowse\ebin\ +copy deps\giza\ebin\*.* bin\deps\giza\ebin\ mkdir bin\www copy www\*.* bin\www\ copy tools\*.* bin\ diff --git a/tools/win_init_sphinx_index.bat b/tools/win_init_sphinx_index.bat new file mode 100644 index 0000000..1001cc3 --- /dev/null +++ b/tools/win_init_sphinx_index.bat @@ -0,0 +1 @@ +erl -pa ebin -noshell -run sphinx_builder_sup init_indexes diff --git a/tools/win_start_sphinx_builder.bat b/tools/win_start_sphinx_builder.bat new file mode 100644 index 0000000..13a9097 --- /dev/null +++ b/tools/win_start_sphinx_builder.bat @@ -0,0 +1 @@ +erl -pa ebin -noshell -run sphinx_builder_sup start_standalone localhost 27017 5