add cache_indexer, not integrated now, see src/cache_indexer/readme.md

This commit is contained in:
Kevin Lynx 2013-07-14 22:57:57 +08:00
parent 539dd9103b
commit c78e5d2f9c
7 changed files with 304 additions and 0 deletions

View File

@ -34,3 +34,7 @@
[debug_info, [debug_info,
{i, "include"}, {i, "include"},
{outdir,"ebin"}]}. {outdir,"ebin"}]}.
{'src/cache_indexer/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.

View File

@ -0,0 +1,2 @@
cache_indexer download torrent index file from torrage.com and build the torrent index in database, so that dhtcrawler2 can check whether a torrent can retrieved from http cache quickly.

View File

@ -0,0 +1,29 @@
%%
%% db_hash_index.erl
%% Kevin Lynx
%% 07.14.2013
%%
-module(db_hash_index).
-export([insert/2, exist/2]).
-define(DBNAME, hash_cache).
-define(COLLNAME, hashes).
insert(Conn, Hash) when is_list(Hash) ->
case catch do_insert(Conn, Hash) of
{'EXIT', _} -> failed;
_ -> ok
end.
do_insert(Conn, Hash) ->
Doc = {'_id', list_to_binary(Hash)},
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:insert(?COLLNAME, Doc)
end).
exist(Conn, Hash) when is_list(Hash) ->
Sel = {'_id', list_to_binary(Hash)},
{Doc} = mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:find_one(?COLLNAME, Sel)
end),
Doc == {}.

View File

