mirror of
https://github.com/btdig/dhtcrawler2.git
synced 2025-01-31 10:31:37 +00:00
fix name_seger tool
This commit is contained in:
parent
d64630950e
commit
356a1c0a92
@ -193,11 +193,10 @@ create_torrent_desc(_Conn, Hash, Name, Length, Announce, Files) ->
|
||||
announce, Announce,
|
||||
files, encode_file_list(Files)}.
|
||||
|
||||
seg_text(Name, Files) ->
|
||||
FullName = lists:foldl(fun({S, _}, Acc) ->
|
||||
Acc ++ " " ++ S
|
||||
end, Name, Files),
|
||||
tor_name_seg:seg_text(FullName).
|
||||
seg_text(Name, _Files) ->
|
||||
%FullName = lists:foldl(fun({S, _}, Acc) -> Acc ++ " " ++ S end, Name, Files),
|
||||
%tor_name_seg:seg_text(FullName).
|
||||
tor_name_seg:seg_text(Name).
|
||||
-endif.
|
||||
|
||||
% {file1, {name, xx, length, xx}, file2, {name, xx, length, xx}}
|
||||
|
@ -4,11 +4,19 @@
|
||||
%% segment name by rmmseg
|
||||
%%
|
||||
-module(name_seger).
|
||||
-export([start/0]).
|
||||
-behaviour(gen_server).
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
-export([start/0, start/1, start/3, worker_run/0, loader_run/1]).
|
||||
-define(DBNAME, torrents).
|
||||
-define(COLLNAME, hashes).
|
||||
-define(POOLNAME, db_pool).
|
||||
-define(BATCHSIZE, 1000).
|
||||
-define(BATCHSIZE, 300).
|
||||
-record(state, {sum = 0, workers = [], loader = undefined, torcache = []}).
|
||||
|
||||
start_dep_apps() ->
|
||||
code:add_path("deps/bson/ebin"),
|
||||
@ -17,33 +25,162 @@ start_dep_apps() ->
|
||||
[application:start(App) || App <- Apps].
|
||||
|
||||
start() ->
|
||||
start(50).
|
||||
|
||||
start(Count) ->
|
||||
start_dep_apps(),
|
||||
start_link(localhost, 27017, Count).
|
||||
|
||||
start(IP, Port, Count) ->
|
||||
start_dep_apps(),
|
||||
start_link(IP, Port, Count).
|
||||
|
||||
start_link(IP, Port, Count) ->
|
||||
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count], []).
|
||||
|
||||
srv_name() ->
|
||||
?MODULE.
|
||||
%
|
||||
init([IP, Port, Count]) ->
|
||||
rmmseg:init(),
|
||||
rmmseg:load_dicts(),
|
||||
IP = localhost,
|
||||
Port = 27017,
|
||||
mongo_sup:start_pool(?POOLNAME, 2, {IP, Port}),
|
||||
process().
|
||||
{ok, {IP, Port, Count}, 0}.
|
||||
|
||||
process() ->
|
||||
handle_cast(stop, State) ->
|
||||
save_result(State#state.sum),
|
||||
{stop, normal, State}.
|
||||
|
||||
handle_call({append, Tors}, From, State) ->
|
||||
#state{torcache = MyTors} = State,
|
||||
io:format("loader append ~p ~n", [length(Tors)]),
|
||||
NewTors = MyTors ++ Tors,
|
||||
case length(NewTors) > ?BATCHSIZE * 2 of
|
||||
true -> % too fast
|
||||
{noreply, State#state{loader = From, torcache = NewTors}};
|
||||
false ->
|
||||
{reply, ok, State#state{torcache = NewTors}}
|
||||
end;
|
||||
|
||||
handle_call({worker_done, Worker}, _From, State) ->
|
||||
#state{sum = Sum, workers = Workers} = State,
|
||||
NewWorkers = lists:delete(Worker, Workers),
|
||||
case length(NewWorkers) of
|
||||
0 ->
|
||||
io:format("process done total ~p~n", [Sum]),
|
||||
save_result(Sum),
|
||||
{stop, normal, State#state{workers = []}};
|
||||
_ ->
|
||||
{reply, ok, State#state{workers = NewWorkers}}
|
||||
end;
|
||||
|
||||
handle_call(loader_done, _From, State) ->
|
||||
% set a flag and wait workers done
|
||||
{reply, ok, State#state{loader = loader_done}};
|
||||
|
||||
handle_call(get_one, _From, #state{torcache = Tors} = State)
|
||||
when length(Tors) == 0 ->
|
||||
#state{loader = Loader} = State,
|
||||
Ret = if Loader == loader_done -> exit; true -> wait end,
|
||||
{reply, Ret, State};
|
||||
|
||||
handle_call(get_one, _From, State) ->
|
||||
#state{sum = Sum, torcache = Tors, loader = Loader} = State,
|
||||
print_stats(Sum),
|
||||
[This|Rest] = Tors,
|
||||
RestSize = length(Rest),
|
||||
NewLoader = notify_loader(RestSize, Loader),
|
||||
{reply, This, State#state{torcache = Rest, sum = 1 + Sum, loader = NewLoader}}.
|
||||
|
||||
notify_loader(_, loader_done) ->
|
||||
loader_done;
|
||||
notify_loader(Size, Loader) when Loader /= undefined, Size < ?BATCHSIZE ->
|
||||
gen_server:reply(Loader, continue),
|
||||
undefined;
|
||||
notify_loader(_, Loader) ->
|
||||
Loader.
|
||||
|
||||
handle_info(timeout, {IP, Port, Count}) ->
|
||||
Sum = load_result(),
|
||||
mongo_sup:start_pool(?POOLNAME, 5, {IP, Port}),
|
||||
Workers = [spawn_link(?MODULE, worker_run, []) || _ <- lists:seq(1, Count)],
|
||||
spawn_link(?MODULE, loader_run, [Sum]),
|
||||
{noreply, #state{sum = Sum, workers = Workers}};
|
||||
|
||||
handle_info(_, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_, State) ->
|
||||
{ok, State}.
|
||||
|
||||
code_change(_, _, State) ->
|
||||
{ok, State}.
|
||||
|
||||
loader_run(Skip) ->
|
||||
Tors = load_torrents_with_conn(Skip),
|
||||
case length(Tors) of
|
||||
0 ->
|
||||
io:format("loader exit~n", []),
|
||||
gen_server:call(srv_name(), loader_done),
|
||||
ok;
|
||||
Size ->
|
||||
gen_server:call(srv_name(), {append, Tors}, infinity),
|
||||
loader_run(Skip + Size)
|
||||
end.
|
||||
|
||||
%%
|
||||
load_result() ->
|
||||
case file:consult("nameseg.txt") of
|
||||
{error, _Reason} ->
|
||||
io:format("start a new processing~n", []),
|
||||
0;
|
||||
{ok, [Ret]} ->
|
||||
Sum = proplists:get_value(processed, Ret),
|
||||
io:format("continue to process skip ~p~n", [Sum]),
|
||||
Sum
|
||||
end.
|
||||
|
||||
save_result(Sum) ->
|
||||
Ret = [{processed, Sum}],
|
||||
io:format("save result ~p~n", [Sum]),
|
||||
file:write_file("nameseg.txt", io_lib:fwrite("~p.\n",[Ret])).
|
||||
|
||||
load_torrents_with_conn(Skip) ->
|
||||
Conn = mongo_pool:get(?POOLNAME),
|
||||
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
|
||||
Cursor = mongo:find(?COLLNAME, {}),
|
||||
process(Cursor, ok, 0)
|
||||
Cursor = mongo:find(?COLLNAME, {}, {}, Skip),
|
||||
load_torrents(Cursor)
|
||||
end).
|
||||
|
||||
process(Cursor, ok, Sum) ->
|
||||
print_stats(Sum),
|
||||
Ret = process_one(mongo_cursor:next(Cursor)),
|
||||
process(Cursor, Ret, Sum + 1);
|
||||
load_torrents(Cursor) ->
|
||||
mongo_cursor:take(Cursor, ?BATCHSIZE).
|
||||
|
||||
process(_Cursor, stop, Sum) ->
|
||||
io:format("process done, total ~p~n", [Sum]),
|
||||
stop.
|
||||
worker_run() ->
|
||||
Doc = gen_server:call(srv_name(), get_one, infinity),
|
||||
Ret = do_process(Doc),
|
||||
case Ret of
|
||||
exit -> ok;
|
||||
_ ->
|
||||
worker_run()
|
||||
end.
|
||||
|
||||
do_process(exit) ->
|
||||
gen_server:call(srv_name(), {worker_done, self()}),
|
||||
exit;
|
||||
do_process(wait) ->
|
||||
timer:sleep(1000);
|
||||
do_process(Doc) ->
|
||||
Torrent = db_store_mongo:decode_torrent_item(Doc),
|
||||
{Hash, NameArray} = seg_torrent(Torrent),
|
||||
Conn = mongo_pool:get(?POOLNAME),
|
||||
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
|
||||
commit(list_to_binary(Hash), NameArray)
|
||||
end).
|
||||
|
||||
%%
|
||||
print_stats(Sum) ->
|
||||
case Sum rem 500 == 0 of
|
||||
case (Sum rem 500 == 0) and (Sum > 0) of
|
||||
true ->
|
||||
save_result(Sum),
|
||||
io:format(" -> ~p~n", [Sum]);
|
||||
false ->
|
||||
ok
|
||||
@ -55,20 +192,10 @@ commit(Hash, NameArray) when is_binary(Hash), is_binary(NameArray) ->
|
||||
new, false},
|
||||
mongo:command(Cmd).
|
||||
|
||||
process_one({}) ->
|
||||
stop;
|
||||
process_one({Doc}) ->
|
||||
Torrent = db_store_mongo:decode_torrent_item(Doc),
|
||||
{Hash, NameArray} = seg_torrent(Torrent),
|
||||
commit(list_to_binary(Hash), NameArray),
|
||||
ok.
|
||||
|
||||
seg_torrent({single, Hash, {Name, _}, _, _}) ->
|
||||
{Hash, rmmseg:seg_space(list_to_binary(Name))};
|
||||
|
||||
seg_torrent({multi, Hash, {Name, Files}, _, _}) ->
|
||||
FullName = lists:foldl(fun({S, _}, Acc) ->
|
||||
Acc ++ " " ++ S
|
||||
end, Name, Files),
|
||||
{Hash, rmmseg:seg_space(list_to_binary(FullName))}.
|
||||
seg_torrent({multi, Hash, {Name, _Files}, _, _}) ->
|
||||
%FullName = lists:foldl(fun({S, _}, Acc) -> Acc ++ " " ++ S end, Name, Files),
|
||||
{Hash, rmmseg:seg_space(list_to_binary(Name))}.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user