update torrent importer

This commit is contained in:
Kevin Lynx 2013-07-08 22:18:16 +08:00
parent 4d31535727
commit 5a1be24b89
5 changed files with 372 additions and 49 deletions

View File

@ -1,14 +1,26 @@
%%
%% tor_builder.erl
%% Kevin Lynx
%% 07.07.2013
%% 07.08.2013
%%
-module(tor_builder).
-behaviour(gen_server).
-include("vlog.hrl").
-export([start_standalone/1, start_standalone/4]).
-define(POOLNAME, write_tor_dbpool).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([worker_run/1]).
-export([start_link/4,
start_link/1,
start_standalone/1,
start_standalone/4,
stop/0]).
-record(state, {dir, workers = [], workercnt}).
-define(DBPOOL, tor_db_pool).
-define(BATCHSIZE, 10).
-compile(export_all).
start_dep_apps() ->
code:add_path("deps/bson/ebin"),
@ -17,64 +29,115 @@ start_dep_apps() ->
Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps].
start_standalone([IP, Port, Size, RootDir]) ->
% called from shell
start_standalone([IP, Port, WorkerCount, RootDir]) ->
IPort = list_to_integer(Port),
ISize = list_to_integer(Size),
start_standalone(IP, IPort, ISize, RootDir),
IWorkerCount = list_to_integer(WorkerCount),
start_standalone(IP, IPort, IWorkerCount, RootDir),
receive
fuck_erl_s_option -> ok
end.
start_standalone(IP, Port, PoolSize, RootDir) ->
% `RootDir' must follow a slash e.g: torrents/
start_standalone(IP, Port, WorkerCount, RootDir) ->
start_dep_apps(),
mongo_sup:start_pool(?POOLNAME, PoolSize, {IP, Port}),
vlog:start_link("tor_builder.log", 1),
Dirs = get_dir_list(RootDir),
?I(?FMT("found ~p subdirectories in ~s", [length(Dirs), RootDir])),
[start(?POOLNAME, RootDir ++ Dir) || Dir <- Dirs],
ok.
start_link(IP, Port, RootDir, WorkerCount).
get_dir_list(RootDir) ->
{ok, Dirs} = file:list_dir(RootDir),
lists:dropwhile(fun (S) -> filter_dir(S) end, Dirs).
filter_dir([$\.|_]) ->
true;
filter_dir(_) ->
false.
%%
start(PoolName, Dir) ->
spawn(?MODULE, load_and_save, [PoolName, Dir]).
start_link(DBIP, DBPort, RootDir, WorkerCount) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [DBIP, DBPort, RootDir, WorkerCount], []).
load_and_save(PoolName, Dir) ->
Now = now(),
?T(?FMT("~p startup", [self()])),
Names = load_torrent_names(Dir),
?I(?FMT("~p found ~p torrents in dir ~s, used ~p ms", [self(), length(Names), Dir,
timer:now_diff(now(), Now) div 1000])),
parse_and_save(PoolName, [], Names),
io:format("~p parsed and saved ~p torrents~n", [self(), length(Names)]),
?I(?FMT("save torrents done ~p", [self()])).
start_link(RootDir) ->
start_link(localhost, 27017, RootDir, 100).
load_torrent_names(Dir) ->
Files = filelib:wildcard(Dir ++ "/**/*.torrent"),
Files.
stop() ->
gen_server:cast(srv_name, stop).
parse_and_save(PoolName, Docs, [Name|Files]) when 1 + length(Docs) == ?BATCHSIZE ->
srv_name() ->
?MODULE.
init([DBIP, DBPort, RootDir, WorkerCount]) ->
mongo_sup:start_pool(?DBPOOL, 5, {DBIP, DBPort}),
vlog:start_link("tor_builder.log", 1),
{ok, #state{dir = RootDir, workercnt = WorkerCount}, 1}.
code_change(_, _, State) ->
{ok, State}.
terminate(_, State) ->
tor_location_reader:stop(),
mongo_sup:stop_pool(?DBPOOL),
vlog:stop(),
{ok, State}.
handle_info(build_index_done, State) ->
#state{workercnt = WorkerCount, dir = RootDir} = State,
Workers = spawn_workers(RootDir, WorkerCount),
{noreply, State#state{workers = Workers}};
handle_info({worker_done, Pid}, State) ->
#state{workers = Workers} = State,
NewWorks = lists:delete(Pid, Workers),
case length(NewWorks) of
0 ->
io:format("import all torrents done~n"),
{stop, normal, State#state{workers = NewWorks}};
_ ->
{noreply, State#state{workers = NewWorks}}
end;
handle_info(timeout, State) ->
#state{dir = RootDir, workercnt = WorkerCount} = State,
Workers = case tor_location_reader:ensure_status_files(RootDir) of
true ->
io:format("continue to process...~n"),
spawn_workers(RootDir, WorkerCount);
false ->
tor_location_writer:start_link(RootDir, self()),
[]
end,
{noreply, State#state{workers = Workers}};
handle_info(M, State) ->
?W(?FMT("unhandled message ~p", [M])),
{noreply, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
spawn_workers(RootDir, Count) ->
tor_location_reader:start_link(RootDir),
io:format("spawn ~p worker processes~n", [Count]),
[spawn_link(?MODULE, worker_run, [self()]) || _ <- lists:seq(1, Count)].
worker_run(Parent) ->
Names = tor_location_reader:get(?BATCHSIZE),
?T(?FMT("read ~p torrent file names", [length(Names)])),
case length(Names) == 0 of
true ->
process_done(Parent);
false ->
parse_and_save([], Names),
worker_run(Parent)
end.
process_done(Parent) ->
Parent ! {worker_done, self()}.
parse_and_save(Docs, [Name|Files]) ->
SaveDocs = load_and_parse(Name, Docs),
try_save(PoolName, SaveDocs),
parse_and_save(PoolName, [], Files);
parse_and_save(SaveDocs, Files);
parse_and_save(PoolName, Docs, [Name|Files]) ->
SaveDocs = load_and_parse(Name, Docs),
parse_and_save(PoolName, SaveDocs, Files);
parse_and_save(PoolName, Docs, []) ->
try_save(PoolName, Docs).
parse_and_save(Docs, []) ->
try_save(Docs).
load_and_parse(Name, AccIn) ->
{ok, Content} = file:read_file(Name),
MagHash = parse_hash(Name),
40 = length(MagHash),
case catch(torrent_file:parse(Content)) of
{'EXIT', _} ->
?W(?FMT("parse a torrent failed ~s", [MagHash])),
@ -93,9 +156,9 @@ on_load_torrent(Hash, single, {Name, Length}) ->
on_load_torrent(Hash, multi, {Root, Files}) ->
{Hash, Root, 0, Files}.
try_save(_, []) ->
try_save([]) ->
ok;
try_save(PoolName, Tors) ->
Conn = mongo_pool:get(PoolName),
try_save(Tors) ->
Conn = mongo_pool:get(?DBPOOL),
db_store_mongo:unsafe_insert(Conn, Tors).

View File

@ -0,0 +1,115 @@
%%
%% tor_location_reader.erl
%% Kevin Lynx
%% 07.08.2013
%%
-module(tor_location_reader).
-behaviour(gen_server).
-include("vlog.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1,
get/1,
ensure_status_files/1,
stop/0]).
-record(state, {dir, fp, sum, position, maxpos, lastPrintPos = 0}).
start_link(RootDir) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [RootDir], []).
stop() ->
gen_server:cast(srv_name(), stop).
get(Count) ->
gen_server:call(srv_name(), {read, Count}).
ensure_status_files(RootDir) ->
Index = RootDir ++ "index.txt",
Status = RootDir ++ "index_status.txt",
(filelib:is_regular(Index)) and (filelib:is_regular(Status)).
srv_name() ->
?MODULE.
init([RootDir]) ->
{ok, FP} = file:open(RootDir ++ "index.txt", [read]),
{ok, [Status]} = file:consult(RootDir ++ "index_status.txt"),
Pos = proplists:get_value(position, Status),
Sum = proplists:get_value(sum, Status),
{ok, MaxPos} = file:position(FP, eof),
file:position(FP, Pos),
?I(?FMT("read status file ok ~p, ~p", [Pos, MaxPos])),
{ok, #state{fp = FP, position = Pos, maxpos = MaxPos, sum = Sum, dir = RootDir}, 0}.
terminate(_, #state{fp = FP} = State) ->
file:close(FP),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info(timeout, State) ->
{noreply, State};
handle_info(M, State) ->
?W(?FMT("unhandled message ~p", [M])),
{noreply, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call({read, Count}, _From, State) ->
#state{dir = RootDir, sum = Sum, fp = FP,
lastPrintPos = OldPrPos, maxpos = MaxPos, position = OldPos} = State,
List = do_read(FP, Count),
{NewPos, Names, PrintPos} = case length(List) == 0 of
true when OldPos == MaxPos ->
PrPos = display_progress(MaxPos, MaxPos, OldPrPos),
update_status(RootDir, FP, Sum, OldPos),
% to avoid multiple display 100%
{MaxPos + 1, [], PrPos};
true ->
{MaxPos + 1, [], OldPrPos};
false ->
PrPos = display_progress(MaxPos, OldPos, OldPrPos),
Pos = update_status(RootDir, FP, Sum, OldPos),
{Pos, List, PrPos}
end,
{reply, Names, State#state{position = NewPos, lastPrintPos = PrintPos}};
handle_call(_, _From, State) ->
{noreply, State}.
display_progress(MaxPos, CurPos, OldPrPos)
when 100 * (CurPos - OldPrPos) / MaxPos >= 10; MaxPos == CurPos ->
io:format("--> ~.2f%~n", [CurPos * 100 / MaxPos]),
CurPos;
display_progress(_MaxPos, _CurPos, OldPrPos) ->
OldPrPos.
do_read(_, 0) ->
[];
do_read(FP, Count) ->
case io:get_line(FP, "") of
eof -> [];
Line -> [strip_lf(Line) | do_read(FP, Count - 1)]
end.
strip_lf(S) ->
lists:sublist(S, length(S) - 1).
% when some error occurs, the processor can continue to work from old pos
update_status(RootDir, FP, Sum, OldPos) ->
{ok, Pos} = file:position(FP, cur),
Status = [{sum, Sum}, {position, OldPos}],
?I(?FMT("update status ~p", [Status])),
file:write_file(RootDir ++ "index_status.txt", io_lib:fwrite("~p.\n",[Status])),
Pos.

View File

@ -0,0 +1,141 @@
%%
%% tor_location_writer.erl
%% Kevin Lynx
%% 07.08.2013
%%
-module(tor_location_writer).
-behaviour(gen_server).
-include("vlog.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/2, stop/0]).
-export([worker_run/2]).
-record(state, {parent, dir, fp, sum = 0, workers = []}).
-compile(export_all).
start_link(RootDir, Parent) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [RootDir, Parent], []).
stop() ->
gen_server:cast(srv_name(), stop).
srv_name() ->
?MODULE.
init([RootDir, Parent]) ->
{ok, FP} = file:open(RootDir ++ "index.txt", [write]),
{ok, #state{parent = Parent, dir = RootDir, fp = FP}, 0}.
terminate(_, #state{fp = FP} = State) ->
file:close(FP),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info(timeout, State) ->
#state{dir = RootDir} = State,
Dirs = list_directory(RootDir),
?I(?FMT("~s has ~p subdirectories", [RootDir, length(Dirs)])),
Workers = [spawn_link(?MODULE, worker_run, [RootDir, Dir]) || Dir <- Dirs],
io:format("start to build the torrent index.....~n", []),
{noreply, State#state{workers = Workers}};
handle_info(M, State) ->
?W(?FMT("unhandled message ~p", [M])),
{noreply, State}.
handle_cast({worker_done, Pid}, State) ->
#state{parent = Parent, workers = Workers, sum = Sum, dir = RootDir} = State,
NewWorks = lists:delete(Pid, Workers),
case length(NewWorks) of
0 ->
on_build_done(RootDir, Sum, Parent),
{stop, normal, State#state{workers = NewWorks}};
_ ->
{noreply, State#state{workers = NewWorks}}
end;
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call({add_torrents, Names}, _From, State) ->
#state{fp = FP, sum = Sum} = State,
AddCnt = append_torrent_names(FP, Names),
?I(?FMT("append ~p torrents to index file", [AddCnt])),
{reply, ok, State#state{sum = Sum + AddCnt}};
handle_call(_, _From, State) ->
{noreply, State}.
on_build_done(RootDir, Sum, Parent) ->
?I(?FMT("build index done, total ~p torrents", [Sum])),
io:format("build index done, total ~p torrents~n", [Sum]),
Ret = [{sum, Sum}, {position, 0}],
file:write_file(RootDir ++ "index_status.txt", io_lib:fwrite("~p.\n",[Ret])),
Parent ! build_index_done.
list_directory(RootDir) ->
{ok, Files} = file:list_dir(RootDir),
% dropwhile not work??
lists:filter(fun (S) -> not filter_dir(RootDir ++ S) end, Files).
filter_dir([$\.|_]) ->
true;
filter_dir(Dir) ->
(not filelib:is_file(Dir)) or (filelib:is_regular(Dir)).
parse_hash(FileName) ->
string:to_upper(filename:basename(FileName, ".torrent")).
append_torrent_names(FP, [Name|Rest]) ->
Hash = parse_hash(Name),
case length(Hash) == 40 of
true ->
io:format(FP, "~s~n", [Name]) ,
1 + append_torrent_names(FP, Rest);
false ->
?W(?FMT("invalid torrent file name ~s", [Name]))
end;
append_torrent_names(_, []) ->
0.
%%
notify_worker_done() ->
gen_server:cast(srv_name(), {worker_done, self()}).
notify_add_torrents([]) ->
skip;
notify_add_torrents(Names) ->
gen_server:call(srv_name(), {add_torrents, Names}, infinity).
load_torrent_names(Dir) ->
Files = filelib:wildcard(Dir ++ "/*.torrent"),
Files.
worker_run(RootDir, Dir) ->
FullDir = RootDir ++ Dir ++ "/",
SubDirs = list_directory(FullDir),
?I(?FMT("worker ~p has ~p subdirectories", [self(), length(SubDirs)])),
process_torrents(FullDir, SubDirs),
notify_worker_done().
process_torrents(ParentDir, [Dir|Rest]) ->
FullDir = ParentDir ++ "/" ++ Dir,
Names = load_torrent_names(FullDir),
?I(?FMT("worker ~p subdirectory ~s has ~p torrents", [self(), FullDir, length(Names)])),
notify_add_torrents(Names),
process_torrents(ParentDir, Rest);
process_torrents(_, []) ->
ok.

View File

@ -1,3 +1,7 @@
## 07.08.2013
* add torrent importer which can import local torrents into torrents database
## 07.05.2013
* add torrent downloader which will download torrents and store them in database or local file system

View File

@ -1 +1 @@
erl -pa ebin -noshell -run tor_builder start_standalone localhost 27017 10 torrents/
erl -pa ebin -noshell -run tor_builder start_standalone localhost 27017 50 torrents/