From eda4d3e1a095c73f2a3d039a151b12449db8508e Mon Sep 17 00:00:00 2001 From: Ulbrich Robert Date: Tue, 10 Sep 2024 14:26:58 +0200 Subject: [PATCH] Implemented connection reuse for smtp emailer Signed-off-by: Ulbrich Robert --- .../implementations/smtp_emailer.go | 53 ++++++++++++------- flyteadmin/pkg/server/service.go | 3 +- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go b/flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go index dce1adfd00..94fa82577f 100644 --- a/flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go +++ b/flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go @@ -6,6 +6,9 @@ import ( "net/smtp" "strings" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" @@ -13,8 +16,6 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" - "golang.org/x/net/context" - "google.golang.org/grpc/codes" ) type SMTPEmailer struct { @@ -22,45 +23,65 @@ type SMTPEmailer struct { systemMetrics emailMetrics tlsConf *tls.Config auth *smtp.Auth + smtpClient *smtp.Client } -func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error { - +func (s *SMTPEmailer) createClient(ctx context.Context) (*smtp.Client, error) { newClient, err := smtp.Dial(s.config.EmailerConfig.SMTPServer + ":" + s.config.EmailerConfig.SMTPPort) if err != nil { - return s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err)) + return nil, s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err)) } - defer newClient.Close() - if err = newClient.Hello("localhost"); err != nil { - return s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err)) + return nil, s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err)) } if ok, _ := newClient.Extension("STARTTLS"); ok { if err = newClient.StartTLS(s.tlsConf); err != nil { - return err + return nil, err } } if ok, _ := newClient.Extension("AUTH"); ok { if err = newClient.Auth(*s.auth); err != nil { - return s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err)) + return nil, s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err)) } } - if err = newClient.Mail(email.SenderEmail); err != nil { + return newClient, nil +} + +func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error { + + if s.smtpClient == nil || s.smtpClient.Noop() != nil { + + if s.smtpClient != nil { + err := s.smtpClient.Close() + if err != nil { + logger.Info(ctx, err) + } + } + smtpClient, err := s.createClient(ctx) + + if err != nil { + return s.emailError(ctx, fmt.Sprintf("Error creating SMTP email client: %s", err)) + } + + s.smtpClient = smtpClient + } + + if err := s.smtpClient.Mail(email.SenderEmail); err != nil { return s.emailError(ctx, fmt.Sprintf("Error creating email instance: %s", err)) } for _, recipient := range email.RecipientsEmail { - if err = newClient.Rcpt(recipient); err != nil { + if err := s.smtpClient.Rcpt(recipient); err != nil { logger.Errorf(ctx, "Error adding email recipient: %s", err) } } - writer, err := newClient.Data() + writer, err := s.smtpClient.Data() if err != nil { return s.emailError(ctx, fmt.Sprintf("Error adding email recipient: %s", err)) @@ -78,12 +99,6 @@ func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) e return s.emailError(ctx, fmt.Sprintf("Error closing mail body: %s", err)) } - err = newClient.Quit() - - if err != nil { - return s.emailError(ctx, fmt.Sprintf("Error quitting mail agent: %s", err)) - } - s.systemMetrics.SendSuccess.Inc() return nil } diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index 32b63c9273..840d0d9f17 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -9,8 +9,6 @@ import ( "strings" "time" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/gorilla/handlers" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth" @@ -45,6 +43,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/clients/go/assets" grpcService "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/gateway/flyteidl/service" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/secretmanager" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger"