Skip to content

Commit 738d93d

Browse files
committed
refactoring
1 parent e866442 commit 738d93d

File tree

20 files changed

+676
-446
lines changed

20 files changed

+676
-446
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package consumer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
9+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
10+
"github.com/ydb-platform/terraform-provider-ydb/internal/helpers/topic"
11+
tbl "github.com/ydb-platform/terraform-provider-ydb/internal/table"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
14+
)
15+
16+
const (
17+
ydbTopicDefaultMaxPartitionWriteSpeed = 1048576
18+
)
19+
20+
func (h *handler) Create(ctx context.Context, d *schema.ResourceData, _ interface{}) diag.Diagnostics {
21+
tableResource, err := tableResourceSchemaToTableResource(d)
22+
if err != nil {
23+
return diag.FromErr(err)
24+
}
25+
if tableResource == nil {
26+
return diag.Diagnostics{
27+
diag.Diagnostic{
28+
Severity: diag.Error,
29+
Summary: "got nil resource, unreachable code",
30+
},
31+
}
32+
}
33+
db, err := tbl.CreateDBConnection(ctx, tbl.ClientParams{
34+
DatabaseEndpoint: tableResource.DatabaseEndpoint,
35+
Token: h.token,
36+
})
37+
if err != nil {
38+
return diag.Diagnostics{
39+
diag.Diagnostic{
40+
Severity: diag.Error,
41+
Summary: "failed to initialize table client",
42+
Detail: err.Error(),
43+
},
44+
}
45+
}
46+
defer func() {
47+
_ = db.Close(ctx)
48+
}()
49+
50+
var supportedCodecs []topictypes.Codec
51+
if gotCodecs, ok := d.GetOk("supported_codecs"); !ok {
52+
supportedCodecs = topic.YDBTopicDefaultCodecs
53+
} else {
54+
for _, c := range gotCodecs.([]interface{}) {
55+
cod := c.(string)
56+
supportedCodecs = append(supportedCodecs, topic.YDBTopicCodecNameToCodec[cod])
57+
}
58+
}
59+
60+
err = db.Topic().Create(ctx, d.Get("name").(string),
61+
topicoptions.CreateWithSupportedCodecs(supportedCodecs...),
62+
topicoptions.CreateWithPartitionWriteBurstBytes(ydbTopicDefaultMaxPartitionWriteSpeed),
63+
topicoptions.CreateWithPartitionWriteSpeedBytesPerSecond(ydbTopicDefaultMaxPartitionWriteSpeed),
64+
topicoptions.CreateWithRetentionPeriod(time.Duration(d.Get("starting_message_timestamp_ms").(int))*time.Millisecond),
65+
)
66+
if err != nil {
67+
return diag.FromErr(fmt.Errorf("failed to initialize ydb-topic-consumer control plane client: %w", err))
68+
}
69+
70+
topicPath := d.Get("topic_path").(string)
71+
d.SetId(d.Get("connection_string").(string) + "&path=" + topicPath)
72+
73+
return h.Read(ctx, d, nil)
74+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package consumer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
8+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
9+
tbl "github.com/ydb-platform/terraform-provider-ydb/internal/table"
10+
)
11+
12+
func (h *handler) Delete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
13+
tableResource, err := tableResourceSchemaToTableResource(d)
14+
if err != nil {
15+
return diag.FromErr(err)
16+
}
17+
if tableResource == nil {
18+
return diag.Diagnostics{
19+
diag.Diagnostic{
20+
Severity: diag.Error,
21+
Summary: "got nil resource, unreachable code",
22+
},
23+
}
24+
}
25+
26+
db, err := tbl.CreateDBConnection(ctx, tbl.ClientParams{
27+
DatabaseEndpoint: tableResource.DatabaseEndpoint,
28+
Token: h.token,
29+
})
30+
if err != nil {
31+
return diag.Errorf("failed to initialize table client: %s", err)
32+
}
33+
defer func() {
34+
_ = db.Close(ctx)
35+
}()
36+
37+
topicName := tableResource.Entity.GetEntityPath()
38+
err = db.Topic().Drop(ctx, topicName)
39+
if err != nil {
40+
return diag.FromErr(fmt.Errorf("failed to delete topic: %w", err))
41+
}
42+
43+
return nil
44+
}

sdk/terraform/topic/consumer_structures_manual.go renamed to internal/resources/topic/consumer/handler.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package topic
1+
package consumer
22

