Skip to content

Commit 07d22dd

Browse files
author
Lyubo Kamenov
authored
Copy connectors/pipelines from instance when removing them in provisioning config files (#2314)
1 parent 0b29e76 commit 07d22dd

1 file changed

Lines changed: 19 additions & 9 deletions

File tree

pkg/provisioning/import_actions.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,25 +237,35 @@ func (a updatePipelineAction) update(ctx context.Context, cfg config.Pipeline) e
237237

238238
// update connector IDs
239239
if !a.isEqualConnectors(p.ConnectorIDs, cfg.Connectors) {
240-
// recreate all connector IDs
241-
for _, procID := range p.ConnectorIDs {
242-
_, err = a.pipelineService.RemoveConnector(ctx, cfg.ID, procID)
240+
// Make a copy of the pipeline connectors, the instance value
241+
// will be modified during removal and can cause side effects.
242+
connectorIDs := make([]string, len(p.ConnectorIDs))
243+
_ = copy(connectorIDs, p.ConnectorIDs)
244+
245+
// truncate pipeline connectors and add connectors from the pipeline config.
246+
for _, connID := range connectorIDs {
247+
_, err = a.pipelineService.RemoveConnector(ctx, cfg.ID, connID)
243248
if err != nil {
244-
return cerrors.Errorf("failed to remove connector %v: %w", procID, err)
249+
return cerrors.Errorf("failed to remove connector %v: %w", connID, err)
245250
}
246251
}
247-
for _, proc := range cfg.Connectors {
248-
_, err = a.pipelineService.AddConnector(ctx, cfg.ID, proc.ID)
252+
for _, conn := range cfg.Connectors {
253+
_, err = a.pipelineService.AddConnector(ctx, cfg.ID, conn.ID)
249254
if err != nil {
250-
return cerrors.Errorf("failed to add connector %v: %w", proc.ID, err)
255+
return cerrors.Errorf("failed to add connector %v: %w", conn.ID, err)
251256
}
252257
}
253258
}
254259

255260
// update processor IDs
256261
if !a.isEqualProcessors(p.ProcessorIDs, cfg.Processors) {
257-
// recreate all processor IDs
258-
for _, procID := range p.ProcessorIDs {
262+
// Make a copy of the pipeline processors, the instance value
263+
// will be modified during removal and can cause side effects.
264+
processorIDs := make([]string, len(p.ProcessorIDs))
265+
_ = copy(processorIDs, p.ProcessorIDs)
266+
267+
// truncate pipeline processors and add processors from the pipeline config.
268+
for _, procID := range processorIDs {
259269
_, err = a.pipelineService.RemoveProcessor(ctx, cfg.ID, procID)
260270
if err != nil {
261271
return cerrors.Errorf("failed to remove processor %v: %w", procID, err)

0 commit comments

Comments
 (0)