Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update message thread on events #79

Merged
merged 5 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ jobs:
steps:
- name: Checkout service-agent
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup node v20
uses: actions/setup-node@v4
Expand Down
2 changes: 1 addition & 1 deletion packages/main/src/controllers/message/MessageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ export class MessageService {
content: {},
tags: { messageId: message.id, connectionId: message.connectionId },
})
this.logger.debug!(`messageId: ${messageId}`)
this.logger.debug!(`messageId saved: ${messageId}`)
return { id: messageId ?? utils.uuid() } // TODO: persistant mapping between AFJ records and Service Agent flows. Support external message id setting
} catch (error) {
this.logger.error(`Error: ${error.stack}`)
Expand Down
29 changes: 19 additions & 10 deletions packages/main/src/events/MessageEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
timestamp: new Date(), // It can take also 'sentTime' to be related to the origin
})

if (msg.threadId) msg.threadId = await getRecordId(agent, msg.threadId)
await sendMessageReceivedEvent(agent, msg, msg.timestamp, config)
}

Expand Down Expand Up @@ -148,7 +149,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
if (message.type === CallOfferMessage.type.messageTypeUri) {
const callOffer = message as CallOfferMessage
const msg = new CallOfferRequestMessage({
id: message.id,
id: await getRecordId(agent, message.id),
connectionId: connection.id,
offerExpirationTime: callOffer.offerExpirationTime ?? undefined,
offerStartTime: callOffer.offerStartTime ?? undefined,
Expand All @@ -164,7 +165,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
if (message.type === CallEndMessage.type.messageTypeUri) {
const thread = (message as CallEndMessage).thread
const msg = new CallEndRequestMessage({
id: message.id,
id: await getRecordId(agent, message.id),
connectionId: connection.id,
threadId: thread?.threadId,
timestamp: new Date(),
Expand All @@ -176,7 +177,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
if (message.type === CallAcceptMessage.type.messageTypeUri) {
const parameters = (message as CallAcceptMessage).parameters
const msg = new CallAcceptRequestMessage({
id: message.id,
id: await getRecordId(agent, message.id),
connectionId: connection.id,
parameters: parameters,
threadId: message.thread?.threadId,
Expand All @@ -189,7 +190,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
if (message.type === CallRejectMessage.type.messageTypeUri) {
const thread = (message as CallEndMessage).thread
const msg = new CallRejectRequestMessage({
id: message.id,
id: await getRecordId(agent, message.id),
connectionId: connection.id,
threadId: thread?.threadId,
timestamp: new Date(),
Expand Down Expand Up @@ -221,7 +222,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
],
connectionId: record.connectionId!,
id: message.id,
threadId: record.threadId,
threadId: await getRecordId(agent, record.threadId),
timestamp: record.updatedAt,
})

Expand Down Expand Up @@ -312,7 +313,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
],
connectionId: record.connectionId!,
id: message.id,
threadId: record.threadId,
threadId: await getRecordId(agent, record.threadId),
timestamp: record.updatedAt,
})

Expand All @@ -330,7 +331,6 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
config.logger.debug(`CredentialStateChangedEvent received. Record id:
${JSON.stringify(payload.credentialRecord.id)}, state: ${JSON.stringify(payload.credentialRecord.state)}`)
const record = payload.credentialRecord
const flowRecord = await agent.genericRecords.findById(record.threadId)

if (record.state === CredentialState.ProposalReceived) {
const credentialProposalMessage = await agent.credentials.findProposalMessage(record.id)
Expand All @@ -348,14 +348,15 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
timestamp: record.createdAt,
})

if (message.threadId) message.threadId = await getRecordId(agent, message.threadId)
await sendMessageReceivedEvent(agent, message, message.timestamp, config)
} else if (
[CredentialState.Declined, CredentialState.Done, CredentialState.Abandoned].includes(record.state)
) {
const message = new CredentialReceptionMessage({
connectionId: record.connectionId!,
id: record.id,
threadId: (flowRecord?.getTag('messageId') as string) ?? record.threadId,
threadId: await getRecordId(agent, record.threadId),
state:
record.errorMessage === 'issuance-abandoned: e.msg.refused'
? CredentialState.Declined
Expand Down Expand Up @@ -400,6 +401,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
})),
})

if (message.threadId) message.threadId = await getRecordId(agent, message.threadId)
await sendMessageReceivedEvent(agent, message, message.timestamp, config)
}
}
Expand Down Expand Up @@ -468,6 +470,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
mrzData,
})

msg.id = await getRecordId(agent, msg.id)
await sendMessageReceivedEvent(agent, msg, msg.timestamp, config)
})

Expand All @@ -481,6 +484,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
dataGroups,
})

msg.id = await getRecordId(agent, msg.id)
await sendMessageReceivedEvent(agent, msg, msg.timestamp, config)
})

Expand All @@ -505,6 +509,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
threadId,
state: stateMap[description.code as MrtdProblemReportReason],
})
msg.id = await getRecordId(agent, msg.id)
await sendMessageReceivedEvent(agent, msg, msg.timestamp, config)
} else if (
[MrtdProblemReportReason.MrzRefused, MrtdProblemReportReason.MrzTimeout].includes(
Expand All @@ -516,6 +521,7 @@ export const messageEvents = async (agent: ServiceAgent, config: ServerConfig) =
threadId,
state: stateMap[description.code as MrtdProblemReportReason],
})
msg.id = await getRecordId(agent, msg.id)
await sendMessageReceivedEvent(agent, msg, msg.timestamp, config)
}
})
Expand All @@ -529,8 +535,6 @@ const sendMessageReceivedEvent = async (
timestamp: Date,
config: ServerConfig,
) => {
const recordId = await agent.genericRecords.findById(message.id)
if (recordId?.getTag('messageId') as string) message.id = recordId?.getTag('messageId') as string
const body = new MessageReceived({
timestamp,
message: message,
Expand Down Expand Up @@ -558,3 +562,8 @@ const sendMessageStateUpdatedEvent = async (options: {
})
await sendWebhookEvent(config.webhookUrl + '/message-state-updated', body, config.logger)
}

const getRecordId = async (agent: ServiceAgent, id: string): Promise<string> => {
const record = await agent.genericRecords.findById(id)
return (record?.getTag('messageId') as string) ?? id
}