Skip to content

Commit

Permalink
Propagate ingest/search pipeline errors standalone in inspector
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler committed Feb 7, 2025
1 parent 10b725d commit 1edfc23
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 79 deletions.
22 changes: 15 additions & 7 deletions public/pages/workflow_detail/tools/errors/errors.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
*/

import React from 'react';
import { EuiCodeBlock, EuiEmptyPrompt } from '@elastic/eui';
import { isEmpty } from 'lodash';
import { EuiCodeBlock, EuiEmptyPrompt, EuiSpacer } from '@elastic/eui';

interface ErrorsProps {
errorMessage: string;
errorMessages: string[];
}

/**
Expand All @@ -18,12 +17,21 @@ interface ErrorsProps {
export function Errors(props: ErrorsProps) {
return (
<>
{isEmpty(props.errorMessage) ? (
{props.errorMessages?.length === 0 ? (
<EuiEmptyPrompt title={<h2>No errors</h2>} titleSize="s" />
) : (
<EuiCodeBlock fontSize="m" isCopyable={false}>
{props.errorMessage}
</EuiCodeBlock>
<>
{props.errorMessages.map((errorMessage, idx) => {
return (
<>
<EuiSpacer size="m" />
<EuiCodeBlock fontSize="m" isCopyable={false} paddingSize="s">
{errorMessage}
</EuiCodeBlock>
</>
);
})}
</>
)}
</>
);
Expand Down
11 changes: 0 additions & 11 deletions public/pages/workflow_detail/tools/query/query.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ import {
import {
AppState,
searchIndex,
setOpenSearchError,
setSearchPipelineErrors,
useAppDispatch,
} from '../../../../store';
import {
containsEmptyValues,
containsSameValues,
formatSearchPipelineErrors,
getDataSourceId,
getPlaceholdersFromQuery,
getSearchPipelineErrors,
Expand Down Expand Up @@ -225,15 +223,6 @@ export function Query(props: QueryProps) {
errors: searchPipelineErrors,
})
);
if (!isEmpty(searchPipelineErrors)) {
dispatch(
setOpenSearchError({
error: `Error running search pipeline. ${formatSearchPipelineErrors(
searchPipelineErrors
)}`,
})
);
}
} else {
setSearchPipelineErrors({ errors: {} });
}
Expand Down
56 changes: 38 additions & 18 deletions public/pages/workflow_detail/tools/tools.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,55 @@ const PANEL_TITLE = 'Inspect flows';
* The base Tools component for performing ingest and search, viewing resources, and debugging.
*/
export function Tools(props: ToolsProps) {
// error message state
// error message states. Error may come from several different sources.
const { opensearch, workflows } = useSelector((state: AppState) => state);
const opensearchError = opensearch.errorMessage;
const workflowsError = workflows.errorMessage;
const [curErrorMessage, setCurErrorMessage] = useState<string>('');
const {
ingestPipeline: ingestPipelineErrors,
searchPipeline: searchPipelineErrors,
} = useSelector((state: AppState) => state.errors);
const [curErrorMessages, setCurErrorMessages] = useState<string[]>([]);

// auto-navigate to errors tab if a new error has been set as a result of
// executing OpenSearch or Flow Framework workflow APIs, or from the workflow state
// (note that if provision/deprovision fails, there is no concrete exception returned at the API level -
// it is just set in the workflow's error field when fetching workflow state)
// Propagate any errors coming from opensearch API calls, including ingest/search pipeline verbose calls.
useEffect(() => {
setCurErrorMessage(opensearchError);
if (!isEmpty(opensearchError)) {
props.setSelectedTabId(INSPECTOR_TAB_ID.ERRORS);
if (
!isEmpty(opensearchError) ||
!isEmpty(ingestPipelineErrors) ||
!isEmpty(searchPipelineErrors)
) {
if (!isEmpty(opensearchError)) {
setCurErrorMessages([opensearchError]);
} else if (!isEmpty(ingestPipelineErrors)) {
setCurErrorMessages([
'Data not ingested. Errors found with the following ingest processor(s):',
...Object.values(ingestPipelineErrors).map((value) => value.errorMsg),
]);
} else if (!isEmpty(searchPipelineErrors)) {
setCurErrorMessages([
'Errors found with the following search processor(s)',
...Object.values(searchPipelineErrors).map((value) => value.errorMsg),
]);
}
} else {
setCurErrorMessages([]);
}
}, [opensearchError]);
}, [opensearchError, ingestPipelineErrors, searchPipelineErrors]);

// Propagate any errors coming from the workflow, either runtime from API call, or persisted in the indexed workflow itself.
useEffect(() => {
setCurErrorMessage(workflowsError);
if (!isEmpty(workflowsError)) {
props.setSelectedTabId(INSPECTOR_TAB_ID.ERRORS);
}
setCurErrorMessages(!isEmpty(workflowsError) ? [workflowsError] : []);
}, [workflowsError]);
useEffect(() => {
setCurErrorMessage(props.workflow?.error || '');
if (!isEmpty(props.workflow?.error)) {
setCurErrorMessages(props.workflow?.error ? [props.workflow.error] : []);
}, [props.workflow?.error]);

// auto-navigate to errors tab if new errors have been found
useEffect(() => {
if (curErrorMessages.length > 0) {
props.setSelectedTabId(INSPECTOR_TAB_ID.ERRORS);
}
}, [props.workflow?.error]);
}, [curErrorMessages]);

