Skip to content

Commit

Permalink
Code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
catttam committed Jun 21, 2024
1 parent 6078852 commit 8bcce24
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 110 deletions.
124 changes: 47 additions & 77 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *

// Create input buckets
for _, in := range service.Input {
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(in.Provider), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}
provID, provName = getProviderInfo(in.Provider)

// Only allow input from MinIO and dCache
if provName != types.MinIOName && provName != types.WebDavName {
Expand Down Expand Up @@ -327,17 +318,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *

// Create output buckets
for _, out := range service.Output {
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(out.Provider), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}

provID, provName = getProviderInfo(out.Provider)
// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
Expand Down Expand Up @@ -402,76 +383,50 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
}

if service.Mount.Provider != "" {
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(service.Mount.Provider), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}
provID, provName = getProviderInfo(service.Mount.Provider)

// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID)
}

path := strings.Trim(service.Mount.Path, " /")
// Split buckets and folders from path
splitPath := strings.SplitN(path, "/", 2)

switch provName {
case types.MinIOName, types.S3Name:
// Use the appropriate client
if provName == types.MinIOName {
s3Client = service.StorageProviders.MinIO[provID].GetS3Client()
} else {
s3Client = service.StorageProviders.S3[provID].GetS3Client()
}
// Create bucket
_, err := s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(splitPath[0]),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Check if the error is caused because the bucket already exists
if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou {
log.Printf("The bucket \"%s\" already exists\n", splitPath[0])
} else {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
// Currently only MinIO/S3 are supported
// Use the appropriate client
if provName == types.MinIOName {
s3Client = service.StorageProviders.MinIO[provID].GetS3Client()
} else {
s3Client = service.StorageProviders.S3[provID].GetS3Client()
}
// Create bucket
_, err := s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(splitPath[0]),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Check if the error is caused because the bucket already exists
if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou {
log.Printf("The bucket \"%s\" already exists\n", splitPath[0])
} else {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
} else {
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
// Create folder(s)
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
folderKey := fmt.Sprintf("%s/", splitPath[1])
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(splitPath[0]),
Key: aws.String(folderKey),
})
if err != nil {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
}
case types.OnedataName:
cdmiClient = service.StorageProviders.Onedata[provID].GetCDMIClient()
err := cdmiClient.CreateContainer(fmt.Sprintf("%s/%s", service.StorageProviders.Onedata[provID].Space, path), true)
}
// Create folder(s)
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
folderKey := fmt.Sprintf("%s/", splitPath[1])
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(splitPath[0]),
Key: aws.String(folderKey),
})
if err != nil {
if err == cdmi.ErrBadRequest {
log.Printf("Error creating \"%s\" folder in Onedata. Error: %v\n", path, err)
} else {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
return fmt.Errorf("error connecting to Onedata's Oneprovider \"%s\". Error: %v", service.StorageProviders.Onedata[provID].OneproviderHost, err)
}
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
}
}
Expand All @@ -494,6 +449,21 @@ func isStorageProviderDefined(storageName string, storageID string, providers *t
return ok
}

func getProviderInfo(rawInfo string) (string, string) {
var provID, provName string
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(rawInfo), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}
return provID, provName
}

func checkIdentity(service *types.Service, cfg *types.Config, authHeader string) error {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
rawToken := strings.TrimPrefix(authHeader, "Bearer ")
Expand All @@ -505,7 +475,7 @@ func checkIdentity(service *types.Service, cfg *types.Config, authHeader string)
}

if !hasVO {
return fmt.Errorf("This user isn't enrrolled on the vo: %v", service.VO)
return fmt.Errorf("this user isn't enrrolled on the vo: %v", service.VO)
}

service.Labels["vo"] = service.VO
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/expose.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func DeleteExpose(name string, kubeClientset kubernetes.Interface, cfg *Config)
}
err = kubeClientset.CoreV1().Pods(cfg.ServicesNamespace).DeleteCollection(context.TODO(), delete, listOpts)
if err != nil {
return fmt.Errorf("error deleting pods of exposed service mount '%s': %v", name, err)
return fmt.Errorf("error deleting pods of exposed service '%s': %v", name, err)
}
return nil
}
Expand Down
59 changes: 28 additions & 31 deletions pkg/types/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
)

