套接字模式

主动模式(选项{active, true})一般让人很喜欢,非阻塞消息接收,但在系统无法应对超大流量请求时,客户端发送的数据快过服务器可以处理的速度,那么系统就可能会造成消息缓冲区被塞满,可能出现持续繁忙的流量的极端情况下,系统因请求而溢出,虚拟机造成内存不足的风险而崩溃。

使用被动模式(选项{active, false})的套接字,底层的TCP缓冲区可用于抑制请求,并拒绝客户端的消息,在接收数据的地方都会调用gen_tcp:recv,造成阻塞(单进程模式下就只能消极等待某一个具体的客户端套接字,很危险)。需要注意的是,操作系统可能还会做一些缓存允许客户端机器继续发送少量数据,然后才会将其阻塞,此时Erlang尚未调用recv函数。

混合型模式(半阻塞),使用选项{active, once}打开,主动仅针对一个消息,在控制进程发送完一个数据消息后,必须显示调用inet:setopts(Socket, [{active, once}])重新激活以便接受下一个消息(在此之前,系统处于阻塞状态)。可见,混合型模式综合了主动模式和被动模式的两者优势,可实现流量控制,防止服务器被过多消息淹没。

以下TCP Server代码,都是建立在混合型模式(半阻塞)基础上。

prim_inet相关说明

prim_inet没有官方文档,可以认为是对底层socket的直接包装。淘宝yufeng说,这是otp内部实现的细节 是针对Erlang库开发者的private module,底层模块,不推荐使用。但在Building a Non-blocking TCP server using OTP principles示范中演示了prim_inet操作Socket异步特性。

设计模式

一般来说,需要一个单独进程进行客户端套接字监听,每一个子进程进行处理来自具体客户端的socket请求。

Building a Non-blocking TCP server using OTP principles示范中,子进程使用gen_fsm处理,很巧妙的结合状态机和消息事件,值得学习。

Erlang: A Generalized TCP Server文章中,作者也是使用此模式,但子进程不符合OTP规范,因此个人认为不是一个很好的实践模式。

simple_one_for_one

简易的一对一监督进程,用来创建一组动态子进程。对于需要并发处理多个请求的服务器较为合适。比如socket 服务端接受新的客户端连接请求以后,需要动态创建一个新的socket连接处理子进程。若遵守OTP原则,那就是子监督进程。

TCP Server实现

基于标准API简单实现

也是基于{active, once}模式,但阻塞的等待下一个客户端连接的任务被抛给了子监督进程。

看一下入口tcp_server_app吧

module(tcp_server_app).
author(‘yongboy@gmail.com’).
behaviour(application).
export([start/2, stop/1]).
define(DEF_PORT, 2222).
start(_Type, _Args) ->
Opts = [binary, {packet, 2}, {reuseaddr, true},
{keepalive, true}, {backlog, 30}, {active, false}],
ListenPort = get_app_env(listen_port, ?DEF_PORT),
{ok, LSock} = gen_tcp:listen(ListenPort, Opts),
case tcp_server_sup:start_link(LSock) of
{ok, Pid} ->
tcp_server_sup:start_child(),
{ok, Pid};
Other ->
{error, Other}
end.
stop(_S) ->
ok.
get_app_env(Opt, Default) ->
case application:get_env(application:get_application(), Opt) of
{ok, Val} -> Val;
_ ->
case init:get_argument(Opt) of
[[Val | _]] -> Val;
error -> Default
end
end.

读取端口,然后启动主监督进程(此时还不会监听处理客户端socket请求),紧接着启动子监督进程,开始处理来自客户端的socket的连接。

监督进程tcp_server_sup也很简单:

module(tcp_server_sup).
author(‘yongboy@gmail.com’).
behaviour(supervisor).
export([start_link/1, start_child/0]).
export([init/1]).
define(SERVER, ?MODULE).
start_link(LSock) ->
supervisor:start_link({local, ?SERVER}, ?MODULE, [LSock]).
start_child() ->
supervisor:start_child(?SERVER, []).
init([LSock]) ->
Server = {tcp_server_handler, {tcp_server_handler, start_link, [LSock]},
temporary, brutal_kill, worker, [tcp_server_handler]},
Children = [Server],
RestartStrategy = {simple_one_for_one, 0, 1},
{ok, {RestartStrategy, Children}}.

