Skip to content

Commit b9aa9c8

Browse files
replication fix
2 parents 80f59a3 + fc24959 commit b9aa9c8

File tree

3 files changed

+118
-85
lines changed

3 files changed

+118
-85
lines changed

internal/state/helpers.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package state
2+
3+
import (
4+
"fmt"
5+
6+
"container-registry.com/harbor-satellite/internal/config"
7+
"container-registry.com/harbor-satellite/internal/utils"
8+
"github.com/rs/zerolog"
9+
)
10+
11+
func processInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) {
12+
13+
if utils.IsValidURL(input) {
14+
return processURLInput(utils.FormatRegistryURL(input), username, password, log)
15+
}
16+
17+
log.Info().Msg("Input is not a valid URL, checking if it is a file path")
18+
if err := validateFilePath(input, log); err != nil {
19+
return nil, err
20+
}
21+
22+
return processFileInput(input, username, password, log)
23+
}
24+
25+
func validateFilePath(path string, log *zerolog.Logger) error {
26+
if utils.HasInvalidPathChars(path) {
27+
log.Error().Msg("Path contains invalid characters")
28+
return fmt.Errorf("invalid file path: %s", path)
29+
}
30+
if err := utils.GetAbsFilePath(path); err != nil {
31+
log.Error().Err(err).Msg("No file found")
32+
return fmt.Errorf("no file found: %s", path)
33+
}
34+
return nil
35+
}
36+
37+
func processURLInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) {
38+
log.Info().Msg("Input is a valid URL")
39+
config.SetRemoteRegistryURL(input)
40+
41+
stateArtifactFetcher := NewURLStateFetcher(input, username, password)
42+
43+
return stateArtifactFetcher, nil
44+
}
45+
46+
func processFileInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) {
47+
log.Info().Msg("Input is a valid file path")
48+
stateArtifactFetcher := NewFileStateFetcher(input, username, password)
49+
return stateArtifactFetcher, nil
50+
}

internal/state/replicator.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import (
1313

1414
type Replicator interface {
1515
// Replicate copies images from the source registry to the local registry.
16-
Replicate(ctx context.Context, replicationEntities []ArtifactReader) error
16+
Replicate(ctx context.Context, replicationEntities []Entity) error
1717
// DeleteReplicationEntity deletes the image from the local registry.
18-
DeleteReplicationEntity(ctx context.Context, replicationEntity []ArtifactReader) error
18+
DeleteReplicationEntity(ctx context.Context, replicationEntity []Entity) error
1919
}
2020

2121
type BasicReplicator struct {
@@ -36,8 +36,28 @@ func NewBasicReplicator(username, password, zotURL, sourceRegistry string, useUn
3636
}
3737
}
3838

39+
// Entity represents an image or artifact which needs to be handled by the replicator
40+
type Entity struct {
41+
Name string
42+
Repository string
43+
Tag string
44+
Digest string
45+
}
46+
47+
func (e Entity) GetName() string {
48+
return e.Name
49+
}
50+
51+
func (e Entity) GetRepository() string {
52+
return e.Repository
53+
}
54+
55+
func (e Entity) GetTag() string {
56+
return e.Tag
57+
}
58+
3959
// Replicate replicates images from the source registry to the Zot registry.
40-
func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []ArtifactReader) error {
60+
func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []Entity) error {
4161
log := logger.FromContext(ctx)
4262
auth := authn.FromConfig(authn.AuthConfig{
4363
Username: r.username,
@@ -51,10 +71,10 @@ func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []A
5171

5272
for _, replicationEntity := range replicationEntities {
5373

54-
log.Info().Msgf("Pulling image %s from repository %s at registry %s with tag %s", replicationEntity.GetName(), replicationEntity.GetRepository(), r.sourceRegistry, replicationEntity.GetTags()[0])
74+
log.Info().Msgf("Pulling image %s from repository %s at registry %s with tag %s", replicationEntity.GetName(), replicationEntity.GetRepository(), r.sourceRegistry, replicationEntity.GetTag())
5575

5676
// Pull the image from the source registry
57-
srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTags()[0]), options...)
77+
srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), options...)
5878
if err != nil {
5979
log.Error().Msgf("Failed to pull image: %v", err)
6080
return err
@@ -64,7 +84,7 @@ func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []A
6484
ociImage := mutate.MediaType(srcImage, types.OCIManifestSchema1)
6585

6686
// Push the converted OCI image to the Zot registry
67-
err = crane.Push(ociImage, fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTags()[0]), options...)
87+
err = crane.Push(ociImage, fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), options...)
6888
if err != nil {
6989
log.Error().Msgf("Failed to push image: %v", err)
7090
return err
@@ -75,7 +95,7 @@ func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []A
7595
return nil
7696
}
7797

