Showing posts with label erlang. Show all posts
Showing posts with label erlang. Show all posts

Tuesday, March 24, 2009

erlang和python交互

最近开发 Erlang ,对其字符串处理能力无言至极,于是决定把它和python联合起来,打造一个强力的分布式系统,等将来需要系统级开发时,我再把 C++/C组合进来.

首先参考了 Erlang 官方文档和 http://blog.developers.api.sina.com.cn/?tag=erlang 以及 http://kazmier.net/computer/port-howto/ .

研读了将近24个小时, 才终于完全把问题解决. 开始 python 一直是broken pipe.后来才知道是管道中断导致的问题.

接着我排除了python的问题, 在erlang上面发现了Timeout, 感谢litaocheng的提醒,翻看了文档,把gen_server:call 的超时控制为infinity, 这样完全排除了超时的错误.

后来发现执行combin()之后,系统一直在等待. 我重新参考了下前面的2个成功案例,数据流的结尾应该
加上"\n"(windows为"\t\n"),并且erlang需要转换二进制,于是就是

String = list_to_binary("combine|"++_String++"\n"),

python端就成功收到了Stream.
这里combine只是我自己定的一个通信协议 合成请求|String1,String2 => string1+string2.








%%%-------------------------------------------------------------------  

%%% File    : gen_server_template.full  

%%% Designer  : free.Won <freefis@Gmail.com>  

%%% Description :  

%%%     Communication between Python and Erlang via Stdio.  

%%% Archieved :  Mar 24 2009 by  free.Wang <freefis@Gmail.com>  

%%%-------------------------------------------------------------------  

-module(comm).  

-behaviour(gen_server).  

%% API  

-export([start/0,combine/1]).  

%% gen_server callbacks  

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,  

     terminate/2, code_change/3]).  

-record(state, {port}).

  

start() ->  

    gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).