需要注意的是,只有调用start_child函数时,才真正调用tcp_server_handler:start_link([LSock])函数。

tcp_server_handler的代码也不复杂:

module(tcp_server_handler).
behaviour(gen_server).
export([start_link/1]).
export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
record(state, {lsock, socket, addr}).
start_link(LSock) ->
gen_server:start_link(?MODULE, [LSock], []).
init([Socket]) ->
inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
{ok, #state{lsock = Socket}, 0}.
handle_call(Msg, _From, State) ->
{reply, {ok, Msg}, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info({tcp, Socket, Data}, State) ->
inet:setopts(Socket, [{active, once}]),
io:format(~p got message ~p\n, [self(), Data]),
ok = gen_tcp:send(Socket, <<Echo back : , Data/binary>>),
{noreply, State};
handle_info({tcp_closed, Socket}, #state{addr=Addr} = StateData) ->
error_logger:info_msg(~p Client ~p disconnected.\n, [self(), Addr]),
{stop, normal, StateData};
handle_info(timeout, #state{lsock = LSock} = State) ->
{ok, ClientSocket} = gen_tcp:accept(LSock),
{ok, {IP, _Port}} = inet:peername(ClientSocket),
tcp_server_sup:start_child(),
{noreply, State#state{socket=ClientSocket, addr=IP}};
handle_info(_Info, StateData) ->
{noreply, StateData}.
terminate(_Reason, #state{socket=Socket}) ->
(catch gen_tcp:close(Socket)),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

代码很精巧,有些小技巧在里面。子监督进程调用start_link函数,init会返回{ok, #state{lsock = Socket}, 0}. 数字0代表了timeout数值,意味着gen_server马上调用handle_info(timeout, #state{lsock = LSock} = State)函数,执行客户端socket监听,阻塞于此,但不会影响在此模式下其它函数的调用。直到有客户端进来,然后启动一个新的子监督进程tcp_server_handler,当前子监督进程解除阻塞。

 

基于prim_inet实现

这个实现师从于Non-blocking TCP server using OTP principles一文,但子进程改为了gen_server实现。

看一看入口,很简单的:

module(tcp_server_app).
author(‘yongboy@gmail.com’).
behaviour(application).
export([start_client/1]).
export([start/2, stop/1]).
define(DEF_PORT, 2222).
%% A startup function for spawning new client connection handling FSM.
%% To be called by the TCP listener process.
start_client(Socket) ->
tcp_server_sup:start_child(Socket).
start(_Type, _Args) ->
ListenPort = get_app_env(listen_port, ?DEF_PORT),
tcp_server_sup:start_link(ListenPort, tcp_client_handler).
stop(_S) ->
ok.
get_app_env(Opt, Default) ->
case application:get_env(application:get_application(), Opt) of
{ok, Val} -> Val;
_ ->
case init:get_argument(Opt) of
[[Val | _]] -> Val;
error -> Default
end
end.

监督进程代码:

module(tcp_server_sup).
author(‘yongboy@gmail.com’).
behaviour(supervisor).
export([start_child/1, start_link/2, init/1]).
define(SERVER, ?MODULE).
define(CLIENT_SUP, tcp_client_sup).
define(MAX_RESTART, 5).
define(MAX_TIME, 60).
start_child(Socket) ->
supervisor:start_child(?CLIENT_SUP, [Socket]).
start_link(ListenPort, HandleMoudle) ->
supervisor:start_link({local, ?SERVER}, ?MODULE, [ListenPort, HandleMoudle]).
init([Port, Module]) ->
TcpListener = {tcp_server_sup, % Id = internal id
{tcp_listener, start_link, [Port, Module]}, % StartFun = {M, F, A}
permanent, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
[tcp_listener] % Modules = [Module] | dynamic
},
TcpClientSupervisor = {?CLIENT_SUP,
{supervisor, start_link, [{local, ?CLIENT_SUP}, ?MODULE, [Module]]},
permanent,
infinity,
supervisor,
[]
},
{ok,
{{one_for_one, ?MAX_RESTART, ?MAX_TIME},
[TcpListener, TcpClientSupervisor]
}
};
init([Module]) ->
{ok,
{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
% TCP Client
{ undefined, % Id = internal id
{Module, start_link, []}, % StartFun = {M, F, A}
temporary, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
[] % Modules = [Module] | dynamic
}
]
}
}.

策略不一样,one_for_one包括了一个监听进程tcp_listener,还包含了一个tcp_client_sup进程树(simple_one_for_one策略)

tcp_listener单独一个进程用于监听来自客户端socket的连接:

module(tcp_listener).
author(‘saleyn@gmail.com’).
behaviour(gen_server).
export([start_link/2]).
export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
record(state, {
listener, % Listening socket
acceptor, % Asynchronous acceptor’s internal reference
module % FSM handling module
}).
start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).
init([Port, Module]) ->
process_flag(trap_exit, true),
Opts = [binary, {packet, 2}, {reuseaddr, true},
{keepalive, true}, {backlog, 30}, {active, false}],
case gen_tcp:listen(Port, Opts) of
{ok, Listen_socket} ->
%%Create first accepting process
{ok, Ref} = prim_inet:async_accept(Listen_socket, 1),
{ok, #state{listener = Listen_socket,
acceptor = Ref,
module = Module}};
{error, Reason} ->
{stop, Reason}
end.
handle_call(Request, _From, State) ->
{stop, {unknown_call, Request}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
#state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
try
case set_sockopt(ListSock, CliSocket) of
ok -> ok;
{error, Reason} -> exit({set_sockopt, Reason})
end,
%% New client connected – spawn a new process using the simple_one_for_one
%% supervisor.
{ok, Pid} = tcp_server_app:start_client(CliSocket),
gen_tcp:controlling_process(CliSocket, Pid),
%% Signal the network driver that we are ready to accept another connection
case prim_inet:async_accept(ListSock, 1) of
{ok, NewRef} -> ok;
{error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
end,
{noreply, State#state{acceptor=NewRef}}
catch exit:Why ->
error_logger:error_msg(Error in async accept: ~p.\n, [Why]),
{stop, Why, State}
end;
handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
error_logger:error_msg(Error in socket acceptor: ~p.\n, [Error]),
{stop, Error, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
gen_tcp:close(State#state.listener),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Taken from prim_inet. We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
true = inet_db:register_socket(CliSocket, inet_tcp),
case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
{ok, Opts} ->
case prim_inet:setopts(CliSocket, Opts) of
ok -> ok;
Error -> gen_tcp:close(CliSocket), Error
end;
Error ->
gen_tcp:close(CliSocket), Error
end.
view rawtcp_listener.erl hosted with ❤ by GitHub

很显然,接收客户端的连接之后,转交给tcp_client_handler模块进行处理:

module(tcp_client_handler).
behaviour(gen_server).
export([start_link/1]).
export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
record(state, {socket, addr}).
define(TIMEOUT, 120000).
start_link(Socket) ->
gen_server:start_link(?MODULE, [Socket], []).
init([Socket]) ->
inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
{ok, {IP, _Port}} = inet:peername(Socket),
{ok, #state{socket=Socket, addr=IP}}.
handle_call(Request, From, State) ->
{noreply, ok, State}.
handle_cast(Msg, State) ->
{noreply, State}.
handle_info({tcp, Socket, Data}, State) ->
inet:setopts(Socket, [{active, once}]),
io:format(~p got message ~p\n, [self(), Data]),
ok = gen_tcp:send(Socket, <<Echo back : , Data/binary>>),
{noreply, State};
handle_info({tcp_closed, Socket}, #state{addr=Addr} = StateData) ->
error_logger:info_msg(~p Client ~p disconnected.\n, [self(), Addr]),
{stop, normal, StateData};
handle_info(_Info, StateData) ->
{noreply, StateData}.
terminate(_Reason, #state{socket=Socket}) ->
(catch gen_tcp:close(Socket)),
ok.
code_change(OldVsn, State, Extra) ->
{ok, State}.

和标准API对比一下,可以感受到异步IO的好处。

小结

通过不同的模式,简单实现一个基于Erlang OTP的TCP服务器,也是学习总结,不至于忘记。

您若有更好的建议,欢迎告知,谢谢。

link: http://www.blogjava.net/yongboy/archive/2012/10/24/390185.html

为了研究怎么用Erlang写一个游戏服务器,我很幸运的下到了一份英雄远征的服
务器Erlang源码,这两天花了点时间看代码,其中看到做TCP的accept动作时,它
是用的一个函数prim_inet:async_accept/2,这个可跟书上说的不一样(一般来
说书上教的是用gen_tcp:accept/1),于是我google了一下,发现找不到文档,
再翻一下发现已经有不少人问为什么这是一个undocumented的函数,也就是说
Erlang就没想让你去用这个函数,所以文档自然没提供。一般来说undocumented
的函数你是最好别用,因为下一次Erlang更新的时候没准就没这个函数了,或者
参数变了,或者行为变了。总之各种不靠谱的事都可以发生。这个事情可以由
这个帖子 看到。不过,这个帖子还特地说了:However, you might find
prim_inet:async_accept/2 useful.这样又把我们带到了 这个帖子 。在这里,
楼主说看起来这个函数很有趣,很有用,对此提了2个问题,1是为什么这个函数
还是一个undocumented,2是用这个函数安全吗?楼主还给了一篇讲如何用OTP原
理打造一个非阻塞的TCP服务器的文章。

这篇文章中说,虽然prim_inet:async_accept/2是一个undocumented的函数,但
因为他要写一个非阻塞的accept,所以还是会冒险去挖掘它的潜能。因为普通的
函数gen_tcp:accept/1是阻塞的,而现在需要做一个非阻塞的accept,只好用这
个undocumented的函数。

考虑一下这种异步的accept实现可以比同步的accept快多少?当同时有100个并发
的连接请求时,如果同样是有10个进程在做accept,异步的情况能让这100个请求
同时开始被处理。而同步的实现则需要让90个请求等待,先和10个请求accept完
再处理。最撮的那10个请求将要等待9次。(注:这只是简化的思考,实际上有可
能某个请求要等上几十次)再考虑每次的accept,这个我不太清楚,但我想应该
就是建立TCP连接的过程,也就是说client和server之间来回要跑3个包。假设平
均的延时是40ms,那么来回3次是120ms,等待10次差不多是要1秒多。考虑更差的
情况下,可能要等上几十秒,这种就已经是不能忍受的了。从这个角度来说,异
步的accept还是有价值的。

但是,同时有大量并发的连接请求的情况并不会经常出现。以游戏为例,只在刚
开服的时候会遇到这样的问题。正常运行的服务器很少再遇到大量的并发连接请
求。我想说的是,如果我们用100个,甚至1000个阻塞性的accept进程来代替这种
非正式的异步实现,也未尝不可。毕竟1000个进程对于erlang来说还是小case,
而对一个游戏服务器已经够用了。

最后总结一下,一,prim_inet:async_accept/2实现的异步OTP的TCP server在处
理大量并发的情况有优势;二,这种情况可以通过多开一些同步的阻塞性accept
进程在一定程度上克服;三,调用这个函数理论上来说毕竟还是不能完全放心的,
用不用看你的选择了。

最后,如果有时间,可以考虑做一些测试,对这2种情况实际对比。

有利的部分

erlang是好的,吸引了很多人的兴趣。不少人甚至将erlang体系的影响带到了自己所在的其他语言项目中,构建一些类似erlang的基础设施。包括我。

  1. 随意挥霍的process。 端游从早期的单进程支持一个游戏服,发展到现在的多进程支持一个游戏服。其中涉及多个进程协作的逻辑越来越多,异步任务相互之间由于时间差带来的问题也越来越频繁。
    通常我总会利用状态机或者类似的办法管理异步任务,但并不能顾及到每一处,而且状态机增加了理解代码的难度。
    erlang轻量的process使我能为每个异步任务创建一个独立的执行过程,同时变量不可变的机制几乎消除了多线程中的资源共用的问题。
  2. 发送消息太简单。
    用C++,发送消息从socket干起,还只能发给一个真正的系统进程,然后switch分发处理。
    在异步任务逐渐增多的环境下,有时候很希望语法糖是:能将消息发送给某个异步任务。需要消息平台是一种逻辑上的抽象,而不是底层的socket。
    这个可以C++包装一个,而erlang提供了一个极其完善的面向process的消息系统,process又可以映射到异步任务中。
  3. ETS,还有Mnesia,it’s great
    在多进程支持一个游戏功能的情况下,有大把的数据需要在不同的进程之间同步共享。也用C++包装一个类似的系统能降低很多重复的数据同步工作。
    erlang提供了ETS甚至Mnesia,几乎无条件的为我们提供了数据的分布、持久和恢复。

困难的部分

用erlang开发游戏最困难的地方就是实现场景逻辑。这里没有异步任务,这里是效率、指针和角色相互操作的天堂,但是对erlang来说可能是地狱。
常见的是player/NPC角色之间的立即相互修改,从地图区域查询一批player/NPC角色等。在指针环境下这些操作非常有效率。
一个实时战斗的MMORPG要求核心逻辑要达到25帧/S,区域地图在线1000人。

  1. 如果我们为每个player/NPC映射到process用于交互,那么他们必然使用消息去操作访问对方。 这很美,但是效率不行。因为player/NPC的操作访问实在太频繁了,这一来erlang就变成一个消息繁忙的系统,真正的游戏计算反而耽误了。
  2. 每个场景一个process,场景中所有的player/NPC集中保存到tuple,list中,通过参数传递。
    这样具体操作player/NPC的数据确实快了,但是erlang的原生list类型不是利于搜索的类型,这一趟趟查下来也就将计算时间花光光了。
    另外通过参数传递也会导致每个相关的子函数都要留一个口子。
  3. 每个场景一个process,场景中的所有player/NPC都保存到进程字典中。
    进程字典的存取速度在erlang中算快了,插入100ns,查询40ns。ETS的速度是us。参见进程字典到底有多快

游戏的三种逻辑

如果要用erlang来做MMORPG,那规划一下游戏的基本逻辑模块,根据模块的特性选择不同的erlang模式,是有意义的。像C++那样一种模式就行得通的做法,并不适用于erlang。

  1. 自我修改逻辑
    比如升级,强化之类,就是改改自己的属性,改改自己的技能。
    这种逻辑erlang可以为每个player/NPC创建process负责,也可以一个process处理多个player/NPC的类似逻辑。客户端对于这种操作的延时总能容忍1,2秒,慢慢来,性能没关系。
  2. 低频角色交互逻辑
    比如组队,领取任务,NPC思考等等,角色需要相互操作对方数据。同样也是延时容忍较高,可以尽情发挥erlang在这种交互逻辑中的异步任务支持能力。
  3. 高频角色交互逻辑
    同一个场景中,角色交互频率最高的就是移动和战斗。
    移动影响相互间的视野,战斗则直接读取修改彼此的数据。不要忘了,1000人25帧/S的计算。

移动和战斗

这一块比较纯粹,由一个process为一个场景的player/NPC提供服务,基本上有下面几样内容就搞定计算:

  1. pos,角色坐标
  2. atb,角色属性
  3. skill,技能列表
  4. state,状态列表
  5. 同屏表
  6. 地图数据
  7. 事件列表

前4条属于角色数据,简单操作即可。
同屏表通过四叉树管理角色的空间,为各种类型的群攻判定提供快目标的快速查询。
地图数据为角色检测技能释放的空间合法性,以及战斗寻路。
事件列表则每帧更新技能、状态等数值的计算。

效率优化

仅仅是纯粹的技能效果计算,erlang在虚拟机上的效率肯定不及C++。不过属性计算这块要求的性能并不高,猜测还是能扛下这个计算。
费时、难写的计算可能有两块,同屏表和地图数据。
erlang的数据类型很难高效的处理关系型结构,空间四叉树和地图三角关系用erlang实现起来相当凑合。那么,同屏表和地图的功能到底作为场景战斗process的一个功能,还是以独立process的形式服务呢?
process发送消息的性能大概是1us。
通常一个角色的一次战斗计算需要访问同屏表和地图功能各一次,4us。1000人的话是4ms。1帧40ms,那么每帧大概有30ms左右的时间用于完整的战斗计算,包括同屏计算和地图计算。
为了进一步提高同屏计算和地图计算的性能,可以用C++实现erlang的内置函数。此外,原先逐个角色串行计算同屏功能和地图功能,可以改并行:

  1. 先收集全部角色的同屏计算需求
  2. 将全部计算需求均匀发送到多个同屏表process进行计算
  3. 一边收集结果一边继续计算剩下的战斗部分。

地图计算也采用类似的并行处理,即可提高效率。