78-
func (r *BasicReplicator) DeleteReplicationEntity(ctx context.Context, replicationEntity []ArtifactReader) error {
98+
func (r *BasicReplicator) DeleteReplicationEntity(ctx context.Context, replicationEntity []Entity) error {
7999
log := logger.FromContext(ctx)
80100
auth := authn.FromConfig(authn.AuthConfig{
81101
Username: r.username,
@@ -88,9 +108,9 @@ func (r *BasicReplicator) DeleteReplicationEntity(ctx context.Context, replicati
88108
}
89109

90110
for _, entity := range replicationEntity {
91-
log.Info().Msgf("Deleting image %s from repository %s at registry %s with tag %s", entity.GetName(), entity.GetRepository(), r.remoteRegistryURL, entity.GetTags()[0])
111+
log.Info().Msgf("Deleting image %s from repository %s at registry %s with tag %s", entity.GetName(), entity.GetRepository(), r.remoteRegistryURL, entity.GetTag())
92112

93-
err := crane.Delete(fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, entity.GetRepository(), entity.GetName(), entity.GetTags()[0]), options...)
113+
err := crane.Delete(fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, entity.GetRepository(), entity.GetName(), entity.GetTag()), options...)
94114
if err != nil {
95115
log.Error().Msgf("Failed to delete image: %v", err)
96116
return err

internal/state/state_process.go

Lines changed: 39 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package state
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"strings"
87
"sync"
98

10-
"container-registry.com/harbor-satellite/internal/config"
119
"container-registry.com/harbor-satellite/internal/notifier"
1210
"container-registry.com/harbor-satellite/internal/scheduler"
1311
"container-registry.com/harbor-satellite/internal/utils"
@@ -41,14 +39,15 @@ type FetchAndReplicateStateProcess struct {
4139
}
4240

4341
type StateMap struct {
44-
url string
45-
State StateReader
42+
url string
43+
State StateReader
44+
Entities []Entity
4645
}
4746

4847
func NewStateMap(url []string) []StateMap {
4948
var stateMap []StateMap
5049
for _, u := range url {
51-
stateMap = append(stateMap, StateMap{url: u, State: nil})
50+
stateMap = append(stateMap, StateMap{url: u, State: nil, Entities: nil})
5251
}
5352
return stateMap
5453
}
@@ -98,7 +97,7 @@ func (f *FetchAndReplicateStateProcess) Execute(ctx context.Context) error {
9897
return err
9998
}
10099
log.Info().Msgf("State fetched successfully for %s", f.stateMap[i].url)
101-
deleteEntity, replicateEntity, newState := f.GetChanges(newStateFetched, log, f.stateMap[i].State)
100+
deleteEntity, replicateEntity, newState := f.GetChanges(newStateFetched, log, f.stateMap[i].Entities)
102101
f.LogChanges(deleteEntity, replicateEntity, log)
103102
if err := f.notifier.Notify(); err != nil {
104103
log.Error().Err(err).Msg("Error sending notification")
@@ -117,51 +116,52 @@ func (f *FetchAndReplicateStateProcess) Execute(ctx context.Context) error {
117116
}
118117
// Update the state directly in the slice
119118
f.stateMap[i].State = newState
119+
f.stateMap[i].Entities = FetchEntitiesFromState(newState)
120120
}
121121
return nil
122122
}
123123

124-
func (f *FetchAndReplicateStateProcess) GetChanges(newState StateReader, log *zerolog.Logger, oldState StateReader) ([]ArtifactReader, []ArtifactReader, StateReader) {
124+
func (f *FetchAndReplicateStateProcess) GetChanges(newState StateReader, log *zerolog.Logger, oldEntites []Entity) ([]Entity, []Entity, StateReader) {
125125
log.Info().Msg("Getting changes")
126126
// Remove artifacts with null tags from the new state
127127
newState = f.RemoveNullTagArtifacts(newState)
128+
newEntites := FetchEntitiesFromState(newState)
128129

129-
var entityToDelete []ArtifactReader
130-
var entityToReplicate []ArtifactReader
130+
var entityToDelete []Entity
131+
var entityToReplicate []Entity
131132

132-
if oldState == nil {
133-
log.Warn().Msg("Old state is nil")
134-
return entityToDelete, newState.GetArtifacts(), newState
133+
if oldEntites == nil {
134+
log.Warn().Msg("Old state has zero entites, replicating the complete state")
135+
return entityToDelete, newEntites, newState
135136
}
136137

137138
// Create maps for quick lookups
138-
oldArtifactsMap := make(map[string]ArtifactReader)
139-
for _, oldArtifact := range oldState.GetArtifacts() {
140-
tag := oldArtifact.GetTags()[0]
141-
oldArtifactsMap[oldArtifact.GetName()+"|"+tag] = oldArtifact
139+
oldEntityMap := make(map[string]Entity)
140+
for _, oldEntity := range oldEntites {
141+
oldEntityMap[oldEntity.Name+"|"+oldEntity.Tag] = oldEntity
142142
}
143143

144144
// Check new artifacts and update lists
145-
for _, newArtifact := range newState.GetArtifacts() {
146-
nameTagKey := newArtifact.GetName() + "|" + newArtifact.GetTags()[0]
147-
oldArtifact, exists := oldArtifactsMap[nameTagKey]
145+
for _, newEntity := range newEntites {
146+
nameTagKey := newEntity.Name + "|" + newEntity.Tag
147+
oldEntity, exists := oldEntityMap[nameTagKey]
148148

149149
if !exists {
150150
// New artifact doesn't exist in old state, add to replication list
151-
entityToReplicate = append(entityToReplicate, newArtifact)
152-
} else if newArtifact.GetDigest() != oldArtifact.GetDigest() {
151+
entityToReplicate = append(entityToReplicate, newEntity)
152+
} else if newEntity.Digest != oldEntity.Digest {
153153
// Artifact exists but has changed, add to both lists
154-
entityToReplicate = append(entityToReplicate, newArtifact)
155-
entityToDelete = append(entityToDelete, oldArtifact)
154+
entityToReplicate = append(entityToReplicate, newEntity)
155+
entityToDelete = append(entityToDelete, oldEntity)
156156
}
157157

158158
// Remove processed old artifact from map
159-
delete(oldArtifactsMap, nameTagKey)
159+
delete(oldEntityMap, nameTagKey)
160160
}
161161

162162
// Remaining artifacts in oldArtifactsMap should be deleted
163-
for _, oldArtifact := range oldArtifactsMap {
164-
entityToDelete = append(entityToDelete, oldArtifact)
163+
for _, oldEntity := range oldEntityMap {
164+
entityToDelete = append(entityToDelete, oldEntity)
165165
}
166166

167167
return entityToDelete, entityToReplicate, newState
@@ -239,17 +239,6 @@ func (f *FetchAndReplicateStateProcess) RemoveNullTagArtifacts(state StateReader
239239
return state
240240
}
241241

242-
func PrintPrettyJson(info interface{}, log *zerolog.Logger, message string) error {
243-
log.Warn().Msg("Printing pretty JSON")
244-
stateJSON, err := json.MarshalIndent(info, "", " ")
245-
if err != nil {
246-
log.Error().Err(err).Msg("Error marshalling state to JSON")
247-
return err
248-
}
249-
log.Info().Msgf("%s: %s", message, stateJSON)
250-
return nil
251-
}
252-
253242
func ProcessState(state *StateReader) (*StateReader, error) {
254243
for _, artifact := range (*state).GetArtifacts() {
255244
repo, image, err := utils.GetRepositoryAndImageNameFromArtifact(artifact.GetRepository())
@@ -274,50 +263,24 @@ func (f *FetchAndReplicateStateProcess) FetchAndProcessState(fetcher StateFetche
274263
return state, nil
275264
}
276265

277-
func (f *FetchAndReplicateStateProcess) LogChanges(deleteEntity, replicateEntity []ArtifactReader, log *zerolog.Logger) {
266+
func (f *FetchAndReplicateStateProcess) LogChanges(deleteEntity, replicateEntity []Entity, log *zerolog.Logger) {
278267
log.Warn().Msgf("Total artifacts to delete: %d", len(deleteEntity))
279268
log.Warn().Msgf("Total artifacts to replicate: %d", len(replicateEntity))
280269
}
281270

282-
func processInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) {
283-
284-
if utils.IsValidURL(input) {
285-
return processURLInput(utils.FormatRegistryURL(input), username, password, log)
286-
}
287-
288-
log.Info().Msg("Input is not a valid URL, checking if it is a file path")
289-
if err := validateFilePath(input, log); err != nil {
290-
return nil, err
291-
}
292-
293-
return processFileInput(input, username, password, log)
294-
}
295-
296-
func validateFilePath(path string, log *zerolog.Logger) error {
297-
if utils.HasInvalidPathChars(path) {
298-
log.Error().Msg("Path contains invalid characters")
299-
return fmt.Errorf("invalid file path: %s", path)
300-
}
301-
if err := utils.GetAbsFilePath(path); err != nil {
302-
log.Error().Err(err).Msg("No file found")
303-
return fmt.Errorf("no file found: %s", path)
271+
func FetchEntitiesFromState(state StateReader) []Entity {
272+
var entities []Entity
273+
for _, artifact := range state.GetArtifacts() {
274+
for _, tag := range artifact.GetTags() {
275+
entities = append(entities, Entity{
276+
Name: artifact.GetName(),
277+
Repository: artifact.GetRepository(),
278+
Tag: tag,
279+
Digest: artifact.GetDigest(),
280+
})
281+
}
304282
}
305-
return nil
306-
}
307-
308-
func processURLInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) {
309-
log.Info().Msg("Input is a valid URL")
310-
config.SetRemoteRegistryURL(input)
311-
312-
stateArtifactFetcher := NewURLStateFetcher(input, username, password)
313-
314-
return stateArtifactFetcher, nil
315-
}
316-
317-
func processFileInput(input, username, password string, log *zerolog.Logger) (StateFetcher, error) {
318-
log.Info().Msg("Input is a valid file path")
319-
stateArtifactFetcher := NewFileStateFetcher(input, username, password)
320-
return stateArtifactFetcher, nil
283+
return entities
321284
}
322285

323286
func (f *FetchAndReplicateStateProcess) AddEventBroker(eventBroker *scheduler.EventBroker, ctx context.Context) {

0 commit comments

Comments
 (0)