// auto-navigate to ingest tab if a populated value has been set, indicating ingest has been ran
useEffect(() => {
Expand Down Expand Up @@ -136,7 +156,7 @@ export function Tools(props: ToolsProps) {
/>
)}
{props.selectedTabId === INSPECTOR_TAB_ID.ERRORS && (
<Errors errorMessage={curErrorMessage} />
<Errors errorMessages={curErrorMessages} />
)}
{props.selectedTabId === INSPECTOR_TAB_ID.RESOURCES && (
<Resources workflow={props.workflow} />
Expand Down
10 changes: 0 additions & 10 deletions public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import {
getWorkflow,
provisionWorkflow,
setIngestPipelineErrors,
setOpenSearchError,
simulatePipeline,
updateWorkflow,
useAppDispatch,
Expand All @@ -57,7 +56,6 @@ import {
getDataSourceId,
prepareDocsForSimulate,
getIngestPipelineErrors,
formatIngestPipelineErrors,
} from '../../../utils';
import { BooleanField } from './input_fields';
import '../workspace/workspace-styles.scss';
Expand Down Expand Up @@ -609,14 +607,6 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
);
if (isEmpty(ingestPipelineErrors)) {
bulkIngest(ingestDocsObjs);
} else {
dispatch(
setOpenSearchError({
error: `Data not ingested. ${formatIngestPipelineErrors(
ingestPipelineErrors
)}`,
})
);
}
})
.catch((error: any) => {
Expand Down
11 changes: 0 additions & 11 deletions public/store/reducers/opensearch_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export const INITIAL_OPENSEARCH_STATE = {
};

const OPENSEARCH_PREFIX = 'opensearch';
const SET_OPENSEARCH_ERROR = `${OPENSEARCH_PREFIX}/setError`;
const CAT_INDICES_ACTION = `${OPENSEARCH_PREFIX}/catIndices`;
const GET_MAPPINGS_ACTION = `${OPENSEARCH_PREFIX}/mappings`;
const SEARCH_INDEX_ACTION = `${OPENSEARCH_PREFIX}/search`;
Expand All @@ -43,13 +42,6 @@ const GET_INGEST_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/getIngestPipeline`;
const GET_SEARCH_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/getSearchPipeline`;
const GET_INDEX_ACTION = `${OPENSEARCH_PREFIX}/getIndex`;

export const setOpenSearchError = createAsyncThunk(
SET_OPENSEARCH_ERROR,
async ({ error }: { error: string }, { rejectWithValue }) => {
return error;
}
);

export const catIndices = createAsyncThunk(
CAT_INDICES_ACTION,
async (
Expand Down Expand Up @@ -329,9 +321,6 @@ const opensearchSlice = createSlice({
state.loading = true;
state.errorMessage = '';
})
.addCase(setOpenSearchError.fulfilled, (state, action) => {
state.errorMessage = action.payload;
})
.addCase(catIndices.fulfilled, (state, action) => {
const indicesMap = new Map<string, Index>();
action.payload.forEach((index: Index) => {
Expand Down
24 changes: 2 additions & 22 deletions public/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,14 @@ export function getIngestPipelineErrors(
if (processorResult.error?.reason !== undefined) {
ingestPipelineErrors[idx] = {
processorType: processorResult.processor_type,
errorMsg: processorResult.error.reason,
errorMsg: `Type: ${processorResult.processor_type}. Error: ${processorResult.error.reason}`,
};
}
});
});
return ingestPipelineErrors;
}

export function formatIngestPipelineErrors(
errors: IngestPipelineErrors
): string {
let msg = 'Errors found with the following ingest processor(s):\n\n';
Object.values(errors || {}).forEach((processorError, idx) => {
msg += `Processor type: ${processorError.processorType}. Error: ${processorError.errorMsg}\n\n`;
});
return msg;
}

export function getSearchPipelineErrors(
searchResponseVerbose: SearchResponseVerbose
): SearchPipelineErrors {
Expand All @@ -241,23 +231,13 @@ export function getSearchPipelineErrors(
if (processorResult?.error !== undefined) {
searchPipelineErrors[idx] = {
processorType: processorResult.processor_name,
errorMsg: processorResult.error,
errorMsg: `Type: ${processorResult.processor_name}. Error: ${processorResult.error}`,
};
}
});
return searchPipelineErrors;
}

export function formatSearchPipelineErrors(
errors: IngestPipelineErrors
): string {
let msg = 'Errors found with the following search processor(s):\n\n';
Object.values(errors || {}).forEach((processorError, idx) => {
msg += `Processor type: ${processorError.processorType}. Error: ${processorError.errorMsg}\n\n`;
});
return msg;
}

// ML inference processors will use standard dot notation or JSONPath depending on the input.
// We follow the same logic here to generate consistent results.
export function generateTransform(
Expand Down

0 comments on commit 1edfc23

Please sign in to comment.