Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add continuous update of k8s job logs #19

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 78 additions & 25 deletions worker/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,39 +82,66 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
_, err = client.Create(ctx, job, metav1.CreateOptions{})

if err != nil {
return fmt.Errorf("creating job: %v", err)
return fmt.Errorf("error while creating job: %v", err)
}

waitForJobFinish(ctx, client, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)})
err = waitForJobPodStart(ctx, kcmd.Namespace, fmt.Sprintf("%s-%d", taskId, kcmd.JobId))
if err != nil {
return fmt.Errorf("error while waiting for job pod to start: %v", err)
}

pods, err := clientset.CoreV1().Pods(kcmd.Namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)})
go kcmd.streamLogs()

err = waitForJobFinish(ctx, client, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)})
if err != nil {
return err
return fmt.Errorf("error while waiting for job to finish: %v", err)
}

for _, v := range pods.Items {
req := clientset.CoreV1().Pods(kcmd.Namespace).GetLogs(v.Name, &corev1.PodLogOptions{})
podLogs, err := req.Stream(ctx)
return nil
}

if err != nil {
return err
}
func (kcmd KubernetesCommand) streamLogs() {
clientset, err := getKubernetesClientset()
if err != nil {
fmt.Println("Error while getting kubernetes clientset: ", err)
return
}

pods, err := clientset.CoreV1().Pods(kcmd.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", kcmd.TaskId, kcmd.JobId)})
if err != nil {
fmt.Println("Error while getting pods: ", err)
return
}

pod := pods.Items[0]
req := clientset.CoreV1().Pods(kcmd.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: true})
stream, err := req.Stream(context.TODO())

defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
fmt.Println("Error while opening log stream: ", err)
return
}

for {
buf := make([]byte, 512)
numBytes, err := stream.Read(buf)
if err != nil {
return err
if err == io.EOF {
fmt.Printf("Pod log stream closed.\n")
return
}

fmt.Printf("Error while reading logs for pod: %v\n", err)
return
}

var bytes = buf.Bytes()
_, err = kcmd.Stdout.Write(bytes)
_, err = kcmd.Stdout.Write(buf[:numBytes])

if err != nil {
return err
fmt.Printf("Error while writing logs: %v\n", err)
return
}
}

return nil
}

// Deletes the job running the task.
Expand All @@ -132,36 +159,62 @@ func (kcmd KubernetesCommand) Stop() error {
})

if err != nil {
return fmt.Errorf("deleting job: %v", err)
return fmt.Errorf("error while deleting job: %v", err)
}

return nil
}

func waitForJobPodStart(ctx context.Context, namespace string, jobName string) error {
clientset, err := getKubernetesClientset()
if err != nil {
return err
}

ticker := time.NewTicker(10 * time.Second)

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s", jobName)})

if err != nil {
return err
}

if len(pods.Items) > 0 {
return nil
}
}
}
}

// Waits until the job finishes
func waitForJobFinish(ctx context.Context, client batchv1.JobInterface, listOptions metav1.ListOptions) {
func waitForJobFinish(ctx context.Context, client batchv1.JobInterface, listOptions metav1.ListOptions) error {
ticker := time.NewTicker(10 * time.Second)

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
jobs, err := client.List(ctx, listOptions)

if err != nil {
return
return err
}

if len(jobs.Items) == 0 {
// Should not happen
return
return fmt.Errorf("job not found")
}

// There should be always only one job
job := jobs.Items[0]
if job.Status.Succeeded > 0 || job.Status.Failed > 0 {
return
return nil
}
}
}
Expand Down