diff --git a/public/pages/workflow_detail/tools/query/query.tsx b/public/pages/workflow_detail/tools/query/query.tsx index 60c9c749..514d9e6a 100644 --- a/public/pages/workflow_detail/tools/query/query.tsx +++ b/public/pages/workflow_detail/tools/query/query.tsx @@ -36,6 +36,7 @@ import { containsEmptyValues, containsSameValues, getDataSourceId, + getEffectiveVersion, getPlaceholdersFromQuery, getSearchPipelineErrors, injectParameters, @@ -64,6 +65,17 @@ const SEARCH_OPTIONS = [ export function Query(props: QueryProps) { const dispatch = useAppDispatch(); const dataSourceId = getDataSourceId(); + const [dataSourceVersion, setDataSourceVersion] = useState< + string | undefined + >(undefined); + useEffect(() => { + async function getVersion() { + if (dataSourceId !== undefined) { + setDataSourceVersion(await getEffectiveVersion(dataSourceId)); + } + } + getVersion(); + }, [dataSourceId]); const { loading } = useSelector((state: AppState) => state.opensearch); @@ -204,6 +216,7 @@ export function Query(props: QueryProps) { : '_none', }, dataSourceId, + dataSourceVersion, verbose: includePipeline, }) ) diff --git a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx index 2bc0a9da..f43a7c55 100644 --- a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx @@ -29,8 +29,11 @@ import { WorkflowConfig, WorkflowFormValues, } from '../../../../common'; -import { formikToUiConfig, getDataSourceFromURL } from '../../../utils'; -import { getEffectiveVersion } from '../../../pages/workflows/new_workflow/new_workflow'; +import { + formikToUiConfig, + getDataSourceFromURL, + getEffectiveVersion, +} from '../../../utils'; import { CollapseProcessor, CopyIngestProcessor, diff --git a/public/pages/workflow_detail/workflow_inputs/search_inputs/search_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/search_inputs/search_inputs.tsx index 67bd359b..cdb3c0ab 100644 --- a/public/pages/workflow_detail/workflow_inputs/search_inputs/search_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/search_inputs/search_inputs.tsx @@ -15,8 +15,7 @@ import { WorkflowConfig, } from '../../../../../common'; import { catIndices, useAppDispatch } from '../../../../store'; -import { getDataSourceId } from '../../../../utils'; -import { getEffectiveVersion } from '../../../workflows/new_workflow/new_workflow'; +import { getDataSourceId, getEffectiveVersion } from '../../../../utils'; interface SearchInputsProps { uiConfig: WorkflowConfig; @@ -42,8 +41,10 @@ export function SearchInputs(props: SearchInputsProps) { useEffect(() => { const checkVersion = async () => { try { - const version = await getEffectiveVersion(dataSourceId); - setShowTransformQuery(semver.gte(version, '2.19.0')); + if (dataSourceId !== undefined) { + const version = await getEffectiveVersion(dataSourceId); + setShowTransformQuery(semver.gte(version, '2.19.0')); + } } catch (error) { console.error('Error checking version:', error); setShowTransformQuery(true); diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index eaf18c0c..0e19226e 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -6,6 +6,7 @@ import React, { useEffect, useState } from 'react'; import { getIn, useFormikContext } from 'formik'; import { isEmpty, isEqual } from 'lodash'; +import semver from 'semver'; import { EuiSmallButton, EuiSmallButtonEmpty, @@ -23,6 +24,7 @@ import { import { CONFIG_STEP, CachedFormikState, + MINIMUM_FULL_SUPPORTED_VERSION, SimulateIngestPipelineResponseVerbose, TemplateNode, WORKFLOW_STEP_TYPE, @@ -56,6 +58,8 @@ import { getDataSourceId, prepareDocsForSimulate, getIngestPipelineErrors, + getEffectiveVersion, + sleep, } from '../../../utils'; import { BooleanField } from './input_fields'; import '../workspace/workspace-styles.scss'; @@ -97,6 +101,21 @@ export function WorkflowInputs(props: WorkflowInputsProps) { } = useFormikContext(); const dispatch = useAppDispatch(); const dataSourceId = getDataSourceId(); + const [dataSourceVersion, setDataSourceVersion] = useState< + string | undefined + >(undefined); + useEffect(() => { + async function getVersion() { + if (dataSourceId !== undefined) { + setDataSourceVersion(await getEffectiveVersion(dataSourceId)); + } + } + getVersion(); + }, [dataSourceId]); + const isPreV219 = + dataSourceVersion !== undefined + ? semver.lt(dataSourceVersion, MINIMUM_FULL_SUPPORTED_VERSION) + : false; // transient running states const [isUpdatingSearchPipeline, setIsUpdatingSearchPipeline] = useState< @@ -390,10 +409,16 @@ export function WorkflowInputs(props: WorkflowInputsProps) { reprovision: true, }, dataSourceId, + dataSourceVersion, }) ) .unwrap() .then(async (result) => { + // if the datasource < 2.19, only async provisioning/reprovisioning is supported. + // so, we manually wait some time before trying to fetch the updated workflow + if (isPreV219) { + await sleep(1000); + } props.setUnsavedIngestProcessors(false); props.setUnsavedSearchProcessors(false); success = true; @@ -426,6 +451,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ) .unwrap() .then(async (result) => { + await sleep(100); await dispatch( updateWorkflow({ apiBody: { @@ -438,16 +464,24 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ) .unwrap() .then(async (result) => { + await sleep(100); props.setUnsavedIngestProcessors(false); props.setUnsavedSearchProcessors(false); await dispatch( provisionWorkflow({ workflowId: updatedWorkflow.id as string, dataSourceId, + dataSourceVersion, }) ) .unwrap() .then(async (result) => { + await sleep(100); + // if the datasource < 2.19, only async provisioning/reprovisioning is supported. + // so, we manually wait some time before trying to fetch the updated workflow + if (isPreV219) { + await sleep(1000); + } await dispatch( getWorkflow({ workflowId: updatedWorkflow.id as string, diff --git a/public/pages/workflows/new_workflow/new_workflow.tsx b/public/pages/workflows/new_workflow/new_workflow.tsx index 0294885e..1974e4e5 100644 --- a/public/pages/workflows/new_workflow/new_workflow.tsx +++ b/public/pages/workflows/new_workflow/new_workflow.tsx @@ -27,11 +27,13 @@ import { searchConnectors, } from '../../../store'; import { enrichPresetWorkflowWithUiMetadata } from './utils'; -import { getDataSourceId, isDataSourceReady } from '../../../utils'; +import { + getDataSourceId, + isDataSourceReady, + getEffectiveVersion, +} from '../../../utils'; import { getDataSourceEnabled } from '../../../services'; import semver from 'semver'; -import { DataSourceAttributes } from '../../../../../../src/plugins/data_source/common/data_sources'; -import { getSavedObjectsClient } from '../../../../public/services'; import { WORKFLOW_TYPE, MIN_SUPPORTED_VERSION, @@ -40,27 +42,6 @@ import { interface NewWorkflowProps {} -export const getEffectiveVersion = async ( - dataSourceId: string | undefined -): Promise => { - try { - if (dataSourceId === undefined) { - throw new Error('Data source is required'); - } - - const dataSource = await getSavedObjectsClient().get( - 'data-source', - dataSourceId - ); - const version = - dataSource?.attributes?.dataSourceVersion || MIN_SUPPORTED_VERSION; - return version; - } catch (error) { - console.error('Error getting version:', error); - return MIN_SUPPORTED_VERSION; - } -}; - const filterPresetsByVersion = async ( workflows: WorkflowTemplate[], dataSourceId: string | undefined diff --git a/public/route_service.ts b/public/route_service.ts index 180b1f19..922dab14 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -60,11 +60,13 @@ export interface RouteService { workflowTemplate: WorkflowTemplate, updateFields: boolean, reprovision: boolean, - dataSourceId?: string + dataSourceId?: string, + dataSourceVersion?: string ) => Promise; provisionWorkflow: ( workflowId: string, - dataSourceId?: string + dataSourceId?: string, + dataSourceVersion?: string ) => Promise; deprovisionWorkflow: ({ workflowId, @@ -96,12 +98,14 @@ export interface RouteService { index, body, dataSourceId, + dataSourceVersion, searchPipeline, verbose, }: { index: string; body: {}; dataSourceId?: string; + dataSourceVersion?: string; searchPipeline?: string; verbose?: boolean; }) => Promise; @@ -205,7 +209,8 @@ export function configureRoutes(core: CoreStart): RouteService { workflowTemplate: WorkflowTemplate, updateFields: boolean, reprovision: boolean, - dataSourceId?: string + dataSourceId?: string, + dataSourceVersion?: string ) => { try { const url = dataSourceId @@ -215,6 +220,9 @@ export function configureRoutes(core: CoreStart): RouteService { `${url}/${workflowId}/${updateFields}/${reprovision}`, { body: JSON.stringify(workflowTemplate), + query: { + data_source_version: dataSourceVersion, + }, } ); return response; @@ -222,13 +230,22 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, - provisionWorkflow: async (workflowId: string, dataSourceId?: string) => { + provisionWorkflow: async ( + workflowId: string, + dataSourceId?: string, + dataSourceVersion?: string + ) => { try { const url = dataSourceId ? `${BASE_NODE_API_PATH}/${dataSourceId}/workflow/provision` : PROVISION_WORKFLOW_NODE_API_PATH; const response = await core.http.post<{ respString: string }>( - `${url}/${workflowId}` + `${url}/${workflowId}`, + { + query: { + data_source_version: dataSourceVersion, + }, + } ); return response; } catch (e: any) { @@ -323,12 +340,14 @@ export function configureRoutes(core: CoreStart): RouteService { index, body, dataSourceId, + dataSourceVersion, searchPipeline, verbose, }: { index: string; body: {}; dataSourceId?: string; + dataSourceVersion?: string; searchPipeline?: string; verbose?: boolean; }) => { @@ -344,6 +363,7 @@ export function configureRoutes(core: CoreStart): RouteService { body: JSON.stringify(body), query: { verbose: verbose ?? false, + data_source_version: dataSourceVersion, }, }); return response; diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index 2983919d..5a917f71 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -110,10 +110,12 @@ export const searchIndex = createAsyncThunk( { apiBody, dataSourceId, + dataSourceVersion, verbose, }: { apiBody: { index: string; body: {}; searchPipeline?: string }; dataSourceId?: string; + dataSourceVersion?: string; verbose?: boolean; }, { rejectWithValue } @@ -123,6 +125,7 @@ export const searchIndex = createAsyncThunk( index, body, dataSourceId, + dataSourceVersion, searchPipeline, verbose, }); diff --git a/public/store/reducers/workflows_reducer.ts b/public/store/reducers/workflows_reducer.ts index 06f23120..ff03d38a 100644 --- a/public/store/reducers/workflows_reducer.ts +++ b/public/store/reducers/workflows_reducer.ts @@ -116,6 +116,7 @@ export const updateWorkflow = createAsyncThunk( { apiBody, dataSourceId, + dataSourceVersion, }: { apiBody: { workflowId: string; @@ -124,6 +125,7 @@ export const updateWorkflow = createAsyncThunk( reprovision?: boolean; }; dataSourceId?: string; + dataSourceVersion?: string; }, { rejectWithValue } ) => { @@ -135,7 +137,8 @@ export const updateWorkflow = createAsyncThunk( workflowTemplate, updateFields || false, reprovision || false, - dataSourceId + dataSourceId, + dataSourceVersion ); if (response instanceof HttpFetchError) { return rejectWithValue( @@ -150,14 +153,23 @@ export const updateWorkflow = createAsyncThunk( export const provisionWorkflow = createAsyncThunk( PROVISION_WORKFLOW_ACTION, async ( - { workflowId, dataSourceId }: { workflowId: string; dataSourceId?: string }, + { + workflowId, + dataSourceId, + dataSourceVersion, + }: { + workflowId: string; + dataSourceId?: string; + dataSourceVersion?: string; + }, { rejectWithValue } ) => { const response: | any | HttpFetchError = await getRouteService().provisionWorkflow( workflowId, - dataSourceId + dataSourceId, + dataSourceVersion ); if (response instanceof HttpFetchError) { return rejectWithValue( diff --git a/public/utils/config_to_template_utils.ts b/public/utils/config_to_template_utils.ts index 6dff2f53..2b0c6552 100644 --- a/public/utils/config_to_template_utils.ts +++ b/public/utils/config_to_template_utils.ts @@ -194,7 +194,7 @@ export function processorConfigsToTemplateProcessors( let processor = { ml_inference: { - model_id: model.id, + model_id: model?.id || '', }, } as MLInferenceProcessor; @@ -378,11 +378,11 @@ export function processorConfigsToTemplateProcessors( ); }); // remove the model field, update to just the required model ID - const model = finalFormValues.model; + const model = finalFormValues?.model; delete finalFormValues.model; finalFormValues = { ...finalFormValues, - model_id: model.id, + model_id: model?.id || '', }; // add the field map config obj diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 51802b8a..6101034b 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -40,8 +40,13 @@ import { LABEL_FIELD_PATTERN, MODEL_ID_PATTERN, WORKFLOW_TYPE, + MIN_SUPPORTED_VERSION, } from '../../common'; -import { getCore, getDataSourceEnabled } from '../services'; +import { + getCore, + getDataSourceEnabled, + getSavedObjectsClient, +} from '../services'; import { Connector, IngestPipelineErrors, @@ -858,3 +863,25 @@ export function getFieldValue(jsonObj: {}, fieldName: string): any | undefined { } return undefined; } + +// Get the version from the selected data source +export const getEffectiveVersion = async ( + dataSourceId: string | undefined +): Promise => { + try { + if (dataSourceId === undefined) { + throw new Error('Data source is required'); + } + + const dataSource = await getSavedObjectsClient().get( + 'data-source', + dataSourceId + ); + const version = + dataSource?.attributes?.dataSourceVersion || MIN_SUPPORTED_VERSION; + return version; + } catch (error) { + console.error('Error getting version:', error); + return MIN_SUPPORTED_VERSION; + } +}; diff --git a/server/cluster/flow_framework_plugin.ts b/server/cluster/flow_framework_plugin.ts index 78e10066..29a86017 100644 --- a/server/cluster/flow_framework_plugin.ts +++ b/server/cluster/flow_framework_plugin.ts @@ -110,6 +110,24 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { method: 'PUT', }); + flowFramework.updateAndReprovisionWorkflowAsync = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>&reprovision=true`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + update_fields: { + type: 'boolean', + required: true, + }, + }, + }, + needBody: true, + method: 'PUT', + }); + flowFramework.provisionWorkflow = ca({ url: { fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_provision?wait_for_completion_timeout=${PROVISION_TIMEOUT}`, @@ -123,6 +141,19 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { method: 'POST', }); + flowFramework.provisionWorkflowAsync = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_provision`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'POST', + }); + flowFramework.deprovisionWorkflow = ca({ url: { fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_deprovision`, diff --git a/server/routes/flow_framework_routes_service.ts b/server/routes/flow_framework_routes_service.ts index 4897868e..2e03e9b0 100644 --- a/server/routes/flow_framework_routes_service.ts +++ b/server/routes/flow_framework_routes_service.ts @@ -5,6 +5,7 @@ import fs from 'fs'; import path from 'path'; +import semver from 'semver'; import { schema } from '@osd/config-schema'; import { IRouter, @@ -21,6 +22,7 @@ import { GET_PRESET_WORKFLOWS_NODE_API_PATH, GET_WORKFLOW_NODE_API_PATH, GET_WORKFLOW_STATE_NODE_API_PATH, + MINIMUM_FULL_SUPPORTED_VERSION, PROVISION_WORKFLOW_NODE_API_PATH, SEARCH_WORKFLOWS_NODE_API_PATH, SearchHit, @@ -171,6 +173,9 @@ export function registerFlowFrameworkRoutes( reprovision: schema.boolean(), }), body: schema.any(), + query: schema.object({ + data_source_version: schema.maybe(schema.string()), + }), }, }, flowFrameworkRoutesService.updateWorkflow @@ -196,6 +201,9 @@ export function registerFlowFrameworkRoutes( workflow_id: schema.string(), data_source_id: schema.string(), }), + query: schema.object({ + data_source_version: schema.maybe(schema.string()), + }), }, }, flowFrameworkRoutesService.provisionWorkflow @@ -468,6 +476,13 @@ export class FlowFrameworkRoutesService { const workflowTemplate = req.body as WorkflowTemplate; try { const { data_source_id = '' } = req.params as { data_source_id?: string }; + const { data_source_version } = req.query as { + data_source_version?: string; + }; + const isPreV219 = + data_source_version !== undefined + ? semver.lt(data_source_version, MINIMUM_FULL_SUPPORTED_VERSION) + : false; const callWithRequest = getClientBasedOnDataSource( context, this.dataSourceEnabled, @@ -476,16 +491,25 @@ export class FlowFrameworkRoutesService { this.client ); if (reprovision) { - await callWithRequest('flowFramework.updateAndReprovisionWorkflow', { - workflow_id, - // default update_fields to false if not explicitly set otherwise - update_fields, - body: workflowTemplate, - }); + if (isPreV219) { + await callWithRequest( + 'flowFramework.updateAndReprovisionWorkflowAsync', + { + workflow_id, + update_fields, + body: workflowTemplate, + } + ); + } else { + await callWithRequest('flowFramework.updateAndReprovisionWorkflow', { + workflow_id, + update_fields, + body: workflowTemplate, + }); + } } else { await callWithRequest('flowFramework.updateWorkflow', { workflow_id, - // default update_fields to false if not explicitly set otherwise update_fields, body: workflowTemplate, }); @@ -508,6 +532,13 @@ export class FlowFrameworkRoutesService { const { workflow_id } = req.params as { workflow_id: string }; try { const { data_source_id = '' } = req.params as { data_source_id?: string }; + const { data_source_version } = req.query as { + data_source_version?: string; + }; + const isPreV219 = + data_source_version !== undefined + ? semver.lt(data_source_version, MINIMUM_FULL_SUPPORTED_VERSION) + : false; const callWithRequest = getClientBasedOnDataSource( context, this.dataSourceEnabled, @@ -515,9 +546,16 @@ export class FlowFrameworkRoutesService { data_source_id, this.client ); - await callWithRequest('flowFramework.provisionWorkflow', { - workflow_id, - }); + if (isPreV219) { + await callWithRequest('flowFramework.provisionWorkflowAsync', { + workflow_id, + }); + } else { + await callWithRequest('flowFramework.provisionWorkflow', { + workflow_id, + }); + } + return res.ok(); } catch (err: any) { return generateCustomError(res, err); diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts index f983d930..6af9cf64 100644 --- a/server/routes/opensearch_routes_service.ts +++ b/server/routes/opensearch_routes_service.ts @@ -5,6 +5,7 @@ import { schema } from '@osd/config-schema'; import { isEmpty } from 'lodash'; +import semver from 'semver'; import { IRouter, IOpenSearchDashboardsResponse, @@ -25,6 +26,7 @@ import { IndexResponse, IngestPipelineConfig, IngestPipelineResponse, + MINIMUM_FULL_SUPPORTED_VERSION, SEARCH_INDEX_NODE_API_PATH, SEARCH_PIPELINE_NODE_API_PATH, SIMULATE_PIPELINE_NODE_API_PATH, @@ -138,6 +140,7 @@ export function registerOpenSearchRoutes( body: schema.any(), query: schema.object({ verbose: schema.boolean(), + data_source_version: schema.maybe(schema.string()), }), }, }, @@ -171,6 +174,7 @@ export function registerOpenSearchRoutes( body: schema.any(), query: schema.object({ verbose: schema.boolean(), + data_source_version: schema.maybe(schema.string()), }), }, }, @@ -479,9 +483,14 @@ export class OpenSearchRoutesService { search_pipeline: string | undefined; }; const { data_source_id = '' } = req.params as { data_source_id?: string }; - const { verbose = false } = req.query as { + const { verbose = false, data_source_version = undefined } = req.query as { verbose?: boolean; + data_source_version?: string; }; + const isPreV219 = + data_source_version !== undefined + ? semver.lt(data_source_version, MINIMUM_FULL_SUPPORTED_VERSION) + : false; const body = req.body; try { const callWithRequest = getClientBasedOnDataSource( @@ -491,13 +500,22 @@ export class OpenSearchRoutesService { data_source_id, this.client ); - - const response = await callWithRequest('search', { - index, - body, - search_pipeline, - verbose_pipeline: verbose, - }); + let response; + // If verbose is false/undefined, or the version isn't eligible, omit the verbose param when searching. + if (!verbose || isPreV219) { + response = await callWithRequest('search', { + index, + body, + search_pipeline, + }); + } else { + response = await callWithRequest('search', { + index, + body, + search_pipeline, + verbose_pipeline: verbose, + }); + } return res.ok({ body: response }); } catch (err: any) {