-module(pubsub2).
-export([startDispatcher/0, startClient/0,
subscribe/2, publish/3]).
startClient() ->
Pid = spawn(fun clientLoop/0),
register(client, Pid).
clientLoop() ->
receive {Topic, Message} ->
io:fwrite("Received message ~w for topic ~w~n",
[Message, Topic]),
clientLoop()
end.
subscribe(Host, Topic) ->
{dispatcher, Host} ! {subscribe, node(), Topic}.
publish(Host, Topic, Message) ->
{dispatcher, Host} ! {publish, Topic, Message}.
startDispatcher() ->
Pid = spawn(fun dispatcherLoop/0),
register(dispatcher, Pid).
dispatcherLoop() ->
io:fwrite("Dispatcher started\n"),
dispatcherLoop([]).
dispatcherLoop(Interests) ->
receive
{subscribe, Client, Topic} ->
dispatcherLoop(addInterest(Interests, Client, Topic));
{publish, Topic, Message} ->
Destinations = computeDestinations(Topic, Interests),
send(Topic, Message, Destinations),
dispatcherLoop(Interests)
end.
computeDestinations(_, []) -> [];
computeDestinations(Topic, [{SelectedTopic, Clients}|T]) ->
if SelectedTopic == Topic -> Clients;
SelectedTopic =/= Topic -> computeDestinations(Topic, T)
end.
send(_, _, []) -> ok;
send(Topic, Message, [Client|T]) ->
{client, Client} ! {Topic, Message},
send(Topic, Message, T).
addInterest(Interests, Client, Topic) ->
addInterest(Interests, Client, Topic, []).
addInterest([], Client, Topic, Result) ->
Result ++ [{Topic, [Client]}];
addInterest([{SelectedTopic, Clients}|T], Client, Topic, Result) ->
if SelectedTopic == Topic ->
NewClients = Clients ++ [Client],
Result ++ [{Topic, NewClients}] ++ T;
SelectedTopic =/= Topic ->
addInterest(T, Client, Topic, Result ++ [{SelectedTopic, Clients}])
end.
This post has been edited by macosxnerd101: 12 April 2012 - 06:59 AM
Reason for edit:: Added code tags and moved to Functional Programming

New Topic/Question
Reply


MultiQuote


|