From f5e05c65ff1fd86cd26923c8b5f89be964e6c0be Mon Sep 17 00:00:00 2001 From: gpestana Date: Mon, 18 Jan 2021 19:44:06 +0000 Subject: [PATCH 1/4] clean slate for v2 --- Makefile | 12 --- clock/clock.go | 116 -------------------- clock/clock_test.go | 99 ----------------- e2e/e2e-1_test.go | 77 -------------- e2e/e2e-2_test.go | 122 --------------------- e2e/e2e-3_test.go | 116 -------------------- e2e/e2e-4_test.go | 144 ------------------------- go.mod | 3 + node/node.go | 204 ------------------------------------ node/node_test.go | 60 ----------- operation/cursor.go | 56 ---------- operation/cursor_test.go | 24 ----- operation/operation.go | 61 ----------- operation/operation_test.go | 20 ---- rdoc.go | 150 -------------------------- rdoc_test.go | 87 --------------- utils.go | 54 ---------- 17 files changed, 3 insertions(+), 1402 deletions(-) delete mode 100644 Makefile delete mode 100644 clock/clock.go delete mode 100644 clock/clock_test.go delete mode 100644 e2e/e2e-1_test.go delete mode 100644 e2e/e2e-2_test.go delete mode 100644 e2e/e2e-3_test.go delete mode 100644 e2e/e2e-4_test.go create mode 100644 go.mod delete mode 100644 node/node.go delete mode 100644 node/node_test.go delete mode 100644 operation/cursor.go delete mode 100644 operation/cursor_test.go delete mode 100644 operation/operation.go delete mode 100644 operation/operation_test.go delete mode 100644 rdoc_test.go delete mode 100644 utils.go diff --git a/Makefile b/Makefile deleted file mode 100644 index 770d8c8..0000000 --- a/Makefile +++ /dev/null @@ -1,12 +0,0 @@ -all: test build -ci: pre-build test - -pre-build: - go get . - -build: - go build . - -test: - go vet . - go test ./... -cover diff --git a/clock/clock.go b/clock/clock.go deleted file mode 100644 index 4924c58..0000000 --- a/clock/clock.go +++ /dev/null @@ -1,116 +0,0 @@ -// Lamport timestamp implementation. Every operation has an unique ID in the -// network. Lamport timestamps ensure that if two operarations in different -// network nodes have occurred concurrently, their order is arbitrary but -// deterministic -package clock - -import ( - "hash/adler32" - "strconv" - "strings" -) - -const ( - BASE = 10 -) - -type Clock struct { - seed int64 - count int64 -} - -// Initializes a clock. The `seed` is a string which uniquely identifies the -// clock in the network -func New(seed []byte) Clock { - s := adler32.Checksum(seed) - return Clock{ - seed: int64(s), - count: 1, - } -} - -func (c *Clock) ID() string { - return strconv.FormatInt(c.seed, 10) -} - -// Increments Clock count -func (c *Clock) Tick() { - c.count++ -} - -// Returns Timestamp that uniquely identifies the state (clock and count) in the -// network -func (c Clock) Timestamp() string { - return c.String() -} - -// Updates a Clock based on another clock or string representation. If the -// current Clock count.seed value is higher, no changes are done. Othwerise, the -// clock updates to the upper count -func (c *Clock) Update(rcv interface{}) error { - var err error - rcvC := Clock{} - switch t := rcv.(type) { - case Clock: - rcvC = t - - case string: - rcvC, err = strToClock(t) - if err != nil { - return err - } - } - - rcvCan, err := rcvC.canonical() - if err != nil { - return err - } - - currCan, err := c.canonical() - if err != nil { - return err - } - - if rcvCan > currCan { - c.count = rcvC.count - } - return nil -} - -// Returns the canonical value of clock. The canonical value of the logical -// clock is a float64 type in the form of .. The -// Clock.seed value must be unique per Clock in the network. -func (c Clock) canonical() (float64, error) { - fc, err := strconv.ParseFloat(c.String(), 10) - return fc, err -} - -// Converts string to Clock. The input string is expected to have format -// "counter>." -func strToClock(s string) (Clock, error) { - c := Clock{} - str := strings.Split(s, ".") - count, err := strconv.Atoi(str[0]) - if err != nil { - return c, err - } - seed, err := strconv.Atoi(str[1]) - if err != nil { - return c, err - } - - c.count = int64(count) - c.seed = int64(seed) - return c, nil -} - -func (c *Clock) String() string { - cnt := strconv.FormatInt(c.count, BASE) - sd := strconv.FormatInt(c.seed, BASE) - return cnt + "." + sd -} - -// Convert string to Clock -func ConvertString(c string) (Clock, error) { - return strToClock(c) -} diff --git a/clock/clock_test.go b/clock/clock_test.go deleted file mode 100644 index 333b818..0000000 --- a/clock/clock_test.go +++ /dev/null @@ -1,99 +0,0 @@ -package clock - -import ( - "fmt" - "testing" -) - -func TestConstructor(t *testing.T) { - seed := []byte("clock_1") - clk := New(seed) - initialCount := 1 - seedHash := int64(187368093) - - if clk.count != 1 { - t.Error(fmt.Sprintf("Constructor: Clock should be initialized with count=%v", initialCount)) - } - if clk.seed != seedHash { - t.Error(fmt.Sprintf("Constructor: Clock should be initialized with seed=%v", seedHash)) - } -} - -func TestClockTick(t *testing.T) { - clk := New([]byte("clk")) - clk.Tick() - clk.Tick() - clk.Tick() - if clk.count != 4 { - t.Error("Tick: Clock count should be 4") - } -} - -func TestTimestamp(t *testing.T) { - clk := New([]byte("clock_1")) - expectedTs := "1.187368093" - actualTs := clk.Timestamp() - - if actualTs != expectedTs { - t.Error(fmt.Sprintf("Timestamp: actual ts is %v, should be %v", actualTs, expectedTs)) - } - -} - -func TestString(t *testing.T) { - clk := New([]byte("clock_1")) - expectedStr := "1.187368093" - actualStr := clk.String() - - if actualStr != expectedStr { - t.Error(fmt.Sprintf("String: actual string is %v, should be %v", actualStr, expectedStr)) - } -} - -func TestUpdateClock(t *testing.T) { - clk1 := New([]byte("clk1")) - - clk1.Update("123.321") - if clk1.count != 123 { - t.Error(fmt.Sprintf("Clock count should be: 123, had %v", clk1.count)) - } -} - -func TestUpdateClockString(t *testing.T) { - clk1 := New([]byte("clk1")) - clk2 := New([]byte("clk2")) - - clk1.Tick() - clk2.Update(clk1) - if clk2.count != 2 { - t.Error(fmt.Sprintf("Clock 2 should have same count as Clock 1: %v != %v", clk1.count, clk2.count)) - } - - clk2.Tick() - clk2.Tick() - clk1.Update(clk2) - // test also idempotent update - clk1.Update(clk2) - clk1.Update(clk2) - if clk1.count != 4 { - t.Error(fmt.Sprintf("Clock 2 should have same count as Clock 1: %v != %v", clk1.count, clk2.count)) - } -} - -func TestConvertString(t *testing.T) { - c := "21" - expC := int64(21) - s := "31231233" - expS := int64(31231233) - - clk, err := ConvertString(fmt.Sprintf("%v.%v", c, s)) - if err != nil { - t.Fatal(err) - } - if clk.count != expC { - t.Error(fmt.Sprintf("ConvertString: count is %v, should be %v", clk.count, expC)) - } - if clk.seed != expS { - t.Error(fmt.Sprintf("ConvertString: seed is %v, should be %v", clk.seed, expS)) - } -} diff --git a/e2e/e2e-1_test.go b/e2e/e2e-1_test.go deleted file mode 100644 index 3b17291..0000000 --- a/e2e/e2e-1_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// e2e tests map to the examples of the original paper -package rdoc - -import ( - "github.com/gpestana/rdoc" - n "github.com/gpestana/rdoc/node" - op "github.com/gpestana/rdoc/operation" - "testing" -) - -// Case A: different value assignment of a register in different replicas -func TestCaseA(t *testing.T) { - id1 := "1" - doc1 := rdoc.Init(id1) - - id2 := "2" - doc2 := rdoc.Init(id2) - emptyC := op.NewEmptyCursor() - - // contructs operation to initially populate the docs - nmap1 := n.New("op-doc1") - mut1, _ := op.NewMutation(op.Assign, "key", nmap1) - op1, _ := op.New(id1+".0", []string{}, emptyC, mut1) // using id1 means that the operation was generated by id1 - - nmap2 := n.New("op-doc2") - mut2, _ := op.NewMutation(op.Assign, "key", nmap2) - op2, _ := op.New(id1+".0", []string{}, emptyC, mut2) - - _, err := doc1.ApplyOperation(*op1) - if err != nil { - t.Fatal(err) - } - _, err = doc2.ApplyRemoteOperation(*op2) - if err != nil { - t.Fatal(err) - } - - // constructs and applies locally operation from replica 1 - mut3, _ := op.NewMutation(op.Assign, nil, "B") - cur3 := op.NewCursor("key", op.MapKey{"key"}) - op3, _ := op.New(id1+".1", []string{id1 + ".0"}, cur3, mut3) - doc1.ApplyOperation(*op3) - - // constructs and applies locally operation for replica 2 - mut4, _ := op.NewMutation(op.Assign, nil, "C") - cur4 := op.NewCursor("key", op.MapKey{"key"}) - op4, _ := op.New(id2+".1", []string{id1 + ".0"}, cur4, mut4) - doc2.ApplyOperation(*op4) - - // at this moment, we have: - // doc1: {"key": MVR{1.1: "B"}} - // doc2: {"key": MVR{2.1: "C"}} - - // network communication: cross-apply operations in replica 1 and 2 - doc1.ApplyRemoteOperation(*op4) - doc2.ApplyRemoteOperation(*op3) - - // after network communication, we have: - // doc1: {"key": MVR{1.1: "B", 2.1: "C"}} - // doc2: {"key": MVR{2.1: "C", 1.1: "b"}} - - if len(doc1.Head.Map().Values()) != 1 { - t.Error("In doc1, lenght of Head.Map should be 1") - } - - if len(doc2.Head.Map().Values()) != 1 { - t.Error("In doc2, lenght of Head.Map should be 2") - } - - if len(nmap1.GetMVRegister()) != 2 { - t.Error("In doc1, lenght of MVRegister should be 2") - } - - if len(nmap2.GetMVRegister()) != 2 { - t.Error("In doc2, lenght of MVRegister should be 2") - } -} diff --git a/e2e/e2e-2_test.go b/e2e/e2e-2_test.go deleted file mode 100644 index eb2a1b2..0000000 --- a/e2e/e2e-2_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// e2e tests map to the examples of the original paper -package rdoc - -import ( - "fmt" - "github.com/gpestana/rdoc" - n "github.com/gpestana/rdoc/node" - op "github.com/gpestana/rdoc/operation" - "testing" -) - -// Case B: Modifying the contents of a nested map while concurrently the entire -// map is overwritten. -func TestCaseB(t *testing.T) { - id1, id2 := "1", "2" - doc1, doc2 := rdoc.Init(id1), rdoc.Init(id2) - - // doc1: initial state: {"colors": { "blue": "#0000ff" }} - curDoc1 := op.NewCursor("colors", op.MapKey{"colors"}) - mutDoc1, _ := op.NewMutation(op.Noop, nil, nil) - opColors1, _ := op.New(id1+".1", []string{}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opColors1) - - curDoc1 = op.NewCursor("blue", op.MapKey{"colors"}) - mutDoc1, _ = op.NewMutation(op.Insert, "blue", "#0000ff") - opBlue1, _ := op.New(id1+".2", []string{id1 + ".1"}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opBlue1) - - // doc2: initial state: {"colors": { "blue": "#0000ff" }} - curDoc2 := op.NewCursor("colors", op.MapKey{"colors"}) - mutDoc2, _ := op.NewMutation(op.Noop, nil, nil) - opColors2, _ := op.New(id2+".1", []string{}, curDoc2, mutDoc2) - doc2.ApplyOperation(*opColors2) - - curDoc2 = op.NewCursor("blue", op.MapKey{"colors"}) - mutDoc2, _ = op.NewMutation(op.Insert, "blue", "#0000ff") - opBlue2, _ := op.New(id2+".2", []string{id2 + ".1"}, curDoc2, mutDoc2) - doc2.ApplyOperation(*opBlue2) - - // sync - doc1.ApplyRemoteOperation(*opColors2) - doc1.ApplyRemoteOperation(*opBlue2) - - doc2.ApplyRemoteOperation(*opColors1) - doc2.ApplyRemoteOperation(*opBlue1) - - // doc1: insert KV {"red": "#ff0000"} to map - curDoc1 = op.NewCursor("red", op.MapKey{"colors"}) - mutDoc1, _ = op.NewMutation(op.Insert, "red", "#ff0000") - opRed, _ := op.New(id1+".3", []string{id1 + ".1", id1 + ".2", id2 + ".1", id2 + ".2"}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opRed) - - // doc2: 1) clear map "colors"; 2) insert {"green": "#00ff00"} to "colors" map - nEmpty := n.New("colors") - curDoc2 = op.NewEmptyCursor() - mutDoc2, _ = op.NewMutation(op.Assign, "color", nEmpty) - opClear, _ := op.New(id2+".3", []string{id1 + ".1", id1 + ".2", id2 + ".1", id2 + ".2"}, curDoc2, mutDoc2) - doc2.ApplyOperation(*opClear) - - curDoc2 = op.NewCursor("colors", op.MapKey{"colors"}) - mutDoc2, _ = op.NewMutation(op.Insert, "green", "#00ff00") - opGreen, _ := op.New(id2+".4", []string{id1 + ".1", id1 + ".2", id2 + ".1", id2 + ".2", id2 + ".3"}, curDoc2, mutDoc2) - - doc2.ApplyOperation(*opGreen) - - // sync again - doc1.ApplyRemoteOperation(*opClear) - doc1.ApplyRemoteOperation(*opGreen) - doc2.ApplyRemoteOperation(*opRed) - - // doc1: verifications - doc1ColorsIf, _ := doc1.Head.Map().Get("colors") - doc1Colors := doc1ColorsIf.(*n.Node) - - if doc1Colors.Map().Size() != 3 { - t.Error(fmt.Printf("doc1.colors should have 3 elements, got %v", doc1Colors.Map().Size())) - } - - doc1Blue, _ := doc1Colors.Map().Get("blue") - doc1BlueDeps := doc1Blue.(*n.Node).Deps() - if len(doc1BlueDeps) != 0 { - t.Error("doc1: dependencies of 'blue' must be all cleared, got ", doc1BlueDeps) - } - - doc1Red, _ := doc1Colors.Map().Get("red") - doc1RedDeps := doc1Red.(*n.Node).Deps() - if len(doc1RedDeps) == 0 { - t.Error("doc1: dependencies of 'red' must NOT be all cleared, got ", doc1RedDeps) - } - - doc1Green, _ := doc1Colors.Map().Get("green") - doc1GreenDeps := doc1Green.(*n.Node).Deps() - if len(doc1GreenDeps) == 0 { - t.Error("doc1: dependencies of 'green' must NOT be all cleared, got ", doc1GreenDeps) - } - - // doc2: verifications - doc2ColorsIf, _ := doc2.Head.Map().Get("colors") - doc2Colors := doc2ColorsIf.(*n.Node) - - if doc2Colors.Map().Size() != 3 { - t.Error(fmt.Printf("doc2.colors should have 3 elements, got %v", doc2Colors.Map().Size())) - } - - doc2Blue, _ := doc2Colors.Map().Get("blue") - doc2BlueDeps := doc2Blue.(*n.Node).Deps() - if len(doc2BlueDeps) != 0 { - t.Error("doc2: dependencies of 'blue' must be all cleared, got ", doc2BlueDeps) - } - - doc2Red, _ := doc2Colors.Map().Get("red") - doc2RedDeps := doc2Red.(*n.Node).Deps() - if len(doc2RedDeps) == 0 { - t.Error("doc2: dependencies of 'red' must NOT be all cleared, got ", doc2RedDeps) - } - - doc2Green, _ := doc2Colors.Map().Get("green") - doc2GreenDeps := doc2Green.(*n.Node).Deps() - if len(doc2GreenDeps) == 0 { - t.Error("doc2: dependencies of 'green' must NOT be all cleared, got ", doc2GreenDeps) - } -} diff --git a/e2e/e2e-3_test.go b/e2e/e2e-3_test.go deleted file mode 100644 index a0ece32..0000000 --- a/e2e/e2e-3_test.go +++ /dev/null @@ -1,116 +0,0 @@ -// e2e tests map to the examples of the original paper -package rdoc - -import ( - "fmt" - "github.com/gpestana/rdoc" - n "github.com/gpestana/rdoc/node" - op "github.com/gpestana/rdoc/operation" - "testing" -) - -//Case C: Two replicas concurrently create ordered lists under the same map keu -func TestCaseC(t *testing.T) { - id1, id2 := "1", "2" - doc1 := rdoc.Init(id1) - doc2 := rdoc.Init(id2) - - // doc1 adds key "grosseries" to map at root level - curDoc1 := op.NewCursor("groceries", op.MapKey{"groceries"}) - mutDoc1, _ := op.NewMutation(op.Noop, nil, nil) - opList1, _ := op.New(id1+".1", []string{}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opList1) - - // doc1 adds "eggs" and "ham" entries to list - curDoc1 = op.NewCursor("groceries", op.MapKey{"groceries"}) - mutDoc1, _ = op.NewMutation(op.Insert, 0, "eggs") - opEggs, _ := op.New(id1+".2", []string{id1 + ".1"}, curDoc1, mutDoc1) - _, err := doc1.ApplyOperation(*opEggs) - if err != nil { - t.Error(err) - } - - curDoc1 = op.NewCursor("groceries", op.MapKey{"groceries"}) - mutDoc1, _ = op.NewMutation(op.Insert, 1, "ham") - opHam, _ := op.New(id1+".3", []string{id1 + ".1", id1 + ".2"}, curDoc1, mutDoc1) - _, err = doc1.ApplyOperation(*opHam) - if err != nil { - t.Error(err) - } - - // doc2 adds key "grosseries" to map at root level - curDoc2 := op.NewCursor("groceries", op.MapKey{"groceries"}) - mutDoc2, _ := op.NewMutation(op.Noop, nil, nil) - opList2, _ := op.New(id2+".1", []string{}, curDoc2, mutDoc2) - doc2.ApplyOperation(*opList2) - - // doc2 adds "milk" and "flour" entries to list - curDoc2 = op.NewCursor("groceries", op.MapKey{"groceries"}) - mutDoc2, _ = op.NewMutation(op.Insert, 0, "milk") - opMilk, _ := op.New(id2+".2", []string{id2 + ".1"}, curDoc2, mutDoc2) - _, err = doc2.ApplyOperation(*opMilk) - if err != nil { - t.Error(err) - } - - curDoc2 = op.NewCursor("groceries", op.MapKey{"groceries"}) - mutDoc2, _ = op.NewMutation(op.Insert, 1, "flour") - opFlour, _ := op.New(id2+".3", []string{id2 + ".1", id2 + ".2"}, curDoc2, mutDoc2) - _, err = doc2.ApplyOperation(*opFlour) - if err != nil { - t.Error(err) - } - - // applies remote operations in both replicas - doc1.ApplyRemoteOperation(*opList2) - _, err = doc1.ApplyRemoteOperation(*opMilk) - if err != nil { - t.Fatal(err) - } - _, err = doc1.ApplyRemoteOperation(*opFlour) - if err != nil { - t.Fatal(err) - } - - doc2.ApplyRemoteOperation(*opList1) - _, err = doc2.ApplyRemoteOperation(*opEggs) - if err != nil { - t.Fatal(err) - } - _, err = doc2.ApplyRemoteOperation(*opHam) - if err != nil { - t.Fatal(err) - } - - // verifications - doc1If, _ := doc1.Head.Map().Get("groceries") - doc1Groceries := doc1If.(*n.Node).List() - if doc1Groceries.Size() != 4 { - t.Error(fmt.Sprintf("Doc1 grosseries list should have 4 items after applying remote operations, got %v", doc1Groceries.Size())) - } - - doc2If, _ := doc2.Head.Map().Get("groceries") - doc2Groceries := doc2If.(*n.Node).List() - if doc2Groceries.Size() != 4 { - t.Error(fmt.Sprintf("Doc2 grosseries list should have 4 items after applying remote operations, got %v", doc2Groceries.Size())) - } - - if doc1Groceries.Size() != doc2Groceries.Size() { - t.Error(fmt.Sprintf("List must have same number of elements in doc1 (got: %v) and doc2 (got: %v)", doc1Groceries.Size(), doc2Groceries.Size())) - } - - // compares list elements order - var list1Keys string - var list2Keys string - for i := 0; i < doc1Groceries.Size(); i++ { - el1, _ := doc1Groceries.Get(i) - el2, _ := doc2Groceries.Get(i) - el1Keys := el1.(*n.Node).Reg().Keys() - el2Keys := el2.(*n.Node).Reg().Keys() - list1Keys = fmt.Sprintf("%v %v", list1Keys, el1Keys[0].(string)) - list2Keys = fmt.Sprintf("%v %v", list2Keys, el2Keys[0].(string)) - } - if list1Keys != list2Keys { - t.Error(fmt.Sprintf("Final list state did not converge in both replicas: %v != %v", list1Keys, list2Keys)) - } -} diff --git a/e2e/e2e-4_test.go b/e2e/e2e-4_test.go deleted file mode 100644 index 6c890e0..0000000 --- a/e2e/e2e-4_test.go +++ /dev/null @@ -1,144 +0,0 @@ -// e2e tests map to the examples of the original paper -package rdoc - -import ( - "fmt" - "github.com/emirpasic/gods/lists/arraylist" - "github.com/gpestana/rdoc" - n "github.com/gpestana/rdoc/node" - op "github.com/gpestana/rdoc/operation" - "testing" -) - -// Case D: 4. Concurrent editing of an ordered list of characters (i.e., a text -// document). -func TestCaseD(t *testing.T) { - id1, id2 := "1", "2" - doc1, doc2 := rdoc.Init(id1), rdoc.Init(id2) - - // doc1: populates head of doc with ["a", "b", "c"] - curDoc1 := op.NewEmptyCursor() - mutDoc1, _ := op.NewMutation(op.Insert, 0, "a") - opInsert1a, _ := op.New(id1+".1", []string{}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opInsert1a) - - mutDoc1, _ = op.NewMutation(op.Insert, 1, "b") - opInsert1b, _ := op.New(id1+".2", []string{id1 + ".1"}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opInsert1b) - - mutDoc1, _ = op.NewMutation(op.Insert, 2, "c") - opInsert1c, _ := op.New(id1+".3", []string{id1 + ".1", id1 + ".2"}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opInsert1c) - - // doc2: populates head of doc with ["a", "b", "c"] (through sync so that both - // replicas have the same state) - doc2.ApplyRemoteOperation(*opInsert1a) - doc2.ApplyRemoteOperation(*opInsert1b) - doc2.ApplyRemoteOperation(*opInsert1c) - - // doc1: delete element position 1 ("b") - curDel := op.NewCursor(1, op.ListKey{1}) - mutDoc1, _ = op.NewMutation(op.Delete, nil, nil) - opDelete1b, _ := op.New(id1+".4", []string{id1 + ".1", id1 + ".2", id1 + ".3"}, curDel, mutDoc1) - doc1.ApplyOperation(*opDelete1b) - - // doc1: insert element "x" position 1 - mutDoc1, _ = op.NewMutation(op.Insert, 1, "x") - opInsert1x, _ := op.New(id1+".5", []string{id1 + ".1", id1 + ".2", id1 + ".3", id1 + ".4"}, curDoc1, mutDoc1) - doc1.ApplyOperation(*opInsert1x) - - // doc1: initial verifications - list1 := doc1.Head.List() - - elA1, _ := list1.Get(0) - elA1Val, _ := elA1.(*n.Node).Reg().Get("1.1") - elA1DepsLen := len(elA1.(*n.Node).Deps()) - if elA1Val != "a" { - t.Error("doc1: element 0 must be value 'a', got ", elA1Val) - } - if elA1DepsLen != 1 { - t.Error("doc1: element '0:a' must have 1 dependency, got", elA1DepsLen) - } - - elX1, _ := list1.Get(1) - elX1Val, _ := elX1.(*n.Node).Reg().Get("1.5") - elX1DepsLen := len(elX1.(*n.Node).Deps()) - if elX1Val != "x" { - t.Error("doc1: element 1 must be value 'x', got ", elX1Val) - } - if elX1DepsLen != 5 { - t.Error("doc1: element '1:b' must have 0 dependencies, got", elX1DepsLen) - } - - elB1, _ := list1.Get(2) - elB1Val, _ := elB1.(*n.Node).Reg().Get("1.2") - elB1DepsLen := len(elB1.(*n.Node).Deps()) - if elB1Val != "b" { - t.Error("doc1: element 2 must be value 'b', got ", elB1Val) - } - if elB1DepsLen != 0 { - t.Error("doc1: element '2:b' must have 0 dependencies, got", elB1DepsLen) - } - - elC1, _ := list1.Get(3) - elC1Val, _ := elC1.(*n.Node).Reg().Get("1.3") - elC1DepsLen := len(elC1.(*n.Node).Deps()) - if elC1Val != "c" { - t.Error("doc1: element 3 must be value 'c', got ", elC1Val) - } - if elC1DepsLen != 3 { - t.Error("doc1: element '3:c' must have 3 dependencies, got", elC1DepsLen) - } - - // doc2: insert element "y" position 0 - curDoc2 := op.NewEmptyCursor() - mutDoc2, _ := op.NewMutation(op.Insert, 0, "y") - opInsert2y, _ := op.New(id2+".4", []string{id1 + ".1", id1 + ".2", id1 + ".3"}, curDoc2, mutDoc2) - doc2.ApplyOperation(*opInsert2y) - - // doc2: insert element "z" position 3 - mutDoc2, _ = op.NewMutation(op.Insert, 2, "z") - opInsert2z, _ := op.New(id2+".5", []string{id1 + ".1", id1 + ".2", id1 + ".3", id2 + ".4"}, curDoc2, mutDoc2) - doc2.ApplyOperation(*opInsert2z) - - // sync - doc1.ApplyRemoteOperation(*opInsert2y) - doc1.ApplyRemoteOperation(*opInsert2z) - doc2.ApplyRemoteOperation(*opDelete1b) - doc2.ApplyRemoteOperation(*opInsert1x) - - // verifications - doc1List := doc1.Head.List() - doc2List := doc2.Head.List() - - if doc1List.Size() != 6 { - t.Error("doc1: size of list must be 6, got ", doc1List.Size()) - } - - if doc2List.Size() != 6 { - t.Error("doc2: size of list must be 6, got ", doc2List.Size()) - } - - doc1Vals := getListValues(doc1List) - doc2Vals := getListValues(doc2List) - - if len(doc1Vals) != len(doc2Vals) { - t.Fatal(fmt.Sprintf("Lenght of doc1 and doc2 lists must be the same, got %v vs %v", doc1Vals, doc2Vals)) - } - - for i := 0; i < len(doc1Vals); i++ { - if doc1Vals[i] != doc2Vals[i] { - //t.Error(fmt.Sprintf("Elements should be ordered equally, got: (%v:%v) vs (%v, %v) l1: %v; l2: %v", i, doc1Vals[i], i, doc2Vals[i], doc1Vals, doc2Vals)) - } - } -} - -func getListValues(l *arraylist.List) []string { - vals := []string{} - for i := 0; i < l.Size(); i++ { - e, _ := l.Get(i) - el := e.(*n.Node) - vals = append(vals, el.Reg().Values()[0].(string)) - } - return vals -} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2808b1e --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/gpestana/rdoc + +go 1.14 diff --git a/node/node.go b/node/node.go deleted file mode 100644 index d93f47d..0000000 --- a/node/node.go +++ /dev/null @@ -1,204 +0,0 @@ -// mainly by rdoc.Mutate() when applying mutations and rdoc.Traverse() when -// traversing the tree -package node - -import ( - "errors" - "fmt" - "github.com/emirpasic/gods/lists/arraylist" - "github.com/emirpasic/gods/maps/hashmap" - "github.com/gpestana/rdoc/clock" - "reflect" - _ "strconv" -) - -type Node struct { - deps []string - // operation id that originated the node - opId string - // node may be a map - hmap *hashmap.Map - // node may be a list - list *arraylist.List - // node may be a register - reg *hashmap.Map -} - -func New(opId string) *Node { - return &Node{ - deps: []string{}, - opId: opId, - hmap: hashmap.New(), - list: arraylist.New(), - reg: hashmap.New(), - } -} - -func (n *Node) AddDependency(dep string) { - n.deps = append(n.deps, dep) -} - -func (n *Node) ClearDependency(dep string) { - n.deps = filter(n.deps, dep) -} - -// returns a child node which is part of the list or map -func (n *Node) GetChild(k interface{}) (*Node, bool, error) { - switch key := k.(type) { - case string: - ni, exists := n.hmap.Get(key) - if exists { - n := ni.(*Node) - return n, exists, nil - } - case int: - ni, exists := n.list.Get(key) - if exists { - n := ni.(*Node) - return n, exists, nil - } - default: - return nil, false, errors.New("Node child is stored in list or map, key must be int or string") - } - // child with key `k` does not exist - return nil, false, nil -} - -// returns map with all values associated to a given key. the map is indexed by -// operation ID - the operation that created the KV pair in first place -func (n *Node) GetMVRegister() map[string]interface{} { - keys := n.reg.Keys() - mvrMap := make(map[string]interface{}) - for _, k := range keys { - v, _ := n.reg.Get(k) - mvrMap[k.(string)] = v - } - return mvrMap -} - -// adds a value to the node -func (n *Node) Add(k interface{}, v interface{}, opId string) (*Node, error) { - var err error - var ok bool - var node *Node - switch key := k.(type) { - case string: - // adds to map - node, ok = v.(*Node) - if !ok { - node, err = newNodeWithRegisterValue(v, opId) - if err != nil { - return node, err - } - } - n.hmap.Put(key, node) - case int: - // adds to list - node, ok = v.(*Node) - if !ok { - node, err = newNodeWithRegisterValue(v, opId) - if err != nil { - return node, err - } - } - - // checks if element in position already exists - // if so, calculates proper position for new elemnt in the list - _, exists := n.list.Get(key) - if exists { - key = calculatePositionInsert(n.List(), node, key) - } - n.list.Insert(key, node) - - case nil: - // adds to mvregister - n.reg.Put(opId, v) - default: - return nil, errors.New("Key type must be of type string (map element), int (list element) or nil (register)") - } - - return node, nil -} - -// returns all direct non-leaf children (maps and lists) from node -func (n *Node) GetChildren() []*Node { - var ich []interface{} - ich = append(ich, n.list.Values()...) - ich = append(ich, n.hmap.Values()...) - - ch := make([]*Node, len(ich)) - - for i, c := range ich { - ch[i] = c.(*Node) - } - return ch -} - -func (n *Node) Deps() []string { - return n.deps -} - -func (n *Node) SetDeps(deps []string) { - n.deps = deps -} - -// testing purposes only -func (n *Node) Reg() *hashmap.Map { - return n.reg -} - -// testing purposes only -func (n *Node) Map() *hashmap.Map { - return n.hmap -} - -// testing purposes only -func (n *Node) List() *arraylist.List { - return n.list -} - -func filter(deps []string, dep string) []string { - ndeps := []string{} - for _, d := range deps { - if d != dep { - ndeps = append(ndeps, d) - } - } - return ndeps -} - -// creates a new node with value in register (string or int) -func newNodeWithRegisterValue(v interface{}, opId string) (*Node, error) { - switch v.(type) { - case string: - case int: - default: - return nil, errors.New(fmt.Sprintf("register value must be int or string, got %v", reflect.TypeOf(v))) - } - n := New(opId) - n.reg.Put(opId, v) - return n, nil -} - -// the real position to insert the element is relative to all operations from -// the same replica. all operations from same replica are in ascendent order -func calculatePositionInsert(list *arraylist.List, new *Node, key int) int { - newClock, _ := clock.ConvertString(new.opId) - var baseIndex int - - calculateBaseIndex := func() int { - for baseIndex := 0; baseIndex < list.Size(); baseIndex++ { - eif, _ := list.Get(baseIndex) - elClock, _ := clock.ConvertString(eif.(*Node).opId) - - // calculate base index (index relative to the other operation elements) - if newClock.Timestamp() > elClock.Timestamp() { - return baseIndex - } - } - return list.Size() - } - - baseIndex = calculateBaseIndex() - return baseIndex + key -} diff --git a/node/node_test.go b/node/node_test.go deleted file mode 100644 index e8c04ca..0000000 --- a/node/node_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package node - -import ( - "fmt" - "testing" -) - -func TestNodeConstruction(t *testing.T) { - n1, n2, n3 := New("n1"), New("n2"), New("n3") - - // { "hello": ["world"] } - _, err := n1.Add("hello", n2, "op1") - checkError(err, t) - _, err = n2.Add(0, n3, "op2") - checkError(err, t) - _, err = n3.Add(nil, "world", "op3") - checkError(err, t) - - n2r, exists, err := n1.GetChild("hello") - checkError(err, t) - if !exists { - t.Error("Key 'hello' should exist in n1 map") - } - - n3r, exists, err := n2r.GetChild(0) - checkError(err, t) - if !exists { - t.Error("Key 0 should exist in n2 list") - } - - mvr := n3r.GetMVRegister() - if mvr["op3"] != "world" { - t.Error(fmt.Sprintf("Register should be map['op3']:world, got %v", mvr)) - } -} - -func TestDependenciesMgmt(t *testing.T) { - n := New("node") - d1, d2, d3 := "d1", "d2", "d3" - n.AddDependency(d1) - n.AddDependency(d2) - n.AddDependency(d3) - - if len(n.deps) != 3 { - t.Error(fmt.Sprintf("Node should have 3 dependencies, got %v", len(n.deps))) - } - - n.ClearDependency(d1) - n.ClearDependency(d2) - - if len(n.deps) != 1 { - t.Error(fmt.Sprintf("Node should have 1 dependency after clearing, got %v", len(n.deps))) - } -} - -func checkError(err error, t *testing.T) { - if err != nil { - t.Error("Error should not happen in this case: ", err) - } -} diff --git a/operation/cursor.go b/operation/cursor.go deleted file mode 100644 index 469e89b..0000000 --- a/operation/cursor.go +++ /dev/null @@ -1,56 +0,0 @@ -package operation - -const ( - MapT = iota - ListT - RegT -) - -// A cursor identifies unambiguous a position in the JSON document by describing -// the path from the root until the leaf/node selected and the element ID -type Cursor struct { - Path []CursorElement - Key interface{} -} - -func NewCursor(key interface{}, path ...CursorElement) Cursor { - c := Cursor{} - for _, e := range path { - c.Path = append(c.Path, e) - } - c.Key = key - return c -} - -func NewEmptyCursor() Cursor { - return Cursor{} -} - -type CursorElement interface { - Get() interface{} - Type() int -} - -type MapKey struct { - Key string -} - -func (k MapKey) Get() interface{} { - return k.Key -} - -func (k MapKey) Type() int { - return MapT -} - -type ListKey struct { - Key int -} - -func (k ListKey) Get() interface{} { - return k.Key -} - -func (l ListKey) Type() int { - return ListT -} diff --git a/operation/cursor_test.go b/operation/cursor_test.go deleted file mode 100644 index 9bafde8..0000000 --- a/operation/cursor_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package operation - -import ( - "testing" -) - -func TestCursor(t *testing.T) { - k := "some_key" - // c points at: map.anothermap[0] - c := NewCursor( - k, - MapKey{"map"}, - MapKey{"another_map"}, - ListKey{0}, - ) - - if c.Path[1].Type() != MapT { - t.Error("Cursor should be of type MapT") - } - - if c.Path[2].Type() != ListT { - t.Error("Cursor should be of type ListT") - } -} diff --git a/operation/operation.go b/operation/operation.go deleted file mode 100644 index 880357f..0000000 --- a/operation/operation.go +++ /dev/null @@ -1,61 +0,0 @@ -// Implements CRDT operations. -// Everytime the state of the JSON document changes, an operations that -// describes the mutation is generated. The operation is kept for conflict -// resolution. The document state is the set of all operations ran against -// itself. -package operation - -import ( - "strings" -) - -const ( - Insert = iota - Delete - Assign - Noop -) - -type Operation struct { - // Lamport timestamp (implemented in clock.Clock) which uniquely identifies - // the operation in the network - ID string - // Set of casual dependencies of the operation (all operations that - // happened before the current operation) - Deps []string - // Ambiguously identifies the position in the JSON object to apply the - // operation by describing a path from the root of the document tree to some - // branch or leaf node - Cursor Cursor - // Mutation requested at the specific operation's position - Mutation Mutation -} - -// Returns new Operation object -func New(id string, deps []string, cursor Cursor, m Mutation) (*Operation, error) { - return &Operation{ - ID: id, - Deps: deps, - Cursor: cursor, - Mutation: m, - }, nil -} - -// Returns ID of the node which generated the operation -func (op Operation) NodeID() string { - splId := strings.Split(op.ID, ".") - seed := splId[1] - return seed -} - -type Mutation struct { - // Type of the mutation. Can be one of {insert(v), delete, assign(v)} - Type int - Key interface{} - Value interface{} -} - -// Returns new Mutation -func NewMutation(typ int, k interface{}, v interface{}) (Mutation, error) { - return Mutation{Type: typ, Key: k, Value: v}, nil -} diff --git a/operation/operation_test.go b/operation/operation_test.go deleted file mode 100644 index 5615b90..0000000 --- a/operation/operation_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package operation - -import ( - "fmt" - "testing" -) - -func TestNodeID(t *testing.T) { - nid := "123123123" - cursor := NewCursor(0) - op, err := New("10."+nid, []string{}, cursor, Mutation{}) - if err != nil { - t.Fatal(err) - } - - actualNid := op.NodeID() - if actualNid != nid { - t.Error(fmt.Sprintf("Expected Node ID %v, had %v", nid, actualNid)) - } -} diff --git a/rdoc.go b/rdoc.go index 9ea878b..71b83ae 100644 --- a/rdoc.go +++ b/rdoc.go @@ -1,151 +1 @@ package rdoc - -import ( - "fmt" - "github.com/gpestana/rdoc/clock" - n "github.com/gpestana/rdoc/node" - op "github.com/gpestana/rdoc/operation" -) - -type Doc struct { - Id string - Clock clock.Clock - OperationsId []string - Head *n.Node - OperationsBuffer []op.Operation -} - -// Returns a new rdoc data structure. It receives an ID which must be -// unique in the context of the network. -func Init(id string) *Doc { - headNode := n.New("") - c := clock.New([]byte(id)) - return &Doc{ - Id: id, - Clock: c, - OperationsId: []string{}, - Head: headNode, - OperationsBuffer: []op.Operation{}, - } -} - -func (d *Doc) ApplyRemoteOperation(o op.Operation) (*Doc, error) { - // if operation has been applied already, skip - if containsId(d.OperationsId, o.ID) { - return d, nil - } - // if operation dependencies havent been all applied in the document, buffer - // the operation - missingOp := diff(o.Deps, d.OperationsId) - if len(missingOp) != 0 { - d.OperationsBuffer = append(d.OperationsBuffer, o) - return d, nil - } - return d.ApplyOperation(o) -} - -func (d *Doc) ApplyOperation(o op.Operation) (*Doc, error) { - nPtr, travNodes, createdNodes := d.traverse(o.Cursor, o.ID) - - // updates dependencies of traversed and created nodes - var deps []*n.Node - deps = append(deps, travNodes...) - deps = append(deps, createdNodes...) - for _, n := range deps { - n.AddDependency(o.ID) - } - - //TODO: how to rollback side effects of traverse if Mutate() fails? - err := Mutate(nPtr, o) - if err != nil { - return d, err - } - - d.OperationsId = append(d.OperationsId, o.ID) - return d, nil -} - -// Traverses the document from root element to the node indicated by the cursor -// input. When a path does not exist in the current document, create the node -// and link it to the document. -// The traverse function returns a pointer to the last node, a list of pointers -// of nodes traversed and a list of pointers of nodes created -func (d *Doc) traverse(cursor op.Cursor, opId string) (*n.Node, []*n.Node, []*n.Node) { - var nPtr *n.Node - var travNodes []*n.Node - var createdNodes []*n.Node - - // traverse starts from headNode - nPtr = d.Head - - // TODO: refactor - for _, c := range cursor.Path { - switch c.Type() { - case op.MapT: - k := c.Get().(string) - nn, exists, _ := nPtr.GetChild(k) - if !exists { - nn = n.New(opId) - nPtr.Add(k, nn, opId) - createdNodes = append(createdNodes, nn) - } else { - travNodes = append(travNodes, nPtr) - } - nPtr = nn - case op.ListT: - k := c.Get().(int) - nn, exists, _ := nPtr.GetChild(k) - if !exists { - nn = n.New(opId) - nPtr.Add(k, nn, opId) - createdNodes = append(createdNodes, nn) - } else { - travNodes = append(travNodes, nPtr) - } - nPtr = nn - } - } - - return nPtr, travNodes, createdNodes -} - -func Mutate(node *n.Node, o op.Operation) error { - mut := o.Mutation - - switch mut.Type { - case op.Noop: - return nil - case op.Delete: - chs := allChildren(node) - // nodes to clear are children and node itself - all := append(chs, node) - clearDeps(all, o.Deps) - return nil - case op.Assign: - chs := allChildren(node) - // nodes to clear are children and node itself - all := append(chs, node) - clearDeps(all, o.Deps) - // continue to insertion - } - - // Insert - newNode, err := node.Add(mut.Key, mut.Value, o.ID) - - if newNode != nil { - // adds dependencies - newNode.AddDependency(o.ID) - for _, dep := range o.Deps { - newNode.AddDependency(dep) - } - } - - return err -} - -func (d Doc) String() string { - ids := fmt.Sprintf("ID: %v; ClockId: %v", d.Id, d.Clock) - ops := fmt.Sprintf("Operations: applied: %v, buffered: %v", d.OperationsId, d.OperationsBuffer) - node := fmt.Sprintf("Head: %v", d.Head) - return fmt.Sprintf("%v\n%v\n%v\n", ids, ops, node) -} diff --git a/rdoc_test.go b/rdoc_test.go deleted file mode 100644 index a4fdf48..0000000 --- a/rdoc_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package rdoc - -import ( - "fmt" - n "github.com/gpestana/rdoc/node" - op "github.com/gpestana/rdoc/operation" - "testing" -) - -func TestTraverseSimple(t *testing.T) { - docId := "doc1" - doc := Init(docId) - - cursor1 := op.NewCursor( - 0, // cursor's key - op.MapKey{Key: "root"}, - op.MapKey{Key: "sub-node"}, - op.ListKey{Key: 0}, - ) - - n1, trvN, crtN := doc.traverse(cursor1, "oId") - - if len(crtN) != 3 { - t.Error(fmt.Sprintf("There should be 3 created nodes after first traversal, got %v", len(crtN))) - } - - if len(trvN) != 0 { - t.Error(fmt.Sprintf("There should be 0 traversed nodes after first traversal, got %v", len(trvN))) - } - - n2, trvN, crtN := doc.traverse(cursor1, "opId") - - if len(crtN) != 0 { - t.Error(fmt.Sprintf("There should be 0 created nodes after second traversal, got %v", len(crtN))) - } - - if len(trvN) != 3 { - t.Error(fmt.Sprintf("There should be 3 traversed nodes after second traversal, got %v", len(trvN))) - } - - if n1 != n2 { - t.Error(fmt.Sprintf("Both traverses should have finished in the same node, finished at: %v, %v instead", n1, n2)) - } -} - -func TestMutateInsert(t *testing.T) { - opId := "opIdTest" - v := "hello world" - mut, _ := op.NewMutation(op.Insert, nil, v) - op, _ := op.New(opId, []string{}, op.Cursor{}, mut) - - node := n.New("") - - err := Mutate(node, *op) - if err != nil { - t.Fatal(err) - } - - mvr := node.GetMVRegister() - if mvr[opId] != v { - t.Error(fmt.Sprintf("MVR should be {opIdTest: 'hello world', got %v", mvr)) - } -} - -func TestClearDeps(t *testing.T) { - initDeps := []string{"1", "2", "3", "4"} - removeDeps := []string{"2", "4"} - node := n.New("test") - node.SetDeps(initDeps) - - clearDeps([]*n.Node{node}, removeDeps) - - finalDeps := node.Deps() - - if len(finalDeps) != 2 { - t.Error(fmt.Sprintf("Dependency set should have length 2 after clearing, got %v", len(finalDeps))) - } - - if finalDeps[0] != initDeps[0] { - t.Error(fmt.Sprintf("Dependency set element 0 should be %v after clearing, got %v", initDeps[0], finalDeps[0])) - } - - if finalDeps[1] != initDeps[2] { - t.Error(fmt.Sprintf("Dependency set element 1 should be %v after clearing, got %v", initDeps[1], finalDeps[2])) - } - -} diff --git a/utils.go b/utils.go deleted file mode 100644 index d04b5c5..0000000 --- a/utils.go +++ /dev/null @@ -1,54 +0,0 @@ -package rdoc - -import ( - n "github.com/gpestana/rdoc/node" -) - -// Returns all subsequent nodes from a particular Node -func allChildren(node *n.Node) []*n.Node { - var children []*n.Node - var tmp []*n.Node - tmp = append(tmp, node.GetChildren()...) - - for { - if len(tmp) == 0 { - break - } - nextTmp := tmp[:1] - tmp = tmp[1:] - - c := nextTmp[0] - tmp = append(tmp, c.GetChildren()...) - children = append(children, c) - } - - return children -} - -func clearDeps(nodes []*n.Node, deps []string) { - for _, node := range nodes { - node.SetDeps(diff(node.Deps(), deps)) - } -} - -// checks if `sl` stice contains `id` string -func containsId(sl []string, id string) bool { - for i, _ := range sl { - if sl[i] == id { - return true - } - } - return false -} - -// returns all strings in `base` slice which do not exist in `subset` -func diff(base []string, subset []string) []string { - var diff []string - for i, _ := range base { - contains := containsId(subset, base[i]) - if !contains { - diff = append(diff, base[i]) - } - } - return diff -} From 4501e44a9ac9bbe0c249c3f49e77b23bf768b3a4 Mon Sep 17 00:00:00 2001 From: gpestana Date: Thu, 21 Jan 2021 09:16:12 +0000 Subject: [PATCH 2/4] Adds logical clocks to Document and Operations; lint --- Makefile | 5 ++ go.mod | 5 ++ go.sum | 7 +++ lclock/lclock.go | 133 ++++++++++++++++++++++++++++++++++++++++++ lclock/lclock_test.go | 118 +++++++++++++++++++++++++++++++++++++ rdoc.go | 98 +++++++++++++++++++++++++++++++ rdoc_test.go | 22 +++++++ 7 files changed, 388 insertions(+) create mode 100644 Makefile create mode 100644 go.sum create mode 100644 lclock/lclock.go create mode 100644 lclock/lclock_test.go create mode 100644 rdoc_test.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..277b352 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +tests: + go test ./... -v -cover + +lint: + golangci-lint run -E gofmt -E golint --exclude-use-default=false diff --git a/go.mod b/go.mod index 2808b1e..266fdb3 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module github.com/gpestana/rdoc go 1.14 + +require ( + github.com/evanphx/json-patch v0.5.2 + github.com/lafikl/hlc v0.0.0-20170703083803-0610e7fd8181 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..999d4fe --- /dev/null +++ b/go.sum @@ -0,0 +1,7 @@ +github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= +github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/lafikl/hlc v0.0.0-20170703083803-0610e7fd8181 h1:vYJjPjoqLvEt/yI2p0h4Y3xTZb62yR+xpt2JfocUrNw= +github.com/lafikl/hlc v0.0.0-20170703083803-0610e7fd8181/go.mod h1:DW4KbeN1bUFkHdmRAI/M3NCHnVTP377hTvRwdG8rsDo= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/lclock/lclock.go b/lclock/lclock.go new file mode 100644 index 0000000..c9552a1 --- /dev/null +++ b/lclock/lclock.go @@ -0,0 +1,133 @@ +// Package lclock implements Lamport logical clocks, with helps to be used in +// the context of rdocEvery operation has an unique ID in the +// network. Lamport timestamps ensure that if two operations in different +// network nodes have occurred concurrently, their order is arbitrary but +// deterministic +package lclock + +import ( + "fmt" + "hash/adler32" + "strconv" + "strings" +) + +const ( + base = 10 +) + +// Clock holds a Lamport logical clock +type Clock struct { + seed int64 + count int64 +} + +// New initializes and returns a new clock. The `seed` is a string which +// uniquely identifies the clock in the network +func New(seed []byte) Clock { + s := adler32.Checksum(seed) + return Clock{ + seed: int64(s), + count: 1, + } +} + +// ID returns the id of the clock +func (c *Clock) ID() string { + return strconv.FormatInt(c.seed, 10) +} + +// Tick increments the clock counter +func (c *Clock) Tick() { + c.count++ +} + +// Timestamp returns a timestamp that uniquely identifies the state (id and +// counter) in the network +func (c Clock) Timestamp() string { + return c.String() +} + +// Update performs a clock update based on another clock or string +// representation. If the current Clock count.seed value is higher, no +// changes are done. Othwerise, the clock updates to the upper count +func (c *Clock) Update(rcv interface{}) error { + var err error + rcvC := Clock{} + switch t := rcv.(type) { + case Clock: + rcvC = t + + case string: + rcvC, err = strToClock(t) + if err != nil { + return err + } + } + + rcvCan, err := rcvC.canonical() + if err != nil { + return err + } + + currCan, err := c.canonical() + if err != nil { + return err + } + + if rcvCan > currCan { + c.count = rcvC.count + } + return nil +} + +// CheckTick checks if tick belongs to the clock, or if tick representation is +// invalid +func (c Clock) CheckTick(tick string) (bool, error) { + tickClock, err := strToClock(tick) + if err != nil { + return false, + fmt.Errorf("Operation ID invalid. Expected ., got %v", tick) + } + + return c.ID() == tickClock.ID(), nil +} + +// Returns the canonical value of clock. The canonical value of the logical +// clock is a float64 type in the form of .. The +// Clock.seed value must be unique per Clock in the network. +func (c Clock) canonical() (float64, error) { + fc, err := strconv.ParseFloat(c.String(), 10) + return fc, err +} + +// Converts string to Clock. The input string is expected to have format +// "counter>." +func strToClock(s string) (Clock, error) { + c := Clock{} + str := strings.Split(s, ".") + count, err := strconv.Atoi(str[0]) + if err != nil { + return c, err + } + seed, err := strconv.Atoi(str[1]) + if err != nil { + return c, err + } + + c.count = int64(count) + c.seed = int64(seed) + return c, nil +} + +func (c *Clock) String() string { + cnt := strconv.FormatInt(c.count, base) + sd := strconv.FormatInt(c.seed, base) + return cnt + "." + sd +} + +// ConvertString converts a string to a clock representation, or returns an +// error if string representation is invalid +func ConvertString(c string) (Clock, error) { + return strToClock(c) +} diff --git a/lclock/lclock_test.go b/lclock/lclock_test.go new file mode 100644 index 0000000..017dce6 --- /dev/null +++ b/lclock/lclock_test.go @@ -0,0 +1,118 @@ +package lclock + +import ( + "fmt" + "testing" +) + +func TestConstructor(t *testing.T) { + seed := []byte("clock_1") + clk := New(seed) + initialCount := 1 + seedHash := int64(187368093) + + if clk.count != 1 { + t.Error(fmt.Sprintf("Constructor: Clock should be initialized with count=%v", initialCount)) + } + if clk.seed != seedHash { + t.Error(fmt.Sprintf("Constructor: Clock should be initialized with seed=%v", seedHash)) + } +} + +func TestClockTick(t *testing.T) { + clk := New([]byte("clk")) + clk.Tick() + clk.Tick() + clk.Tick() + if clk.count != 4 { + t.Error("Tick: Clock count should be 4") + } +} + +func TestTimestamp(t *testing.T) { + clk := New([]byte("clock_1")) + expectedTs := "1.187368093" + actualTs := clk.Timestamp() + + if actualTs != expectedTs { + t.Error(fmt.Sprintf("Timestamp: actual ts is %v, should be %v", actualTs, expectedTs)) + } + +} + +func TestString(t *testing.T) { + clk := New([]byte("clock_1")) + expectedStr := "1.187368093" + actualStr := clk.String() + + if actualStr != expectedStr { + t.Error(fmt.Sprintf("String: actual string is %v, should be %v", actualStr, expectedStr)) + } +} + +func TestUpdateClock(t *testing.T) { + clk1 := New([]byte("clk1")) + + err := clk1.Update("123.321") + if err != nil { + t.Error(err) + } + if clk1.count != 123 { + t.Error(fmt.Sprintf("Clock count should be: 123, had %v", clk1.count)) + } +} + +func TestUpdateClockString(t *testing.T) { + clk1 := New([]byte("clk1")) + clk2 := New([]byte("clk2")) + + clk1.Tick() + err := clk2.Update(clk1) + if err != nil { + t.Error(err) + } + + if clk2.count != 2 { + t.Error(fmt.Sprintf("Clock 2 should have same count as Clock 1: %v != %v", clk1.count, clk2.count)) + } + + clk2.Tick() + clk2.Tick() + + err = clk1.Update(clk2) + if err != nil { + t.Error(err) + } + + err = clk1.Update(clk2) + if err != nil { + t.Error(err) + } + + err = clk1.Update(clk2) + if err != nil { + t.Error(err) + } + + if clk1.count != 4 { + t.Error(fmt.Sprintf("Clock 2 should have same count as Clock 1: %v != %v", clk1.count, clk2.count)) + } +} + +func TestConvertString(t *testing.T) { + c := "21" + expC := int64(21) + s := "31231233" + expS := int64(31231233) + + clk, err := ConvertString(fmt.Sprintf("%v.%v", c, s)) + if err != nil { + t.Fatal(err) + } + if clk.count != expC { + t.Error(fmt.Sprintf("ConvertString: count is %v, should be %v", clk.count, expC)) + } + if clk.seed != expS { + t.Error(fmt.Sprintf("ConvertString: seed is %v, should be %v", clk.seed, expS)) + } +} diff --git a/rdoc.go b/rdoc.go index 71b83ae..55981a5 100644 --- a/rdoc.go +++ b/rdoc.go @@ -1 +1,99 @@ package rdoc + +import ( + "encoding/json" + "fmt" + "strings" + + jpatch "github.com/evanphx/json-patch" + "github.com/gpestana/rdoc/lclock" +) + +// Doc represents a JSON CRDT document +type Doc struct { + id string + clock lclock.Clock + operations []Operation + bufferedOperations []Operation +} + +// Init returns a new JSON CRDT document +func Init(id string) *Doc { + + return &Doc{ + id: id, + clock: lclock.New([]byte(id)), + operations: []Operation{}, + bufferedOperations: []Operation{}, + } +} + +// Apply applies a valid json patch on the document +func (doc Doc) Apply(rawPatch []byte) error { + patch, err := jpatch.DecodePatch(rawPatch) + if err != nil { + return err + } + + for _, opRaw := range patch { + op, err := operationFromPatch(opRaw) + if err != nil { + return err + } + + isFromSameClock, err := doc.clock.CheckTick(op.id) + if err != nil { + return err + } + if !isFromSameClock { + fmt.Println("Remote operation") + } else { + fmt.Println("Local operation") + } + + fmt.Println(op) + } + + return nil +} + +// MarshalJSON marshals a buffer into a crdt doc +func (doc Doc) MarshalJSON() ([]byte, error) { + return nil, nil +} + +// Operation represents the CRDT operations +type Operation struct { + id string + deps []string + raw jpatch.Operation +} + +func operationFromPatch(rawOp jpatch.Operation) (*Operation, error) { + rawID := rawOp["id"] + if rawID == nil { + return nil, + fmt.Errorf("Operation must have an associated id, got: %v", rawID) + } + id := string(*rawID) + id = strings.TrimSuffix(id, "\"") + id = strings.TrimPrefix(id, "\"") + + rawDeps := rawOp["deps"] + if rawDeps == nil { + return nil, + fmt.Errorf("Operation must have an associated dependency, got: %v", rawDeps) + } + + deps := new([]string) + err := json.Unmarshal(*rawDeps, deps) + if err != nil { + return nil, err + } + + return &Operation{ + id: id, + deps: *deps, + raw: rawOp, + }, nil +} diff --git a/rdoc_test.go b/rdoc_test.go new file mode 100644 index 0000000..0407946 --- /dev/null +++ b/rdoc_test.go @@ -0,0 +1,22 @@ +package rdoc + +import ( + "testing" +) + +func TestApply(t *testing.T) { + + patch := []byte(`[ +{"op": "add", "path": "/", "value": "user", "id":"1.1", "deps": [] }, +{"op": "add", "path": "/name", "value": "Jane", "id":"2.1", "deps": ["1.1"] }, +{"op": "add", "path": "/name", "value": "Jane", "id":"1.380503024", "deps": [""] } +]`) + + doc := Init("document_1") + + err := doc.Apply(patch) + if err != nil { + t.Error("Applying valid patch should not err, got ", err) + } + +} From ecf365e57ce8e7479d08ab41bc5a50f23943597f Mon Sep 17 00:00:00 2001 From: gpestana Date: Fri, 22 Jan 2021 09:54:47 +0000 Subject: [PATCH 3/4] Implements Marshaling for Doc --- rdoc.go | 35 ++++++++++++++++++++++++++++++++--- rdoc_test.go | 23 +++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/rdoc.go b/rdoc.go index 55981a5..3f9214e 100644 --- a/rdoc.go +++ b/rdoc.go @@ -29,7 +29,7 @@ func Init(id string) *Doc { } // Apply applies a valid json patch on the document -func (doc Doc) Apply(rawPatch []byte) error { +func (doc *Doc) Apply(rawPatch []byte) error { patch, err := jpatch.DecodePatch(rawPatch) if err != nil { return err @@ -51,7 +51,8 @@ func (doc Doc) Apply(rawPatch []byte) error { fmt.Println("Local operation") } - fmt.Println(op) + // when/where to append? + doc.operations = append(doc.operations, *op) } return nil @@ -59,7 +60,35 @@ func (doc Doc) Apply(rawPatch []byte) error { // MarshalJSON marshals a buffer into a crdt doc func (doc Doc) MarshalJSON() ([]byte, error) { - return nil, nil + type operationNoDeps struct { + ID string `json:"id"` + Op string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value"` + } + + buffer := []operationNoDeps{} + + for _, operation := range doc.operations { + path, err := operation.raw.Path() + if err != nil { + return nil, err + } + value, err := operation.raw.ValueInterface() + if err != nil { + return nil, err + } + + opNoDeps := operationNoDeps{ + ID: operation.id, + Op: operation.raw.Kind(), + Path: path, + Value: value, + } + + buffer = append(buffer, opNoDeps) + } + return json.Marshal(buffer) } // Operation represents the CRDT operations diff --git a/rdoc_test.go b/rdoc_test.go index 0407946..bee2045 100644 --- a/rdoc_test.go +++ b/rdoc_test.go @@ -1,7 +1,10 @@ package rdoc import ( + "encoding/json" "testing" + + jsonp "github.com/evanphx/json-patch" ) func TestApply(t *testing.T) { @@ -12,6 +15,8 @@ func TestApply(t *testing.T) { {"op": "add", "path": "/name", "value": "Jane", "id":"1.380503024", "deps": [""] } ]`) + expectedPatchAfterMarshaling := []byte(`[{"id":"1.1","op":"add","path":"/","value":"user"},{"id":"2.1","op":"add","path":"/name","value":"Jane"},{"id":"1.380503024","op":"add","path":"/name","value":"Jane"}]`) + doc := Init("document_1") err := doc.Apply(patch) @@ -19,4 +24,22 @@ func TestApply(t *testing.T) { t.Error("Applying valid patch should not err, got ", err) } + buffer, err := json.Marshal(*doc) + if err != nil { + t.Error(err) + } + + if len(buffer) < 10 { + t.Error("Error encoding doc:", string(buffer)) + } + + if string(buffer) != string(expectedPatchAfterMarshaling) { + t.Error("Expected marshaling mismatch:", + string(buffer), string(expectedPatchAfterMarshaling)) + } + + _, err = jsonp.DecodePatch(buffer) + if err != nil { + t.Error(err) + } } From 43a0e28d757a0975e786aaeccc1e56c5750bc4d9 Mon Sep 17 00:00:00 2001 From: gpestana Date: Fri, 22 Jan 2021 18:49:48 +0000 Subject: [PATCH 4/4] Implements local/remote Apply --- idset/idset.go | 33 +++++++++++++++++++ rdoc.go | 88 +++++++++++++++++++++++++++++++++++++++++++++----- rdoc_test.go | 8 ++--- 3 files changed, 117 insertions(+), 12 deletions(-) create mode 100644 idset/idset.go diff --git a/idset/idset.go b/idset/idset.go new file mode 100644 index 0000000..c8b6309 --- /dev/null +++ b/idset/idset.go @@ -0,0 +1,33 @@ +package idset + +// Set is a set of IDs +type Set struct { + ids map[string]struct{} +} + +// New returns an empty set +func New() *Set { + return &Set{ids: map[string]struct{}{}} +} + +// Add inserts a new id to the set +func (set *Set) Add(id string) { + set.ids[id] = struct{}{} +} + +// Exists checks whether an ID exists in the set +func (set Set) Exists(id string) bool { + _, exists := set.ids[id] + return exists +} + +// Diff returns all strings in the base doc that do not exist in the Doc +func (set Set) Diff(ids []string) []string { + var diff []string + for _, id := range ids { + if !set.Exists(id) { + diff = append(diff, id) + } + } + return diff +} diff --git a/rdoc.go b/rdoc.go index 3f9214e..6d76d90 100644 --- a/rdoc.go +++ b/rdoc.go @@ -6,15 +6,17 @@ import ( "strings" jpatch "github.com/evanphx/json-patch" + "github.com/gpestana/rdoc/idset" "github.com/gpestana/rdoc/lclock" ) // Doc represents a JSON CRDT document type Doc struct { id string + appliedIDs *idset.Set clock lclock.Clock operations []Operation - bufferedOperations []Operation + bufferedOperations map[string]Operation } // Init returns a new JSON CRDT document @@ -22,9 +24,10 @@ func Init(id string) *Doc { return &Doc{ id: id, + appliedIDs: idset.New(), clock: lclock.New([]byte(id)), operations: []Operation{}, - bufferedOperations: []Operation{}, + bufferedOperations: map[string]Operation{}, } } @@ -35,6 +38,8 @@ func (doc *Doc) Apply(rawPatch []byte) error { return err } + appliedRemoteOperations := false + for _, opRaw := range patch { op, err := operationFromPatch(opRaw) if err != nil { @@ -45,20 +50,51 @@ func (doc *Doc) Apply(rawPatch []byte) error { if err != nil { return err } - if !isFromSameClock { - fmt.Println("Remote operation") + + // apply local operations and continues + if isFromSameClock { + doc.applyOperation(*op) + continue + } + + // attempts to apply remote operations by checking if all operation + // dependencies have been applied on the doc + if len(doc.appliedIDs.Diff(op.deps)) != 0 { + doc.bufferedOperations[op.id] = *op } else { - fmt.Println("Local operation") + appliedRemoteOperations = true + delete(doc.bufferedOperations, op.id) // remove buffered operation in case it was buffered + doc.applyOperation(*op) } + } - // when/where to append? - doc.operations = append(doc.operations, *op) + // if remote operation hasbeen applied, attemps to apply buffered operations + if appliedRemoteOperations { + doc.tryBufferedOperations() } return nil } -// MarshalJSON marshals a buffer into a crdt doc +func (doc *Doc) applyOperation(operation Operation) { + doc.appliedIDs.Add(operation.id) + doc.operations = append(doc.operations, operation) +} + +func (doc *Doc) tryBufferedOperations() { + buffer, err := doc.MarshalFullJSON() + if err != nil { + panic(fmt.Sprintf("Buffered operations are not valid -- this should never happen: %v\n", err)) + } + + err = doc.Apply(buffer) + if err != nil { + panic(fmt.Sprintf("Error applying buffered operations -- this should never happen: %v\n", err)) + } +} + +// MarshalJSON marshals a Doc into a buffer, excluding the deps field on each +// operation func (doc Doc) MarshalJSON() ([]byte, error) { type operationNoDeps struct { ID string `json:"id"` @@ -91,6 +127,42 @@ func (doc Doc) MarshalJSON() ([]byte, error) { return json.Marshal(buffer) } +// MarshalFullJSON marshals a Doc into a buffer, including the dependencies +// field +func (doc Doc) MarshalFullJSON() ([]byte, error) { + type operationNoDeps struct { + ID string `json:"id"` + Op string `json:"op"` + Path string `json:"path"` + Deps []string `json:"deps"` + Value interface{} `json:"value"` + } + + buffer := []operationNoDeps{} + + for _, operation := range doc.operations { + path, err := operation.raw.Path() + if err != nil { + return nil, err + } + value, err := operation.raw.ValueInterface() + if err != nil { + return nil, err + } + + opNoDeps := operationNoDeps{ + ID: operation.id, + Deps: operation.deps, + Op: operation.raw.Kind(), + Path: path, + Value: value, + } + + buffer = append(buffer, opNoDeps) + } + return json.Marshal(buffer) +} + // Operation represents the CRDT operations type Operation struct { id string diff --git a/rdoc_test.go b/rdoc_test.go index bee2045..7c55204 100644 --- a/rdoc_test.go +++ b/rdoc_test.go @@ -7,15 +7,15 @@ import ( jsonp "github.com/evanphx/json-patch" ) -func TestApply(t *testing.T) { +func TestApplyAndMarshall(t *testing.T) { patch := []byte(`[ -{"op": "add", "path": "/", "value": "user", "id":"1.1", "deps": [] }, +{"op": "add", "path": "/", "value": "user", "id":"1.380503024", "deps": [] }, {"op": "add", "path": "/name", "value": "Jane", "id":"2.1", "deps": ["1.1"] }, -{"op": "add", "path": "/name", "value": "Jane", "id":"1.380503024", "deps": [""] } +{"op": "add", "path": "/name", "value": "Jane", "id":"2.380503024", "deps": ["1.380503024"] } ]`) - expectedPatchAfterMarshaling := []byte(`[{"id":"1.1","op":"add","path":"/","value":"user"},{"id":"2.1","op":"add","path":"/name","value":"Jane"},{"id":"1.380503024","op":"add","path":"/name","value":"Jane"}]`) + expectedPatchAfterMarshaling := []byte(`[{"id":"1.380503024","op":"add","path":"/","value":"user"},{"id":"2.380503024","op":"add","path":"/name","value":"Jane"}]`) doc := Init("document_1")