Monday, December 8, 2008

Erlang Mapreduce分析

最近在研究Erlang上的MapReduce框架. 本段代码摘自<>20章 Multicores Programming.

开始----


% 代码来自phofs.erl 模块. phofs is short for Parrallel Highrt-Order Function.
-module(phofs).
-export([mapreduce/4]).

-import(lists, [foreach/2]).

%% F1(Pid, X) -> sends {Key,Val} messages to Pid
%% F2(Key, [Val], AccIn) -> AccOut


mapreduce(F1, F2, Acc0, L) ->
S = self(),
Pid = spawn(fun() -> run(S, F1, F2, Acc0, L) end),
receive
{Pid, Result} ->
Result
end.

% 本段代码的主函数 mapreduce由4个参数组成.
% F1 - Map函数
% F2 - Reduce函数
% Acc0 - 累加器的初始值, 函数最后会返回Acc0的最后累加结果
% L - 运算数据,装载在一个序列中.
run(Parent, F1, F2, Acc0, L) ->
process_flag(trap_exit, true),
ReducePid = self(),

%% 启动 Map运算
%% 对 序列L 中的每一个单元e 进行F1(e)运算. 并为每个单元创建一个进程
%% 此步是ERLANG优势的最大体现.最大化计算性能
foreach(fun(X) ->
spawn_link(fun() -> do_job(ReducePid, F1, X) end)
end, L),

% 获得 序列L 的元素量N.也就是总进程数N,并等待来自N个进程的结果,存到Dict0.
N = length(L),
Dict0 = dict:new(),
Dict1 = collect_replies(N, Dict0),

%% 启动Reduce运算,F2 代表某种Reduce 法则.用户可以自己选择并归的方案.
%% 对Dict1 哈希序列的key-value元素e 运行F2(e)并返回Acc0的累加器.
%% dict:fold函数我研究了好久,后来不断的查看文档,搜结果,讨论(可能本身愚钝:P), 其相当于lists:foldr
%% 也就是相当于python的reduce函数 e.g: reduce(lambda x,y:x+y,range(5)) ===> 1+2+3+4+5.运算集合来自Dict1,运算法则来自F2.
Acc = dict:fold(F2, Acc0, Dict1),

%% 最后把结果返回主函数mapreduce. 并打印.
Parent ! {self(), Acc}.

%% collect_replies 函数 负责处理Map进程的返回结果.并存入某个容器(这里用的是进程字典,同样,Ets,Dts都是可选容器)
%% 当某个Map 任务完成,会自动结束起进程proc,并发送'EXIT',这时候collect_replies会把这个proc 从等待进程序列删除.
>%% 当N=0时,所有map任务完毕,就会把所有相同key值的元素整合到同一个序列中作为新value,并重新对应相应的key.
%% 最后形成一个完整的 key-value Hash Table, 并返回到上面的 Dict1.

collect_replies(0, Dict) ->
Dict;
collect_replies(N, Dict) ->
receive
{Key, Val} ->
case dict:is_key(Key, Dict) of
true ->
Dict1 = dict:append(Key, Val, Dict),
collect_replies(N, Dict1);
false ->
Dict1 = dict:store(Key,[Val], Dict),
collect_replies(N, Dict1)
end;
{'EXIT', _, _} ->
collect_replies(N-1, Dict)
end.

%% Call F(Pid, X)
%% F must send {Key, Value} messsages to Pid
%% and then terminate

do_job(ReducePid, F, X) ->
F(ReducePid, X).


-----完

提示:鉴于对MapReduce函数的设计,Reduce必须等待所有相关Map进程返回才能开始启动.所以合理的配置Map-Reduce节点是关键,避免时间冗余,最小化等待时间,是M/R 架构师能力的重要体现


No comments:

Followers