Sunday, February 8, 2009

Erlang 分布式爬虫

下面这个是主模块,后面2段代码分别是数据库模块,和一个附属功能模块.

%%%-------------------------------------------------------------------
%%% File : Distributed-Crawler
%%% Designer : free.Wang
%%% Description :
%%% Archieved : Jan 28, 2009
%%%-------------------------------------------------------------------
-module(crawler_server).
-behaviour(gen_server).
%% API
-export([start/1,login/0,crawl/0,check/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).


-define(MasterNode,'aaa@219.141.12.84').
-define(Downloadpath,'/Users/winfree/Project/distriawler/urlsea').
%-define(Mastername,).
%if this node is master.node , comment delow row.
%-define(Selfname,changjiangyihao).

%=====================================================================
% gen_server : {global,node()}
% send : mnesia:lookup(pid_table,node)
%=====================================================================

%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------

globalname(Mode) ->
if
Mode =:= 'slave' ->
[Name,_] = string:tokens(atom_to_list(node()),"@");
Mode =:= 'master' ->
[Name,_] = string:tokens(atom_to_list(?MasterNode),"@")
end,
list_to_atom(Name).




start(Mode) ->
if
Mode =:= 'master' ->
gen_server:start_link({global, globalname(master)}, ?MODULE, [], []);
Mode =:= 'slave' ->
net_adm:ping(?MasterNode),
gen_server:start_link({global, globalname(slave)}, ?MODULE, [], []),
login()
end.




%%====================================================================
%% gen_server callbacks
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
database:start(),
{ok,["success"]}.

%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------

%handle_call(_Request, _From, State) ->
% Reply = term(),
% {reply,ok,State};

login() ->
gen_server:call({global,globalname(master)},{login,{globalname(slave),node()}}).


handle_call({login,{Globalname,Node}},_From,State) ->
io:format("starting handling form ~p~n",[_From]),
[Id,Hostname] = string:tokens(atom_to_list(Node),"@"),
database:insert_address(Id,Hostname,Globalname),
% database exe. in MasterNode.
Reply = database:select_all(address),
{reply,Reply,State};

% this will be executed by Slave Node.
handle_call({crawl,Urls},_From,State) ->
lists:foreach(fun(Url) ->
io:format("~p~n",[Url]),
spawn(fun() ->
Command = " curl " ++ Url ++ " > " ++ lists:nth(2,string:tokens(Url,"//")),
% os:cmd("cd"++?Downloadpath),
os:cmd(Command),
io:format(" downloading :~p~n",[Command])
end)
end,Urls),
{reply,ok,State}.


%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.

%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------


allurls() ->
["http://www.g.cn","http://www.sina.com.cn","http://www.xiaonei.com",
"http://www.erlang.org","http://www.microsoft.com","http://www.yahoo.com",
"http://www.google.com","http://www.dell.com","http://www.doban.com"].

% communication with others.





check() ->
Nodes = database:select_all(address),
LoopNum = length(Nodes),
UrlGroups = tools:seperate_list(allurls(),LoopNum,[]),
io:format("~p~n------~p~n",[Nodes,UrlGroups]).

% this will deploy the CrawlerDuty by MasterNode.
crawl() ->
Nodes = database:select_all(address),
LoopNum = length(Nodes),
AIG = length(allurls()) div LoopNum,
UrlGroups = tools:seperate_list(allurls(),AIG,[]),
Final = lists:zip(Nodes,UrlGroups),
lists:foreach(fun(One) ->
{Node,UrlGroup} = One,
{_ ,_ ,_ , Globalname} = Node,
io:format("executed by ~p~n",[Globalname]),
spawn(fun() ->
gen_server:call({global,Globalname},{crawl,UrlGroup})
end)
end,Final).


下面是数据库模块:


-module(database).
-import(lists, [foreach/2]).
-compile(export_all).
-include_lib("stdlib/include/qlc.hrl").

%% API
-export([start/0,select_all/1,delete/2,insert_address/3,
insert_urltable/2]).



% address: {Globalname,id@hostname} to send Message.
% urltable:{"http://www.freeis.cn/new/year/","freeis.cn"}
-record(address, {id,hostname,globalname}).
-record(urltable, {url,domain}).



ready() ->
mnesia:create_schema([node()]),
mnesia:start(),
mnesia:create_table(address, [{attributes, record_info(fields, address )}]),
mnesia:create_table(urltable, [{attributes, record_info(fields, urltable)}]),
mnesia:stop().


start() ->
ready(),
mnesia:start(),
mnesia:wait_for_tables([address,urltable], 20000).

select_all(Table) ->
do(qlc:q([X || X <- mnesia:table(Table)])).

do(Q) ->
F = fun() -> qlc:e(Q) end,
{atomic, Val} = mnesia:transaction(F),
Val.

example_tables() ->
[%% The address table
{address, "jack", "10.1.225.117", "<0.33>"},
{address, "allen", "abc.example.com", "<7.313>"},
{address, "Foy", "chat.gogogo.com", "<1.214>"},
%% The urltable table
{urltable, "http://gogogo.sina.com", "sina.com"},
{urltable, "http://A.sohu.com", "sohu.com"},
{urltable, "http://B.avial.com", "avial.com"},
{urltable, "http://C.wgi.com", "wgi.com"}
].

insert_address(Id, Hostname, Globalname) ->
Row = #address{id=Id, hostname=Hostname, globalname=Globalname},
F = fun() ->
mnesia:write(Row)
end,
mnesia:transaction(F).

insert_urltable(Url,Domain) ->
Row = #urltable{url=Url,domain=Domain},
F = fun() ->
mnesia:write(Row)
end,
mnesia:transaction(F).

delete_address(Item) ->
Oid = {adress, Item},
F = fun() ->
mnesia:delete(Oid)
end,
mnesia:transaction(F).

delete_urltable(Item) ->
Oid = {urltable, Item},
F = fun() ->
mnesia:delete(Oid)
end,
mnesia:transaction(F).

delete(Table,Item) ->
if
Table =:= urltable ->
Oid = {urltable,Item};
Table =:= address ->
Oid = {address,Item}
end,
F = fun() ->
mnesia:delete(Oid)
end,
mnesia:transaction(F).

reset_tables() ->
mnesia:clear_table(urltable),
mnesia:clear_table(address),
F = fun() ->
foreach(fun mnesia:write/1, example_tables())
end,
mnesia:transaction(F).

get_plan(PlanId) ->
F = fun() -> mnesia:read({design, PlanId}) end,
mnesia:transaction(F)




这段是附加功能模块:

-module(tools).
-export([seperate_list/3,check/0]).

allurls() ->
["http://www.g.cn","http://www.sina.com.cn","http://www.xiaonei.com",
"http://www.erlang.org","http://www.microsoft.com","http://www.yahoo.com",
"http://www.google.com","http://www.dell.com","http://www.doban.com"].

% seperate_list([1,2,3,4,5],2,[]) ---> [ [1,2],[3,4],[5] ].
% seperate_list([1,2,3,4,5,6],2,[]) ---> [ [1,2],[3,4],[5,6] ].
seperate_list(List,NOP,LatestList) ->
% NOP = Number of One Page.
Rem = length(List) rem NOP,
Grp = lists:sublist(List,NOP+Rem),
Ext = lists:subtract(List,Grp),
if
length(Grp) > 0 ->
seperate_list(Ext,NOP,[Grp|LatestList]);
length(Grp) =:= 0 ->
LatestList
end.


check() ->
Allurl = allurls(),
seperate_list(Allurl,2,[]).

Followers