const (
Rclone_containerName = "rclone-container"
rclone_containerImage = "rclone/rclone"
rclone_commandImage = `mkdir -p $MNT_POINT/$MINIO_BUCKET
rcloneContainerName = "rclone-container"
rcloneContainerImage = "rclone/rclone"
rcloneStartCommand = `mkdir -p $MNT_POINT/$MINIO_BUCKET
rclone config create minio s3 provider=Minio access_key_id=$AWS_ACCESS_KEY_ID secret_access_key=$AWS_SECRET_ACCESS_KEY endpoint=$MINIO_ENDPOINT acl=public-read-write
rclone mount minio:/$MINIO_BUCKET $MNT_POINT/$MINIO_BUCKET --dir-cache-time 10s --allow-other --allow-non-empty --umask 0007 --uid 1000 --gid 100 --allow-other --no-checksum &
pid=$!
Expand All @@ -36,39 +36,39 @@ while true; do
fi
sleep 5
done`
rclone_folder_mount = "/mnt"
rclone_volume_name = "shared-data"
ephemeral_volume_name = "ephemeral-data"
ephemeral_volume_mount = "/tmpfolder"
rcloneFolderMount = "/mnt"
rcloneVolumeName = "shared-data"
ephemeralVolumeName = "ephemeral-data"
ephemeralVolumeMount = "/tmpfolder"
)

func SetMount(podSpec *v1.PodSpec, service Service, cfg *Config) {
podSpec.Containers = append(podSpec.Containers, secondPodSpec(service, cfg))
podSpec.Containers = append(podSpec.Containers, sidecarPodSpec(service))
termination := int64(5)
podSpec.TerminationGracePeriodSeconds = &termination
addVolume(podSpec, service, cfg)
addVolume(podSpec)
}

func addVolume(podSpec *v1.PodSpec, service Service, cfg *Config) {
func addVolume(podSpec *v1.PodSpec) {
hostToContainer := v1.MountPropagationHostToContainer
volumeMountShare := v1.VolumeMount{
Name: rclone_volume_name,
MountPath: rclone_folder_mount,
Name: rcloneVolumeName,
MountPath: rcloneFolderMount,
MountPropagation: &hostToContainer,
}
volumeshare := v1.Volume{
Name: rclone_volume_name,
Name: rcloneVolumeName,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}
ephemeralvolumeMountShare := v1.VolumeMount{
Name: ephemeral_volume_name,
MountPath: ephemeral_volume_mount,
Name: ephemeralVolumeName,
MountPath: ephemeralVolumeMount,
MountPropagation: &hostToContainer,
}
ephemeralvolumeshare := v1.Volume{
Name: ephemeral_volume_name,
Name: ephemeralVolumeName,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
Expand All @@ -79,55 +79,52 @@ func addVolume(podSpec *v1.PodSpec, service Service, cfg *Config) {
podSpec.Volumes = append(podSpec.Volumes, ephemeralvolumeshare)
}

func secondPodSpec(service Service, cfg *Config) v1.Container {
func sidecarPodSpec(service Service) v1.Container {
bidirectional := v1.MountPropagationBidirectional
var ptr *bool // Uninitialized pointer
value := true
ptr = &value
container := v1.Container{
Name: Rclone_containerName,
Image: rclone_containerImage,
Name: rcloneContainerName,
Image: rcloneContainerImage,
Command: []string{"/bin/sh"},
Args: []string{"-c", rclone_commandImage},
Args: []string{"-c", rcloneStartCommand},
Ports: []v1.ContainerPort{
{
Name: "",
ContainerPort: 9000,
},
},
SecurityContext: &v1.SecurityContext{Privileged: ptr},
Env: []v1.EnvVar{
{
Name: "MNT_POINT",
Value: rclone_folder_mount,
Value: rcloneFolderMount,
},
},
VolumeMounts: []v1.VolumeMount{
{
Name: rclone_volume_name,
MountPath: rclone_folder_mount,
Name: rcloneVolumeName,
MountPath: rcloneFolderMount,
MountPropagation: &bidirectional,
},
{
Name: ephemeral_volume_name,
MountPath: ephemeral_volume_mount,
Name: ephemeralVolumeName,
MountPath: ephemeralVolumeMount,
MountPropagation: &bidirectional,
},
},
}

provider := strings.Split(service.Mount.Provider, ".")
if provider[0] == MinIOName {
credentialsValue := setCredentialsMinIO(service, cfg, provider[1])
for index := 0; index < len(credentialsValue); index++ {
container.Env = append(container.Env, credentialsValue[index])
}
MinIOEnvVars := setMinIOEnvVars(service, provider[1])
container.Env = append(container.Env, MinIOEnvVars...)
}
return container

}

func setCredentialsMinIO(service Service, cfg *Config, providerId string) []v1.EnvVar {
func setMinIOEnvVars(service Service, providerId string) []v1.EnvVar {
//service.Mount.Provider
credentials := []v1.EnvVar{
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ type Service struct {
// Optional (If the list is empty we asume the visibility is public for all cluster users)
AllowedUsers []string `json:"allowed_users"`

// Output StorageIOConfig slice with the output service configuration
// Configuration to create a storage provider as a volume inside the service container
// Optional
Mount StorageIOConfig `json:"mount"`
}
Expand Down

0 comments on commit 8bcce24

Please sign in to comment.