Skip to content

Commit

Permalink
composable ray resource logger
Browse files Browse the repository at this point in the history
fix

fix
  • Loading branch information
simotw committed Feb 19, 2025
1 parent d8fe937 commit 41d5c97
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 82 deletions.
82 changes: 0 additions & 82 deletions ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package sampleyaml

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"

. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -113,81 +109,3 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg
}
}
}

func WithRayJobResourceLogger(t Test) types.GomegaTestingT {
return &RayJobResourceLogger{t: t}
}

type RayJobResourceLogger struct {
t Test
}

func (l *RayJobResourceLogger) Helper() {
l.t.T().Helper()
}

func (l *RayJobResourceLogger) Fatalf(format string, args ...interface{}) {
l.Helper()
var sb strings.Builder

// Log the original failure message
fmt.Fprintf(&sb, format, args...)

if pods, err := l.t.Client().Core().CoreV1().Pods("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(&sb, "\n=== Pods across all namespaces ===\n")
for _, pod := range pods.Items {
podJSON, err := json.MarshalIndent(pod, "", " ")
if err != nil {
fmt.Fprintf(&sb, "Error marshaling pod %s/%s: %v\n", pod.Namespace, pod.Name, err)
continue
}
fmt.Fprintf(&sb, "---\n# Pod: %s/%s\n%s\n", pod.Namespace, pod.Name, string(podJSON))
}
} else {
fmt.Fprintf(&sb, "Failed to get pods: %v\n", err)
}

if jobs, err := l.t.Client().Core().BatchV1().Jobs("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(&sb, "\n=== Jobs across all namespaces ===\n")
for _, job := range jobs.Items {
jobJSON, err := json.MarshalIndent(job, "", " ")
if err != nil {
fmt.Fprintf(&sb, "Error marshaling job %s/%s: %v\n", job.Namespace, job.Name, err)
continue
}
fmt.Fprintf(&sb, "---\n# Job: %s/%s\n%s\n", job.Namespace, job.Name, string(jobJSON))
}
} else {
fmt.Fprintf(&sb, "Failed to get jobs: %v\n", err)
}

if services, err := l.t.Client().Core().CoreV1().Services("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(&sb, "\n=== Services across all namespaces ===\n")
for _, svc := range services.Items {
serviceJSON, err := json.MarshalIndent(svc, "", " ")
if err != nil {
fmt.Fprintf(&sb, "Error marshaling service %s/%s: %v\n", svc.Namespace, svc.Name, err)
continue
}
fmt.Fprintf(&sb, "---\n# Service: %s/%s\n%s\n", svc.Namespace, svc.Name, string(serviceJSON))
}
} else {
fmt.Fprintf(&sb, "Failed to get services: %v\n", err)
}

if rayJobs, err := l.t.Client().Ray().RayV1().RayJobs("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(&sb, "\n=== RayJobs across all namespaces ===\n")
for _, rayJob := range rayJobs.Items {
rayJobJSON, err := json.MarshalIndent(rayJob, "", " ")
if err != nil {
fmt.Fprintf(&sb, "Error marshaling rayjob %s/%s: %v\n", rayJob.Namespace, rayJob.Name, err)
continue
}
fmt.Fprintf(&sb, "---\n# RayJob: %s/%s\n%s\n", rayJob.Namespace, rayJob.Name, string(rayJobJSON))
}
} else {
fmt.Fprintf(&sb, "Failed to get rayjobs: %v\n", err)
}

l.t.T().Fatal(sb.String())
}
134 changes: 134 additions & 0 deletions ray-operator/test/support/resource_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package support

import (
"encoding/json"
"fmt"
"strings"

"github.com/onsi/gomega/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
PODS_RESOURCE = iota
JOBS_RESOURCE
SERVICES_RESOURCE
RAYJOBS_RESOURCE
)

type ResourceLoggerFunc = func(sb *strings.Builder)

func WithRayJobResourceLogger(t Test) types.GomegaTestingT {
resources := []int{PODS_RESOURCE, JOBS_RESOURCE, SERVICES_RESOURCE, RAYJOBS_RESOURCE}
return &RayResourceLogger{t: t, resources: resources}
}

type RayResourceLogger struct {
t Test
resources []int
}

func (l *RayResourceLogger) Helper() {
l.t.T().Helper()
}

