From 5a1be24b89360ce18038e9549aaba862e272ecbc Mon Sep 17 00:00:00 2001 From: Kevin Lynx Date: Mon, 8 Jul 2013 22:18:16 +0800 Subject: [PATCH] update torrent importer --- src/tor_builder/tor_builder.erl | 159 +++++++++++++++++------- src/tor_builder/tor_location_reader.erl | 115 +++++++++++++++++ src/tor_builder/tor_location_writer.erl | 141 +++++++++++++++++++++ tools/HISTORY.md | 4 + tools/win_start_import_tors.bat | 2 +- 5 files changed, 372 insertions(+), 49 deletions(-) create mode 100644 src/tor_builder/tor_location_reader.erl create mode 100644 src/tor_builder/tor_location_writer.erl diff --git a/src/tor_builder/tor_builder.erl b/src/tor_builder/tor_builder.erl index 959a6bc..69a2bb1 100644 --- a/src/tor_builder/tor_builder.erl +++ b/src/tor_builder/tor_builder.erl @@ -1,14 +1,26 @@ %% %% tor_builder.erl %% Kevin Lynx -%% 07.07.2013 +%% 07.08.2013 %% -module(tor_builder). +-behaviour(gen_server). -include("vlog.hrl"). --export([start_standalone/1, start_standalone/4]). --define(POOLNAME, write_tor_dbpool). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([worker_run/1]). +-export([start_link/4, + start_link/1, + start_standalone/1, + start_standalone/4, + stop/0]). +-record(state, {dir, workers = [], workercnt}). +-define(DBPOOL, tor_db_pool). -define(BATCHSIZE, 10). --compile(export_all). start_dep_apps() -> code:add_path("deps/bson/ebin"), @@ -17,64 +29,115 @@ start_dep_apps() -> Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb], [application:start(App) || App <- Apps]. -start_standalone([IP, Port, Size, RootDir]) -> +% called from shell +start_standalone([IP, Port, WorkerCount, RootDir]) -> IPort = list_to_integer(Port), - ISize = list_to_integer(Size), - start_standalone(IP, IPort, ISize, RootDir), + IWorkerCount = list_to_integer(WorkerCount), + start_standalone(IP, IPort, IWorkerCount, RootDir), receive fuck_erl_s_option -> ok end. -start_standalone(IP, Port, PoolSize, RootDir) -> +% `RootDir' must follow a slash e.g: torrents/ +start_standalone(IP, Port, WorkerCount, RootDir) -> start_dep_apps(), - mongo_sup:start_pool(?POOLNAME, PoolSize, {IP, Port}), - vlog:start_link("tor_builder.log", 1), - Dirs = get_dir_list(RootDir), - ?I(?FMT("found ~p subdirectories in ~s", [length(Dirs), RootDir])), - [start(?POOLNAME, RootDir ++ Dir) || Dir <- Dirs], - ok. + start_link(IP, Port, RootDir, WorkerCount). -get_dir_list(RootDir) -> - {ok, Dirs} = file:list_dir(RootDir), - lists:dropwhile(fun (S) -> filter_dir(S) end, Dirs). - -filter_dir([$\.|_]) -> - true; -filter_dir(_) -> - false. %% -start(PoolName, Dir) -> - spawn(?MODULE, load_and_save, [PoolName, Dir]). +start_link(DBIP, DBPort, RootDir, WorkerCount) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [DBIP, DBPort, RootDir, WorkerCount], []). -load_and_save(PoolName, Dir) -> - Now = now(), - ?T(?FMT("~p startup", [self()])), - Names = load_torrent_names(Dir), - ?I(?FMT("~p found ~p torrents in dir ~s, used ~p ms", [self(), length(Names), Dir, - timer:now_diff(now(), Now) div 1000])), - parse_and_save(PoolName, [], Names), - io:format("~p parsed and saved ~p torrents~n", [self(), length(Names)]), - ?I(?FMT("save torrents done ~p", [self()])). +start_link(RootDir) -> + start_link(localhost, 27017, RootDir, 100). -load_torrent_names(Dir) -> - Files = filelib:wildcard(Dir ++ "/**/*.torrent"), - Files. +stop() -> + gen_server:cast(srv_name, stop). -parse_and_save(PoolName, Docs, [Name|Files]) when 1 + length(Docs) == ?BATCHSIZE -> +srv_name() -> + ?MODULE. + +init([DBIP, DBPort, RootDir, WorkerCount]) -> + mongo_sup:start_pool(?DBPOOL, 5, {DBIP, DBPort}), + vlog:start_link("tor_builder.log", 1), + {ok, #state{dir = RootDir, workercnt = WorkerCount}, 1}. + +code_change(_, _, State) -> + {ok, State}. + +terminate(_, State) -> + tor_location_reader:stop(), + mongo_sup:stop_pool(?DBPOOL), + vlog:stop(), + {ok, State}. + +handle_info(build_index_done, State) -> + #state{workercnt = WorkerCount, dir = RootDir} = State, + Workers = spawn_workers(RootDir, WorkerCount), + {noreply, State#state{workers = Workers}}; + +handle_info({worker_done, Pid}, State) -> + #state{workers = Workers} = State, + NewWorks = lists:delete(Pid, Workers), + case length(NewWorks) of + 0 -> + io:format("import all torrents done~n"), + {stop, normal, State#state{workers = NewWorks}}; + _ -> + {noreply, State#state{workers = NewWorks}} + end; + +handle_info(timeout, State) -> + #state{dir = RootDir, workercnt = WorkerCount} = State, + Workers = case tor_location_reader:ensure_status_files(RootDir) of + true -> + io:format("continue to process...~n"), + spawn_workers(RootDir, WorkerCount); + false -> + tor_location_writer:start_link(RootDir, self()), + [] + end, + {noreply, State#state{workers = Workers}}; + +handle_info(M, State) -> + ?W(?FMT("unhandled message ~p", [M])), + {noreply, State}. + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +spawn_workers(RootDir, Count) -> + tor_location_reader:start_link(RootDir), + io:format("spawn ~p worker processes~n", [Count]), + [spawn_link(?MODULE, worker_run, [self()]) || _ <- lists:seq(1, Count)]. + +worker_run(Parent) -> + Names = tor_location_reader:get(?BATCHSIZE), + ?T(?FMT("read ~p torrent file names", [length(Names)])), + case length(Names) == 0 of + true -> + process_done(Parent); + false -> + parse_and_save([], Names), + worker_run(Parent) + end. + +process_done(Parent) -> + Parent ! {worker_done, self()}. + +parse_and_save(Docs, [Name|Files]) -> SaveDocs = load_and_parse(Name, Docs), - try_save(PoolName, SaveDocs), - parse_and_save(PoolName, [], Files); + parse_and_save(SaveDocs, Files); -parse_and_save(PoolName, Docs, [Name|Files]) -> - SaveDocs = load_and_parse(Name, Docs), - parse_and_save(PoolName, SaveDocs, Files); - -parse_and_save(PoolName, Docs, []) -> - try_save(PoolName, Docs). +parse_and_save(Docs, []) -> + try_save(Docs). load_and_parse(Name, AccIn) -> {ok, Content} = file:read_file(Name), MagHash = parse_hash(Name), + 40 = length(MagHash), case catch(torrent_file:parse(Content)) of {'EXIT', _} -> ?W(?FMT("parse a torrent failed ~s", [MagHash])), @@ -93,9 +156,9 @@ on_load_torrent(Hash, single, {Name, Length}) -> on_load_torrent(Hash, multi, {Root, Files}) -> {Hash, Root, 0, Files}. -try_save(_, []) -> +try_save([]) -> ok; -try_save(PoolName, Tors) -> - Conn = mongo_pool:get(PoolName), +try_save(Tors) -> + Conn = mongo_pool:get(?DBPOOL), db_store_mongo:unsafe_insert(Conn, Tors). diff --git a/src/tor_builder/tor_location_reader.erl b/src/tor_builder/tor_location_reader.erl new file mode 100644 index 0000000..c62809e --- /dev/null +++ b/src/tor_builder/tor_location_reader.erl @@ -0,0 +1,115 @@ +%% +%% tor_location_reader.erl +%% Kevin Lynx +%% 07.08.2013 +%% +-module(tor_location_reader). +-behaviour(gen_server). +-include("vlog.hrl"). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1, + get/1, + ensure_status_files/1, + stop/0]). +-record(state, {dir, fp, sum, position, maxpos, lastPrintPos = 0}). + +start_link(RootDir) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [RootDir], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +get(Count) -> + gen_server:call(srv_name(), {read, Count}). + +ensure_status_files(RootDir) -> + Index = RootDir ++ "index.txt", + Status = RootDir ++ "index_status.txt", + (filelib:is_regular(Index)) and (filelib:is_regular(Status)). + +srv_name() -> + ?MODULE. + +init([RootDir]) -> + {ok, FP} = file:open(RootDir ++ "index.txt", [read]), + {ok, [Status]} = file:consult(RootDir ++ "index_status.txt"), + Pos = proplists:get_value(position, Status), + Sum = proplists:get_value(sum, Status), + {ok, MaxPos} = file:position(FP, eof), + file:position(FP, Pos), + ?I(?FMT("read status file ok ~p, ~p", [Pos, MaxPos])), + {ok, #state{fp = FP, position = Pos, maxpos = MaxPos, sum = Sum, dir = RootDir}, 0}. + +terminate(_, #state{fp = FP} = State) -> + file:close(FP), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_info(timeout, State) -> + {noreply, State}; + +handle_info(M, State) -> + ?W(?FMT("unhandled message ~p", [M])), + {noreply, State}. + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call({read, Count}, _From, State) -> + #state{dir = RootDir, sum = Sum, fp = FP, + lastPrintPos = OldPrPos, maxpos = MaxPos, position = OldPos} = State, + List = do_read(FP, Count), + {NewPos, Names, PrintPos} = case length(List) == 0 of + true when OldPos == MaxPos -> + PrPos = display_progress(MaxPos, MaxPos, OldPrPos), + update_status(RootDir, FP, Sum, OldPos), + % to avoid multiple display 100% + {MaxPos + 1, [], PrPos}; + true -> + {MaxPos + 1, [], OldPrPos}; + false -> + PrPos = display_progress(MaxPos, OldPos, OldPrPos), + Pos = update_status(RootDir, FP, Sum, OldPos), + {Pos, List, PrPos} + end, + {reply, Names, State#state{position = NewPos, lastPrintPos = PrintPos}}; + +handle_call(_, _From, State) -> + {noreply, State}. + +display_progress(MaxPos, CurPos, OldPrPos) +when 100 * (CurPos - OldPrPos) / MaxPos >= 10; MaxPos == CurPos -> + io:format("--> ~.2f%~n", [CurPos * 100 / MaxPos]), + CurPos; + +display_progress(_MaxPos, _CurPos, OldPrPos) -> + OldPrPos. + +do_read(_, 0) -> + []; +do_read(FP, Count) -> + case io:get_line(FP, "") of + eof -> []; + Line -> [strip_lf(Line) | do_read(FP, Count - 1)] + end. + +strip_lf(S) -> + lists:sublist(S, length(S) - 1). + +% when some error occurs, the processor can continue to work from old pos +update_status(RootDir, FP, Sum, OldPos) -> + {ok, Pos} = file:position(FP, cur), + Status = [{sum, Sum}, {position, OldPos}], + ?I(?FMT("update status ~p", [Status])), + file:write_file(RootDir ++ "index_status.txt", io_lib:fwrite("~p.\n",[Status])), + Pos. + + + diff --git a/src/tor_builder/tor_location_writer.erl b/src/tor_builder/tor_location_writer.erl new file mode 100644 index 0000000..d93a5fe --- /dev/null +++ b/src/tor_builder/tor_location_writer.erl @@ -0,0 +1,141 @@ +%% +%% tor_location_writer.erl +%% Kevin Lynx +%% 07.08.2013 +%% +-module(tor_location_writer). +-behaviour(gen_server). +-include("vlog.hrl"). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/2, stop/0]). +-export([worker_run/2]). +-record(state, {parent, dir, fp, sum = 0, workers = []}). +-compile(export_all). + +start_link(RootDir, Parent) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [RootDir, Parent], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +srv_name() -> + ?MODULE. + +init([RootDir, Parent]) -> + {ok, FP} = file:open(RootDir ++ "index.txt", [write]), + {ok, #state{parent = Parent, dir = RootDir, fp = FP}, 0}. + +terminate(_, #state{fp = FP} = State) -> + file:close(FP), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_info(timeout, State) -> + #state{dir = RootDir} = State, + Dirs = list_directory(RootDir), + ?I(?FMT("~s has ~p subdirectories", [RootDir, length(Dirs)])), + Workers = [spawn_link(?MODULE, worker_run, [RootDir, Dir]) || Dir <- Dirs], + io:format("start to build the torrent index.....~n", []), + {noreply, State#state{workers = Workers}}; + +handle_info(M, State) -> + ?W(?FMT("unhandled message ~p", [M])), + {noreply, State}. + +handle_cast({worker_done, Pid}, State) -> + #state{parent = Parent, workers = Workers, sum = Sum, dir = RootDir} = State, + NewWorks = lists:delete(Pid, Workers), + case length(NewWorks) of + 0 -> + on_build_done(RootDir, Sum, Parent), + {stop, normal, State#state{workers = NewWorks}}; + _ -> + {noreply, State#state{workers = NewWorks}} + end; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call({add_torrents, Names}, _From, State) -> + #state{fp = FP, sum = Sum} = State, + AddCnt = append_torrent_names(FP, Names), + ?I(?FMT("append ~p torrents to index file", [AddCnt])), + {reply, ok, State#state{sum = Sum + AddCnt}}; + +handle_call(_, _From, State) -> + {noreply, State}. + +on_build_done(RootDir, Sum, Parent) -> + ?I(?FMT("build index done, total ~p torrents", [Sum])), + io:format("build index done, total ~p torrents~n", [Sum]), + Ret = [{sum, Sum}, {position, 0}], + file:write_file(RootDir ++ "index_status.txt", io_lib:fwrite("~p.\n",[Ret])), + Parent ! build_index_done. + +list_directory(RootDir) -> + {ok, Files} = file:list_dir(RootDir), + % dropwhile not work?? + lists:filter(fun (S) -> not filter_dir(RootDir ++ S) end, Files). + +filter_dir([$\.|_]) -> + true; +filter_dir(Dir) -> + (not filelib:is_file(Dir)) or (filelib:is_regular(Dir)). + +parse_hash(FileName) -> + string:to_upper(filename:basename(FileName, ".torrent")). + +append_torrent_names(FP, [Name|Rest]) -> + Hash = parse_hash(Name), + case length(Hash) == 40 of + true -> + io:format(FP, "~s~n", [Name]) , + 1 + append_torrent_names(FP, Rest); + false -> + ?W(?FMT("invalid torrent file name ~s", [Name])) + end; + +append_torrent_names(_, []) -> + 0. + +%% +notify_worker_done() -> + gen_server:cast(srv_name(), {worker_done, self()}). + +notify_add_torrents([]) -> + skip; + +notify_add_torrents(Names) -> + gen_server:call(srv_name(), {add_torrents, Names}, infinity). + +load_torrent_names(Dir) -> + Files = filelib:wildcard(Dir ++ "/*.torrent"), + Files. + +worker_run(RootDir, Dir) -> + FullDir = RootDir ++ Dir ++ "/", + SubDirs = list_directory(FullDir), + ?I(?FMT("worker ~p has ~p subdirectories", [self(), length(SubDirs)])), + process_torrents(FullDir, SubDirs), + notify_worker_done(). + +process_torrents(ParentDir, [Dir|Rest]) -> + FullDir = ParentDir ++ "/" ++ Dir, + Names = load_torrent_names(FullDir), + ?I(?FMT("worker ~p subdirectory ~s has ~p torrents", [self(), FullDir, length(Names)])), + notify_add_torrents(Names), + process_torrents(ParentDir, Rest); + +process_torrents(_, []) -> + ok. + + + + diff --git a/tools/HISTORY.md b/tools/HISTORY.md index dd3eb80..0f1ed15 100644 --- a/tools/HISTORY.md +++ b/tools/HISTORY.md @@ -1,3 +1,7 @@ +## 07.08.2013 + +* add torrent importer which can import local torrents into torrents database + ## 07.05.2013 * add torrent downloader which will download torrents and store them in database or local file system diff --git a/tools/win_start_import_tors.bat b/tools/win_start_import_tors.bat index 1a1a358..fc721bd 100644 --- a/tools/win_start_import_tors.bat +++ b/tools/win_start_import_tors.bat @@ -1 +1 @@ -erl -pa ebin -noshell -run tor_builder start_standalone localhost 27017 10 torrents/ +erl -pa ebin -noshell -run tor_builder start_standalone localhost 27017 50 torrents/