This is a simple example written in Erlang of the real-time
group-collaboration algorithm Operational Transformation. OT is an
optimistic algorithm that lets clients apply operations to a document
immediately as they are created, and only synchronizes the changes with the
server after they have been made. If another client has altered the document
while the first client was making the first operation, we can transform each
operation so that the server and every client will be brought back to the
same document state.
Each operation is either an insert or a delete of a single character at an
index in a document. A document is simply a string. This is hardly fit for
real-world use: most documents are more complicated than plain text, and
working one character at a time will create an unneccessary amount of
operations which require transformations. The large amount of operations
taxes bandwitdh more than you would want in a real world application, as
well. I am using single character operations for simplicity.
|
-module(ot).
-include_lib("eunit/include/eunit.hrl").
-export([start/0, stop/0, server/1, client/1, get_doc/0]).
|
Operations
TODO: expand on operations.
xform
The following description of the xform function is taken from
High-Latency, Low-Bandwidth Windowing in the Jupiter Collaboration
System.
The general tool for handling conflicting messages is a function, xform ,
that maps a pair of messages to the fixed up versions. We write
xform(c, s) = {c’, s’}
where c and s are the original client and server messages. The messages c’
and s’ must have the property that if the client applies c followed by s’ ,
and the server applies s followed by c’ , then the client and server will
wind up in the same final state.
In our simple example, there are only 6 possible pairs of operation types
which must be unified with eachother: nop/nop, delete/nop, insert/nop,
delete/delete, insert/insert, and insert/delete.
|
|
*/nop
Transforming a pair of operations when one is nop (no operation) is simple:
we don't really need to actually calculate the transformation anything.
|
xform(nop, nop) ->
{ok, {nop, nop}};
xform(nop, {ins, X, A}) ->
{ok, ;
xform({ins, X, A}, nop) ->
{ok, {nop, {ins, X, A}}};
xform(nop, {del, X}) ->
{ok, ;
xform({del, X}, nop) ->
{ok, {nop, {del, X}}};
|
Delete/Delete
The Delete/Delete pair is the easiest case to transform, besides nop . If
both operations deleted the same character, then emit the nop operation
because they are already in the same state. Otherwise, just shift the index
of the delete with the larger index down by one.
|
xform({del, X}, {del, X}) ->
{ok, {nop, nop}};
xform({del, X}, {del, Y}) when X > Y ->
{ok, ;
xform({del, X}, {del, Y}) when Y > X ->
{ok, ;
|
Insert/Insert
Each insert operation has the form {ins, Index, Character} . We break ties
based on order of the characters, and always keep user data by shifting
insertion indices rather than creating deletion operations.
|
xform({ins, X, A}, {ins, X, A}) ->
{ok, {nop, nop}};
xform({ins, X, A}, {ins, X, B}) when A < B->
{ok, ;
xform({ins, X, A}, {ins, X, B}) when A > B->
{ok, ;
xform({ins, X, A}, {ins, Y, B}) when X > Y ->
{ok, ;
xform({ins, X, A}, {ins, Y, B}) when Y > X ->
{ok, ;
|
Insert/Delete
What do we do when one operation is an insert and the other is a delete? We
always attempt to keep the insert so that we do not lose user data. It is
always better to force users to go back and delete data to clean a document
up than it is to lose data.
|
xform({ins, X, A}, {del, Y}) when X =< Y ->
{ok, ;
xform({ins, X, A}, {del, Y}) when X > Y->
{ok, ;
xform({del, D}, {ins, X, C}) ->
{ok, {Bprime, Aprime}} = xform({ins, X, C}, {del, D}),
{ok, {Aprime, Bprime}};
xform(A, B) ->
{error, {"Do not know how to transform:", A, B}}.
|
apply_op
apply_op takes two arguments: a document (which is currently just a string),
and an operation. It returns a new document which is identical to the first,
accept with the modifications described in the operation applied to it.
|
apply_op(Doc, nop) ->
Doc;
apply_op(Doc, {del, X}) ->
Doc_Length = length(Doc),
lists:sublist(Doc, X) ++ lists:sublist(Doc, X+2, Doc_Length);
apply_op(Doc, {ins, 0, Char}) ->
[Char] ++ Doc;
apply_op([H|T], {ins, X, Char}) ->
[H | apply_op(T, {ins, X-1, Char})].
|
Messages and Client/Server Communication
When passing our operations between client and server, we need to store some
metadata about the operation with it. We will call the combination of an
operation and its metadata a message.
What kind of metadata do we need to store? Well we need to know an
operation's parent document state. It only makes sense to apply an operation
to a document which is in the same state as the document which generated the
operation was in.
The easiest way to represent a document's state is a hash of its
contents. Daniel Spiewak sings the praises of using a hash to represent a
document's state (although, for compatibility with Google Wave, he is forced
into using a diferent technique):
This scheme has some very nice advantages. Given an operation (and its
associated parent hash), we can determine instantly whether or not we have
the appropriate document state to apply said operation. Hashes also have the
very convenient property of converging exactly when the document states
converge.
Daniel's article on understanding Operational
Transformation is a must read for anyone interested in the
topic.
We will represent messages with the structure {msg, Parent, Operation} .
|
hash(Doc) ->
binary_to_list(erlang:md5(Doc)).
apply_message(Doc, {msg, Parent, Op}) ->
case hash(Doc) == Parent of
true ->
{ok, apply_op(Doc, Op)};
false ->
{error, "Cannot apply this message to the document because the
operation was generated from a different document state."}
end.
server(Doc) ->
server(Doc, [], []).
server(Doc, Clients, History) ->
receive
stop ->
?debugMsg("Server stopping~n"),
lists:foreach(fun (C) -> C ! stop end, Clients),
ok;
{doc, Pid} ->
Pid ! Doc,
server(Doc, Clients, History);
{newclient, Pid} ->
?debugFmt("Server got new client ~p~n", [Pid]),
Pid ! {doc, Doc},
server(Doc, [Pid | Clients], History);
{msg, Parent, Op} ->
% TODO: look in history
?debugFmt("Server got new message with op ~p~n", [Op]),
case apply_message(Doc, {msg, Parent, Op}) of
{ok, New_Doc} ->
?debugMsg(" message applied successfully~n"),
broadcast(Clients, {msg, Parent, Op}),
server(New_Doc,
Clients,
[{hash(New_Doc), New_Doc} | History]);
_ ->
?debugMsg(" message did not apply, ignoring it...~n"),
server(Doc, Clients, History)
end
end.
broadcast([], _) ->
ok;
broadcast([C|Clients], Msg) ->
?debugFmt("broadcasting ~p to ~p~n", [Msg, C]),
C ! Msg,
broadcast(Clients, Msg).
client(Server) ->
Server ! {newclient, self()},
receive
{doc, Doc} ->
Server ! {msg, hash(Doc), nop},
client(Server, Doc, [{nop, Doc}])
end.
% TODO: outgoing should be pairs of {Op, Doc}...
client(Server, Doc, Outgoing) ->
receive
{user_op, Op} ->
?debugFmt("Client ~p received user op ~p~n", [self(), Op]),
case Outgoing of
[] ->
?debugMsg(" sending the new operation~n"),
Server ! {msg, hash(Doc), Op},
client(Server, Doc, [{Op, apply_op(Doc, Op)}]);
_ ->
?debugMsg(" queueing the new operation~n"),
client(Server, Doc, Outgoing ++ [{Op, apply_op(Doc, Op)}])
end;
{msg, Hash, Op} ->
?debugFmt("Client ~p got message from server: ~p~n", [self(), {msg, Hash, Op}]),
Hash = hash(Doc),
New_Doc = apply_op(Doc, Op),
case Outgoing of
[{Op, New_Doc} | Rest] ->
?debugMsg(" it was one that I sent~n"),
case Rest of
[{Next_Op, _} | _] ->
?debugFmt("Client ~p sending buffered operation ~p to server~n", [self(), Next_Op]),
Server ! {msg, hash(New_Doc), Next_Op},
client(Server, New_Doc, Rest);
[] ->
client(Server, New_Doc, [])
end;
_ ->
?debugMsg(" it was one someone else sent~n"),
Outgoing1 = transform_each(Outgoing, Op),
client(Server, New_Doc, Outgoing1)
end
end.
transform_each([], _) ->
[];
transform_each([{A, Doc} | Rest], B) ->
{ok, {Aprime, Bprime}} = xform(A, B),
[{Aprime, Doc} | transform_each(Rest, Bprime)].
|
Application
OTP Application interface to play nice with the Erlang ecosystem.
TODO: http://www.erlang.org/doc/design_principles/applications.html
|
start() ->
register(ot_server, spawn_link(ot, server, [""])).
stop() ->
ot_server ! stop.
get_doc() ->
get_doc(ot_server).
get_doc(Server) ->
Server ! {doc, self()},
receive
Doc ->
Doc
end.
|
Tests
|
identical_delete_test() ->
A = B = {del, 3},
{ok, {Ap, Bp}} = xform(A, B),
"foo" = apply_op(apply_op("food", A), Bp),
"foo" = apply_op(apply_op("food", B), Ap).
identical_insert_test() ->
A = B = {ins, 3, $d},
{ok, {Ap, Bp}} = xform(A, B),
"food" = apply_op(apply_op("foo", A), Bp),
"food" = apply_op(apply_op("foo", B), Ap).
two_deletes_a_test() ->
A = {del, 4},
B = {del, 2},
{ok, {Ap, Bp}} = xform(A, B),
"erag" = apply_op(apply_op("erlang", A), Bp),
"erag" = apply_op(apply_op("erlang", B), Ap).
two_deletes_b_test() ->
A = {del, 2},
B = {del, 4},
{ok, {Ap, Bp}} = xform(A, B),
"erag" = apply_op(apply_op("erlang", A), Bp),
"erag" = apply_op(apply_op("erlang", B), Ap).
two_inserts_a_test() ->
A = {ins, 1, $c},
B = {ins, 3, $t},
{ok, {Ap, Bp}} = xform(A, B),
"scooter" = apply_op(apply_op("sooer", A), Bp),
"scooter" = apply_op(apply_op("sooer", B), Ap).
two_inserts_b_test() ->
A = {ins, 3, $t},
B = {ins, 1, $c},
{ok, {Ap, Bp}} = xform(A, B),
"scooter" = apply_op(apply_op("sooer", A), Bp),
"scooter" = apply_op(apply_op("sooer", B), Ap).
insert_delete_a_test() ->
A = {ins, 2, $t},
B = {del, 2},
{ok, {Ap, Bp}} = xform(A, B),
"cat" = apply_op(apply_op("cab", A), Bp),
"cat" = apply_op(apply_op("cab", B), Ap).
insert_delete_b_test() ->
A = {ins, 2, $t},
B = {del, 0},
{ok, {Ap, Bp}} = xform(A, B),
"atb" = apply_op(apply_op("cab", A), Bp),
"atb" = apply_op(apply_op("cab", B), Ap).
insert_delete_c_test() ->
A = {ins, 0, $t},
B = {del, 2},
{ok, {Ap, Bp}} = xform(A, B),
"tca" = apply_op(apply_op("cab", A), Bp),
"tca" = apply_op(apply_op("cab", B), Ap).
client_server_communication_test() ->
Server = spawn_link(ot, server, [" is cool"]),
Client = spawn_link(ot, client, [Server]),
Client ! {user_op, {ins, 0, $n}},
Client ! {user_op, {ins, 1, $i}},
Client ! {user_op, {ins, 2, $c}},
Client ! {user_op, {ins, 3, $k}},
"nick" = get_doc(Server),
Server ! stop.
|