func (l *RayResourceLogger) Fatalf(format string, args ...interface{}) {
l.Helper()
loggers := l.GetLoggers()
var sb strings.Builder
// Log the original failure message
fmt.Fprintf(&sb, format, args...)
for _, logger := range loggers {
logger(&sb)
}
l.t.T().Fatal(sb.String())
}

func (l *RayResourceLogger) FprintfPods(sb *strings.Builder) {
if pods, err := l.t.Client().Core().CoreV1().Pods("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(sb, "\n=== Pods across all namespaces ===\n")
for _, pod := range pods.Items {
podJSON, err := json.MarshalIndent(pod, "", " ")
if err != nil {
fmt.Fprintf(sb, "Error marshaling pod %s/%s: %v\n", pod.Namespace, pod.Name, err)
continue
}
fmt.Fprintf(sb, "---\n# Pod: %s/%s\n%s\n", pod.Namespace, pod.Name, string(podJSON))
}
} else {
fmt.Fprintf(sb, "Failed to get pods: %v\n", err)
}
}

func (l *RayResourceLogger) FprintfJobs(sb *strings.Builder) {
if jobs, err := l.t.Client().Core().BatchV1().Jobs("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(sb, "\n=== Jobs across all namespaces ===\n")
for _, job := range jobs.Items {
jobJSON, err := json.MarshalIndent(job, "", " ")
if err != nil {
fmt.Fprintf(sb, "Error marshaling job %s/%s: %v\n", job.Namespace, job.Name, err)
continue
}
fmt.Fprintf(sb, "---\n# Job: %s/%s\n%s\n", job.Namespace, job.Name, string(jobJSON))
}
} else {
fmt.Fprintf(sb, "Failed to get jobs: %v\n", err)
}
}

func (l *RayResourceLogger) FprintServices(sb *strings.Builder) {
if services, err := l.t.Client().Core().CoreV1().Services("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(sb, "\n=== Services across all namespaces ===\n")
for _, svc := range services.Items {
serviceJSON, err := json.MarshalIndent(svc, "", " ")
if err != nil {
fmt.Fprintf(sb, "Error marshaling service %s/%s: %v\n", svc.Namespace, svc.Name, err)
continue
}
fmt.Fprintf(sb, "---\n# Service: %s/%s\n%s\n", svc.Namespace, svc.Name, string(serviceJSON))
}
} else {
fmt.Fprintf(sb, "Failed to get services: %v\n", err)
}
}

func (l *RayResourceLogger) FprintRayJobs(sb *strings.Builder) {
if rayJobs, err := l.t.Client().Ray().RayV1().RayJobs("").List(l.t.Ctx(), metav1.ListOptions{}); err == nil {
fmt.Fprintf(sb, "\n=== RayJobs across all namespaces ===\n")
for _, rayJob := range rayJobs.Items {
rayJobJSON, err := json.MarshalIndent(rayJob, "", " ")
if err != nil {
fmt.Fprintf(sb, "Error marshaling rayjob %s/%s: %v\n", rayJob.Namespace, rayJob.Name, err)
continue
}
fmt.Fprintf(sb, "---\n# RayJob: %s/%s\n%s\n", rayJob.Namespace, rayJob.Name, string(rayJobJSON))
}
} else {
fmt.Fprintf(sb, "Failed to get rayjobs: %v\n", err)
}
}

func (l *RayResourceLogger) MakeFprintUnsupportedResource(resource int) func(sb *strings.Builder) {
return func(sb *strings.Builder) {
fmt.Fprintf(sb, "Error: Unsupported resource: %d for RayResourceLogger\n", resource)
}
}

func (l *RayResourceLogger) GetLoggers() []ResourceLoggerFunc {
loggers := []ResourceLoggerFunc{}
for _, resource := range l.resources {
switch resource {
case PODS_RESOURCE:
loggers = append(loggers, l.FprintfPods)
case JOBS_RESOURCE:
loggers = append(loggers, l.FprintfJobs)
case SERVICES_RESOURCE:
loggers = append(loggers, l.FprintServices)
case RAYJOBS_RESOURCE:
loggers = append(loggers, l.FprintRayJobs)
default:
loggers = append(loggers, l.MakeFprintUnsupportedResource(resource))
}
}
return loggers
}

0 comments on commit 41d5c97

Please sign in to comment.