Skip to content

Commit ee8d821

Browse files
authored
imageutil: further containers/image consolidation (#1731)
1 parent 43dfc65 commit ee8d821

31 files changed

+1753
-2102
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ vendor/
2828

2929
# editor and IDE paraphernalia
3030
.idea/
31+
.run/
3132
*.swp
3233
*.swo
3334
*~

Diff for: catalogd/cmd/catalogd/main.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"crypto/tls"
2122
"errors"
2223
"flag"
2324
"fmt"
24-
"log"
2525
"net/url"
2626
"os"
2727
"path/filepath"
2828
"strings"
2929
"time"
3030

3131
"github.com/containers/image/v5/types"
32-
"github.com/go-logr/logr"
3332
"github.com/sirupsen/logrus"
3433
"github.com/spf13/pflag"
3534
corev1 "k8s.io/api/core/v1"
@@ -50,6 +49,7 @@ import (
5049
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
5150
"sigs.k8s.io/controller-runtime/pkg/client"
5251
"sigs.k8s.io/controller-runtime/pkg/healthz"
52+
"sigs.k8s.io/controller-runtime/pkg/log"
5353
"sigs.k8s.io/controller-runtime/pkg/metrics"
5454
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
5555
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -61,10 +61,10 @@ import (
6161
"github.com/operator-framework/operator-controller/internal/catalogd/garbagecollection"
6262
catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
6363
"github.com/operator-framework/operator-controller/internal/catalogd/serverutil"
64-
"github.com/operator-framework/operator-controller/internal/catalogd/source"
6564
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
6665
"github.com/operator-framework/operator-controller/internal/catalogd/webhook"
6766
fsutil "github.com/operator-framework/operator-controller/internal/shared/util/fs"
67+
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
6868
"github.com/operator-framework/operator-controller/internal/shared/version"
6969
)
7070

@@ -177,7 +177,8 @@ func main() {
177177

178178
cw, err := certwatcher.New(certFile, keyFile)
179179
if err != nil {
180-
log.Fatalf("Failed to initialize certificate watcher: %v", err)
180+
setupLog.Error(err, "failed to initialize certificate watcher")
181+
os.Exit(1)
181182
}
182183

183184
tlsOpts := func(config *tls.Config) {
@@ -273,14 +274,16 @@ func main() {
273274
os.Exit(1)
274275
}
275276

276-
unpackCacheBasePath := filepath.Join(cacheDir, source.UnpackCacheDir)
277+
unpackCacheBasePath := filepath.Join(cacheDir, "unpack")
277278
if err := os.MkdirAll(unpackCacheBasePath, 0770); err != nil {
278279
setupLog.Error(err, "unable to create cache directory for unpacking")
279280
os.Exit(1)
280281
}
281-
unpacker := &source.ContainersImageRegistry{
282-
BaseCachePath: unpackCacheBasePath,
283-
SourceContextFunc: func(logger logr.Logger) (*types.SystemContext, error) {
282+
283+
imageCache := imageutil.CatalogCache(unpackCacheBasePath)
284+
imagePuller := &imageutil.ContainersImagePuller{
285+
SourceCtxFunc: func(ctx context.Context) (*types.SystemContext, error) {
286+
logger := log.FromContext(ctx)
284287
srcContext := &types.SystemContext{
285288
DockerCertPath: pullCasDir,
286289
OCICertPath: pullCasDir,
@@ -334,9 +337,10 @@ func main() {
334337
}
335338

336339
if err = (&corecontrollers.ClusterCatalogReconciler{
337-
Client: mgr.GetClient(),
338-
Unpacker: unpacker,
339-
Storage: localStorage,
340+
Client: mgr.GetClient(),
341+
ImageCache: imageCache,
342+
ImagePuller: imagePuller,
343+
Storage: localStorage,
340344
}).SetupWithManager(mgr); err != nil {
341345
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
342346
os.Exit(1)

Diff for: cmd/operator-controller/main.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"time"
3030

3131
"github.com/containers/image/v5/types"
32-
"github.com/go-logr/logr"
3332
"github.com/sirupsen/logrus"
3433
"github.com/spf13/pflag"
3534
corev1 "k8s.io/api/core/v1"
@@ -49,6 +48,7 @@ import (
4948
"sigs.k8s.io/controller-runtime/pkg/client"
5049
crfinalizer "sigs.k8s.io/controller-runtime/pkg/finalizer"
5150
"sigs.k8s.io/controller-runtime/pkg/healthz"
51+
"sigs.k8s.io/controller-runtime/pkg/log"
5252
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
5353
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
5454

@@ -65,12 +65,12 @@ import (
6565
"github.com/operator-framework/operator-controller/internal/operator-controller/controllers"
6666
"github.com/operator-framework/operator-controller/internal/operator-controller/features"
6767
"github.com/operator-framework/operator-controller/internal/operator-controller/finalizers"
68-
"github.com/operator-framework/operator-controller/internal/operator-controller/httputil"
6968
"github.com/operator-framework/operator-controller/internal/operator-controller/resolve"
7069
"github.com/operator-framework/operator-controller/internal/operator-controller/rukpak/preflights/crdupgradesafety"
71-
"github.com/operator-framework/operator-controller/internal/operator-controller/rukpak/source"
7270
"github.com/operator-framework/operator-controller/internal/operator-controller/scheme"
7371
fsutil "github.com/operator-framework/operator-controller/internal/shared/util/fs"
72+
httputil "github.com/operator-framework/operator-controller/internal/shared/util/http"
73+
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
7474
"github.com/operator-framework/operator-controller/internal/shared/version"
7575
)
7676

@@ -315,13 +315,14 @@ func main() {
315315
os.Exit(1)
316316
}
317317

318-
unpacker := &source.ContainersImageRegistry{
319-
BaseCachePath: filepath.Join(cachePath, "unpack"),
320-
SourceContextFunc: func(logger logr.Logger) (*types.SystemContext, error) {
318+
imageCache := imageutil.BundleCache(filepath.Join(cachePath, "unpack"))
319+
imagePuller := &imageutil.ContainersImagePuller{
320+
SourceCtxFunc: func(ctx context.Context) (*types.SystemContext, error) {
321321
srcContext := &types.SystemContext{
322322
DockerCertPath: pullCasDir,
323323
OCICertPath: pullCasDir,
324324
}
325+
logger := log.FromContext(ctx)
325326
if _, err := os.Stat(authFilePath); err == nil && globalPullSecretKey != nil {
326327
logger.Info("using available authentication information for pulling image")
327328
srcContext.AuthFilePath = authFilePath
@@ -336,7 +337,7 @@ func main() {
336337

337338
clusterExtensionFinalizers := crfinalizer.NewFinalizers()
338339
if err := clusterExtensionFinalizers.Register(controllers.ClusterExtensionCleanupUnpackCacheFinalizer, finalizers.FinalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) {
339-
return crfinalizer.Result{}, unpacker.Cleanup(ctx, &source.BundleSource{Name: obj.GetName()})
340+
return crfinalizer.Result{}, imageCache.Delete(ctx, obj.GetName())
340341
})); err != nil {
341342
setupLog.Error(err, "unable to register finalizer", "finalizerKey", controllers.ClusterExtensionCleanupUnpackCacheFinalizer)
342343
os.Exit(1)
@@ -399,7 +400,8 @@ func main() {
399400
if err = (&controllers.ClusterExtensionReconciler{
400401
Client: cl,
401402
Resolver: resolver,
402-
Unpacker: unpacker,
403+
ImageCache: imageCache,
404+
ImagePuller: imagePuller,
403405
Applier: helmApplier,
404406
InstalledBundleGetter: &controllers.DefaultInstalledBundleGetter{ActionClientGetter: acg},
405407
Finalizers: clusterExtensionFinalizers,

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/onsi/ginkgo/v2 v2.22.2
1919
github.com/onsi/gomega v1.36.2
2020
github.com/opencontainers/go-digest v1.0.0
21+
github.com/opencontainers/image-spec v1.1.0
2122
github.com/operator-framework/api v0.29.0
2223
github.com/operator-framework/helm-operator-plugins v0.8.0
2324
github.com/operator-framework/operator-registry v1.50.0
@@ -177,7 +178,6 @@ require (
177178
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
178179
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
179180
github.com/oklog/ulid v1.3.1 // indirect
180-
github.com/opencontainers/image-spec v1.1.0 // indirect
181181
github.com/opencontainers/runtime-spec v1.2.0 // indirect
182182
github.com/openshift/crd-schema-checker v0.0.0-20240404194209-35a9033b1d11 // indirect
183183
github.com/operator-framework/operator-lib v0.17.0 // indirect

Diff for: internal/catalogd/controllers/core/clustercatalog_controller.go

+52-34
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/containers/image/v5/docker/reference"
2728
"k8s.io/apimachinery/pkg/api/equality"
2829
"k8s.io/apimachinery/pkg/api/meta"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -38,8 +39,8 @@ import (
3839
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3940

4041
catalogdv1 "github.com/operator-framework/operator-controller/catalogd/api/v1"
41-
"github.com/operator-framework/operator-controller/internal/catalogd/source"
4242
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
43+
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
4344
)
4445

4546
const (
@@ -52,8 +53,11 @@ const (
5253
// ClusterCatalogReconciler reconciles a Catalog object
5354
type ClusterCatalogReconciler struct {
5455
client.Client
55-
Unpacker source.Unpacker
56-
Storage storage.Instance
56+
57+
ImageCache imageutil.Cache
58+
ImagePuller imageutil.Puller
59+
60+
Storage storage.Instance
5761

5862
finalizers crfinalizer.Finalizers
5963

@@ -66,8 +70,10 @@ type ClusterCatalogReconciler struct {
6670
}
6771

6872
type storedCatalogData struct {
73+
ref reference.Canonical
74+
lastUnpack time.Time
75+
lastSuccessfulPoll time.Time
6976
observedGeneration int64
70-
unpackResult source.Result
7177
}
7278

7379
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch;create;update;patch;delete
@@ -216,50 +222,58 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *catal
216222
case catalog.Generation != storedCatalog.observedGeneration:
217223
l.Info("unpack required: catalog generation differs from observed generation")
218224
needsUnpack = true
219-
case r.needsPoll(storedCatalog.unpackResult.LastSuccessfulPollAttempt.Time, catalog):
225+
case r.needsPoll(storedCatalog.lastSuccessfulPoll, catalog):
220226
l.Info("unpack required: poll duration has elapsed")
221227
needsUnpack = true
222228
}
223229

224230
if !needsUnpack {
225231
// No need to update the status because we've already checked
226232
// that it is set correctly. Otherwise, we'd be unpacking again.
227-
return nextPollResult(storedCatalog.unpackResult.LastSuccessfulPollAttempt.Time, catalog), nil
233+
return nextPollResult(storedCatalog.lastSuccessfulPoll, catalog), nil
234+
}
235+
236+
if catalog.Spec.Source.Type != catalogdv1.SourceTypeImage {
237+
err := reconcile.TerminalError(fmt.Errorf("unknown source type %q", catalog.Spec.Source.Type))
238+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
239+
return ctrl.Result{}, err
240+
}
241+
if catalog.Spec.Source.Image == nil {
242+
err := reconcile.TerminalError(fmt.Errorf("error parsing ClusterCatalog %q, image source is nil", catalog.Name))
243+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
244+
return ctrl.Result{}, err
228245
}
229246

230-
unpackResult, err := r.Unpacker.Unpack(ctx, catalog)
247+
fsys, canonicalRef, unpackTime, err := r.ImagePuller.Pull(ctx, catalog.Name, catalog.Spec.Source.Image.Ref, r.ImageCache)
231248
if err != nil {
232249
unpackErr := fmt.Errorf("source catalog content: %w", err)
233250
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), unpackErr)
234251
return ctrl.Result{}, unpackErr
235252
}
236253

237-
switch unpackResult.State {
238-
case source.StateUnpacked:
239-
// TODO: We should check to see if the unpacked result has the same content
240-
// as the already unpacked content. If it does, we should skip this rest
241-
// of the unpacking steps.
242-
err := r.Storage.Store(ctx, catalog.Name, unpackResult.FS)
243-
if err != nil {
244-
storageErr := fmt.Errorf("error storing fbc: %v", err)
245-
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
246-
return ctrl.Result{}, storageErr
247-
}
248-
baseURL := r.Storage.BaseURL(catalog.Name)
249-
250-
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
251-
updateStatusServing(&catalog.Status, *unpackResult, baseURL, catalog.GetGeneration())
252-
default:
253-
panic(fmt.Sprintf("unknown unpack state %q", unpackResult.State))
254+
// TODO: We should check to see if the unpacked result has the same content
255+
// as the already unpacked content. If it does, we should skip this rest
256+
// of the unpacking steps.
257+
if err := r.Storage.Store(ctx, catalog.Name, fsys); err != nil {
258+
storageErr := fmt.Errorf("error storing fbc: %v", err)
259+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
260+
return ctrl.Result{}, storageErr
254261
}
262+
baseURL := r.Storage.BaseURL(catalog.Name)
263+
264+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
265+
updateStatusServing(&catalog.Status, canonicalRef, unpackTime, baseURL, catalog.GetGeneration())
255266

267+
lastSuccessfulPoll := time.Now()
256268
r.storedCatalogsMu.Lock()
257269
r.storedCatalogs[catalog.Name] = storedCatalogData{
258-
unpackResult: *unpackResult,
270+
ref: canonicalRef,
271+
lastUnpack: unpackTime,
272+
lastSuccessfulPoll: lastSuccessfulPoll,
259273
observedGeneration: catalog.GetGeneration(),
260274
}
261275
r.storedCatalogsMu.Unlock()
262-
return nextPollResult(unpackResult.LastSuccessfulPollAttempt.Time, catalog), nil
276+
return nextPollResult(lastSuccessfulPoll, catalog), nil
263277
}
264278

265279
func (r *ClusterCatalogReconciler) getCurrentState(catalog *catalogdv1.ClusterCatalog) (*catalogdv1.ClusterCatalogStatus, storedCatalogData, bool) {
@@ -272,7 +286,7 @@ func (r *ClusterCatalogReconciler) getCurrentState(catalog *catalogdv1.ClusterCa
272286
// Set expected status based on what we see in the stored catalog
273287
clearUnknownConditions(expectedStatus)
274288
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
275-
updateStatusServing(expectedStatus, storedCatalog.unpackResult, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
289+
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
276290
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
277291
}
278292

@@ -325,13 +339,17 @@ func updateStatusProgressing(status *catalogdv1.ClusterCatalogStatus, generation
325339
meta.SetStatusCondition(&status.Conditions, progressingCond)
326340
}
327341

328-
func updateStatusServing(status *catalogdv1.ClusterCatalogStatus, result source.Result, baseURL string, generation int64) {
329-
status.ResolvedSource = result.ResolvedSource
330-
if status.URLs == nil {
331-
status.URLs = &catalogdv1.ClusterCatalogURLs{}
342+
func updateStatusServing(status *catalogdv1.ClusterCatalogStatus, ref reference.Canonical, modTime time.Time, baseURL string, generation int64) {
343+
status.ResolvedSource = &catalogdv1.ResolvedCatalogSource{
344+
Type: catalogdv1.SourceTypeImage,
345+
Image: &catalogdv1.ResolvedImageSource{
346+
Ref: ref.String(),
347+
},
348+
}
349+
status.URLs = &catalogdv1.ClusterCatalogURLs{
350+
Base: baseURL,
332351
}
333-
status.URLs.Base = baseURL
334-
status.LastUnpacked = ptr.To(metav1.NewTime(result.UnpackTime))
352+
status.LastUnpacked = ptr.To(metav1.NewTime(modTime.Truncate(time.Second)))
335353
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
336354
Type: catalogdv1.TypeServing,
337355
Status: metav1.ConditionTrue,
@@ -434,7 +452,7 @@ func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catal
434452
return err
435453
}
436454
updateStatusNotServing(&catalog.Status, catalog.GetGeneration())
437-
if err := r.Unpacker.Cleanup(ctx, catalog); err != nil {
455+
if err := r.ImageCache.Delete(ctx, catalog.Name); err != nil {
438456
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
439457
return err
440458
}

0 commit comments

Comments
 (0)