|
| 1 | +package v1 |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + sirenv1beta1 "github.com/odpf/siren/api/proto/odpf/siren/v1beta1" |
| 6 | + "github.com/odpf/siren/domain" |
| 7 | + "go.uber.org/zap" |
| 8 | + "google.golang.org/grpc/codes" |
| 9 | + "google.golang.org/grpc/status" |
| 10 | + "google.golang.org/protobuf/types/known/emptypb" |
| 11 | + "google.golang.org/protobuf/types/known/timestamppb" |
| 12 | + "strings" |
| 13 | +) |
| 14 | + |
| 15 | +func (s *GRPCServer) ListSubscriptions(_ context.Context, _ *emptypb.Empty) (*sirenv1beta1.ListSubscriptionsResponse, error) { |
| 16 | + subscriptions, err := s.container.SubscriptionService.ListSubscriptions() |
| 17 | + if err != nil { |
| 18 | + s.logger.Error("handler", zap.Error(err)) |
| 19 | + return nil, status.Errorf(codes.Internal, err.Error()) |
| 20 | + } |
| 21 | + |
| 22 | + res := &sirenv1beta1.ListSubscriptionsResponse{ |
| 23 | + Subscriptions: make([]*sirenv1beta1.Subscription, 0), |
| 24 | + } |
| 25 | + for _, subscription := range subscriptions { |
| 26 | + item := &sirenv1beta1.Subscription{ |
| 27 | + Id: subscription.Id, |
| 28 | + Urn: subscription.Urn, |
| 29 | + Namespace: subscription.Namespace, |
| 30 | + Match: subscription.Match, |
| 31 | + Receivers: getReceiverMetadataListFromDomainObject(subscription.Receivers), |
| 32 | + CreatedAt: timestamppb.New(subscription.CreatedAt), |
| 33 | + UpdatedAt: timestamppb.New(subscription.UpdatedAt), |
| 34 | + } |
| 35 | + res.Subscriptions = append(res.Subscriptions, item) |
| 36 | + } |
| 37 | + return res, nil |
| 38 | +} |
| 39 | + |
| 40 | +func (s *GRPCServer) CreateSubscription(_ context.Context, req *sirenv1beta1.CreateSubscriptionRequest) (*sirenv1beta1.Subscription, error) { |
| 41 | + subscription, err := s.container.SubscriptionService.CreateSubscription(&domain.Subscription{ |
| 42 | + Namespace: req.GetNamespace(), |
| 43 | + Urn: req.GetUrn(), |
| 44 | + Receivers: getReceiverMetadataListInDomainObject(req.GetReceivers()), |
| 45 | + Match: req.GetMatch(), |
| 46 | + }) |
| 47 | + if err != nil { |
| 48 | + s.logger.Error("handler", zap.Error(err)) |
| 49 | + return nil, status.Errorf(codes.Internal, err.Error()) |
| 50 | + } |
| 51 | + |
| 52 | + receivers := make([]*sirenv1beta1.ReceiverMetadata, 0) |
| 53 | + for _, receiverMetadataItem := range subscription.Receivers { |
| 54 | + item := getReceiverMetadataFromDomainObject(&receiverMetadataItem) |
| 55 | + receivers = append(receivers, &item) |
| 56 | + } |
| 57 | + return &sirenv1beta1.Subscription{ |
| 58 | + Id: subscription.Id, |
| 59 | + Urn: subscription.Urn, |
| 60 | + Namespace: subscription.Namespace, |
| 61 | + Match: subscription.Match, |
| 62 | + Receivers: receivers, |
| 63 | + CreatedAt: timestamppb.New(subscription.CreatedAt), |
| 64 | + UpdatedAt: timestamppb.New(subscription.UpdatedAt), |
| 65 | + }, nil |
| 66 | +} |
| 67 | + |
| 68 | +func (s *GRPCServer) GetSubscription(_ context.Context, req *sirenv1beta1.GetSubscriptionRequest) (*sirenv1beta1.Subscription, error) { |
| 69 | + subscription, err := s.container.SubscriptionService.GetSubscription(req.GetId()) |
| 70 | + if err != nil { |
| 71 | + s.logger.Error("handler", zap.Error(err)) |
| 72 | + return nil, status.Errorf(codes.Internal, err.Error()) |
| 73 | + } |
| 74 | + if subscription == nil { |
| 75 | + return nil, status.Errorf(codes.NotFound, "subscription not found") |
| 76 | + } |
| 77 | + |
| 78 | + receivers := make([]*sirenv1beta1.ReceiverMetadata, 0) |
| 79 | + for _, receiverMetadataItem := range subscription.Receivers { |
| 80 | + item := getReceiverMetadataFromDomainObject(&receiverMetadataItem) |
| 81 | + receivers = append(receivers, &item) |
| 82 | + } |
| 83 | + |
| 84 | + return &sirenv1beta1.Subscription{ |
| 85 | + Id: subscription.Id, |
| 86 | + Urn: subscription.Urn, |
| 87 | + Namespace: subscription.Namespace, |
| 88 | + Match: subscription.Match, |
| 89 | + Receivers: receivers, |
| 90 | + CreatedAt: timestamppb.New(subscription.CreatedAt), |
| 91 | + UpdatedAt: timestamppb.New(subscription.UpdatedAt), |
| 92 | + }, nil |
| 93 | +} |
| 94 | + |
| 95 | +func (s *GRPCServer) UpdateSubscription(_ context.Context, req *sirenv1beta1.UpdateSubscriptionRequest) (*sirenv1beta1.Subscription, error) { |
| 96 | + subscription, err := s.container.SubscriptionService.UpdateSubscription(&domain.Subscription{ |
| 97 | + Id: req.GetId(), |
| 98 | + Namespace: req.GetNamespace(), |
| 99 | + Urn: req.GetUrn(), |
| 100 | + Receivers: getReceiverMetadataListInDomainObject(req.GetReceivers()), |
| 101 | + Match: req.GetMatch(), |
| 102 | + }) |
| 103 | + if err != nil { |
| 104 | + if strings.Contains(err.Error(), `violates unique constraint "urn_provider_id_unique"`) { |
| 105 | + return nil, status.Errorf(codes.InvalidArgument, "urn and provider pair already exist") |
| 106 | + } |
| 107 | + s.logger.Error("handler", zap.Error(err)) |
| 108 | + return nil, status.Errorf(codes.Internal, err.Error()) |
| 109 | + } |
| 110 | + |
| 111 | + receivers := make([]*sirenv1beta1.ReceiverMetadata, 0) |
| 112 | + for _, receiverMetadataItem := range subscription.Receivers { |
| 113 | + item := getReceiverMetadataFromDomainObject(&receiverMetadataItem) |
| 114 | + receivers = append(receivers, &item) |
| 115 | + } |
| 116 | + |
| 117 | + return &sirenv1beta1.Subscription{ |
| 118 | + Id: subscription.Id, |
| 119 | + Urn: subscription.Urn, |
| 120 | + Namespace: subscription.Namespace, |
| 121 | + Match: subscription.Match, |
| 122 | + Receivers: receivers, |
| 123 | + CreatedAt: timestamppb.New(subscription.CreatedAt), |
| 124 | + UpdatedAt: timestamppb.New(subscription.UpdatedAt), |
| 125 | + }, nil |
| 126 | +} |
| 127 | + |
| 128 | +func (s *GRPCServer) DeleteSubscription(_ context.Context, req *sirenv1beta1.DeleteSubscriptionRequest) (*emptypb.Empty, error) { |
| 129 | + err := s.container.SubscriptionService.DeleteSubscription(req.GetId()) |
| 130 | + if err != nil { |
| 131 | + s.logger.Error("handler", zap.Error(err)) |
| 132 | + return nil, status.Errorf(codes.Internal, err.Error()) |
| 133 | + } |
| 134 | + return &emptypb.Empty{}, nil |
| 135 | +} |
| 136 | + |
| 137 | +func getReceiverMetadataFromDomainObject(item *domain.ReceiverMetadata) sirenv1beta1.ReceiverMetadata { |
| 138 | + return sirenv1beta1.ReceiverMetadata{ |
| 139 | + Id: item.Id, |
| 140 | + Configuration: item.Configuration, |
| 141 | + } |
| 142 | +} |
| 143 | + |
| 144 | +func getReceiverMetadataInDomainObject(item *sirenv1beta1.ReceiverMetadata) domain.ReceiverMetadata { |
| 145 | + return domain.ReceiverMetadata{ |
| 146 | + Id: item.Id, |
| 147 | + Configuration: item.Configuration, |
| 148 | + } |
| 149 | +} |
| 150 | + |
| 151 | +func getReceiverMetadataListInDomainObject(domainReceivers []*sirenv1beta1.ReceiverMetadata) []domain.ReceiverMetadata { |
| 152 | + receivers := make([]domain.ReceiverMetadata, 0) |
| 153 | + for _, receiverMetadataItem := range domainReceivers { |
| 154 | + receivers = append(receivers, getReceiverMetadataInDomainObject(receiverMetadataItem)) |
| 155 | + } |
| 156 | + return receivers |
| 157 | +} |
| 158 | + |
| 159 | +func getReceiverMetadataListFromDomainObject(domainReceivers []domain.ReceiverMetadata) []*sirenv1beta1.ReceiverMetadata { |
| 160 | + receivers := make([]*sirenv1beta1.ReceiverMetadata, 0) |
| 161 | + for _, receiverMetadataItem := range domainReceivers { |
| 162 | + item := getReceiverMetadataFromDomainObject(&receiverMetadataItem) |
| 163 | + receivers = append(receivers, &item) |
| 164 | + } |
| 165 | + return receivers |
| 166 | +} |
0 commit comments