Skip to content

Commit d0bea22

Browse files
authored
Merge pull request #8 from ora-io/refactor/flow
Refactor/flow
2 parents f4b6d4d + 23355d3 commit d0bea22

File tree

29 files changed

+1062
-116
lines changed

29 files changed

+1062
-116
lines changed

packages/orap/README.md

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,6 @@ orap.event(eventSignalParam)
101101
.ttl({ taskTtl: 120000, doneTtl: 60000 })
102102
.handle(handle2)
103103

104-
// set logger before listen
105-
orap.logger(logger)
106-
107104
// start signal listeners
108105
orap.listen(
109106
{
@@ -130,8 +127,6 @@ Each `new Orap()` starts a `Orap Flow`
130127
- optional: httpProvider, for crosscheck only, since crosscheck is based on getLogs
131128
- `onListenFn`: customized hook when listener started.
132129

133-
**.logger(logger)**
134-
- set which logger to use across this orap
135130

136131
#### Event Flow
137132

@@ -302,4 +297,4 @@ etc.
302297
### StorageManager
303298
- a wrap class designed for caching tasks in Orap
304299
- `store`: the store entity, currently provides 2 options: use memory or redis, checkout `orap/store`
305-
- `queryDelay`: when doing retry-able operations, e.g. get all keys with the given prefix, this defines the interval between retries.
300+
- `queryDelay`: when doing retry-able operations, e.g. get all keys with the given prefix, this defines the interval between retries.

packages/orap/examples/customDemo/app.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { TransferTask } from './taskTransfer'
88

99
// new orap
1010
const orap = new Orap()
11-
orap.logger(logger)
1211

1312
let store: any
1413
let sm: any

packages/orap/examples/declarativeDemo/app.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {
5050
.ttl({ taskTtl: 20000, doneTtl: 20000 })
5151
.handle(handleTask_2)
5252

53-
// set logger before listen
54-
orap.logger(logger)
55-
5653
// start signal listener
5754
orap.listen(
5855
options,

packages/orap/examples/raplizeSample/app.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ orap.event(eventSignalParam)
3434
.ttl({ taskTtl: 120000, doneTtl: 60000 })
3535
.handle(handle)
3636

37-
// set logger before listen
38-
orap.logger(logger)
39-
4037
// start signal listeners
4138
orap.listen(
4239
{

packages/orap/src/beat/event.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import type { Logger } from '@ora-io/utils'
21
import type { AutoCrossCheckParam, Providers } from '@ora-io/reku'
32
import type { EventSignalCallback, EventSignalRegisterParams } from '../signal'
43
import { EventSignal } from '../signal'
@@ -11,12 +10,11 @@ export class EventBeat extends EventSignal {
1110
constructor(
1211
params: EventSignalRegisterParams,
1312
callback: EventSignalCallback,
14-
logger: Logger,
1513
crosscheckOptions: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'> | undefined,
1614
private subscribeProvider: Providers,
1715
private crosscheckProvider: Providers | undefined,
1816
) {
19-
super(params, callback, logger, crosscheckOptions)
17+
super(params, callback, crosscheckOptions)
2018
}
2119

2220
drop() {

packages/orap/src/flow/event.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { ethers } from 'ethers'
2+
import { beforeEach, describe, expect, it } from 'vitest'
3+
import { OrapFlow } from './orap'
4+
import { EventFlow } from './event'
5+
6+
describe('EventFlow', () => {
7+
let orapFlow: OrapFlow
8+
let eventFlow: EventFlow
9+
10+
beforeEach(() => {
11+
orapFlow = new OrapFlow()
12+
eventFlow = new EventFlow(orapFlow)
13+
})
14+
15+
it('should create a task flow', () => {
16+
const taskFlow = eventFlow.task()
17+
expect(taskFlow).toBeDefined()
18+
expect(eventFlow.taskFlows).toContain(taskFlow)
19+
})
20+
21+
it('should set the subscribe provider', () => {
22+
const provider = new ethers.JsonRpcProvider()
23+
eventFlow.setSubscribeProvider(provider)
24+
expect(eventFlow.subscribeProvider).toBe(provider)
25+
})
26+
27+
it('should set the crosscheck provider', () => {
28+
const provider = new ethers.JsonRpcProvider()
29+
eventFlow.setCrosscheckProvider(provider)
30+
expect(eventFlow.crosscheckProvider).toBe(provider)
31+
})
32+
33+
it('should set the handle function', () => {
34+
const handleFn = () => true
35+
eventFlow.handle(handleFn)
36+
expect(eventFlow.handleFn).toBe(handleFn)
37+
})
38+
39+
it('should assemble the EventVerse', () => {
40+
const taskFlow = eventFlow.task()
41+
const eventVerse = eventFlow.assemble()
42+
expect(eventVerse).toBeDefined()
43+
expect(taskFlow).toBeDefined()
44+
expect(eventFlow.taskFlows).toContain(taskFlow)
45+
})
46+
47+
it('should return the parent OrapFlow', () => {
48+
const parentFlow = eventFlow.another()
49+
expect(parentFlow).toBe(orapFlow)
50+
})
51+
})

packages/orap/src/flow/event.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import type { Flow, HandleFn } from './interface'
99
import type { OrapFlow } from './orap'
1010

1111
export class EventFlow implements Flow {
12-
private taskFlows: TaskFlow[] = []
12+
private _taskFlows: TaskFlow[] = []
1313

1414
handleFn: HandleFn
1515
partialCrosscheckOptions?: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'>
@@ -24,14 +24,12 @@ export class EventFlow implements Flow {
2424
) {
2525
// Default handleFn
2626
this.handleFn = handleFn ?? (async (..._args: Array<any>) => {
27-
const _contractEventPayload = _args.pop()
28-
this.logger.debug('handle event signal', _contractEventPayload.log.transactionHash)
2927
return true
3028
})
3129
}
3230

33-
get logger() {
34-
return this.parentFlow!._logger
31+
get taskFlows() {
32+
return this._taskFlows
3533
}
3634

3735
crosscheck(options?: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'>) {
@@ -46,7 +44,7 @@ export class EventFlow implements Flow {
4644
tf.cache(sm)
4745
if (context)
4846
tf.context(context)
49-
this.taskFlows.push(tf)
47+
this._taskFlows.push(tf)
5048
return tf
5149
}
5250

@@ -74,7 +72,7 @@ export class EventFlow implements Flow {
7472
}
7573

7674
private _assembleTaskFlows(): TaskVerse[] {
77-
return this.taskFlows.map(flow => flow.assemble())
75+
return this._taskFlows.map(flow => flow.assemble())
7876
}
7977

8078
// TODO: use _assemble? for ux?

packages/orap/src/flow/orap.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { ethers } from 'ethers'
2+
import { beforeEach, describe, expect, it, vi } from 'vitest'
3+
import { SEPOLIA_HTTP, SEPOLIA_WSS } from '../../tests/config'
4+
import { OrapFlow } from './orap'
5+
6+
describe('OrapFlow', () => {
7+
let wsProvider: any
8+
let httpProvider: any
9+
let orapFlow: OrapFlow
10+
11+
beforeEach(() => {
12+
wsProvider = new ethers.WebSocketProvider(SEPOLIA_WSS)
13+
httpProvider = new ethers.JsonRpcProvider(SEPOLIA_HTTP)
14+
orapFlow = new OrapFlow()
15+
})
16+
17+
it('should create an event flow', () => {
18+
const eventFlow = orapFlow.event()
19+
expect(eventFlow).toBeDefined()
20+
expect(orapFlow.eventFlows).toContain(eventFlow)
21+
})
22+
23+
it('should listen for events', () => {
24+
const onListenFn = vi.fn()
25+
orapFlow.listen({ wsProvider, httpProvider }, onListenFn)
26+
expect(orapFlow.onListenFn).toBe(onListenFn)
27+
})
28+
29+
it('should assemble the OrapVerse', () => {
30+
const eventFlow = orapFlow.event()
31+
const orapVerse = orapFlow.assemble()
32+
expect(orapVerse).toBeDefined()
33+
expect(eventFlow).toBeDefined()
34+
expect(orapFlow.eventFlows).toContain(eventFlow)
35+
})
36+
})

packages/orap/src/flow/orap.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import type { Logger } from '@ora-io/utils'
2-
import { logger } from '@ora-io/utils'
31
import type { Providers } from '@ora-io/reku'
42
import { OrapVerse } from '../verse/orap'
53
import type { EventSignalRegisterParams } from '../signal'
@@ -20,11 +18,9 @@ export class OrapFlow implements Flow {
2018
} = { event: [] }
2119

2220
onListenFn: any = () => { }
23-
_logger: Logger = logger
2421

25-
logger(logger: Logger) {
26-
this._logger = logger
27-
return this
22+
get eventFlows() {
23+
return this.subflows.event
2824
}
2925

3026
event(options?: EventSignalRegisterParams, handler?: any): EventFlow {
@@ -52,8 +48,7 @@ export class OrapFlow implements Flow {
5248
}
5349

5450
assemble(): OrapVerse {
55-
// const es = new EventSignal(options, fn, this.logger)
56-
const eventVerses = this.subflows.event.map(flow => flow.assemble(), { logger: this.logger })
51+
const eventVerses = this.subflows.event.map(flow => flow.assemble())
5752
return new OrapVerse(this).setEventVerses(eventVerses)
5853
// this.routes.event.push(es)
5954
}

packages/orap/src/flow/task.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest'
2+
import { memoryStore } from '@ora-io/utils'
3+
import { StoreManager } from '../store'
4+
import { EventFlow, TaskFlow } from '.'
5+
6+
describe('TaskFlow', () => {
7+
let parentFlow: any
8+
let taskFlow: TaskFlow
9+
10+
beforeEach(() => {
11+
parentFlow = new EventFlow()
12+
taskFlow = new TaskFlow(parentFlow)
13+
})
14+
15+
it('should set cache', () => {
16+
const sm = new StoreManager(memoryStore())
17+
const result = taskFlow.cache(sm)
18+
expect(result).toBe(taskFlow)
19+
expect(taskFlow.sm).toBe(sm)
20+
})
21+
22+
it('should set context', () => {
23+
const ctx = { key: 'value' }
24+
const result = taskFlow.context(ctx)
25+
expect(result).toBe(taskFlow)
26+
expect(taskFlow.ctx).toBe(ctx)
27+
})
28+
29+
it('should set prefix', () => {
30+
const taskPrefix = 'Task:'
31+
const donePrefix = 'Done-Task:'
32+
const result = taskFlow.prefix(taskPrefix, donePrefix)
33+
expect(result).toBe(taskFlow)
34+
expect(taskFlow.taskPrefix).toBe(taskPrefix)
35+
expect(taskFlow.donePrefix).toBe(donePrefix)
36+
})
37+
38+
it('should set TTL', () => {
39+
const TTLs = { taskTtl: 1000, doneTtl: 2000 }
40+
const result = taskFlow.ttl(TTLs)
41+
expect(result).toBe(taskFlow)
42+
expect(taskFlow.taskTtl).toBe(TTLs.taskTtl)
43+
expect(taskFlow.doneTtl).toBe(TTLs.doneTtl)
44+
})
45+
46+
it('should set key', () => {
47+
const toKey = () => 'key'
48+
const result = taskFlow.key(toKey)
49+
expect(result).toBe(taskFlow)
50+
expect(taskFlow.toKeyFn).toBe(toKey)
51+
})
52+
53+
it('should set handle', () => {
54+
const handler = vi.fn()
55+
const result = taskFlow.handle(handler)
56+
expect(result).toBe(taskFlow)
57+
expect(taskFlow.handleFn).toBe(handler)
58+
})
59+
60+
it('should set success', () => {
61+
const onSuccess = vi.fn()
62+
const result = taskFlow.success(onSuccess)
63+
expect(result).toBe(taskFlow)
64+
expect(taskFlow.successFn).toBe(onSuccess)
65+
})
66+
67+
it('should set fail', () => {
68+
const onFail = vi.fn()
69+
const result = taskFlow.fail(onFail)
70+
expect(result).toBe(taskFlow)
71+
expect(taskFlow.failFn).toBe(onFail)
72+
})
73+
74+
it('should get another EventFlow', () => {
75+
const result = taskFlow.another()
76+
expect(result).toBe(parentFlow)
77+
})
78+
79+
it('should assemble the TaskVerse', () => {
80+
const taskVerse = taskFlow.assemble()
81+
expect(taskVerse).toBeDefined()
82+
expect(taskVerse.flow).toBe(taskFlow)
83+
})
84+
})

0 commit comments

Comments
 (0)