init([]) ->  

    process_flag(trap_exit, true),

    Port = open_port({spawn, "python -u /home/freefis/Desktop/comm.py"},[stream,{line, 1024}]),

    {ok, #state{port = Port}}.

handle_call({combine,String}, _From, #state{port = Port} = State) ->  

    port_command(Port,String),

    receive

        {Port,{data,{_Flag,Data}}} -> 

            io:format("receiving:~p~n",[Data]),

            sleep(2000),

            {reply, Data, Port}

    end.

handle_cast(_Msg, State) ->  

    {noreply, State}.  

handle_info(Info, State) ->

    {noreply,State}.

%    {stop, {port_terminated, Reason}, Port}.

terminate(_Reason, Port) ->  

    ok.

code_change(_OldVsn, State, _Extra) ->  

    {ok, State}.  

  

%%--------------------------------------------------------------------  

%%% Internal functions  

%%--------------------------------------------------------------------

sleep(T) ->

    receive

        after T ->

            true

    end.

combine(_String) ->

    String = list_to_binary("combine|"++_String++"\n"),

    gen_server:call({global,?MODULE},{combine,String},infinity).








#!/usr/bin/env python

# -*- coding:utf-8 -*-

#-------------------------------------------

# Designer  : Free.Wang

# E-mail    : freefis@Gmail.com

# Licence   : License on GPL Licence

# Archieved : Dec 27 2008

#-------------------------------------------







import sys



def handle(_string):

    if _string.startswith("combine|"):

        string = "".join( _string[8:].split(","))

    return string







"""waiting for input """

while 1:

    # Recv. Binary Stream as Standard IN

    _stream = sys.stdin.readline()

    if not _stream: break

    # Scheme, Turn into  Formal String

    inString  = _stream.strip("\r\n")

    # handle String

    outString = handle(inString)

    # send to port as Standart OUT

    sys.stdout.write("%s\n" % (outString,))

    sys.exit(0)


Sunday, February 8, 2009

Erlang 分布式爬虫

下面这个是主模块,后面2段代码分别是数据库模块,和一个附属功能模块.

%%%-------------------------------------------------------------------
%%% File : Distributed-Crawler
%%% Designer : free.Wang
%%% Description :
%%% Archieved : Jan 28, 2009
%%%-------------------------------------------------------------------
-module(crawler_server).
-behaviour(gen_server).
%% API
-export([start/1,login/0,crawl/0,check/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).


-define(MasterNode,'aaa@219.141.12.84').
-define(Downloadpath,'/Users/winfree/Project/distriawler/urlsea').
%-define(Mastername,).
%if this node is master.node , comment delow row.
%-define(Selfname,changjiangyihao).

%=====================================================================
% gen_server : {global,node()}
% send : mnesia:lookup(pid_table,node)
%=====================================================================

%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------

globalname(Mode) ->
if
Mode =:= 'slave' ->
[Name,_] = string:tokens(atom_to_list(node()),"@");
Mode =:= 'master' ->
[Name,_] = string:tokens(atom_to_list(?MasterNode),"@")
end,
list_to_atom(Name).




start(Mode) ->
if
Mode =:= 'master' ->
gen_server:start_link({global, globalname(master)}, ?MODULE, [], []);
Mode =:= 'slave' ->
net_adm:ping(?MasterNode),
gen_server:start_link({global, globalname(slave)}, ?MODULE, [], []),
login()
end.




%%====================================================================
%% gen_server callbacks
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
database:start(),
{ok,["success"]}.

%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------

%handle_call(_Request, _From, State) ->
% Reply = term(),
% {reply,ok,State};

login() ->
gen_server:call({global,globalname(master)},{login,{globalname(slave),node()}}).


handle_call({login,{Globalname,Node}},_From,State) ->
io:format("starting handling form ~p~n",[_From]),
[Id,Hostname] = string:tokens(atom_to_list(Node),"@"),
database:insert_address(Id,Hostname,Globalname),
% database exe. in MasterNode.
Reply = database:select_all(address),
{reply,Reply,State};

% this will be executed by Slave Node.
handle_call({crawl,Urls},_From,State) ->
lists:foreach(fun(Url) ->
io:format("~p~n",[Url]),
spawn(fun() ->
Command = " curl " ++ Url ++ " > " ++ lists:nth(2,string:tokens(Url,"//")),
% os:cmd("cd"++?Downloadpath),
os:cmd(Command),
io:format(" downloading :~p~n",[Command])
end)
end,Urls),
{reply,ok,State}.


%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.

%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------


allurls() ->
["http://www.g.cn","http://www.sina.com.cn","http://www.xiaonei.com",
"http://www.erlang.org","http://www.microsoft.com","http://www.yahoo.com",
"http://www.google.com","http://www.dell.com","http://www.doban.com"].

% communication with others.





check() ->
Nodes = database:select_all(address),
LoopNum = length(Nodes),
UrlGroups = tools:seperate_list(allurls(),LoopNum,[]),
io:format("~p~n------~p~n",[Nodes,UrlGroups]).

% this will deploy the CrawlerDuty by MasterNode.
crawl() ->
Nodes = database:select_all(address),
LoopNum = length(Nodes),
AIG = length(allurls()) div LoopNum,
UrlGroups = tools:seperate_list(allurls(),AIG,[]),
Final = lists:zip(Nodes,UrlGroups),
lists:foreach(fun(One) ->
{Node,UrlGroup} = One,
{_ ,_ ,_ , Globalname} = Node,
io:format("executed by ~p~n",[Globalname]),
spawn(fun() ->
gen_server:call({global,Globalname},{crawl,UrlGroup})
end)
end,Final).


下面是数据库模块:


-module(database).
-import(lists, [foreach/2]).
-compile(export_all).
-include_lib("stdlib/include/qlc.hrl").

%% API
-export([start/0,select_all/1,delete/2,insert_address/3,
insert_urltable/2]).



% address: {Globalname,id@hostname} to send Message.
% urltable:{"http://www.freeis.cn/new/year/","freeis.cn"}
-record(address, {id,hostname,globalname}).
-record(urltable, {url,domain}).



ready() ->
mnesia:create_schema([node()]),
mnesia:start(),
mnesia:create_table(address, [{attributes, record_info(fields, address )}]),
mnesia:create_table(urltable, [{attributes, record_info(fields, urltable)}]),
mnesia:stop().


start() ->
ready(),
mnesia:start(),
mnesia:wait_for_tables([address,urltable], 20000).

select_all(Table) ->
do(qlc:q([X || X <- mnesia:table(Table)])).

do(Q) ->
F = fun() -> qlc:e(Q) end,
{atomic, Val} = mnesia:transaction(F),
Val.

example_tables() ->
[%% The address table
{address, "jack", "10.1.225.117", "<0.33>"},
{address, "allen", "abc.example.com", "<7.313>"},
{address, "Foy", "chat.gogogo.com", "<1.214>"},
%% The urltable table
{urltable, "http://gogogo.sina.com", "sina.com"},
{urltable, "http://A.sohu.com", "sohu.com"},
{urltable, "http://B.avial.com", "avial.com"},
{urltable, "http://C.wgi.com", "wgi.com"}
].

insert_address(Id, Hostname, Globalname) ->
Row = #address{id=Id, hostname=Hostname, globalname=Globalname},
F = fun() ->
mnesia:write(Row)
end,
mnesia:transaction(F).

insert_urltable(Url,Domain) ->
Row = #urltable{url=Url,domain=Domain},
F = fun() ->
mnesia:write(Row)
end,
mnesia:transaction(F).

delete_address(Item) ->
Oid = {adress, Item},
F = fun() ->
mnesia:delete(Oid)
end,
mnesia:transaction(F).

delete_urltable(Item) ->
Oid = {urltable, Item},
F = fun() ->
mnesia:delete(Oid)
end,
mnesia:transaction(F).

delete(Table,Item) ->
if
Table =:= urltable ->
Oid = {urltable,Item};
Table =:= address ->
Oid = {address,Item}
end,
F = fun() ->
mnesia:delete(Oid)
end,
mnesia:transaction(F).

reset_tables() ->
mnesia:clear_table(urltable),
mnesia:clear_table(address),
F = fun() ->
foreach(fun mnesia:write/1, example_tables())
end,
mnesia:transaction(F).

get_plan(PlanId) ->
F = fun() -> mnesia:read({design, PlanId}) end,
mnesia:transaction(F)




这段是附加功能模块:

-module(tools).
-export([seperate_list/3,check/0]).

allurls() ->
["http://www.g.cn","http://www.sina.com.cn","http://www.xiaonei.com",
"http://www.erlang.org","http://www.microsoft.com","http://www.yahoo.com",
"http://www.google.com","http://www.dell.com","http://www.doban.com"].

% seperate_list([1,2,3,4,5],2,[]) ---> [ [1,2],[3,4],[5] ].
% seperate_list([1,2,3,4,5,6],2,[]) ---> [ [1,2],[3,4],[5,6] ].
seperate_list(List,NOP,LatestList) ->
% NOP = Number of One Page.
Rem = length(List) rem NOP,
Grp = lists:sublist(List,NOP+Rem),
Ext = lists:subtract(List,Grp),
if
length(Grp) > 0 ->
seperate_list(Ext,NOP,[Grp|LatestList]);
length(Grp) =:= 0 ->
LatestList
end.


check() ->
Allurl = allurls(),
seperate_list(Allurl,2,[]).

Wednesday, December 17, 2008

分布式gen_server结构

在programming in Erlang中 16章最后给出了my_bank.erl的一个gen_server 实现. 当时一直在琢磨分布式的问题.把C/S分离.后来做了测试. gen_server实现的6个必要条件是 start_link , init , handle_call , handle_cast , handle_info , terminate.
换句话说,这是最基本的gen_server实现. 那么我们只需要把这6个函数放在一个执行在服务器端的模块里.
然后其他逻辑就可以转移到客户端节点了.

而客户端的主要逻辑是:
gen_server:call(Name,Request).

这样一个部署逻辑就实现了. 在具体code过程中,
server:start_link 的Name 要用 {global,arbitary_atom}.

client:gen_server:call 的Name 要用 {global,arbitary_atom}.


最后集群启动 erl -name id@domain
发送简单的 net_adm:ping(server_id@domain).
(其实server ping client 也行,但是对于伸缩性集群来说,新节点加进来的时候,当然第一行动是它发出请求,而不是server.所以这里应当养成良好习惯)

这一步的作用是建立了节点之间的通信,在完成之后,节点间会把global注册了的全局名称发布到集群中.

gen_server的分布式结构就完成了. 当我们构建一个强大多应用的服务器时,就可以把gen_server的逻辑分成N部分,然后部署到N个节点中.

这样,ERLANG的巨大威力就完美的发挥了.

Monday, December 8, 2008

Erlang Mapreduce分析

最近在研究Erlang上的MapReduce框架. 本段代码摘自<>20章 Multicores Programming.

开始----


% 代码来自phofs.erl 模块. phofs is short for Parrallel Highrt-Order Function.
-module(phofs).
-export([mapreduce/4]).

-import(lists, [foreach/2]).

%% F1(Pid, X) -> sends {Key,Val} messages to Pid
%% F2(Key, [Val], AccIn) -> AccOut


mapreduce(F1, F2, Acc0, L) ->
S = self(),
Pid = spawn(fun() -> run(S, F1, F2, Acc0, L) end),
receive
{Pid, Result} ->
Result
end.

% 本段代码的主函数 mapreduce由4个参数组成.
% F1 - Map函数
% F2 - Reduce函数
% Acc0 - 累加器的初始值, 函数最后会返回Acc0的最后累加结果
% L - 运算数据,装载在一个序列中.
run(Parent, F1, F2, Acc0, L) ->
process_flag(trap_exit, true),
ReducePid = self(),

%% 启动 Map运算
%% 对 序列L 中的每一个单元e 进行F1(e)运算. 并为每个单元创建一个进程
%% 此步是ERLANG优势的最大体现.最大化计算性能
foreach(fun(X) ->
spawn_link(fun() -> do_job(ReducePid, F1, X) end)
end, L),

% 获得 序列L 的元素量N.也就是总进程数N,并等待来自N个进程的结果,存到Dict0.
N = length(L),
Dict0 = dict:new(),
Dict1 = collect_replies(N, Dict0),

%% 启动Reduce运算,F2 代表某种Reduce 法则.用户可以自己选择并归的方案.
%% 对Dict1 哈希序列的key-value元素e 运行F2(e)并返回Acc0的累加器.
%% dict:fold函数我研究了好久,后来不断的查看文档,搜结果,讨论(可能本身愚钝:P), 其相当于lists:foldr
%% 也就是相当于python的reduce函数 e.g: reduce(lambda x,y:x+y,range(5)) ===> 1+2+3+4+5.运算集合来自Dict1,运算法则来自F2.
Acc = dict:fold(F2, Acc0, Dict1),

%% 最后把结果返回主函数mapreduce. 并打印.
Parent ! {self(), Acc}.

%% collect_replies 函数 负责处理Map进程的返回结果.并存入某个容器(这里用的是进程字典,同样,Ets,Dts都是可选容器)
%% 当某个Map 任务完成,会自动结束起进程proc,并发送'EXIT',这时候collect_replies会把这个proc 从等待进程序列删除.
>%% 当N=0时,所有map任务完毕,就会把所有相同key值的元素整合到同一个序列中作为新value,并重新对应相应的key.
%% 最后形成一个完整的 key-value Hash Table, 并返回到上面的 Dict1.

collect_replies(0, Dict) ->
Dict;
collect_replies(N, Dict) ->
receive
{Key, Val} ->
case dict:is_key(Key, Dict) of
true ->
Dict1 = dict:append(Key, Val, Dict),
collect_replies(N, Dict1);
false ->
Dict1 = dict:store(Key,[Val], Dict),
collect_replies(N, Dict1)
end;
{'EXIT', _, _} ->
collect_replies(N-1, Dict)
end.

%% Call F(Pid, X)
%% F must send {Key, Value} messsages to Pid
%% and then terminate

do_job(ReducePid, F, X) ->
F(ReducePid, X).


-----完

提示:鉴于对MapReduce函数的设计,Reduce必须等待所有相关Map进程返回才能开始启动.所以合理的配置Map-Reduce节点是关键,避免时间冗余,最小化等待时间,是M/R 架构师能力的重要体现


Followers