From 486c354ba0dc930ef564198b61a5c60d471bc97a Mon Sep 17 00:00:00 2001 From: Kevin Lynx Date: Fri, 2 Aug 2013 22:19:31 +0800 Subject: [PATCH] change sphinx torrent loading using an existing cursor --- src/sphinx_builder/sphinx_builder_sup.erl | 2 +- src/sphinx_builder/sphinx_torrent.erl | 47 +++++++++++++++++------ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/sphinx_builder/sphinx_builder_sup.erl b/src/sphinx_builder/sphinx_builder_sup.erl index 8e3bb0b..969c7d8 100644 --- a/src/sphinx_builder/sphinx_builder_sup.erl +++ b/src/sphinx_builder/sphinx_builder_sup.erl @@ -46,7 +46,7 @@ init([IP, Port, Count]) -> config:start_link("sphinx_builder.config", fun() -> config_default() end), Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]}, Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]}, - Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 1]}, permanent, 1000, worker, [vlog]}, + Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]}, Children = [Logger, Builder, Indexer], {ok, {Spec, Children}}. diff --git a/src/sphinx_builder/sphinx_torrent.erl b/src/sphinx_builder/sphinx_torrent.erl index 4c8f368..4e4d098 100644 --- a/src/sphinx_builder/sphinx_torrent.erl +++ b/src/sphinx_builder/sphinx_torrent.erl @@ -13,11 +13,12 @@ terminate/2, code_change/3]). -export([start_link/3, get/0, try_times/0]). +-export([do_load_torrents/2]). % disable warning only -define(DBNAME, torrents). -define(COLLNAME, hashes). -define(POOLNAME, db_pool). -define(WAIT_TIME, 30*1000). --record(state, {offset = 0, max, try_times = 0, tors = []}). +-record(state, {offset = 0, max, try_times = 0, tors = [], cursor}). start_link(IP, Port, Offset) -> gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []). @@ -38,12 +39,15 @@ init([IP, Port, Offset]) -> {ok, #state{offset = Offset, max = Max}, 0}. handle_cast(load, State) -> - #state{offset = Skip, max = Max, tors = Tors} = State, + #state{cursor = Cursor, offset = Skip, max = Max, tors = Tors} = State, Request = Max * 2 div 3, ?T(?FMT("request next ~p torrents", [Request])), - LoadTors = do_load_torrents(Skip, Request), + LoadTors = load_next_batch(Cursor, Request), case length(LoadTors) of - 0 -> timer:send_after(?WAIT_TIME, try_load); + 0 -> + ?T(?FMT("no torrents in cursor ~p", [Cursor])), + mongo_cursor:close(Cursor), + timer:send_after(?WAIT_TIME, try_load); _ -> ok end, ?T(?FMT("load ~p torrents", [length(LoadTors)])), @@ -56,14 +60,13 @@ handle_cast(stop, State) -> handle_info(try_load, State) -> #state{offset = Skip, max = Max, try_times = Try} = State, ?T(?FMT("try load ~p torrents from ~p", [Max, Skip])), - LoadTors = do_load_torrents(Skip, Max), - LoadCnt = length(LoadTors), - NewTry = case LoadCnt > 0 of - true -> 0; - false -> timer:send_after(?WAIT_TIME, try_load), Try + 1 - end, - ?T(?FMT("load ~p torrents, try times ~p", [LoadCnt, NewTry])), - {noreply, State#state{offset = Skip + LoadCnt, tors = LoadTors, try_times = NewTry}}; + case load_cursor_batch(Skip, Max) of + {} -> + timer:send_after(?WAIT_TIME, try_load), + {noreply, State#state{try_times = Try + 1}}; + {Cursor, R} -> + {noreply, State#state{try_times = 0, offset = Skip + length(R), tors = R, cursor = Cursor}} + end; handle_info(timeout, State) -> self() ! try_load, @@ -97,6 +100,26 @@ try_load_next(Tors, Max) when length(Tors) == Max div 3 -> try_load_next(_, _) -> ok. +load_cursor(Skip, Size) -> + Conn = mongo_pool:get(?POOLNAME), + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:find(?COLLNAME, {}, {}, Skip, Size) + end). + +load_cursor_batch(Skip, Size) -> + Cursor = load_cursor(Skip, Size), + case load_next_batch(Cursor, Size) of + [] -> + mongo_cursor:close(Cursor), {}; + R -> + {Cursor, R} + end. + +% will cause `get_more' +load_next_batch(Cursor, Size) -> + mongo_cursor:take(Cursor, Size). + +% will cause lots of queries do_load_torrents(Skip, Size) -> Conn = mongo_pool:get(?POOLNAME), mongo:do(safe, master, Conn, ?DBNAME, fun() ->