Skip to content

Commit 29faf81

Browse files
authored
Refactor ingestion of conversations to aim fixing worker timeouts (#1025)
* Factor out webhook handling * Refactor ingestClosedConversation on installation to use tasks * Fix api auth errors in task handler as installation context is not passed * Fix function typo * Fix typo * Add changeset * review: fix error message
1 parent 501fc25 commit 29faf81

File tree

10 files changed

+390
-156
lines changed

10 files changed

+390
-156
lines changed

.changeset/few-meals-sell.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@gitbook/integration-intercom-conversations': minor
3+
---
4+
5+
Fix intercom-conversations integration worker timeouts

bun.lock

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@
124124
},
125125
"integrations/freshdesk": {
126126
"name": "@gitbook/integration-freshdesk",
127-
"version": "0.1.0",
127+
"version": "1.0.0",
128128
"dependencies": {
129129
"@gitbook/api": "*",
130130
"@gitbook/runtime": "*",
@@ -318,6 +318,7 @@
318318
"@gitbook/api": "*",
319319
"@gitbook/runtime": "*",
320320
"intercom-client": "^6.3.0",
321+
"itty-router": "^4.0.27",
321322
"p-map": "^7.0.3",
322323
},
323324
"devDependencies": {
@@ -2650,6 +2651,8 @@
26502651

26512652
"@gitbook/integration-hubspot-conversations/itty-router": ["[email protected]", "", {}, "sha512-hIPHtXGymCX7Lzb2I4G6JgZFE4QEEQwst9GORK7sMYUpJvLfy4yZJr95r04e8DzoAnj6HcxM2m4TbK+juu+18g=="],
26522653

2654+
"@gitbook/integration-intercom-conversations/itty-router": ["[email protected]", "", {}, "sha512-KegPW0l9SNPadProoFT07AB84uOqLUwzlXQ7HsqkS31WUrxkjdhcemRpTDUuetbMJ89uBtWeQSVoiEmUAu31uw=="],
2655+
26532656
"@gitbook/integration-jira/itty-router": ["[email protected]", "", {}, "sha512-hIPHtXGymCX7Lzb2I4G6JgZFE4QEEQwst9GORK7sMYUpJvLfy4yZJr95r04e8DzoAnj6HcxM2m4TbK+juu+18g=="],
26542657

26552658
"@gitbook/integration-lucid/itty-router": ["[email protected]", "", {}, "sha512-KegPW0l9SNPadProoFT07AB84uOqLUwzlXQ7HsqkS31WUrxkjdhcemRpTDUuetbMJ89uBtWeQSVoiEmUAu31uw=="],

integrations/intercom-conversations/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"@gitbook/runtime": "*",
77
"@gitbook/api": "*",
88
"p-map": "^7.0.3",
9+
"itty-router": "^4.0.27",
910
"intercom-client": "^6.3.0"
1011
},
1112
"devDependencies": {

integrations/intercom-conversations/src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { IntercomClient } from 'intercom-client';
2-
import { IntercomRuntimeContext, IntercomMeResponse } from './types';
2+
import type { IntercomRuntimeContext, IntercomMeResponse } from './types';
33
import { ExposableError, getOAuthToken, Logger, OAuthConfig } from '@gitbook/runtime';
44

55
const logger = Logger('intercom-conversations:client');

integrations/intercom-conversations/src/config.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createComponent, InstallationConfigurationProps } from '@gitbook/runtime';
2-
import { IntercomRuntimeContext, IntercomRuntimeEnvironment } from './types';
2+
import type { IntercomRuntimeContext, IntercomRuntimeEnvironment } from './types';
33

44
/**
55
* Configuration component for the Intercom integration.

integrations/intercom-conversations/src/conversations.ts

Lines changed: 38 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import { ConversationInput } from '@gitbook/api';
22
import { Logger } from '@gitbook/runtime';
3-
import { Intercom, IntercomClient } from 'intercom-client';
4-
import pMap from 'p-map';
3+
import { Intercom } from 'intercom-client';
54
import { getIntercomClient } from './client';
6-
import { IntercomRuntimeContext } from './types';
5+
import type { IntercomIntegrationTask, IntercomRuntimeContext } from './types';
6+
import { queueIntercomIntegrationTask } from './tasks';
7+
import pMap from 'p-map';
78

8-
const logger = Logger('intercom-conversations');
9+
const logger = Logger('intercom-conversations:ingest');
910

1011
/**
1112
* Ingest the last closed conversations from Intercom.
1213
*/
13-
export async function ingestConversations(context: IntercomRuntimeContext) {
14+
export async function ingestLastClosedIntercomConversations(context: IntercomRuntimeContext) {
1415
const { installation } = context.environment;
1516
if (!installation) {
1617
throw new Error('Installation not found');
@@ -21,7 +22,7 @@ export async function ingestConversations(context: IntercomRuntimeContext) {
2122
let pageIndex = 0;
2223
const perPage = 100;
2324
const maxPages = 7; // Keep under ~1000 subrequest limit. Calc: 7 pages * 100 items ≈ 700 detail calls + 7 search page calls ≈ ~707 Intercom calls (+7 GitBook ingests ≈ ~714 total).
24-
let totalProcessed = 0;
25+
let totalConvsToIngest = 0;
2526

2627
let page = await intercomClient.conversations.search(
2728
{
@@ -47,43 +48,20 @@ export async function ingestConversations(context: IntercomRuntimeContext) {
4748
`Conversation ingestion started. A maximum of ${maxPages * perPage} conversations will be processed.`,
4849
);
4950

51+
const tasks: Array<IntercomIntegrationTask> = [];
5052
while (pageIndex < maxPages) {
5153
pageIndex += 1;
5254

53-
// Process conversations with fail-safe error handling
54-
const gitbookConversations = (
55-
await pMap(
56-
page.data,
57-
async (conversation) => {
58-
try {
59-
return await parseConversationAsGitBook(intercomClient, conversation);
60-
} catch {
61-
return null;
62-
}
63-
},
64-
{
65-
concurrency: 3,
66-
},
67-
)
68-
).filter((conversation) => conversation !== null);
69-
70-
// Ingest conversations to GitBook
71-
if (gitbookConversations.length > 0) {
72-
try {
73-
await context.api.orgs.ingestConversation(
74-
installation.target.organization,
75-
gitbookConversations,
76-
);
77-
totalProcessed += gitbookConversations.length;
78-
logger.info(
79-
`Successfully ingested ${gitbookConversations.length} conversations from page ${pageIndex}`,
80-
);
81-
} catch (error) {
82-
logger.error(
83-
`Failed to ingest ${gitbookConversations.length} conversations from page ${pageIndex}: ${error}`,
84-
);
85-
}
86-
}
55+
const intercomConversations = page.data.map((conversation) => conversation.id);
56+
totalConvsToIngest += intercomConversations.length;
57+
tasks.push({
58+
type: 'ingest:closed-conversations',
59+
payload: {
60+
organization: installation.target.organization,
61+
installation: installation.id,
62+
conversations: intercomConversations,
63+
},
64+
});
8765

8866
if (!page.hasNextPage()) {
8967
break;
@@ -92,51 +70,47 @@ export async function ingestConversations(context: IntercomRuntimeContext) {
9270
page = await page.getNextPage();
9371
}
9472

95-
logger.info(`Conversation ingestion completed. Processed ${totalProcessed} conversations`);
73+
await pMap(tasks, async (task) => queueIntercomIntegrationTask(context, task), {
74+
concurrency: 3,
75+
});
76+
77+
logger.info(
78+
`Dispatched ${tasks.length} tasks to ingest a total of ${totalConvsToIngest} intercom closed conversations`,
79+
);
9680
}
9781

9882
/**
99-
* Fetch the the full conversation details and parse it into a GitBook conversation format.
83+
* Parse a fetched intercom conversation into a GitBook conversation format.
10084
*/
101-
export async function parseConversationAsGitBook(
102-
intercom: IntercomClient,
103-
partialConversation: Intercom.Conversation,
104-
): Promise<ConversationInput> {
105-
if (partialConversation.state !== 'closed') {
106-
throw new Error(`Conversation ${partialConversation.id} is not closed`);
85+
export function parseIntercomConversationAsGitBook(
86+
conversation: Intercom.Conversation,
87+
): ConversationInput {
88+
if (conversation.state !== 'closed') {
89+
throw new Error(`Conversation ${conversation.id} is not closed`);
10790
}
10891

10992
const resultConversation: ConversationInput = {
110-
id: partialConversation.id,
93+
id: conversation.id,
11194
metadata: {
112-
url: `https://app.intercom.com/a/inbox/_/inbox/conversation/${partialConversation.id}`,
95+
url: `https://app.intercom.com/a/inbox/_/inbox/conversation/${conversation.id}`,
11396
attributes: {},
114-
createdAt: new Date(partialConversation.created_at * 1000).toISOString(),
97+
createdAt: new Date(conversation.created_at * 1000).toISOString(),
11598
},
11699
parts: [],
117100
};
118101

119-
if (partialConversation.source.subject) {
120-
resultConversation.subject = partialConversation.source.subject;
102+
if (conversation.source.subject) {
103+
resultConversation.subject = conversation.source.subject;
121104
}
122105

123-
if (partialConversation.source.body) {
106+
if (conversation.source.body) {
124107
resultConversation.parts.push({
125108
type: 'message',
126109
role: 'user',
127-
body: partialConversation.source.body,
110+
body: conversation.source.body,
128111
});
129112
}
130113

131-
// Fetch full conversation details
132-
const conversation = await intercom.conversations.find(
133-
{ conversation_id: partialConversation.id },
134-
{
135-
headers: { Accept: 'application/json' },
136-
timeoutInSeconds: 3,
137-
},
138-
);
139-
140114
for (const part of conversation.conversation_parts?.conversation_parts ?? []) {
141115
if (part.author.type === 'bot') {
142116
continue;

integrations/intercom-conversations/src/index.ts

Lines changed: 70 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,90 @@
1-
import { createIntegration, createOAuthHandler, ExposableError, Logger } from '@gitbook/runtime';
2-
import { Intercom } from 'intercom-client';
3-
import { getIntercomClient, getIntercomOAuthConfig } from './client';
1+
import { Router } from 'itty-router';
2+
3+
import {
4+
createIntegration,
5+
createOAuthHandler,
6+
ExposableError,
7+
Logger,
8+
verifyIntegrationRequestSignature,
9+
} from '@gitbook/runtime';
10+
import { getIntercomOAuthConfig } from './client';
411
import { configComponent } from './config';
5-
import { ingestConversations, parseConversationAsGitBook } from './conversations';
6-
import { IntercomRuntimeContext } from './types';
12+
import { ingestLastClosedIntercomConversations } from './conversations';
13+
import type { IntercomIntegrationTask, IntercomRuntimeContext } from './types';
14+
import { handleIntercomWebhookRequest } from './intercom-webhooks';
15+
import { handleIntercomIntegrationTask } from './tasks';
716

817
const logger = Logger('intercom-conversations');
918

10-
/**
11-
* https://developers.intercom.com/docs/references/webhooks/webhook-models#webhook-notification-object
12-
*/
13-
type IntercomWebhookPayload = {
14-
type: 'notification_event';
15-
// This is the workspace ID
16-
app_id: string;
17-
topic: 'conversation.admin.closed';
18-
data: {
19-
item: Intercom.Conversation;
20-
};
21-
};
22-
2319
export default createIntegration<IntercomRuntimeContext>({
2420
fetch: async (request, context) => {
25-
const url = new URL(request.url);
21+
const { environment } = context;
22+
23+
const router = Router({
24+
base: new URL(
25+
environment.installation?.urls.publicEndpoint ||
26+
environment.integration.urls.publicEndpoint,
27+
).pathname,
28+
});
2629

2730
/*
28-
* Webhook to ingest conversations when they are closed.
31+
* OAuth flow.
2932
*/
30-
if (url.pathname.endsWith('/webhook')) {
31-
const payload = await request.json<IntercomWebhookPayload>();
32-
33-
if (payload.topic === 'conversation.admin.closed') {
34-
const appId = payload.app_id;
35-
36-
// Find all installations matching this Intercom workspace (externalId = app_id)
37-
const {
38-
data: { items: installations },
39-
} = await context.api.integrations.listIntegrationInstallations(
40-
context.environment.integration.name,
41-
{
42-
externalId: appId,
43-
},
44-
);
45-
46-
if (installations.length === 0) {
47-
throw new Error(`No installations found for Intercom workspace: ${appId}`);
48-
}
33+
router.get(
34+
'/oauth',
35+
createOAuthHandler(getIntercomOAuthConfig(context), {
36+
replace: false,
37+
}),
38+
);
4939

50-
const conversation = payload.data.item;
51-
logger.info(
52-
`Webhook received with topic '${payload.topic}' for conversation id ${conversation.id}. Processing for installations ${installations.join(' ')} `,
53-
);
40+
/*
41+
* Webhook handler to ingest conversations when they are closed.
42+
*/
43+
router.post('/webhook', async (request) => {
44+
return handleIntercomWebhookRequest(request, context);
45+
});
5446

55-
for (const installation of installations) {
56-
try {
57-
const installationContext: IntercomRuntimeContext = {
58-
...context,
59-
environment: {
60-
...context.environment,
61-
installation,
62-
},
63-
};
47+
/**
48+
* Integration tasks handler.
49+
*/
50+
router.post('/tasks', async (request) => {
51+
const verified = await verifyIntegrationRequestSignature(request, environment);
6452

65-
const intercomClient = await getIntercomClient(installationContext);
53+
if (!verified) {
54+
const message = `Invalid signature for integration task`;
55+
logger.error(message);
56+
throw new ExposableError(message);
57+
}
6658

67-
const gitbookConversation = await parseConversationAsGitBook(
68-
intercomClient,
69-
conversation,
70-
);
59+
const { task } = JSON.parse(await request.text()) as { task: IntercomIntegrationTask };
60+
logger.debug('Verified & received integration task', task);
7161

72-
const installationApiClient = await context.api.createInstallationClient(
73-
context.environment.integration.name,
74-
installation.id,
75-
);
62+
context.waitUntil(
63+
(async () => {
64+
await handleIntercomIntegrationTask(context, task);
65+
})(),
66+
);
7667

77-
await installationApiClient.orgs.ingestConversation(
78-
installation.target.organization,
79-
[gitbookConversation],
80-
);
81-
} catch (error) {
82-
logger.error('Failed processing Intercom webhook for installation', {
83-
installationId: installation.id,
84-
error: error instanceof Error ? error.message : String(error),
85-
});
86-
}
87-
}
88-
} else {
89-
throw new ExposableError(`Unknown webhook received: ${payload.topic}`);
68+
return new Response(JSON.stringify({ acknowledged: true }), {
69+
status: 200,
70+
headers: { 'content-type': 'application/json' },
71+
});
72+
});
73+
74+
try {
75+
const response = await router.handle(request, context);
76+
if (!response) {
77+
return new Response(`No route matching ${request.method} ${request.url}`, {
78+
status: 404,
79+
});
9080
}
91-
92-
return new Response('OK', { status: 200 });
93-
}
94-
95-
/*
96-
* OAuth flow.
97-
*/
98-
if (url.pathname.endsWith('/oauth')) {
99-
const oauthHandler = createOAuthHandler(getIntercomOAuthConfig(context), {
100-
replace: false,
81+
return response;
82+
} catch (error: any) {
83+
logger.error(`error handling request ${error.message} ${error.stack}`);
84+
return new Response('Unexpected error', {
85+
status: 500,
10186
});
102-
return oauthHandler(request, context);
10387
}
104-
105-
return new Response('Not found', { status: 404 });
10688
},
10789
components: [configComponent],
10890
events: {
@@ -112,7 +94,7 @@ export default createIntegration<IntercomRuntimeContext>({
11294
installation_setup: async (_, context) => {
11395
const { installation } = context.environment;
11496
if (installation?.configuration.oauth_credentials) {
115-
await ingestConversations(context);
97+
await ingestLastClosedIntercomConversations(context);
11698
}
11799
},
118100
},

0 commit comments

Comments
 (0)