@ -0,0 +1,160 @@
%%
%% index_builder.erl
%% Kevin Lynx
%% 07.14.2013
%%
-module(index_builder).
-include("vlog.hrl").
-compile(export_all).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/2,
start_standalone/1,
start_standalone/2,
stop/0]).
-record(state, {work_on = [], done = [], workers = []}).
-define(WORKDIR, "sync/").
-define(DBPOOL, index_builder_pool).
-define(SYNC_TODAY_INTERVAL, 5*60*1000).
start_dep_apps() ->
code:add_path("deps/bson/ebin"),
code:add_path("deps/mongodb/ebin"),
code:add_path("deps/ibrowse/ebin"),
Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps],
ibrowse:start().
start_standalone([IP, Port]) ->
IPort = list_to_integer(Port),
start_standalone(IP, IPort),
receive
fuck_erl_s_option -> ok
end.
start_standalone(DBIP, DBPort) ->
start_dep_apps(),
start_link(DBIP, DBPort).
start_link(DBIP, DBPort) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [DBIP, DBPort], []).
stop() ->
gen_server:cast(srv_name(), stop).
srv_name() ->
?MODULE.
init([DBIP, DBPort]) ->
mongo_sup:start_pool(?DBPOOL, 5, {DBIP, DBPort}),
filelib:ensure_dir("log/"),
vlog:start_link("log/hash_cache.txt", 0),
{Done, WorkOn} = load_status(?WORKDIR),
?I(?FMT("done ~p, workon ~p", [Done, WorkOn])),
NewWorkOn = intersect_new_files(?WORKDIR, Done ++ WorkOn),
?I(?FMT("new workon ~p", [NewWorkOn])),
save_status(?WORKDIR, Done, NewWorkOn),
{ok, #state{work_on = WorkOn ++ NewWorkOn, done = Done}, 0}.
terminate(_, State) ->
mongo_sup:stop_pool(?DBPOOL),
vlog:stop(),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info(sync_today, State) ->
#state{work_on = WorkOn} = State,
FileName = index_download:today_file_name(),
case lists:member(FileName, WorkOn) of
true ->
% the file is processing, we should wait
?I(?FMT("today index file ~s is processing, wait", [FileName])),
schedule_update_today();
false ->
?I(?FMT("start to download today index file ~s", [FileName])),
index_download:download()
end,
{noreply, State};
handle_info({sync_torrent_index, ok, FileName}, State) ->
#state{workers = Workers, work_on = WorkOn} = State,
schedule_update_today(),
?I(?FMT("today index file ~s download success", [FileName])),
Pid = start_worker(FileName),
{noreply, State#state{work_on = [FileName|WorkOn], workers = [Pid|Workers]}};
handle_info({sync_torrent_index, failed, FileName}, State) ->
?W(?FMT("today index file ~s download failed", [FileName])),
schedule_update_today(),
{noreply, State};
handle_info({worker_done, Pid, FileName}, State) ->
?I(?FMT("worker ~s done", [FileName])),
#state{workers = Workers, done = Done, work_on = WorkOn} = State,
NewWorkers = lists:delete(Pid, Workers),
case length(NewWorkers) of
0 ->
?I("all index files have been done"),
io:format("all index files have been done~n", []);
_ ->
ok
end,
NewDone = [FileName|Done],
NewWorkOn = lists:delete(FileName, WorkOn),
save_status(?WORKDIR, NewDone, NewWorkOn),
{noreply, State#state{workers = NewWorkers, done = NewDone, work_on = NewWorkOn}};
handle_info(timeout, State) ->
#state{work_on = WorkOn} = State,
Workers = [start_worker(FileName) || FileName <- WorkOn],
schedule_update_today(),
{noreply, State#state{workers = Workers}}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{reply, not_implemented, State}.
schedule_update_today() ->
timer:send_after(?SYNC_TODAY_INTERVAL, sync_today).
%%
start_worker(FileName) ->
Conn = mongo_pool:get(?DBPOOL),
index_file:start(Conn, FileName).
intersect_new_files(Dir, Processed) ->
Files = index_file_list(Dir),
lists:foldl(fun(F, Acc) ->
case lists:member(F, Processed) of
true -> Acc;
false -> [F|Acc]
end
end, [], Files).
load_status(Dir) ->
case file:consult(Dir ++ "index.sta") of
{ok, [Status]} ->
Done = proplists:get_value(processed, Status),
WorkOn = proplists:get_value(processing, Status),
{Done, WorkOn};
{error, _} ->
{[], []}
end.
save_status(Dir, Done, WorkOn) ->
Status = [{processed, Done}, {processing, WorkOn}],
file:write_file(Dir ++ "index.sta", io_lib:fwrite("~p.\n",[Status])).
index_file_list(Dir) ->
Files = filelib:wildcard(Dir ++ "*.txt"),
Files.

View File

@ -0,0 +1,52 @@
%%
%% index_download.erl
%% Kevin Lynx
%% 07.14.2013
%%
-module(index_download).
-export([download/0, download/2, today_file_name/0, do_download/2]).
-define(DOMAIN, "http://torrage.com").
-define(WORKDIR, "sync/").
download() ->
{Date, _} = calendar:local_time(),
download(self(), Date).
download(From, Date) ->
spawn_link(?MODULE, do_download, [From, Date]).
do_download(From, {_, _, _} = Date) ->
File = format_file_name(Date),
URL = format_file_url(?DOMAIN, File),
io:format("download file ~s~n", [URL]),
Start = now(),
{ok, Code, _, Body} = ibrowse:send_req(URL, [], get, [], [], infinity),
Dir = ?WORKDIR,
filelib:ensure_dir(Dir),
FullFile = Dir ++ File,
Ret = try_save(FullFile, Code, Body, timer:now_diff(now(), Start) div 1000),
From ! Ret.
try_save(FullFile, "200", Body, Time) ->
file:write_file(FullFile, Body),
Size = length(Body),
Speed = Size * 1000 div Time,
io:format("download index file ~s success ~b bytes, ~b bytes/sec~n", [FullFile, Size, Speed]),
{sync_torrent_index, ok, FullFile};
try_save(FullFile, Code, _, _) ->
io:format("download index file ~s failed ~p~n", [FullFile, Code]),
{sync_torrent_index, failed, FullFile}.
today_file_name() ->
{Date, _} = calendar:local_time(),
?WORKDIR ++ format_file_name(Date).
format_file_name({Y, M, 0}) ->
lists:flatten(io_lib:format("~b~2..0b.txt", [Y, M]));
format_file_name({Y, M, D}) ->
lists:flatten(io_lib:format("~b~2..0b~2..0b.txt", [Y, M, D])).
format_file_url(Domain, File) ->
Domain ++ "/sync/" ++ File.

View File

@ -0,0 +1,56 @@
%%
%% index_file.erl
%% Kevin Lynx
%% 07.14.2013
%%
-module(index_file).
-export([start/2]).
-export([worker_run/3]).
start(Conn, FileName) ->
spawn_link(?MODULE, worker_run, [self(), Conn, FileName]).
load_position(Name) ->
StatusFile = Name ++ ".sta",
Pos = case file:consult(StatusFile) of
{ok, [Status]} ->
proplists:get_value(position, Status);
{error, _} ->
0
end,
Pos.
save_position(Name, Pos) ->
StatusFile = Name ++ ".sta",
Status = [{name, Name}, {position, Pos}],
file:write_file(StatusFile, io_lib:fwrite("~p.\n",[Status])).
worker_run(Parent, Conn, FileName) ->
Pos = load_position(FileName),
io:format("start to process ~s from ~p~n", [FileName, Pos]),
{ok, FP} = file:open(FileName, [read]),
file:position(FP, Pos),
Sum = process_hash(Conn, FileName, FP),
Parent ! {worker_done, self(), FileName},
file:close(FP),
io:format("Index file ~s done, ~p hashes~n", [FileName, Sum]).
process_hash(Conn, FileName, FP) ->
case io:get_line(FP, "") of
eof -> 0;
Line ->
save_hash(Conn, strip_lf(Line)),
{ok, Pos} = file:position(FP, cur),
save_position(FileName, Pos),
1 + process_hash(Conn, FileName, FP)
end.
strip_lf(S) ->
lists:sublist(S, length(S) - 1).
save_hash(Conn, Hash) when length(Hash) == 40 ->
db_hash_index:insert(Conn, Hash);
save_hash(_, _) ->
invalid.

View File

@ -0,0 +1 @@
tor_builder imports local torrents to database