Skip to content

Commit

Permalink
feat: dangling node detection for graph.Memory
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxuan Wang <[email protected]>
  • Loading branch information
wangxiaoxuan273 committed Nov 29, 2023
1 parent 79a08b4 commit 407d871
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 12 deletions.
4 changes: 1 addition & 3 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ func (s *Store) Delete(ctx context.Context, target ocispec.Descriptor) error {
untagged = true
}
}
if err := s.graph.Remove(ctx, target); err != nil {
return err
}
s.graph.Remove(ctx, target)
if untagged && s.AutoSaveIndex {
err := s.saveIndex()
if err != nil {
Expand Down
34 changes: 30 additions & 4 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,23 @@ import (

// Memory is a memory based PredecessorFinder.
type Memory struct {
nodes map[descriptor.Descriptor]ocispec.Descriptor // nodes saves the map keys of ocispec.Descriptor
// properties and behaviors of the fields:
// - Memory.nodes:
// 1. a node exists in Memory.nodes if and only if it exists in the memory
// 2. Memory.nodes saves the ocispec.Descriptor map keys, which are used by
// the other fields.
// - Memory.predecessors:
// 1. a node exists in Memory.predecessors if it has at least one predecessor
// in the memory, regardless of whether or not the node itself exists in
// the memory.
// 2. a node does not exist in Memory.nodes, if it doesn't have any predecessors
// in the memory.
// - Memory.successors:
// 1. a node exists in Memory.successors if and only if it exists in the memory.
// 2. A node's entry in Memory.successors is always consistent with the actual
// content of the node, regardless of whether or not each successor exists
// in the memory.
nodes map[descriptor.Descriptor]ocispec.Descriptor
predecessors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
successors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
lock sync.RWMutex
Expand Down Expand Up @@ -101,12 +117,14 @@ func (m *Memory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]oci
return res, nil
}

// Remove removes the node from its predecessors and successors.
func (m *Memory) Remove(ctx context.Context, node ocispec.Descriptor) error {
// Remove removes the node from its predecessors and successors, and returns the
// dangling nodes caused by the deletion.
func (m *Memory) Remove(ctx context.Context, node ocispec.Descriptor) []ocispec.Descriptor {
m.lock.Lock()
defer m.lock.Unlock()

nodeKey := descriptor.FromOCI(node)
danglings := []ocispec.Descriptor{}
// remove the node from its successors' predecessor list
for successorKey := range m.successors[nodeKey] {
predecessorEntry := m.predecessors[successorKey]
Expand All @@ -116,11 +134,12 @@ func (m *Memory) Remove(ctx context.Context, node ocispec.Descriptor) error {
// predecessors entry. Otherwise, we do not remove the entry.
if len(predecessorEntry) == 0 {
delete(m.predecessors, successorKey)
danglings = append(danglings, m.nodes[successorKey])
}
}
delete(m.successors, nodeKey)
delete(m.nodes, nodeKey)
return nil
return danglings
}

// index indexes predecessors for each direct successor of the given node.
Expand Down Expand Up @@ -152,3 +171,10 @@ func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispe
}
return successors, nil
}

func (m *Memory) isDanglingNode(desc ocispec.Descriptor) bool {
key := descriptor.FromOCI(desc)
_, existsInMemory := m.nodes[key]
_, existsInPredecessors := m.predecessors[key]
return existsInMemory && !existsInPredecessors
}
157 changes: 152 additions & 5 deletions internal/graph/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"io"
"reflect"
"sort"
"testing"

"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -253,15 +254,15 @@ func TestMemory_IndexAndRemove(t *testing.T) {
// still exist, since its predecessor A still exists
predecessorsB, exists = testMemory.predecessors[nodeKeyB]
if !exists {
t.Errorf("testDeletableMemory.predecessors should still contain the entry of %v", "B")
t.Errorf("testDeletableMemory.predecessors should still contain the entry of %s", "B")
}
if !predecessorsB.Contains(nodeKeyA) {
t.Errorf("predecessors of %s should still contain %s", "B", "A")
}
// 3. verify B' successors info: B's entry in testMemory.successors should no
// longer exist
if _, exists := testMemory.successors[nodeKeyB]; exists {
t.Errorf("testDeletableMemory.successors should not contain the entry of %v", "B")
t.Errorf("testDeletableMemory.successors should not contain the entry of %s", "B")
}
// 4. verify B' predecessors' successors info: B should still exist in A's
// successors
Expand Down Expand Up @@ -291,7 +292,7 @@ func TestMemory_IndexAndRemove(t *testing.T) {
// 2. verify A' successors info: A's entry in testMemory.successors should no
// longer exist
if _, exists := testMemory.successors[nodeKeyA]; exists {
t.Errorf("testDeletableMemory.successors should not contain the entry of %v", "A")
t.Errorf("testDeletableMemory.successors should not contain the entry of %s", "A")
}
// 3. verify A' successors' predecessors info: D's entry in testMemory.predecessors
// should no longer exist, since all predecessors of D are already deleted
Expand Down Expand Up @@ -567,7 +568,7 @@ func TestMemory_IndexAllAndPredecessors(t *testing.T) {
t.Errorf("successors of %s should still contain %s", "A", "C")
}
if _, exists := testMemory.successors[nodeKeyC]; exists {
t.Errorf("testMemory.successors should not contain the entry of %v", "C")
t.Errorf("testMemory.successors should not contain the entry of %s", "C")
}
if _, exists := testMemory.predecessors[nodeKeyC]; !exists {
t.Errorf("entry %s in predecessors should still exists since it still has at least one predecessor node present", "C")
Expand All @@ -585,7 +586,7 @@ func TestMemory_IndexAllAndPredecessors(t *testing.T) {
t.Errorf("entry %s in predecessors should no longer exists", "D")
}
if _, exists := testMemory.successors[nodeKeyA]; exists {
t.Errorf("testDeletableMemory.successors should not contain the entry of %v", "A")
t.Errorf("testDeletableMemory.successors should not contain the entry of %s", "A")
}

// check that the Predecessors of node D is empty
Expand All @@ -610,3 +611,149 @@ func TestMemory_IndexAllAndPredecessors(t *testing.T) {
t.Errorf("incorrect predecessor result")
}
}

// This test is for intermediate testing and may be removed when the pull request
// is finalized.
// +-----------------------------------------------+
// | |
// | +-----------+ |
// | |A(manifest)| |
// | +-----+-----+ |
// | | |
// | +------------+------------+ |
// | | | | |
// | | | | |
// | v v v |
// | +---------+ +--------+ +--------+ |
// | |B(config)| |C(layer)| |D(layer)| |
// | +---------+ +--------+ +--------+ |
// | |
// | |
// +-----------------------------------------------+
func TestMemory_isDanglingNode(t *testing.T) {
testFetcher := cas.NewMemory()
testMemory := NewMemory()
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) ocispec.Descriptor {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
return descs[len(descs)-1]
}
descB := appendBlob("layer node B", []byte("Node B is a config")) // blobs[0], layer "B"
descC := appendBlob("layer node C", []byte("Node C is a layer")) // blobs[1], layer "C"
descD := appendBlob("layer node D", []byte("Node D is a layer")) // blobs[2], layer "D"

generateManifest := func(layers ...ocispec.Descriptor) ocispec.Descriptor {
manifest := ocispec.Manifest{
Config: descB,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
return appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}

descA := generateManifest(descs[1:3]...) // blobs[3], manifest "A"

// prepare the content in the fetcher, so that it can be used to test Index
testContents := []ocispec.Descriptor{descB, descC, descD, descA}
for i := 0; i < len(blobs); i++ {
testFetcher.Push(ctx, testContents[i], bytes.NewReader(blobs[i]))
}

// make sure that testFetcher works
rc, err := testFetcher.Fetch(ctx, descA)
if err != nil {
t.Errorf("testFetcher.Fetch() error = %v", err)
}
got, err := io.ReadAll(rc)
if err != nil {
t.Errorf("testFetcher.Fetch().Read() error = %v", err)
}
err = rc.Close()
if err != nil {
t.Errorf("testFetcher.Fetch().Close() error = %v", err)
}
if !bytes.Equal(got, blobs[3]) {
t.Errorf("testFetcher.Fetch() = %v, want %v", got, blobs[4])
}

nodeKeyA := descriptor.FromOCI(descA)
nodeKeyB := descriptor.FromOCI(descB)
nodeKeyC := descriptor.FromOCI(descC)
nodeKeyD := descriptor.FromOCI(descD)
nodeKeys := []descriptor.Descriptor{nodeKeyA, nodeKeyB, nodeKeyC, nodeKeyD}

// index the nodes and verify the information
testMemory.IndexAll(ctx, testFetcher, descA)

// check that all nodes exist in the memory
for i := 0; i < len(nodeKeys); i++ {
if _, exists := testMemory.nodes[nodeKeys[i]]; !exists {
t.Errorf("nodes entry of %v should exist", nodeKeys[i])
}
}

// check that predecessor of B,C,D is A
for i := 1; i < len(nodeKeys); i++ {
if len(testMemory.predecessors[nodeKeys[i]]) != 1 || !testMemory.predecessors[nodeKeys[i]].Contains(nodeKeyA) {
t.Errorf("the predecessor of %v should be A", nodeKeys[i])
}
}

// remove node A and verify the information
danglings := testMemory.Remove(ctx, descA)

// check that A is no longer in the memory, while B,C,D still exist in memory
if _, exists := testMemory.nodes[nodeKeyA]; exists {
t.Errorf("nodes entry of %s should not exist", "A")
}
for i := 1; i < len(nodeKeys); i++ {
if _, exists := testMemory.nodes[nodeKeys[i]]; !exists {
t.Errorf("nodes entry of %v should exist", nodeKeys[i])
}
}

// check that B,C,D have no predecessors and are dangling nodes
if len(testMemory.predecessors) != 0 {
t.Errorf("testMemory.predecessors should be empty")
}

// check that B,C,D are all garbage
for i := 1; i < len(nodeKeys); i++ {
if isGarbage := testMemory.isDanglingNode(testMemory.nodes[nodeKeys[i]]); !isGarbage {
t.Errorf("%v should be considered a dangling node", nodeKeys[i])
}
}

// check that danglings has the correct content
expectedDanglings := []ocispec.Descriptor{}
for i := 1; i < len(nodeKeys); i++ {
expectedDanglings = append(expectedDanglings, testMemory.nodes[nodeKeys[i]])
}

sort.SliceStable(expectedDanglings, func(i, j int) bool { return expectedDanglings[i].Digest <= expectedDanglings[j].Digest })
sort.SliceStable(danglings, func(i, j int) bool { return danglings[i].Digest <= danglings[j].Digest })

if len(danglings) != len(expectedDanglings) {
t.Errorf("danglings has length = %d, want length %d", len(danglings), len(expectedDanglings))
}
for i := 0; i < len(expectedDanglings); i++ {
if !reflect.DeepEqual(danglings, expectedDanglings) {
t.Errorf("danglings[i] = %v, want %v", danglings[i], expectedDanglings[i])
}
if isGarbage := testMemory.isDanglingNode(danglings[i]); !isGarbage {
t.Errorf("%v should be considered a dangling node", danglings[i])
}
}
}

0 comments on commit 407d871

Please sign in to comment.