commit d23b3293f1fe8457b5b452db7a6c59042ebf918d Author: Kevin Lynx Date: Mon Jul 1 23:06:18 2013 +0800 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..281319c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +deps +*.swp +bin diff --git a/Emakefile b/Emakefile new file mode 100644 index 0000000..9d64f5f --- /dev/null +++ b/Emakefile @@ -0,0 +1,28 @@ +{'src/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/bt/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/common/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/http_front/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/hash_reader/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/crawler/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. +{'src/torrent/*', + [debug_info, + {i, "include"}, + {outdir,"ebin"}]}. diff --git a/README.md b/README.md new file mode 100644 index 0000000..62638e1 --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +## dhtcrawler + +dhtcrawler is a DHT crawler written in erlang. It can join a DHT network and crawl many P2P torrents. The program save all torrent info into database and provide an http interface to search a torrent by a keyword. + +![screenshot](https://raw.github.com/kevinlynx/dhtcrawler/master/screenshot.png) + +## Usage + +* Download mongodb and start it with text search, i.e: + + mongod --dbpath db --setParameter textSearchEnabled=true + +* Download dhtcrawler source code +* Use `rebar` to download and install all dependent libraries + + rebar get-deps + +* compile + + rebar compile + +* start dhtcrawler + + crawler_app:start() + +* start the http front-end + + crawler_http:start(). + +* Open a web browser and point to `localhost:8000/index.html` + +## Config + +see priv/dhtcrawler.config. + +**NOTE**, when you change `node_count` value in dhtcrawler.config, you'd better delete all files saved in dhtstate directory. + diff --git a/include/db_common.hrl b/include/db_common.hrl new file mode 100644 index 0000000..c133c83 --- /dev/null +++ b/include/db_common.hrl @@ -0,0 +1,4 @@ +-define(HASH_DBNAME, dht_hash). +-define(HASH_COLLNAME, hash). +-define(HASH_DOWNLOAD_COLL, wait_download). + diff --git a/include/vlog.hrl b/include/vlog.hrl new file mode 100644 index 0000000..edf33d6 --- /dev/null +++ b/include/vlog.hrl @@ -0,0 +1,32 @@ +%% +%% vlog.hrl +%% Kevin Lynx +%% 06.05.2013 +%% +-ifndef(VLOGHRL). +-define(VLOGHRL, true). + +-define(TRACE, 0). +-define(INFO, 1). +-define(WARN, 2). +-define(ERROR, 3). +-define(OFF, 4). + +-define(LVLS(L), + case L of + ?TRACE -> "trac"; + ?INFO -> "info"; + ?WARN -> "warn"; + ?ERROR -> "error" + end). +-define(LOG(X, Lvl), + vlog:format(Lvl, "~s [~s] {~p, ~p}: ~p~n", + [?LVLS(Lvl), vlog:time_string(), ?MODULE, ?LINE, X])). +-define(T(X), ?LOG(X, ?TRACE)). +-define(I(X), ?LOG(X, ?INFO)). +-define(W(X), ?LOG(X, ?WARN)). +-define(E(X), ?LOG(X, ?ERROR)). + +-define(FMT(S, A), lists:flatten(io_lib:format(S, A))). + +-endif. diff --git a/priv/dhtcrawler.config b/priv/dhtcrawler.config new file mode 100644 index 0000000..bce6f7e --- /dev/null +++ b/priv/dhtcrawler.config @@ -0,0 +1,6 @@ +[{start_port,6776}, + {node_count,50}, + {loglevel,3}, + {dbconn, 5}, + {dbhost,"localhost"}, + {dbport,27017}]. diff --git a/rebar b/rebar new file mode 100644 index 0000000..0da69d2 Binary files /dev/null and b/rebar differ diff --git a/rebar.cmd b/rebar.cmd new file mode 100644 index 0000000..6c7a1ca --- /dev/null +++ b/rebar.cmd @@ -0,0 +1,4 @@ +@echo off +setlocal +set rebarscript=%~f0 +escript.exe "%rebarscript:.cmd=%" %* diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..38953b0 --- /dev/null +++ b/rebar.config @@ -0,0 +1,9 @@ +{erl_opts, [debug_info, fail_on_warning]}. + +{deps, [ + {ibrowse, ".*", {git, "git@github.com:cmullaparthi/ibrowse.git", "HEAD"}}, + {bson, ".*", {git, "git@github.com:mongodb/bson-erlang.git", "HEAD"}}, + {mongodb, ".*", {git, "git@github.com:mongodb/mongodb-erlang.git", "HEAD"}}, + {kdht, ".*", {git, "git@github.com:kevinlynx/kdht.git", "HEAD"}} +]}. + diff --git a/src/bt/bt_conn.erl b/src/bt/bt_conn.erl new file mode 100644 index 0000000..38e675c --- /dev/null +++ b/src/bt/bt_conn.erl @@ -0,0 +1,151 @@ + +-module(bt_conn). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start/4, + stop/1]). +-export([test/0]). +-record(state, {data, sock, hash, req = 0, + msgid = 0, metasize = 0, metainfo = <<>>, + support_utdata = false}). +-define(UT_METADATA_MSGID, 1). + +start(IP, Port, Hash, Self) -> + gen_server:start_link(?MODULE, [IP, Port, Hash, Self], []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +init([IP, Port, Hash, Self]) -> + {ok, {IP, Port, Hash, Self}, 0}. + +terminate(_, State) when is_tuple(State) -> + {ok, State}; + +terminate(_, State) -> + #state{sock = Sock} = State, + gen_tcp:close(Sock), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_info({tcp, Sock, Data}, State) -> + #state{data = Old, req = Req} = State, + FullData = <>, + {NewReq, NewData, Msgs} = parse_message(Req, FullData, []), + inet:setopts(Sock, [{active,once}]), + NewState = process_message(State#state{data = NewData, req = NewReq}, Msgs), + {noreply, NewState}; + +handle_info(timeout, {IP, Port, Hash, Self}) -> + Sock = case gen_tcp:connect(IP, Port, [binary, {active, once}]) of + {ok, S} -> S; + _ -> nil + end, + State = #state{data = <<>>, sock = Sock, hash = Hash, req = 1}, + Handshake = bt_message:encode_handshake(Hash, Self), + gen_tcp:send(Sock, Handshake), + ExtHandshake = bt_message:encode_ext_handshake(?UT_METADATA_MSGID), + gen_tcp:send(Sock, ExtHandshake), + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +parse_message(1, Bin, Msgs) -> + {_, _, _, Rest} = bt_message:decode_handshake(Bin), + parse_message(2, Rest, Msgs); + +parse_message(Req, <<>>, Msgs) -> + {Req, <<>>, lists:reverse(Msgs)}; + +parse_message(Req, Bin, Msgs) -> + case bt_message:decode(Bin) of + {Rest, not_completed} -> + {Req, Rest, Msgs}; + {Rest, Msg} -> + parse_message(Req + 1, Rest, [Msg|Msgs]) + end. + +process_message(State, []) -> + State; + +process_message(State, [Msg|Rest]) -> + NewState = process_message(State, Msg), + process_message(NewState, Rest); + +process_message(State, not_implemented) -> + State; + +process_message(State, {extend, handshake, Dict, _}) -> + {ok, {dict, M}} = dict:find(<<"m">>, Dict), + {ok, ID} = dict:find(<<"ut_metadata">>, M), + {ok, Size} = dict:find(<<"metadata_size">>, Dict), + % request the metainfo + start_req_metainfo(State, ID), + State#state{msgid = ID, metasize = Size}; + +process_message(State, {extend, MsgID, Dict, Body}) -> + ?UT_METADATA_MSGID = MsgID, + {ok, FP} = file:open("meta.torrent", [append]), + file:write(FP, Body), + file:close(FP), + MetaMsg = bt_message:decode_metadata_msg(Dict, Body), + process_meta_message(State, MetaMsg). + +% request, reject right now +process_meta_message(State, {request, _}) -> + #state{sock = Sock, msgid = MsgID} = State, + Msg = bt_message:encode_metadata_reject(MsgID), + gen_tcp:send(Sock, Msg), + State; + +% data +process_meta_message(State, {data, Piece, Size, Data}) -> + #state{metainfo = OldInfo, metasize = MetaSize, sock = Sock, msgid = MsgID} = State, + Size = MetaSize, + NewInfo = <>, + case byte_size(NewInfo) >= MetaSize of + true -> + % load metainfo done, TODO: notify + io:format("download metainfo done ~p ~n", [Size]), + file:write_file("meta.torrent", NewInfo), + ok; + false -> + % request next piece + ReqMsg = bt_message:encode_metadata_req(MsgID, Piece + 1), + gen_tcp:send(Sock, ReqMsg), + ok + end, + State#state{metainfo = NewInfo}; + +% reject, ignore right now +process_meta_message(State, {reject, _}) -> + State. + +start_req_metainfo(State, ID) -> + #state{sock = Sock} = State, + ReqMsg = bt_message:encode_metadata_req(ID, 0), + gen_tcp:send(Sock, ReqMsg). + +%%% +test() -> + I = bep9:random(), + Peer = <>, + %Hash = <<200,126,2,108,203,24,198,144,173,99,133,8,141,160,119,166,176,58,126,169>>, + Hash = <<77,16,191,99,137,133,31,179,255,232,239,14,116,98,74,114,233,232,39,248>>, + %Hash = <<252,152,147,123,241,68,124,54,123,130,135,101,215,57,9, 59,102,111,53,209>>, + start({127, 0, 0, 1}, 6881, Hash, Peer). + diff --git a/src/bt/bt_message.erl b/src/bt/bt_message.erl new file mode 100644 index 0000000..e80f97f --- /dev/null +++ b/src/bt/bt_message.erl @@ -0,0 +1,122 @@ +%% +%% bt_message.erl +%% Kevin Lynx +%% 06.24.2013 +%% +-module(bt_message). +-export([decode/1, + decode_handshake/1, + encode_handshake/2, + decode_metadata_msg/2, + encode_ext_handshake/1, + encode_metadata_req/2, + encode_metadata_reject/1, + decode_metadata_id/1]). +-compile(export_all). +-define(MSG_EXTEND, 20). +-define(MSG_EXT_HANDSHAKE, 0). +-define(PROTO_NAME, <<"BitTorrent protocol">>). + +encode_handshake(Hash, PeerID) when is_binary(Hash), is_binary(PeerID) -> + % to support extend message, ext[5] |= 0x10 + Ext = <<0, 0, 0, 0, 0, 16, 0, 0>>, + <<19:8, ?PROTO_NAME/binary, Ext/binary, Hash/binary, PeerID/binary>>. + +decode_handshake(Bin) when is_binary(Bin) -> + <> = Bin, + <> = R, + case PName == ?PROTO_NAME of + true -> + {Ext, Hash, PeerID, Rest}; + false -> + {error, Bin} + end. + +% {R, not_completed} +% {R, {extend, handkshake, Dict}} +% {R, {extend, ExtMsgID, Dict}} +% {R, not_implemented} +decode(Bin) when is_binary(Bin) -> + <> = Bin, + case byte_size(Rest) < Size of + true -> + {Bin, not_completed}; + false -> + decode(Size, Rest) + end. + +decode(0, <<>>) -> % keep-alive + {<<>>, not_implemented}; + +decode(Size, <>) -> + % (whole size) - (message type) + BodySize = Size - 1, + case Type of + ?MSG_EXTEND -> + {Code, Dict, BodyR, R} = decode_extend(BodySize, Rest), + {R, {extend, Code, Dict, BodyR}}; + 0 -> + {Rest, not_implemented}; + _ -> + <<_:BodySize/binary, R/binary>> = Rest, + {R, not_implemented} + end. + +decode_extend(Size, Bin) -> + % exclude the extend message id + DictSize = Size - 1, + <> = Bin, + case ID of + ?MSG_EXT_HANDSHAKE -> + {{dict, D}, R} = bencode:dec(Body), + {handshake, D, R, Rest}; + _ -> + {{dict, D}, R} = bencode:dec(Body), + % may has extra data, e.g, `data' message in ut_metadata + {ID, D, R, Rest} + end. + +decode_metadata_id(Dict) -> + {ok, {dict, M}} = dict:find(<<"m">>, Dict), + case dict:find(<<"ut_metadata">>, M) of + {ok, ID} -> + ID; + _ -> + not_support + end. + +% should check id match first +decode_metadata_msg(Dict, Bin) -> + {ok, Type} = dict:find(<<"msg_type">>, Dict), + {ok, Piece} = dict:find(<<"piece">>, Dict), + case Type of + 0 -> % request + Bin = <<>>, + {request, Piece}; + 1 -> % data + % total size the total size for the torrent metadata, not the size of this piece + {ok, Size} = dict:find(<<"total_size">>, Dict), + {data, Piece, Size, Bin}; + 2 -> % reject + Bin = <<>>, + {reject, Piece} + end. + +encode_ext_msg(MsgID, Dict) -> + Body = bencode:encode(Dict), + Len = byte_size(Body) + 2, + <>. + +encode_ext_handshake(MetaMsgID) -> + M = {dict, dict:from_list([{<<"ut_metadata">>, MetaMsgID}])}, + Dict = {dict, dict:from_list([{<<"m">>, M}])}, + encode_ext_msg(0, Dict). + +encode_metadata_req(MsgID, Piece) -> + Dict = {dict, dict:from_list([{<<"msg_type">>, 0}, {<<"piece">>, Piece}])}, + encode_ext_msg(MsgID, Dict). + +encode_metadata_reject(MsgID) -> + Dict = {dict, dict:from_list([{<<"msg_type">>, 2}, {<<"piece">>, 0}])}, + encode_ext_msg(MsgID, Dict). + diff --git a/src/common/string_split.erl b/src/common/string_split.erl new file mode 100644 index 0000000..6f85f92 --- /dev/null +++ b/src/common/string_split.erl @@ -0,0 +1,40 @@ +%% +%% string_split.erl +%% Kevin Lynx +%% 06.17.2013 +%% split a string into substrings +%% +-module(string_split). +-export([split/1]). +-compile(export_all). + +split(Str) when is_list(Str) -> + B = list_to_binary(Str), + case unicode:characters_to_list(B) of + {error, L, D} -> + {error, L, D}; + {incomplete, L, D} -> + {incomplete, L, D}; + UL -> + {ok, subsplit(UL)} + end. + +subsplit([]) -> + []; + +subsplit(L) -> + [_|R] = L, + {PreL, _} = lists:splitwith(fun(Ch) -> not is_spliter(Ch) end, L), + [unicode:characters_to_binary(lists:sublist(PreL, Len)) + || Len <- lists:seq(1, length(PreL))] ++ subsplit(R). + +% TODO: more control characters +is_spliter(Ch) -> + (Ch < $0) or + ((Ch >= $[) and (Ch =< $`)) or + ((Ch >= ${) and (Ch =< $~)) or + ((Ch >= $:) and (Ch =< $@)). + + + + diff --git a/src/common/time_util.erl b/src/common/time_util.erl new file mode 100644 index 0000000..20b3972 --- /dev/null +++ b/src/common/time_util.erl @@ -0,0 +1,36 @@ +%% +%% time_util.erl +%% Kevin Lynx +%% 06.18.2013 +%% +-module(time_util). +-export([now_seconds/0, + diff_milsecs/2, + seconds_to_local_time/1, + now_day_seconds/0]). +-compile(export_all). + +diff_milsecs(T1, T2) -> + timer:now_diff(T1, T2) div 1000. + +now_seconds() -> + {Megasecs, Secs, Microsecs} = now(), + (Megasecs * 1000000) + Secs + (Microsecs div 1000000). + +seconds_to_local_time(Secs) -> + {{Y, M, D}, Time} = calendar:gregorian_seconds_to_datetime(Secs), + calendar:universal_time_to_local_time({{Y + 1970, M, D}, Time}). + +now_day_seconds() -> + {{Year, Month, Day}, _Time} = calendar:local_time(), + {{NY, NM, ND}, Time} = local_time_to_universal_time({{Year, Month, Day}, {0, 0, 0}}), + calendar:datetime_to_gregorian_seconds({{NY - 1970, NM, ND}, Time}). + +local_time_to_universal_time(Datetime) -> + case calendar:local_time_to_universal_time_dst(Datetime) of + [_, DateTimeUTC] -> + DateTimeUTC; + [DateTimeUTC] -> + DateTimeUTC + end. + diff --git a/src/common/vlog.erl b/src/common/vlog.erl new file mode 100644 index 0000000..68dd0eb --- /dev/null +++ b/src/common/vlog.erl @@ -0,0 +1,90 @@ +%% +%% vlog.erl +%% Kevin Lynx +%% 06.05.2013 +%% +-module(vlog). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/2, + format/3, + set_level/1, + sync_format/3, + time_string/0, + stop/0]). +-record(state, {name, level}). +-include("vlog.hrl"). + +start_link(Name, Lvl) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [Name, Lvl], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +set_level(Level) -> + gen_server:call(srv_name(), {set_level, Level}). + +format(Lvl, Fmt, Arg) -> + gen_server:cast(srv_name(), {write, Lvl, Fmt, Arg}). + +sync_format(Lvl, Fmt, Arg) -> + gen_server:call(srv_name(), {write, Lvl, Fmt, Arg}). + +time_string() -> + {{_Year, _Month, _Day}, {Hour, Min, Sec}} = erlang:localtime(), + lists:flatten(io_lib:format("~2.10.0b:~2.10.0b:~2.10.0b", [Hour, Min, Sec])). + +srv_name() -> + ?MODULE. + +init([Name, Lvl]) -> + {ok, FP} = file:open(Name, [write]), + file:close(FP), + {ok, #state{name = Name, level = Lvl}}. + +handle_info(_, State) -> + {noreply, State}. + +handle_cast({write, Level, Fmt, Arg}, State) -> + do_log(State, Level, Fmt, Arg), + {noreply, State}; + +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(_, State) -> + {noreply, State}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_call({write, Level, Fmt, Arg}, _From, State) -> + do_log(State, Level, Fmt, Arg), + {reply, ok, State}; + +handle_call({set_level, Level}, _From, State) -> + {reply, ok, State#state{level = Level}}; + +handle_call(_, _From, State) -> + {reply, not_implemented, State}. + +do_log(State, Level, Fmt, Arg) -> + #state{name = Name, level = MaxLevel} = State, + case Level >= MaxLevel of + true -> append(Name, Fmt, Arg); + false -> false + end. + +append(Name, Fmt, Arg) -> + {ok, FP} = file:open(Name, [append]), + io:format(FP, Fmt, Arg), + file:close(FP). + diff --git a/src/crawler/crawler_app.erl b/src/crawler/crawler_app.erl new file mode 100644 index 0000000..57b0285 --- /dev/null +++ b/src/crawler/crawler_app.erl @@ -0,0 +1,64 @@ +%% +%% crawler_app.erl +%% Kevin Lynx +%% 06.19.2013 +%% +-module(crawler_app). +-behaviour(application). +-export([start/2, stop/1]). +-export([start/0, startboth/0, stop/0]). + +start(_Type, _StartArgs) -> + File = config_file_name(), + io:format("load config file ~s ", [File]), + case file:consult(File) of + {error, _Reason} -> + do_default_start(File); + {ok, [Cfg]} -> + do_start(Cfg) + end. + +stop(_State) -> + crawler_sup:stop(). + +config_file_name() -> + filename:join([filename:dirname(code:which(?MODULE)), + "..", "priv", "dhtcrawler.config"]). + +do_default_start(File) -> + List = [{start_port, 6776}, + {node_count, 10}, + {loglevel, 3}, + {dbconn, 15}, + {dbhost, "localhost"}, + {dbport, 27017}], + filelib:ensure_dir("priv/"), + file:write_file(File, io_lib:fwrite("~p.\n",[List])), + do_start(List). + +do_start(List) -> + StartPort = proplists:get_value(start_port, List), + Count = proplists:get_value(node_count, List), + LogLevel = proplists:get_value(loglevel, List), + DBConn = proplists:get_value(dbconn, List), + DBHost = proplists:get_value(dbhost, List), + DBPort = proplists:get_value(dbport, List), + io:format("dhtcrawler startup ~p, ~p, ~p:~p~n", [StartPort, Count, DBHost, DBPort]), + crawler_sup:start_link({StartPort, Count, DBHost, DBPort, LogLevel, DBConn}). + +start() -> + error_logger:logfile({open, "crash.log"}), + code:add_path("deps/bson/ebin"), + code:add_path("deps/kdht/ebin"), + code:add_path("deps/mongodb/ebin"), + Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb], + [application:start(App) || App <- Apps], + application:start(dhtcrawler). + +stop() -> + application:stop(dhtcrawler). + +startboth() -> + start(), + crawler_http:start(). + diff --git a/src/crawler/crawler_stats.erl b/src/crawler/crawler_stats.erl new file mode 100644 index 0000000..736960f --- /dev/null +++ b/src/crawler/crawler_stats.erl @@ -0,0 +1,161 @@ +%% +%% crawler_stats.erl +%% Kevin Lynx +%% 06.17.2013 +%% +-module(crawler_stats). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/0, + stop/0, + saved/1, + announce/0, + get_peers/0, + dump/0, + get_stats_desc/0, + get_stats/0]). +-export([handle_new/0, + handle_update/0, + handle_init/1]). +-include("crawler_stats.hrl"). +-define(DUMP_INTERVAL, 30*60*1000). + +start_link() -> + gen_server:start_link({local, srv_name()}, ?MODULE, [], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +announce() -> + gen_server:cast(srv_name(), {announce}). + +get_peers() -> + gen_server:cast(srv_name(), {get_peers}). + +saved(New) -> + Type = case New of + true -> new_torrent; + false -> update_torrent + end, + gen_server:cast(srv_name(), {Type}). + +get_stats() -> + gen_server:call(srv_name(), {get_stats}). + +get_stats_desc() -> + gen_server:call(srv_name(), {get_stats_desc}). + +% TODO: re-arrange the process relationship +handle_init(Sum) -> + start_link(), + gen_server:call(srv_name(), {set_sum, Sum}). + +handle_new() -> + saved(true). + +handle_update() -> + saved(false). + +dump() -> + gen_server:cast(srv_name(), {dump}). + +srv_name() -> + ?MODULE. + +init([]) -> + timer:send_interval(?DUMP_INTERVAL, {dump_stats}), % cancel the timer ? + {ok, #crawler_stats{start_time = now(), torrent_sum = 0}}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast({new_torrent}, State) -> + #crawler_stats{new_saved = Saved, torrent_sum = Sum, torrent_count = ThisCount} = State, + {noreply, State#crawler_stats{new_saved = Saved + 1, + torrent_count = ThisCount + 1, + torrent_sum = Sum + 1}}; + +handle_cast({update_torrent}, State) -> + #crawler_stats{updated = U, torrent_count = ThisCount} = State, + {noreply, State#crawler_stats{updated = U + 1, torrent_count = ThisCount + 1}}; + +handle_cast({announce}, State) -> + #crawler_stats{announce_count = A} = State, + {noreply, State#crawler_stats{announce_count = A + 1}}; + +handle_cast({get_peers}, State) -> + #crawler_stats{get_peers_count = G} = State, + {noreply, State#crawler_stats{get_peers_count = G + 1}}; + +handle_cast({dump}, State) -> + do_dump(State), + {noreply, State}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_info({dump_stats}, State) -> + do_dump(State), + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + +handle_call({set_sum, Sum}, _From, State) -> + {reply, ok, State#crawler_stats{torrent_sum = Sum}}; + +handle_call({get_stats_desc}, _From, State) -> + {reply, format_stats(State), State}; + +handle_call({get_stats}, _From, State) -> + Elapsed = stats_time(State#crawler_stats.start_time), + Ret = {Elapsed, State}, + {reply, Ret, State}; + +handle_call(_, _From, State) -> + {noreply, State}. + +stats_time(Start) -> + DiffSecs = timer:now_diff(now(), Start) div 1000 div 1000, + % {day, {hour, min, sec}} + calendar:seconds_to_daystime(DiffSecs). + +date_string() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = erlang:localtime(), + lists:flatten(io_lib:format("~b-~2.10.0b-~2.10.0b ~2.10.0b:~2.10.0b:~2.10.0b", + [Year, Month, Day, Hour, Min, Sec])). + +-define(TEXT(Fmt, Arg), lists:flatten(io_lib:format(Fmt, Arg))). + +do_dump(State) -> + {ok, FP} = file:open("dhtcrawler-stats.txt", [append]), + io:format(FP, "~s~n", [date_string()]), + io:format(FP, "~s~n", [format_stats(State)]), + file:close(FP). + +format_stats(State) -> + #crawler_stats{ + get_peers_count = G, + announce_count = A, + torrent_count = ThisCount, + new_saved = New, + updated = U, + torrent_sum = Sum + } = State, + {Day, {H, M, S}} = stats_time(State#crawler_stats.start_time), + ?TEXT(" stats time ~b ~2.10.0b:~2.10.0b:~2.10.0b~n", + [Day, H, M, S]) ++ + ?TEXT(" torrent sum ~b~n", [Sum]) ++ + ?TEXT(" get_peers count ~b~n", [G]) ++ + ?TEXT(" announce count ~b~n", [A]) ++ + ?TEXT(" download torrent ~b~n", [ThisCount]) ++ + ?TEXT(" new saved ~b~n", [New]) ++ + ?TEXT(" updated ~b~n", [U]). diff --git a/src/crawler/crawler_stats.hrl b/src/crawler/crawler_stats.hrl new file mode 100644 index 0000000..9c6e6a8 --- /dev/null +++ b/src/crawler/crawler_stats.hrl @@ -0,0 +1,10 @@ +-record(crawler_stats, { + start_time = now(), + announce_count = 0, + get_peers_count = 0, + torrent_count = 0, % valid torrent which has been downloaded even it's duplicated + new_saved = 0, % saved as a new torrent into database + updated = 0, % exist in database and update it + torrent_sum % the sum torrent count + }). + diff --git a/src/crawler/crawler_sup.erl b/src/crawler/crawler_sup.erl new file mode 100644 index 0000000..7db039c --- /dev/null +++ b/src/crawler/crawler_sup.erl @@ -0,0 +1,75 @@ +%% +%% crawler_sup.erl +%% Kevin Lynx +%% 06.14.2013 +%% +-module(crawler_sup). +-behaviour(supervisor). +-include("vlog.hrl"). +-export([init/1]). +-export([start_link/1, + stop/0, + save_all_state/0]). + +start_link(Args) -> + supervisor:start_link({local, srv_name()}, ?MODULE, [Args]). + +stop() -> + save_all_state(), + exit(whereis(srv_name()), normal). + +srv_name() -> + dht_crawler_sup. + +init([{StartPort, Count, DBHost, DBPort, LogLevel, DBConn}]) -> + Spec = {one_for_one, 1, 600}, + Instances = create_dht_instance(StartPort, Count), + Logger = [{dht_logger, {vlog, start_link, ["dht_crawler.txt", LogLevel]}, + permanent, 2000, worker, dynamic}], + %Downloader = [{torrent_downloader, {torrent_download, start_link, []}, + % permanent, 2000, worker, dynamic}], + %DBStorer = [{torrent_index, {torrent_index, start_link, [DBHost, DBPort, crawler_stats, DBConn]}, + % permanent, 2000, worker, dynamic}], + HashInserter = [{db_hash, {db_hash, start_link, [DBHost, DBPort, DBConn]}, + permanent, 2000, worker, dynamic}], + Stats = [{crawler_stats, {crawler_stats, start_link, []}, + permanent, 2000, worker, dynamic}], + %Children = Logger ++ Downloader ++ DBStorer ++ Instances, + Children = Logger ++ HashInserter ++ Stats ++ Instances, + {ok, {Spec, Children}}. + +create_dht_instance(StartPort, Count) -> + Dir = "dhtstate/", + filelib:ensure_dir(Dir), + IDs = create_discrete_ids(Count), + Generator = lists:zip(IDs, lists:seq(StartPort, StartPort + Count - 1)), + [{instance_name(Port), + {kdht_sup, start_link, [instance_state_file(Dir, Port), Port, dht_monitor, ID]}, + permanent, 1000, supervisor, [kdht]} + || {ID, Port} <- Generator]. + +instance_name(Port) -> + Name = lists:flatten(io_lib:format("dht_instance_~p", [Port])), + list_to_atom(Name). + +instance_state_file(Dir, Port) -> + lists:flatten(io_lib:format("~sstate~b", [Dir, Port])). + +create_discrete_ids(1) -> + [dht_id:random()]; + +create_discrete_ids(Count) -> + Max = dht_id:max(), + Piece = Max div Count, + [random:uniform(Piece) + Index * Piece || Index <- lists:seq(0, Count - 1)]. + +save_all_state() -> + [save_instance_state(Instance) || Instance <- supervisor:which_children(srv_name())]. + +save_instance_state({_ID, Pid, supervisor, [kdht]}) -> + kdht_sup:save_state(Pid); + +save_instance_state({_ID, _Pid, worker, _}) -> + ok. + + diff --git a/src/crawler/dht_monitor.erl b/src/crawler/dht_monitor.erl new file mode 100644 index 0000000..6c0f7b7 --- /dev/null +++ b/src/crawler/dht_monitor.erl @@ -0,0 +1,117 @@ +%% +%% dht_monitor.erl +%% Kevin Lynx +%% 06.14.2013 +%% TODO: better to use gen_server +%% +-module(dht_monitor). +-include("vlog.hrl"). +-export([handle_event/2, + handle_torrent/3, + process_infohash_event/1, + save_to_db/3, + tell_more_nodes/1]). +-export([debug_dump/4, + debug_dump_failed/1]). +-define(QUERY_INTERVAL, 1*60*1000). + +% depends on the test log, `get_peers' > `announce_peer' +handle_event(announce_peer, {_InfoHash, _IP, _BTPort}) -> + crawler_stats:announce(); + +handle_event(get_peers, {InfoHash, _IP, _Port}) -> + crawler_stats:get_peers(), + %spawn(?MODULE, process_infohash_event, [InfoHash]); + MagHash = dht_id:tohex(InfoHash), + db_hash:insert(MagHash); + +handle_event(startup, {MyID}) -> + spawn(?MODULE, tell_more_nodes, [MyID]). + +% since some operation will wait infinity, so spawn a new process +% NOTE: this may cause many processes, depends on database operation speed. +process_infohash_event(InfoHash) -> + MagHash = dht_id:tohex(InfoHash), + Wait = 60*1000, + try + case torrent_index:inc_announce(MagHash, Wait) of + true -> + crawler_stats:saved(false); + false -> + download(InfoHash) + end + catch + exit:{timeout, _} -> + ?E(?FMT("inc_announce timeout exception for ~s", [MagHash])) + end. + +tell_more_nodes(MyID) -> + search:get_peers(MyID, dht_id:random()), + timer:sleep(?QUERY_INTERVAL), + tell_more_nodes(MyID). % tail recursive, be careful + +download(InfoHash) -> + MagHash = dht_id:tohex(InfoHash), + torrent_download:download(MagHash, ?MODULE). + +handle_torrent(ok, MagHash, TContent) -> + case catch(torrent_file:parse(TContent)) of + {'EXIT', _} -> + ?E(?FMT("parse torrent file failed ~p", [TContent])); + {Type, Info} -> + save_to_db(MagHash, Type, Info) + end, + ok; + +handle_torrent(error, _MagHash, _TContent) -> + ok. + +save_to_db(MagHash, single, {Name, Length}) -> + torrent_index:insert(MagHash, Name, Length); + +save_to_db(MagHash, multi, {Root, Files}) -> + torrent_index:insert(MagHash, Root, 0, Files). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +debug_dump_failed(MagHash) -> + io:format("download ~s failed~n", [MagHash]), + {ok, FP} = file:open("magnets.txt", [append]), + io:format(FP, "~s~n", [format_magnet(MagHash)]), + io:format(FP, " download torrent failed [~s]~n", [get_time_string()]), + file:close(FP). + +get_time_string() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = erlang:localtime(), + Str = io_lib:format("~b-~2.10.0b-~2.10.0b ~2.10.0b:~2.10.0b:~2.10.0b", + [Year, Month, Day, Hour, Min, Sec]), + lists:flatten(Str). + +format_magnet(MagHash) -> + "magnet:?xt=urn:btih:" ++ MagHash. + +output_torrent_info(single, {Name, Length}, FP) -> + io:format(FP, " ~s ~s~n", [Name, torrent_file:size_string(Length)]); + +output_torrent_info(multi, {Root, Files}, FP) -> + io:format(FP, " ~s~n", [Root]), + [io:format(FP, " ~s ~s~n", [Path, torrent_file:size_string(Length)]) + || {Path, Length} <- Files]. + +debug_dump(MagHash, TContent, Type, Info) -> + Filename = save_torrent(MagHash, TContent), + io:format("download ~s success (~p byts), save as ~s~n", + [MagHash, byte_size(TContent), Filename]), + TSize = byte_size(TContent), + {ok, FP} = file:open("magnets.txt", [append]), + io:format(FP, "~s~n", [format_magnet(MagHash)]), + io:format(FP, " download torrent success [~s] ~s (~s)~n", + [get_time_string(), Filename, torrent_file:size_string(TSize)]), + output_torrent_info(Type, Info, FP), + file:close(FP). + +save_torrent(MagHash, TContent) -> + Dir = "download/", + filelib:ensure_dir(Dir), + Filename = Dir ++ MagHash ++ ".torrent", + file:write_file(Filename, TContent), + Filename. diff --git a/src/db_hash.erl b/src/db_hash.erl new file mode 100644 index 0000000..172b51a --- /dev/null +++ b/src/db_hash.erl @@ -0,0 +1,84 @@ +%% +%% db_hash.erl +%% Kevin Lynx +%% 06.28.2013 +%% save info_hash to database +%% +-module(db_hash). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/2, + start_link/3, + insert/1, + stop/0]). +-record(state, {hash_docs, index}). +-define(MAX_HASH, 1000). +-define(POOL_NAME, hash_db_pool). +-include("db_common.hrl"). + +start_link(IP, Port, Size) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Size], []). + +start_link(IP, Port) -> + start_link(IP, Port, 5). + +stop() -> + gen_server:cast(srv_name(), stop). + +% magnet hash, binary string +insert(Hash) when is_list(Hash) -> + gen_server:cast(srv_name(), {insert, list_to_binary(Hash)}). + +srv_name() -> + db_hash. + +init([IP, Port, Size]) -> + mongo_sup:start_pool(?POOL_NAME, Size, {IP, Port}), + process_flag(trap_exit, true), + {_, Index} = db_system:load_batch_index(mongo_pool:get(?POOL_NAME)), + {ok, #state{hash_docs = [], index = Index}}. + +terminate(_, State) -> + mongo_sup:stop_pool(?POOL_NAME), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_cast({insert, Hash}, State) -> + #state{hash_docs = List, index = Index} = State, + {NewIndex, NewList} = try_insert(Index, Hash, List), + {noreply, State#state{hash_docs = NewList, index = NewIndex}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + +%% DB stuff +try_insert(Index, Hash, List) when length(List) + 1 == ?MAX_HASH -> + Conn = mongo_pool:get(?POOL_NAME), + mongo:do(unsafe, master, Conn, ?HASH_DBNAME, fun() -> + mongo:insert(?HASH_COLLNAME, [create_hash_doc(Index, Hash)|List]) + end), + db_system:inc_batch_windex(Conn), + db_system:stats_query_inserted(Conn, ?MAX_HASH), + {Index + 1, []}; + +try_insert(Index, Hash, List) -> + Doc = create_hash_doc(Index, Hash), + {Index, [Doc|List]}. + +create_hash_doc(Index, Hash) when is_binary(Hash) -> + {hash, Hash, index, Index, req_at, time_util:now_seconds()}. + + diff --git a/src/db_store_mongo.erl b/src/db_store_mongo.erl new file mode 100644 index 0000000..f67037e --- /dev/null +++ b/src/db_store_mongo.erl @@ -0,0 +1,271 @@ +%% +%% db_store_mongo.erl +%% Kevin Lynx +%% 06.16.2013 +%% +-module(db_store_mongo). +-include("vlog.hrl"). +-export([init/2, + close/1, + insert/5, + unsafe_insert/2, + count/1, + inc_announce/2, + exist/2, + index/2, + search_announce_top/2, + search_recently/2, + search/2]). +-compile(export_all). +-define(DBNAME, torrents). +-define(COLLNAME, hashes). +-define(SEARCH_COL, name_array). + +init(Host, Port) -> + {ok, Conn} = mongo_connection:start_link({Host, Port}), + ?I(?FMT("connect mongodb ~p:~p success", [Host, Port])), + enable_text_search(Conn), + ensure_search_index(Conn), + Conn. + +close(Conn) -> + mongo_connection:stop(Conn). + +count(Conn) -> + mongo_do(Conn, fun() -> + mongo:count(?COLLNAME, {}) + end). + +exist(Conn, Hash) when is_list(Hash) -> + case find_exist(Conn, Hash) of + {} -> false; + _ -> true + end. + +% {Rets, {Found, CostTime}} +search(Conn, Key) when is_list(Key) -> + BinColl = list_to_binary(atom_to_list(?COLLNAME)), + BinKey = list_to_binary(Key), + Ret = mongo_do(Conn, fun() -> + mongo:command({text, BinColl, search, BinKey}) + end), + {decode_search(Ret), decode_search_stats(Ret)}. + +search_announce_top(Conn, Count) -> + Sel = {'$query', {}, '$orderby', {announce, -1}}, + List = mongo_do(Conn, fun() -> + % mongodb-erlang does not provide cursor.limit()/sort() functions, wired + % but it work here + Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count), + mongo_cursor:rest(Cursor) + end), + [decode_torrent_item(Item) || Item <- List]. + +% db.hashes.find({$query:{},$orderby:{created_at: 1}}).limit(10); +search_recently(Conn, Count) -> + Sel = {'$query', {}, '$orderby', {created_at, -1}}, + List = mongo_do(Conn, fun() -> + Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count), + mongo_cursor:rest(Cursor) + end), + [decode_torrent_item(Item) || Item <- List]. + +index(Conn, Hash) when is_list(Hash) -> + Ret = mongo_do(Conn, fun() -> + mongo:find_one(?COLLNAME, {'_id', list_to_binary(Hash)}) + end), + case Ret of + {} -> {}; + {Torrent} -> decode_torrent_item(Torrent) + end. + +insert(Conn, Hash, Name, Length, Files) when is_list(Hash) -> + NewDoc = create_torrent_desc(Hash, Name, Length, 1, Files), + mongo_do(Conn, fun() -> + %mongo:insert(?COLLNAME, NewDoc) + % since the doc may already exist (inc_announce failed), i update the doc here + Sel = {'_id', list_to_binary(Hash)}, + mongo:update(?COLLNAME, Sel, NewDoc, true) + end). + +unsafe_insert(Conn, Tors) when is_list(Tors) -> + Docs = [create_torrent_desc(Hash, Name, Length, 1, Files) || + {Hash, Name, Length, Files} <- Tors], + mongo:do(unsafe, master, Conn, ?DBNAME, fun() -> + mongo:insert(?COLLNAME, Docs) + end). + +inc_announce(Conn, Hash) when is_list(Hash) -> + inc_announce(Conn, list_to_binary(Hash)); + +inc_announce(Conn, Hash) when is_binary(Hash) -> + % damn, mongodb-erlang doesnot support update a field for an object, + % `findAndModify` works but it will change `announce' datatype to double + Cmd = {findAndModify, ?COLLNAME, query, {'_id', Hash}, + update, {'$inc', {announce, 1}}, + new, true}, + Ret = mongo_do(Conn, fun() -> + mongo:command(Cmd) + end), + case Ret of + {value, undefined, ok, 1.0} -> false; + {value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} -> true; + _ -> false + end. + +ensure_search_index(Conn) -> + Spec = {key, {?SEARCH_COL, <<"text">>}}, + mongo_do(Conn, fun() -> + mongo:ensure_index(?COLLNAME, Spec) + end). + +% not work +enable_text_search(Conn) -> + Cmd = {setParameter, 1, textSearchEnabled, true}, + mongo:do(safe, master, Conn, admin, fun() -> + mongo:command(Cmd) + end). + +create_torrent_desc(Hash, Name, Length, Announce, Files) -> + NameArray = case string_split:split(Name) of + {error, L, D} -> + ?E(?FMT("string split failed(error): ~p ~p", [L, D])), + [Name]; + {incomplete, L, D} -> + ?E(?FMT("string split failed(incomplte): ~p ~p", [L, D])), + [Name]; + {ok, R} -> R + end, + {'_id', list_to_binary(Hash), + name, list_to_binary(Name), + name_array, NameArray, + length, Length, + created_at, time_util:now_seconds(), + announce, Announce, + files, encode_file_list(Files)}. + +% {file1, {name, xx, length, xx}, file2, {name, xx, length, xx}} +encode_file_list(Files) -> + Keys = ["file"++integer_to_list(Index) || Index <- lists:seq(1, length(Files))], + Generator = lists:zip(Keys, Files), + list_to_tuple(lists:flatten([[list_to_atom(Key), {name, list_to_binary(Name), length, Length}] + || {Key, {Name, Length}} <- Generator])). + +find_exist(Conn, Hash) -> + mongo_do(Conn, fun() -> + mongo:find_one(?COLLNAME, hash_selector(Hash)) + end). + +mongo_do(Conn, Fun) -> + mongo:do(safe, master, Conn, ?DBNAME, Fun). + +% TODO: replace this with {'_id', ID} +hash_selector(Hash) -> + Expr = lists:flatten(io_lib:format("this._id == '~s'", [Hash])), + {'$where', list_to_binary(Expr)}. + +decode_search_stats(Rets) -> + {Stats} = bson:lookup(stats, Rets), + {Found} = bson:lookup(nfound, Stats), + {Cost} = bson:lookup(timeMicros, Stats), + {Found, Cost}. + +decode_search(Rets) -> + case bson:lookup(results, Rets) of + {} -> + []; + {List} -> + [decode_ret_item(Item) || Item <- List] + end. + +decode_ret_item(Item) -> + {Torrent} = bson:lookup(obj, Item), + decode_torrent_item(Torrent). + +decode_torrent_item(Torrent) -> + {BinHash} = bson:lookup('_id', Torrent), + Hash = binary_to_list(BinHash), + {BinName} = bson:lookup(name, Torrent), + Name = binary_to_list(BinName), + {Length} = bson:lookup(length, Torrent), + {CreatedT} = bson:lookup(created_at, Torrent), + ICreatedAt = round(CreatedT), + {Announce} = bson:lookup(announce, Torrent), + IA = round(Announce), % since announce maybe double in mongodb + case bson:lookup(files, Torrent) of + {{}} -> + {single, Hash, {Name, Length}, IA, ICreatedAt}; + {Files} -> + {multi, Hash, {Name, decode_files(tuple_to_list(Files))}, IA, ICreatedAt} + end. + +decode_files(Files) -> + decode_file(Files). + +decode_file([_|[File|Rest]]) -> + {BinName} = bson:lookup(name, File), + Name = binary_to_list(BinName), + {Length} = bson:lookup(length, File), + [{Name, Length}] ++ decode_file(Rest); + +decode_file([]) -> + []. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +test_insert() -> + Conn = init(localhost, 27017), + insert(Conn, "7C6932E7EC1CF5B00AE991871E57B2375DADA5A0", "movie 1", 128, []), + insert(Conn, "AE94E340B5234C8410F37CFA7170F8C5657ECE5D", "another movie name", 0, + [{"subfile-a", 100}, {"subfile-b", 80}]), + insert(Conn, "0F1B5BE407E130AEEA8AB2964F5100190086ED93", "oh it work", 2456, []), + close(Conn). + +test_content(Fun) -> + Conn = init(localhost, 27017), + Ret = Fun(Conn), + close(Conn), + Ret. + +test_ensureidx() -> + test_content(fun(Conn) -> + enable_text_search(Conn), + ensure_search_index(Conn) + end). + +test_search(Key) -> + test_content(fun(Conn) -> + search(Conn, Key) + end). + +test_tmpsearch(Key) -> + test_content(fun(Conn) -> + BinColl = list_to_binary(atom_to_list(?COLLNAME)), + BinKey = list_to_binary(Key), + Ret = mongo_do(Conn, fun() -> + mongo:command({text, BinColl, search, BinKey}) + end), + Ret + end). + +test_count() -> + test_content(fun(Conn) -> + count(Conn) + end). + +test_find_top() -> + test_content(fun(Conn) -> + search_announce_top(Conn, 2) + end). + +test_announce() -> + Hash = "F79ED3E2BF29A5C4358202E88C9983AB479D7722", + test_content(fun(Conn) -> + inc_announce(Conn, Hash) + end). + +test_index(Hash) -> + test_content(fun(Conn) -> + index(Conn, Hash) + end). + + diff --git a/src/db_system.erl b/src/db_system.erl new file mode 100644 index 0000000..b7d6291 --- /dev/null +++ b/src/db_system.erl @@ -0,0 +1,100 @@ +%% +%% db_system.erl +%% Kevin Lynx +%% 06.28.2013 +%% +-module(db_system). +-export([load_batch_index/1, + inc_batch_rindex/1, + inc_batch_windex/1]). +-export([stats_new_saved/1, + stats_updated/1, + stats_query_inserted/2, + stats_day_at/2, + stats_get_peers/1]). +-define(DBNAME, dht_system). +-define(COLLNAME, system). +-define(HASH_BATCH_KEY, <<"hashbatch">>). +-define(STATS_COLLNAME, stats). + + +%% batch index +inc_batch_rindex(Conn) -> + inc_batch_index(Conn, read_index). + +inc_batch_windex(Conn) -> + inc_batch_index(Conn, write_index). + +inc_batch_index(Conn, Col) -> + Cmd = {findAndModify, ?COLLNAME, query, {'_id', ?HASH_BATCH_KEY}, + update, {'$inc', {Col, 1}}, new, true}, + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:command(Cmd) + end). + +load_batch_index(Conn) -> + Doc = case find_exist_batch_index(Conn) of + {} -> + NewDoc = create_batch_index(0, 0), + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:insert(?COLLNAME, NewDoc) + end), + NewDoc; + {Exist} -> + Exist + end, + {RIndex} = bson:lookup(read_index, Doc), + {WIndex} = bson:lookup(write_index, Doc), + {RIndex, WIndex}. + +find_exist_batch_index(Conn) -> + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + mongo:find_one(?COLLNAME, {'_id', ?HASH_BATCH_KEY}) + end). + +create_batch_index(WIndex, RIndex) -> + {'_id', ?HASH_BATCH_KEY, read_index, WIndex, write_index, RIndex}. + +%% stats collection +stats_new_saved(Conn) -> + stats_inc_field(Conn, new_saved). + +stats_updated(Conn) -> + stats_inc_field(Conn, updated). + +% already processes query +stats_get_peers(Conn) -> + stats_inc_field(Conn, get_peers). + +% all queries, not processed +stats_query_inserted(Conn, Count) -> + stats_inc_field(Conn, get_peers_query, Count). + +stats_inc_field(Conn, Filed) -> + stats_inc_field(Conn, Filed, 1). + +stats_inc_field(Conn, Filed, Inc) -> + TodaySecs = time_util:now_day_seconds(), + mongo:do(unsafe, master, Conn, ?DBNAME, fun() -> + Doc = stats_ensure_today(TodaySecs), + {Val} = bson:lookup(Filed, Doc), + NewDoc = bson:update(Filed, Val + Inc, Doc), + mongo:update(?STATS_COLLNAME, {'_id', TodaySecs}, NewDoc) + end). + +stats_day_at(Conn, DaySec) -> + mongo:do(safe, master, Conn, ?DBNAME, fun() -> + stats_ensure_today(DaySec) + end). + +stats_ensure_today(TodaySecs) -> + case mongo:find_one(?STATS_COLLNAME, {'_id', TodaySecs}) of + {} -> + NewDoc = {'_id', TodaySecs, get_peers, 0, get_peers_query, 0, + updated, 0, new_saved, 0}, + mongo:insert(?STATS_COLLNAME, NewDoc), + NewDoc; + {Doc} -> + Doc + end. + diff --git a/src/dhtcrawler.app.src b/src/dhtcrawler.app.src new file mode 100644 index 0000000..362ac10 --- /dev/null +++ b/src/dhtcrawler.app.src @@ -0,0 +1,9 @@ +{application, dhtcrawler, [ + {description, "A DHT crawler to index magnet hash to torrent"}, + {vsn, git}, + {registered, [dht_crawler_sup]}, + {applications, [kernel, stdlib, crypto, + public_key, ssl, inets, bson, mongodb]}, + {mod, {crawler_app, []}} +]}. + diff --git a/src/hash_reader/db_hash_reader.erl b/src/hash_reader/db_hash_reader.erl new file mode 100644 index 0000000..4710bb6 --- /dev/null +++ b/src/hash_reader/db_hash_reader.erl @@ -0,0 +1,182 @@ +%% +%% db_hash_reader.erl +%% Kevin Lynx +%% 06.28.2013 +%% +-module(db_hash_reader). +-compile(export_all). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/1, + stop/1]). +-record(state, {downloader, dbpool, downloading = 0}). +-include("db_common.hrl"). +% if there's no hash, wait some time +-define(WAIT_TIME, 2*60*1000). +% the max concurrent download tasks +-define(MAX_DOWNLOAD, 50). + +start_link(DBPool) -> + gen_server:start_link(?MODULE, [DBPool], []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +% +init([DBPool]) -> + {ok, DownPid} = tor_download:start_link(), + tor_download_stats:register(DownPid), + {ok, #state{dbpool = DBPool, downloader = DownPid}, 0}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_info({got_torrent, failed, _Hash}, State) -> + #state{downloading = D} = State, + Conn = db_conn(State), + try_next_download(Conn), + {noreply, State#state{downloading = D - 1}}; + +handle_info({got_torrent, ok, Hash, Content}, State) -> + #state{downloading = D} = State, + Conn = db_conn(State), + try_next_download(Conn), + case catch(torrent_file:parse(Content)) of + {'EXIT', _} -> + State; + {Type, Info} -> + got_torrent(State, Hash, Type, Info) + end, + {noreply, State#state{downloading = D - 1}}; + +handle_info(timeout, State) -> + Conn = db_conn(State), + try_next(Conn), + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + +handle_cast({process_hash, Doc, DownloadDoc}, State) -> + #state{downloader = DownPid} = State, + Conn = db_conn(State), + {Hash} = bson:lookup(hash, Doc), + ListHash = binary_to_list(Hash), + % to avoid register many timers when the hash is empty but download hash is not + % it also can avoid increase the message queue size, everytime this function get called, + % it remove this message and append only another 1. + if DownloadDoc -> do_nothing; true -> try_next(Conn) end, + NewState = case db_store_mongo:inc_announce(Conn, ListHash) of + true -> + ?T(?FMT("inc_announce success ~s", [ListHash])), + on_updated(Conn), + State; + false -> + ?T(?FMT("start to download the torrent ~s", [ListHash])), + try_download(State, DownPid, ListHash, Doc) + end, + {noreply, NewState}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +try_download(State, Pid, Hash, Doc) -> + #state{downloading = D} = State, + Conn = db_conn(State), + NewDownloading = case D >= ?MAX_DOWNLOAD of + true -> % put it into the download queue + insert_to_download_wait(Conn, Doc), + D; + false -> % download it now + tor_download:download(Pid, Hash), + D + 1 + end, + State#state{downloading = NewDownloading}. + +try_save(State, Hash, Name, Length, Files) -> + Conn = db_conn(State), + case catch db_store_mongo:insert(Conn, Hash, Name, Length, Files) of + {'EXIT', Reason} -> + ?E(?FMT("save torrent failed ~p", [Reason])); + _ -> + on_saved(Conn) + end. + +on_saved(Conn) -> + % `get_peers' here means we have processed a request + db_system:stats_get_peers(Conn), + % increase the `new' counter + db_system:stats_new_saved(Conn), + hash_reader_stats:handle_insert(). + +on_updated(Conn) -> + % `get_peers' here means we have processed a request + db_system:stats_get_peers(Conn), + % also increase the updated counter + db_system:stats_updated(Conn), + hash_reader_stats:handle_update(). + +got_torrent(State, Hash, single, {Name, Length}) -> + try_save(State, Hash, Name, Length, []); + +got_torrent(State, Hash, multi, {Root, Files}) -> + try_save(State, Hash, Root, 0, Files). + +% insert the doc to the `wait-download' collection, and when the +% downloader is free, it will download this doc. +insert_to_download_wait(Conn, Doc) -> + {ID} = bson:lookup('_id', Doc), + Sel = {'_id', ID}, + mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + % may exist already + mongo:update(?HASH_DOWNLOAD_COLL, Sel, Doc, true) + end). + +try_next_download(Conn) -> + Doc = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + D = mongo:find_one(?HASH_DOWNLOAD_COLL, {}), + delete_inner_doc(?HASH_DOWNLOAD_COLL, D), + D + end), + schedule_next(Doc, true). + +try_next(Conn) -> + Doc = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() -> + D = mongo:find_one(?HASH_COLLNAME, {}), + delete_inner_doc(?HASH_COLLNAME, D), + D + end), + schedule_next(Doc, false). + +delete_inner_doc(_Col, {}) -> + ok; + +delete_inner_doc(Col, {Doc}) -> + {ID} = bson:lookup('_id', Doc), + mongo:delete(Col, {'_id', ID}). + +schedule_next({}, true) -> + ok; + +schedule_next({}, false) -> + timer:send_after(?WAIT_TIME, timeout); + +schedule_next({Doc}, DownloadDoc) -> + gen_server:cast(self(), {process_hash, Doc, DownloadDoc}). + +db_conn(State) -> + #state{dbpool = DBPool} = State, + mongo_pool:get(DBPool). + diff --git a/src/hash_reader/db_hash_reader_sup.erl b/src/hash_reader/db_hash_reader_sup.erl new file mode 100644 index 0000000..204bd8f --- /dev/null +++ b/src/hash_reader/db_hash_reader_sup.erl @@ -0,0 +1,63 @@ +%% +%% db_hash_reader_sup.erl +%% Kevin Lynx +%% 06.29.2013 +%% +-module(db_hash_reader_sup). +-behaviour(supervisor). +-export([init/1]). +-export([start_link/3, + start_dep_apps/0, + start_standalone/3, + start_standalone/1]). + +start_dep_apps() -> + code:add_path("deps/bson/ebin"), + code:add_path("deps/mongodb/ebin"), + code:add_path("deps/kdht/ebin"), + code:add_path("deps/ibrowse/ebin"), + Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb], + [application:start(App) || App <- Apps]. + +start_standalone([IP, Port, Size]) -> + IPort = list_to_integer(Port), + ISize = list_to_integer(Size), + start_standalone(IP, IPort, ISize), + receive + fuck_erl_s_option -> ok + end. + +start_standalone(IP, Port, Size) -> + io:format("db: ~p:~p reader count ~p~n", [IP, Port, Size]), + filelib:ensure_dir("log/"), + start_dep_apps(), + tor_download:start_global(), + % NOTE: + Stats = {db_hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]}, + DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]}, + Log = {vlog, {vlog, start_link, ["log/hash_reader.log", 3]}, permanent, 2000, worker, [vlog]}, + start_link(IP, Port, Size, [DownloadStats, Stats, Log]). + +start_link(IP, Port, Size) -> + start_link(IP, Port, Size, []). + +start_link(IP, Port, Size, OtherProcess) -> + PoolName = mongodb_conn_pool_name, + mongo_sup:start_pool(PoolName, 5, {IP, Port}), + supervisor:start_link({local, srv_name()}, ?MODULE, [PoolName, Size, OtherProcess]). + +srv_name() -> + ?MODULE. + +init([PoolName, Size, OtherProcess]) -> + Spec = {one_for_one, 1, 600}, + Children = OtherProcess ++ [create_child(PoolName, Index) || Index <- lists:seq(1, Size)], + {ok, {Spec, Children}}. + +create_child(PoolName, Index) -> + {child_id(Index), {db_hash_reader, start_link, [PoolName]}, + permanent, 1000, worker, dynamic}. + +child_id(Index) -> + list_to_atom(lists:flatten(io_lib:format("db_hash_reader_~p", [Index]))). + diff --git a/src/hash_reader/hash_reader_stats.erl b/src/hash_reader/hash_reader_stats.erl new file mode 100644 index 0000000..4bd7cb8 --- /dev/null +++ b/src/hash_reader/hash_reader_stats.erl @@ -0,0 +1,136 @@ +%% +%% hash_reader_stats.erl +%% Kevin Lynx +%% 06.29.2013 +%% +-module(hash_reader_stats). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/2, + start_link/1, + stop/0, + handle_update/0, + handle_insert/0, + dump/0]). +-record(state, {tref, count, start, name, updated = 0, inserted = 0}). +-define(STATS_INTERVAL, 10*60*1000). +-define(TEXT(Fmt, Arg), lists:flatten(io_lib:format(Fmt, Arg))). + +start_link(Count) -> + start_link(Count, "reader_stats.txt"). + +start_link(Count, Name) -> + gen_server:start_link({local, srv_name()}, ?MODULE, [Count, Name], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +dump() -> + gen_server:cast(srv_name(), dump). + +handle_update() -> + gen_server:cast(srv_name(), inc_updated). + +handle_insert() -> + gen_server:cast(srv_name(), inc_inserted). + +srv_name() -> + ?MODULE. + +init([Count, Name]) -> + {ok, TRef} = timer:send_interval(?STATS_INTERVAL, dump), + State = #state{tref = TRef, count = Count, name = Name, start = now()}, + {ok, State}. + +terminate(_, State) -> + #state{tref = TRef} = State, + timer:cancel(TRef), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_info(dump, State) -> + do_dump(State), + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + +handle_cast(dump, State) -> + do_dump(State), + {noreply, State}; + +handle_cast(inc_updated, State) -> + #state{updated = U} = State, + {noreply, State#state{updated = U + 1}}; + +handle_cast(inc_inserted, State) -> + #state{inserted = I} = State, + {noreply, State#state{inserted = I + 1}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(_, _From, State) -> + {noreply, State}. + +do_dump(State) -> + Dir = "log/", + #state{name = Name} = State, + filelib:ensure_dir(Dir), + % this will cause a long time waiting + DownloadStats = format_download_stats(), + {ok, FP} = file:open(Dir ++ Name, [append]), + io:format(FP, "~s~n", [date_string()]), + io:format(FP, "~p~n", [self()]), + io:format(FP, "~s~n", [format_stats(State)]), + io:format(FP, "~s~n", [DownloadStats]), + file:close(FP). + +stats_time(Start) -> + DiffSecs = timer:now_diff(now(), Start) div 1000 div 1000, + % {day, {hour, min, sec}} + calendar:seconds_to_daystime(DiffSecs). + +date_string() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = erlang:localtime(), + lists:flatten(io_lib:format("~b-~2.10.0b-~2.10.0b ~2.10.0b:~2.10.0b:~2.10.0b", + [Year, Month, Day, Hour, Min, Sec])). + +format_stats(State) -> + #state{count = C, start = Start, updated = U, inserted = I} = State, + {Day, {H, M, S}} = stats_time(Start), + Mins = Day * 24 * 60 + H * 60 + M, + TotalMins = if Mins > 0 -> Mins; true -> 1 end, + Speed = (U + I) div TotalMins, + ?TEXT(" stats time ~b ~2.10.0b:~2.10.0b:~2.10.0b~n", + [Day, H, M, S]) ++ + ?TEXT(" Reader count ~p~n", [C]) ++ + ?TEXT(" Process speed ~p req/min~n", [Speed]) ++ + ?TEXT(" Download torrents speed ~p tor/min~n", [I div TotalMins]) ++ + ?TEXT(" Updated ~p~n", [U]) ++ + ?TEXT(" Inserted ~p~n", [I]). + +format_download_stats() -> + Start = now(), + {ProcessCount, HashSum, ReqSum, TotalTime, CurrentReqCount} = + tor_download_stats:stats(), + TotalSecs = TotalTime div 1000, + Used = timer:now_diff(now(), Start) div 1000, + ?TEXT(" ==== Torrent download stats ====~n", []) ++ + ?TEXT(" Stats used time ~p ms~n", [Used]) ++ + ?TEXT(" Downloader count ~p~n", [ProcessCount]) ++ + ?TEXT(" Request torrents ~p~n", [HashSum]) ++ + ?TEXT(" Http requests ~p~n", [ReqSum]) ++ + ?TEXT(" Total used time ~p secs~n", [TotalSecs]) ++ + ?TEXT(" Download speed ~p tor/secs~n", [TotalSecs div HashSum]) ++ + ?TEXT(" Current wait requests ~p~n", [CurrentReqCount]). + + + diff --git a/src/http_front/crawler_http.erl b/src/http_front/crawler_http.erl new file mode 100644 index 0000000..1c5b919 --- /dev/null +++ b/src/http_front/crawler_http.erl @@ -0,0 +1,81 @@ +%% +%% crawler_http.erl +%% Kevin Lynx +%% 06.15.2013 +%% +-module(crawler_http). +-behaviour(gen_server). +-export([init/1, + handle_info/2, + handle_cast/2, + handle_call/3, + code_change/3, + terminate/2]). +-export([start/0, + start/3, + page_temp/0, + stop/0]). +-record(state, {html_temp, httpid}). + +start(DBHost, DBPort, Port) -> + code:add_path("deps/bson/ebin"), + code:add_path("deps/mongodb/ebin"), + Apps = [crypto, public_key, ssl, inets, bson, mongodb], + [application:start(App) || App <- Apps], + gen_server:start({local, srv_name()}, ?MODULE, [DBHost, DBPort, Port], []). + +start() -> + start(localhost, 27017, 8000). + +stop() -> + gen_server:cast(srv_name(), stop). + +page_temp() -> + gen_server:call(srv_name(), get_page_temp). + +srv_name() -> + crawler_http. + +init([DBHost, DBPort, Port]) -> + torrent_index:start_link(DBHost, DBPort), + {ok, Pid} = inets:start(httpd, [ + {modules, [mod_alias, mod_auth, mod_esi, mod_actions, + mod_cgi, mod_dir, mod_get, mod_head, mod_log, mod_disk_log]}, + {port, Port}, + {bind_address, {0, 0, 0, 0}}, + {server_name, "crawler_http"}, + {document_root, "www"}, + {server_root, "."}, + {directory_index, ["index.html"]}, + {erl_script_alias, {"/e", [http_handler]}}, + {mime_types, [{"html","text/html"}, + {"css","text/css"}, {"js","application/x-javascript"}]}]), + {ok, B} = file:read_file("www/page.temp"), + Html = binary_to_list(B), + {ok, #state{html_temp = Html, httpid = Pid}}. + +handle_call(get_page_temp, _From, State) -> + #state{html_temp = Html} = State, + {reply, Html, State}; + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(_, State) -> + {noreply, State}. + +terminate(_, State) -> + #state{httpid = Pid} = State, + torrent_index:stop(), + inets:stop(httpd, Pid), + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_info(_, State) -> + {noreply, State}. + diff --git a/src/http_front/http_handler.erl b/src/http_front/http_handler.erl new file mode 100644 index 0000000..2d1628a --- /dev/null +++ b/src/http_front/http_handler.erl @@ -0,0 +1,142 @@ +%% +%% http_handler.erl +%% Kevin Lynx +%% 06.18.2013 +%% +-module(http_handler). +-export([search/3, + test_search/1, + index/3, + stats/3, + recent/3, + top/3]). +-define(TEXT(Fmt, Args), lists:flatten(io_lib:format(Fmt, Args))). +-import(torrent_file, [size_string/1]). +-define(CONTENT_TYPE, "Content-Type: text/html\r\n\r\n"). + +search(SessionID, _Env, Input) -> + {K, Body} = case get_search_keyword(Input) of + [] -> + {"", "invalid input"}; + Key -> + {Key, do_search(Key)} + end, + Response = simple_html(K, Body), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +top(SessionID, _Env, _Input) -> + Rets = torrent_index:top(), + BodyList = format_search_result(Rets), + Body = ?TEXT("
    ~s
", [lists:flatten(BodyList)]), + Response = simple_html("top", Body), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +recent(SessionID, _Env, _Input) -> + Rets = torrent_index:recent(), + BodyList = format_search_result(Rets), + Body = ?TEXT("
    ~s
", [lists:flatten(BodyList)]), + Response = simple_html("recent", Body), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +stats(SessionID, _Env, _Input) -> + Body = ?TEXT("total ~p torrents", [torrent_index:count()]), + Response = simple_html("", Body), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +index(SessionID, _Env, Input) -> + Body = case get_index_hash(Input) of + [] -> + "invalid hash"; + Hash -> + format_view(Hash) + end, + Response = simple_html("", Body), + mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]). + +get_search_keyword(Input) -> + case string:equal(string:substr(Input, 1, 2), "q=") of + true -> + urldecode:decode(string:substr(Input, 3)); + false -> + [] + end. + +get_index_hash(Input) -> + case string:equal(string:substr(Input, 1, 2), "q=") of + true -> + string:substr(Input, 3); + false -> + [] + end. + +simple_html(Key, Body) -> + ?TEXT(crawler_http:page_temp(), [Key, Body]). + +test_search(Keyword) -> + Filename = ?TEXT("search_~s.html", [Keyword]), + Body = do_search(Keyword), + file:write_file(Filename, simple_html(Keyword, Body)). + +do_search(Keyword) -> + {Rets, Stats} = torrent_index:search(Keyword), + {_Found, Cost} = Stats, + Tip = ?TEXT("

search ~s, ~b results, ~f seconds

", + [Keyword, length(Rets), Cost / 1000 / 1000]), + BodyList = format_search_result(Rets), + Body = ?TEXT("
    ~s
", [lists:flatten(BodyList)]), + Tip ++ Body. + +format_search_result(RetList) -> + [format_one_result(Result, false) || Result <- RetList]. + +format_one_result({single, Hash, {Name, Length}, Announce, CTime}, ShowAll) -> + format_one_result(Hash, Name, [{Name, Length}], Announce, CTime, ShowAll); + +format_one_result({multi, Hash, {Name, Files}, Announce, CTime}, ShowAll) -> + format_one_result(Hash, Name, Files, Announce, CTime, ShowAll). + +format_one_result(Hash, Name, Files, Announce, CTime, ShowAll) -> + ?TEXT("
  • + ~s

      ~s
    ", + [Hash, Name, format_files(Files, ShowAll)]) ++ + ?TEXT("

    Index at: ~s | File count: ~p | Announce count: ~p + Download

    ", + [format_time_string(CTime), length(Files), Announce, format_magnet(Hash)]). + +format_files(Files, false) -> + Sub = case length(Files) > 3 of + true -> + lists:sublist(Files, 3) ++ [{more, length(Files) - 3}]; + false -> + Files + end, + lists:flatten([format_file(File) || File <- Sub]); + +format_files(Files, true) -> + lists:flatten([format_file(File) || File <- Files]). + +format_file({more, Len}) -> + ?TEXT("
  • ...~b more files
  • ", [Len]); + +format_file({Name, Length}) -> + ?TEXT("
  • ~s ~s
  • ", + [Name, size_string(Length)]). + +format_view(Hash) -> + case torrent_index:index(Hash) of + {} -> "not found"; + Torrent -> + format_torrent_detail(Torrent) + end. + +format_torrent_detail(Torrent) -> + format_one_result(Torrent, true). + +format_magnet(MagHash) -> + "magnet:?xt=urn:btih:" ++ MagHash. + +format_time_string(Secs) -> + {{Y, M, D}, {H, Min, Sec}} = time_util:seconds_to_local_time(Secs), + ?TEXT("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", + [Y, M, D, H, Min, Sec]). + diff --git a/src/http_front/urldecode.erl b/src/http_front/urldecode.erl new file mode 100644 index 0000000..acfa06c --- /dev/null +++ b/src/http_front/urldecode.erl @@ -0,0 +1,29 @@ +%% +%% urldecode.erl +%% +-module(urldecode). +-export([decode/1]). +-compile(export_all). + +decode([$%, Hi, Lo | Tail]) -> + Hex = unhexdigit(Hi, Lo), + [Hex | decode(Tail)]; +decode([$?|T]) -> + [$?|T]; +decode([H|T]) when is_integer(H) -> + [H |decode(T)]; +decode([]) -> + []; +decode([H|T]) when is_list(H) -> + [decode(H) | decode(T)]. + +unhexdigit(C) when C >= $0, C =< $9 -> C - $0; +unhexdigit(C) when C >= $a, C =< $f -> C - $a + 10; +unhexdigit(C) when C >= $A, C =< $F -> C - $A + 10. + +unhexdigit(HiCh, LoCh) -> + unhexdigit(LoCh) bor (unhexdigit(HiCh) bsl 4). + +test(S) -> + R = decode(S), + file:write_file("decode.txt", R). diff --git a/src/torrent/tor_download.erl b/src/torrent/tor_download.erl new file mode 100644 index 0000000..d0007ed --- /dev/null +++ b/src/torrent/tor_download.erl @@ -0,0 +1,186 @@ +%% +%% tor_download.erl +%% Kevin Lynx +%% 06.30.2013 +%% +-module(tor_download). +-include("vlog.hrl"). +-behaviour(gen_server). +-export([init/1, + handle_info/2, + handle_cast/2, + handle_call/3, + code_change/3, + terminate/2]). +-export([start_global/0, + start_link/0, + stop/1, + download/2, + stats/1]). +-export([test/2]). +-define(HTTP_SESSION, 5000). +-define(HTTP_PIPELINE, 1000). +-define(REQ_TIMEOUT, 60*1000). +-record(state, {start, hashSum = 0, reqSum = 0, totalTime = 0, reqs}). + +start_global() -> + ibrowse:start(), + Options = [{max_sessions, ?HTTP_SESSION}, {max_pipeline_size, ?HTTP_PIPELINE}], + % not work here ? + [ibrowse:set_dest(Host, 80, Options) || Host <- get_req_hosts()], + ok. + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +download(Pid, MagHash) when is_list(MagHash), length(MagHash) == 40 -> + gen_server:cast(Pid, {download, MagHash, self()}). + +%{HashSum, ReqSum, TotalTime, CurrentReqCount} +stats(Pid) -> + gen_server:call(Pid, get_stats, infinity). + +init([]) -> + {ok, #state{start = now(), reqs = gb_trees:empty()}}. + +handle_cast({download, MagHash, From}, State) -> + #state{reqs = Reqs, hashSum = H, reqSum = R} = State, + NewReqs = create_download(Reqs, MagHash, From), + {noreply, State#state{reqs = NewReqs, hashSum = H + 1, reqSum = R + 1}}; + +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(_, State) -> + {noreply, State}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_response(State, ReqID, Body) -> + #state{reqSum = R, totalTime = T, reqs = Reqs} = State, + {MagHash, URLs, From, Start} = gb_trees:get(ReqID, Reqs), + NewT = T + timer:now_diff(now(), Start) div 1000, % mill-seconds + {NewReqS, NewReqs} = case unzip_content(Body) of + error -> + handle_next_req(MagHash, URLs, From, R, ReqID, Reqs); + Content -> + {R, handle_ok_response(MagHash, Content, From, ReqID, Reqs)} + end, + State#state{reqSum = NewReqS, reqs = NewReqs, totalTime = NewT}. + +handle_info({ibrowse_async_response, ReqID, Body}, State) -> + #state{reqs = Reqs} = State, + NewState = case gb_trees:is_defined(ReqID, Reqs) of + true -> + handle_response(State, ReqID, Body); + false -> + ?E(?FMT("not found req ~p , reqs count ~p", [ReqID, gb_trees:size(Reqs)])), + State + end, + {noreply, NewState}; + +handle_info(_, State) -> + {noreply, State}. + +handle_call(get_stats, _From, State) -> + #state{hashSum = H, reqSum = R, totalTime = T, reqs = Reqs} = State, + {reply, {H, R, T, gb_trees:size(Reqs)}, State}; + +handle_call(_, _From, State) -> + {noreply, State}. +%% +handle_next_req(MagHash, URLs, From, ReqSum, ReqID, Reqs) -> + DelReqs = gb_trees:delete(ReqID, Reqs), + case request_next(URLs) of + {error, empty} -> + From ! {got_torrent, failed, MagHash}, + {ReqSum, DelReqs}; + {ok, NewReqID, NewURLs, Time} -> + NewReq = {MagHash, NewURLs, From, Time}, + {ReqSum + 1, gb_trees:insert(NewReqID, NewReq, DelReqs)} + end. + +handle_ok_response(MagHash, Content, From, ReqID, Reqs) -> + From ! {got_torrent, ok, MagHash, Content}, + gb_trees:delete(ReqID, Reqs). + +create_download(Reqs, MagHash, From) -> + URLs = create_req_urls(MagHash), + case request_next(URLs) of + {ok, ReqID, NewURLs, Time} -> + Req = {MagHash, NewURLs, From, Time}, + gb_trees:insert(ReqID, Req, Reqs); + {error, empty} -> % exception + From ! {got_torrent, failed, MagHash}, + Reqs + end. + +request_next([]) -> + {error, empty}; + +request_next([URL|T]) -> + SSL = is_ssl_url(URL), + Options = [{is_ssl, SSL}, {ssl_options, []}, {stream_to, self()}, + {max_sessions, ?HTTP_SESSION}, {max_pipeline_size, ?HTTP_PIPELINE}], + case ibrowse:send_req(URL, [], get, [], Options, ?REQ_TIMEOUT) of + {ibrowse_req_id, ReqID} -> + {ok, ReqID, T, now()}; + Reason -> + ?E(?FMT("ibrowse send_req failed ~p", [Reason])), + request_next(T) + end. +%% +unzip_content(B) when is_list(B) -> + unzip_content(list_to_binary(B)); + +unzip_content(B) when is_binary(B), byte_size(B) > 0 -> + case catch(zlib:gunzip(B)) of + {'EXIT', _} -> error; + Res -> Res + end; + +unzip_content(_B) -> + error. + +%% http stuff +get_req_hosts() -> + ["http://bt.box.n0808.com", + "http://torcache.net", + "http://zoink.it"]. + +create_req_urls(MagHash) when is_list(MagHash), length(MagHash) == 40 -> + U1 = "http://torcache.net/torrent/" ++ MagHash ++ ".torrent", + U2 = format_btbox_url(MagHash), + % zoink.it support https, but the ssl library seems memory leak + U3 = "http://zoink.it/torrent/" ++ MagHash ++ ".torrent", + [U1, U2, U3]. + +is_ssl_url(URL) when is_list(URL), length(URL) > 4 -> + string:substr(URL, 1, 5) == "https". + +format_btbox_url(MagHash) -> + H = lists:sublist(MagHash, 2), + T = lists:nthtail(38, MagHash), + "http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent". + +%% +test(Pid, MagHash) -> + tor_download:download(Pid, MagHash), + filelib:ensure_dir("torrents/"), + Name = "torrents/" ++ MagHash ++ ".torrent", + receive + {got_torrent, ok, _, Content} -> + file:write_file(Name, Content); + {got_torrent, failed, _} -> + failed + after 8000 -> + timeout + end. + diff --git a/src/torrent/tor_download_stats.erl b/src/torrent/tor_download_stats.erl new file mode 100644 index 0000000..731f73a --- /dev/null +++ b/src/torrent/tor_download_stats.erl @@ -0,0 +1,70 @@ +%% +%% tor_download_stats.erl +%% Kevin Lynx +%% 06.30.2013 +%% +-module(tor_download_stats). +-behaviour(gen_server). +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([start_link/0, + stop/0, + register/1, + stats/0]). +-record(state, {pids = []}). + +start_link() -> + gen_server:start_link({local, srv_name()}, ?MODULE, [], []). + +stop() -> + gen_server:cast(srv_name(), stop). + +% {ProcessCount, HashSum, ReqSum, TotalTime, CurrentReqCount} +stats() -> + gen_server:call(srv_name(), stats, infinity). + +register(Pid) -> + gen_server:cast(srv_name(), {register, Pid}). + +srv_name() -> + ?MODULE. + +%% +init([]) -> + {ok, #state{}}. + +terminate(_, State) -> + {ok, State}. + +code_change(_, _, State) -> + {ok, State}. + +handle_info(_, State) -> + {noreply, State}. + +handle_cast({register, Pid}, State) -> + #state{pids = Pids} = State, + {noreply, State#state{pids = [Pid|Pids]}}; + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_call(stats, _From, State) -> + #state{pids = Pids} = State, + {reply, stats(Pids), State}; + +handle_call(_, _From, State) -> + {noreply, State}. + +stats(Pids) -> + % {ProcessCount, HashSum, ReqSum, TotalTime, CurrentReqCount} + Info = {0, 0, 0, 0, 0}, + lists:foldl(fun(Pid, {PSum, HashSum, ReqSum, TimeSum, CurReqCount}) -> + {ThisH, ThisReq, ThisTime, ThisCurReq} = tor_download:stats(Pid), + {PSum + 1, HashSum + ThisH, ReqSum + ThisReq, TimeSum + ThisTime, CurReqCount + ThisCurReq} + end, Info, Pids). + diff --git a/src/torrent/torrent_file.erl b/src/torrent/torrent_file.erl new file mode 100644 index 0000000..6f84850 --- /dev/null +++ b/src/torrent/torrent_file.erl @@ -0,0 +1,94 @@ +%% +%% torrent_file.erl +%% Kevin Lynx +%% 06.12.2013 +%% parse torrent torrent_file, it's dependent on bencode module. +%% +-module(torrent_file). +-export([parse_from_file/1, + parse/1, + size_brief/1, + size_string/1]). + +parse_from_file(File) -> + {ok, Data} = file:read_file(File), + parse(Data). + +% Format: +% {single, {name, length}} +% {multi, {root, [{path, length}, {path, length}]}} +parse(Content) -> + {ok, {dict, TD}} = bencode:decode(Content), + {ok, {dict, Info}} = dict:find(<<"info">>, TD), + case type(Info) of + single -> {single, parse_single(Info)}; + multi -> {multi, parse_multi(Info)} + end. + +size_brief(SizeInBytes) -> + BT = 1024, + KT = BT * 1024, + MT = KT * 1024, + GT = MT * 1024, + if + SizeInBytes < BT -> {byte, SizeInBytes}; + SizeInBytes < KT -> {kb, SizeInBytes div BT}; + SizeInBytes < MT -> {mb, SizeInBytes div KT}; + SizeInBytes < GT -> {gb, SizeInBytes div MT}; + true -> {byte, SizeInBytes} + end. + +size_string(SizeInBytes) -> + {T, N} = size_brief(SizeInBytes), + lists:flatten(io_lib:format("~b ~s", [N, atom_to_list(T)])). + +type(Info) -> + case dict:find(<<"files">>, Info) of + {ok, {list, _Files}} -> multi; + _ -> single + end. + +parse_single(Info) -> + Name = read_string("name", Info), + {ok, Length} = dict:find(<<"length">>, Info), + {Name, Length}. + +parse_multi(Info) -> + Root = read_string("name", Info), + {ok, {list, Files}} = dict:find(<<"files">>, Info), + FileInfo = [parse_file_item(Item) || {dict, Item} <- Files], + {Root, FileInfo}. + +parse_file_item(F) -> + {ok, Length} = dict:find(<<"length">>, F), + Path = read_path(F), + {Path, Length}. + +read_string(Key, Dict) -> + % prefer utf8 + case dict:find(list_to_binary(Key++".utf-8"), Dict) of + {ok, UTF8BS} -> + binary_to_list(UTF8BS); + _ -> + {ok, BS} = dict:find(list_to_binary(Key), Dict), + binary_to_list(BS) + end. + +read_path(F) -> + case dict:find(<<"path.utf-8">>, F) of + {ok, {list, Paths}} -> + concat_path(Paths); + _ -> + {ok, {list, Paths}} = dict:find(<<"path">>, F), + concat_path(Paths) + end. + +concat_path(Paths) -> + AppendSlash = fun(BS, Acc) -> + case Acc of + [] -> binary_to_list(BS); + _ -> Acc ++ "/" ++ binary_to_list(BS) + end + end, + lists:foldl(AppendSlash, "", Paths). + diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 0000000..d18f843 --- /dev/null +++ b/tools/README.md @@ -0,0 +1,18 @@ +## dhtcrawler2 + +This git branch maintain pre-compiled erlang files to start dhtcrawler2 directly. + +## Usage + +* install Erlang R16B or newer +* download mongodb and start mongodb first + + mongod --dbpath your-database-path --setParameter textSearchEnabled=true + +* start **crawler**, on Windows, just click `win_start_crawler.bat` +* start **hash_reader**, on Windows, just click `win_start_hash.bat` +* start **httpd**, on Windows, just click `win_start_http.bat` +* wait several minutes and checkout `localhost:8000` + + + diff --git a/tools/create_bin.bat b/tools/create_bin.bat new file mode 100644 index 0000000..a5b897f --- /dev/null +++ b/tools/create_bin.bat @@ -0,0 +1,16 @@ +cd .. +mkdir bin\deps\bson\ebin +mkdir bin\deps\mongodb\ebin +mkdir bin\deps\kdht\ebin +mkdir bin\deps\ibrowse\ebin +copy deps\bson\ebin\*.* bin\deps\bson\ebin\ +copy deps\mongodb\ebin\*.* bin\deps\mongodb\ebin\ +copy deps\kdht\ebin\*.* bin\deps\kdht\ebin\ +copy deps\ibrowse\ebin\*.* bin\deps\ibrowse\ebin\ +mkdir bin\www +copy www\*.* bin\www\ +copy tools\*.* bin\ +mkdir bin\priv +mkdir bin\ebin +copy ebin\*.* bin\ebin\ +pause diff --git a/tools/win_start_crawler.bat b/tools/win_start_crawler.bat new file mode 100644 index 0000000..c7cbd08 --- /dev/null +++ b/tools/win_start_crawler.bat @@ -0,0 +1,2 @@ +erl -pa ebin -noshell -s crawler_app start + diff --git a/tools/win_start_hash.bat b/tools/win_start_hash.bat new file mode 100644 index 0000000..67d86bf --- /dev/null +++ b/tools/win_start_hash.bat @@ -0,0 +1 @@ +erl -pa ebin -noshell -s db_hash_reader_sup start_standalone localhost 27017 10 diff --git a/tools/win_start_http.bat b/tools/win_start_http.bat new file mode 100644 index 0000000..caa6e2f --- /dev/null +++ b/tools/win_start_http.bat @@ -0,0 +1,2 @@ +erl -pa ebin -noshell -s crawler_http start + diff --git a/www/index.html b/www/index.html new file mode 100644 index 0000000..a2b9dbf --- /dev/null +++ b/www/index.html @@ -0,0 +1,66 @@ + + + + +Yet Another Magnet Search Engine + + + +
    + +
    + +
    +

    +Try AVIAmericanIron Man +

    +
    + + +
    +
    + + + diff --git a/www/page.temp b/www/page.temp new file mode 100644 index 0000000..9836b3d --- /dev/null +++ b/www/page.temp @@ -0,0 +1,59 @@ + + + + +Yet Another Magnet Search Engine + + + +
    + +
    + +
    +~s +
    + +
    +
    + + +