33
import (
44
"fmt"
@@ -7,12 +7,30 @@ import (
77

88
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
99
"github.com/ydb-platform/terraform-provider-ydb/internal/helpers/topic"
10+
"github.com/ydb-platform/terraform-provider-ydb/internal/resources"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
1213
)
1314

15+
type handler struct {
16+
token string
17+
}
18+
19+
func NewHandler(token string) resources.Handler {
20+
return &handler{
21+
token: token,
22+
}
23+
}
24+
25+
const (
26+
ydbTopicCodecGZIP = "gzip"
27+
ydbTopicCodecRAW = "raw"
28+
ydbTopicCodecZSTD = "zstd"
29+
)
30+
1431
func flattenYDBTopicConsumerDescription(d *schema.ResourceData, desc topictypes.TopicDescription) error {
1532
_ = d.Set("name", d.Get("name").(string)) // NOTE(shmel1k@): TopicService SDK does not return path for stream.
33+
_ = d.Set("topic_path", d.Get("topic_path").(string))
1634

1735
supportedCodecs := make([]string, 0, len(desc.SupportedCodecs))
1836
for _, v := range desc.SupportedCodecs {
@@ -31,7 +49,7 @@ func flattenYDBTopicConsumerDescription(d *schema.ResourceData, desc topictypes.
3149
return err
3250
}
3351

34-
return d.Set("database_endpoint", d.Get("database_endpoint").(string))
52+
return d.Set("connection_string", d.Get("connection_string").(string))
3553
}
3654

3755
func prepareYDBTopicConsumerAlterSettings(
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package consumer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
9+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
10+
11+
tbl "github.com/ydb-platform/terraform-provider-ydb/internal/table"
12+
)
13+
14+
func (h *handler) Read(ctx context.Context, d *schema.ResourceData, _ interface{}) diag.Diagnostics {
15+
tableResource, err := tableResourceSchemaToTableResource(d)
16+
if err != nil {
17+
return diag.FromErr(err)
18+
}
19+
if tableResource == nil {
20+
return diag.Diagnostics{
21+
diag.Diagnostic{
22+
Severity: diag.Error,
23+
Summary: "got nil resource, unreachable code",
24+
},
25+
}
26+
}
27+
28+
db, err := tbl.CreateDBConnection(ctx, tbl.ClientParams{
29+
DatabaseEndpoint: tableResource.DatabaseEndpoint,
30+
Token: h.token,
31+
})
32+
if err != nil {
33+
return diag.Errorf("failed to initialize table client: %s", err)
34+
}
35+
defer func() {
36+
_ = db.Close(ctx)
37+
}()
38+
39+
topicName := tableResource.Entity.GetEntityPath()
40+
description, err := db.Topic().Describe(ctx, topicName)
41+
if err != nil {
42+
if strings.Contains(err.Error(), "does not exist") {
43+
d.SetId("") // marking as non-existing resource.
44+
return nil
45+
}
46+
return diag.FromErr(fmt.Errorf("resource: failed to describe topic: %w", err))
47+
}
48+
49+
err = flattenYDBTopicConsumerDescription(d, description)
50+
if err != nil {
51+
return diag.FromErr(fmt.Errorf("failed to flatten topic description: %w", err))
52+
}
53+
54+
return nil
55+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package consumer
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
7+
"github.com/ydb-platform/terraform-provider-ydb/internal/helpers"
8+
)
9+
10+
type Resource struct {
11+
Entity *helpers.YDBEntity
12+
13+
FullPath string
14+
Path string
15+
DatabaseEndpoint string
16+
}
17+
18+
func tableResourceSchemaToTableResource(d *schema.ResourceData) (*Resource, error) {
19+
var entity *helpers.YDBEntity
20+
var err error
21+
if d.Id() != "" {
22+
entity, err = helpers.ParseYDBEntityID(d.Id())
23+
if err != nil {
24+
return nil, fmt.Errorf("failed to parse table entity: %w", err)
25+
}
26+
}
27+
28+
var databaseEndpoint string
29+
if entity != nil {
30+
databaseEndpoint = entity.PrepareFullYDBEndpoint()
31+
} else {
32+
// NOTE(shmel1k@): resource is not initialized yet.
33+
databaseEndpoint = d.Get("connection_string").(string)
34+
}
35+
36+
return &Resource{
37+
Entity: entity,
38+
DatabaseEndpoint: databaseEndpoint,
39+
}, nil
40+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package consumer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
9+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
10+
tbl "github.com/ydb-platform/terraform-provider-ydb/internal/table"
11+
)
12+
13+
func (h *handler) performYDBTopicConsumerUpdate(ctx context.Context, d *schema.ResourceData) diag.Diagnostics {
14+
tableResource, err := tableResourceSchemaToTableResource(d)
15+
if err != nil {
16+
return diag.FromErr(err)
17+
}
18+
if tableResource == nil {
19+
return diag.Diagnostics{
20+
diag.Diagnostic{
21+
Severity: diag.Error,
22+
Summary: "got nil resource, unreachable code",
23+
},
24+
}
25+
}
26+
27+
db, err := tbl.CreateDBConnection(ctx, tbl.ClientParams{
28+
DatabaseEndpoint: tableResource.DatabaseEndpoint,
29+
Token: h.token,
30+
})
31+
if err != nil {
32+
return diag.Errorf("failed to initialize table client: %s", err)
33+
}
34+
defer func() {
35+
_ = db.Close(ctx)
36+
}()
37+
38+
if d.HasChange("name") {
39+
// Creating new topic
40+
return h.Create(ctx, d, nil)
41+
}
42+
43+
topicName := tableResource.Entity.GetEntityPath()
44+
desc, err := db.Topic().Describe(ctx, topicName)
45+
if err != nil {
46+
if strings.Contains(err.Error(), "does not exist") {
47+
return h.Create(ctx, d, nil)
48+
}
49+
return diag.FromErr(fmt.Errorf("failed to get description for topic %q", topicName))
50+
}
51+
52+
opts := prepareYDBTopicConsumerAlterSettings(d, desc)
53+
err = db.Topic().Alter(ctx, topicName, opts...)
54+
if err != nil {
55+
return diag.FromErr(fmt.Errorf("got error when tried to alter topic: %w", err))
56+
}
57+
58+
return h.Read(ctx, d, nil)
59+
}
60+
61+
func (h *handler) Update(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
62+
_ = meta
63+
return h.performYDBTopicConsumerUpdate(ctx, d)
64+
}

0 commit comments

Comments
 (0)