Commit e236c412 authored by nanahira's avatar nanahira

rxjs support command and runLayersWith

parent 87323190
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
"lodash": "^4.17.21", "lodash": "^4.17.21",
"mustache": "^4.2.0", "mustache": "^4.2.0",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"rxjs": "^7.5.5",
"typed-reflector": "^1.0.10" "typed-reflector": "^1.0.10"
}, },
"devDependencies": { "devDependencies": {
...@@ -5843,6 +5844,19 @@ ...@@ -5843,6 +5844,19 @@
"queue-microtask": "^1.2.2" "queue-microtask": "^1.2.2"
} }
}, },
"node_modules/rxjs": {
"version": "7.5.5",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.5.5.tgz",
"integrity": "sha512-sy+H0pQofO95VDmFLzyaw9xNJU4KTRSwQIGM6+iG3SypAtCiLDzpeG8sJrNCWn2Up9km+KhkvTdbkrdy+yzZdw==",
"dependencies": {
"tslib": "^2.1.0"
}
},
"node_modules/rxjs/node_modules/tslib": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.0.tgz",
"integrity": "sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ=="
},
"node_modules/safe-buffer": { "node_modules/safe-buffer": {
"version": "5.1.2", "version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
...@@ -11344,6 +11358,21 @@ ...@@ -11344,6 +11358,21 @@
"queue-microtask": "^1.2.2" "queue-microtask": "^1.2.2"
} }
}, },
"rxjs": {
"version": "7.5.5",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.5.5.tgz",
"integrity": "sha512-sy+H0pQofO95VDmFLzyaw9xNJU4KTRSwQIGM6+iG3SypAtCiLDzpeG8sJrNCWn2Up9km+KhkvTdbkrdy+yzZdw==",
"requires": {
"tslib": "^2.1.0"
},
"dependencies": {
"tslib": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.0.tgz",
"integrity": "sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ=="
}
}
},
"safe-buffer": { "safe-buffer": {
"version": "5.1.2", "version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
......
...@@ -63,6 +63,7 @@ ...@@ -63,6 +63,7 @@
"lodash": "^4.17.21", "lodash": "^4.17.21",
"mustache": "^4.2.0", "mustache": "^4.2.0",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"rxjs": "^7.5.5",
"typed-reflector": "^1.0.10" "typed-reflector": "^1.0.10"
}, },
"peerDependencies": { "peerDependencies": {
......
...@@ -4,6 +4,7 @@ import { ...@@ -4,6 +4,7 @@ import {
CommandDefinitionFun, CommandDefinitionFun,
CommandLocaleDef, CommandLocaleDef,
CommandOptionConfigWithDescription, CommandOptionConfigWithDescription,
CommandReturnType,
ContextCallbackLayer, ContextCallbackLayer,
EventName, EventName,
KoishiCommandDefinition, KoishiCommandDefinition,
...@@ -17,7 +18,6 @@ import { ...@@ -17,7 +18,6 @@ import {
import 'reflect-metadata'; import 'reflect-metadata';
import { import {
Argv, Argv,
Awaitable,
BeforeEventMap, BeforeEventMap,
Command, Command,
Dict, Dict,
...@@ -34,6 +34,7 @@ import { ...@@ -34,6 +34,7 @@ import {
applyOptionToCommand, applyOptionToCommand,
registerTemplate, registerTemplate,
} from '../utility'; } from '../utility';
import { Observable } from 'rxjs';
// Register method // Register method
...@@ -54,22 +55,16 @@ export const UsePlugin = () => DoRegister.plugin(); ...@@ -54,22 +55,16 @@ export const UsePlugin = () => DoRegister.plugin();
export function UseCommand<D extends string>( export function UseCommand<D extends string>(
def: D, def: D,
config?: CommandConfigExtended, config?: CommandConfigExtended,
): TypedMethodDecorator< ): TypedMethodDecorator<(...args: any[]) => CommandReturnType>;
(...args: any[]) => Awaitable<string | void | undefined>
>;
export function UseCommand<D extends string>( export function UseCommand<D extends string>(
def: D, def: D,
desc: string, desc: string,
config?: CommandConfigExtended, config?: CommandConfigExtended,
): TypedMethodDecorator< ): TypedMethodDecorator<(...args: any[]) => CommandReturnType>;
(...args: any[]) => Awaitable<string | void | undefined>
>;
export function UseCommand( export function UseCommand(
def: string, def: string,
...args: [CommandConfigExtended?] | [string, CommandConfigExtended?] ...args: [CommandConfigExtended?] | [string, CommandConfigExtended?]
): TypedMethodDecorator< ): TypedMethodDecorator<(...args: any[]) => CommandReturnType> {
(...args: any[]) => Awaitable<string | void | undefined>
> {
const desc = typeof args[0] === 'string' ? (args.shift() as string) : ''; const desc = typeof args[0] === 'string' ? (args.shift() as string) : '';
const config = args[0] as CommandConfigExtended; const config = args[0] as CommandConfigExtended;
return (obj, key: string, des) => { return (obj, key: string, des) => {
......
...@@ -12,6 +12,7 @@ import { ...@@ -12,6 +12,7 @@ import {
import type { DefaultContext, DefaultState, ParameterizedContext } from 'koa'; import type { DefaultContext, DefaultState, ParameterizedContext } from 'koa';
import type { RouterParamContext } from '@koa/router'; import type { RouterParamContext } from '@koa/router';
import { CommandPut } from '../registry'; import { CommandPut } from '../registry';
import { Observable } from 'rxjs';
export interface Type<T = any> extends Function { export interface Type<T = any> extends Function {
new (...args: any[]): T; new (...args: any[]): T;
...@@ -208,3 +209,7 @@ export type ContextCallbackLayer<T = any> = ( ...@@ -208,3 +209,7 @@ export type ContextCallbackLayer<T = any> = (
ctx: Context, ctx: Context,
cb: ContextFunction<void>, cb: ContextFunction<void>,
) => T; ) => T;
export type CommandReturnType = Awaitable<
string | void | undefined | Observable<string | void | undefined>
>;
...@@ -14,12 +14,12 @@ import { ...@@ -14,12 +14,12 @@ import {
} from './utility'; } from './utility';
import { DoRegister } from './registry'; import { DoRegister } from './registry';
import _ from 'lodash'; import _ from 'lodash';
import { isObservable, Observable } from 'rxjs';
export interface DoRegisterResult<T> extends DoRegister.Config { export interface DoRegisterResult<T> extends DoRegister.Config {
key: keyof T & string; key: keyof T & string;
result: any; result: any;
} }
export class Registrar<T = any> { export class Registrar<T = any> {
constructor( constructor(
private obj: T, private obj: T,
...@@ -82,20 +82,34 @@ export class Registrar<T = any> { ...@@ -82,20 +82,34 @@ export class Registrar<T = any> {
ctx: Context, ctx: Context,
cb: ContextFunction<R>, cb: ContextFunction<R>,
layers: ContextCallbackLayer[], layers: ContextCallbackLayer[],
): Promise<Awaited<R>> { ) {
const rest = [...layers]; const rest = [...layers];
const layer = rest.pop(); const layer = rest.pop();
return new Promise<any>((resolve) => { return new Observable<R extends Observable<infer U> ? U : Awaited<R>>(
layer(ctx, async (nextCtx) => { (subscriber) => {
let result: R; layer(ctx, async (nextCtx) => {
if (!rest.length) { if (!rest.length) {
result = await cb(nextCtx); const result = cb(nextCtx);
} else { if (result instanceof Promise) {
result = await this.runLayersWith(nextCtx, cb, rest); try {
} subscriber.next(await result);
resolve(result); } catch (e) {
}); subscriber.error(e);
}); }
} else if (isObservable(result)) {
(result as Observable<any>).subscribe({
next: (value) => subscriber.next(value),
error: (error) => subscriber.error(error),
});
} else {
subscriber.next(result as any);
}
} else {
this.runLayersWith(nextCtx, cb, rest).subscribe(subscriber);
}
});
},
);
} }
runLayers<R>(ctx: Context, cb: ContextFunction<R>, key?: keyof T) { runLayers<R>(ctx: Context, cb: ContextFunction<R>, key?: keyof T) {
......
...@@ -13,6 +13,7 @@ import { ...@@ -13,6 +13,7 @@ import {
BeforeEventName, BeforeEventName,
BeforeEventNameAndPrepend, BeforeEventNameAndPrepend,
CommandRegisterConfig, CommandRegisterConfig,
CommandReturnType,
EventName, EventName,
EventNameAndPrepend, EventNameAndPrepend,
KoaContext, KoaContext,
...@@ -31,6 +32,8 @@ import { CommandPut } from './command-put'; ...@@ -31,6 +32,8 @@ import { CommandPut } from './command-put';
import { applySelector, generateRenderer } from '../../utility'; import { applySelector, generateRenderer } from '../../utility';
import { Next as KoaNext } from 'koa'; import { Next as KoaNext } from 'koa';
import { IncomingMessage } from 'http'; import { IncomingMessage } from 'http';
import { isObservable, Observable } from 'rxjs';
import { sessionRxToPromise } from '../../utility/rxjs-session';
// eslint-disable-next-line @typescript-eslint/no-namespace // eslint-disable-next-line @typescript-eslint/no-namespace
export namespace DoRegister { export namespace DoRegister {
...@@ -88,7 +91,7 @@ export namespace DoRegister { ...@@ -88,7 +91,7 @@ export namespace DoRegister {
onEvent: PickEventFunction<Events, EventName>; onEvent: PickEventFunction<Events, EventName>;
beforeEvent: PickEventFunction<BeforeEventMap, BeforeEventName>; beforeEvent: PickEventFunction<BeforeEventMap, BeforeEventName>;
plugin(): Awaitable<PluginDefinition | undefined>; plugin(): Awaitable<PluginDefinition | undefined>;
command(...args: any[]): Awaitable<string | void | undefined>; command(...args: any[]): CommandReturnType;
route(ctx: KoaContext, Next: KoaNext): any; route(ctx: KoaContext, Next: KoaNext): any;
ws(socket: WebSocket, request: IncomingMessage): any; ws(socket: WebSocket, request: IncomingMessage): any;
formatter: I18n.Formatter; formatter: I18n.Formatter;
...@@ -197,7 +200,7 @@ export namespace DoRegister { ...@@ -197,7 +200,7 @@ export namespace DoRegister {
view, view,
); );
} }
command.action((argv: Argv, ...args: any[]) => { command.action(async (argv: Argv, ...args: any[]) => {
const params = data.putOptions.map((o, i) => const params = data.putOptions.map((o, i) =>
CommandPut.registry.execute( CommandPut.registry.execute(
o, o,
...@@ -207,7 +210,15 @@ export namespace DoRegister { ...@@ -207,7 +210,15 @@ export namespace DoRegister {
view, view,
), ),
); );
return obj[key](...params); const ret = await obj[key](...params);
if (isObservable(ret)) {
return sessionRxToPromise(
argv.session,
ret as Observable<string>,
);
} else {
return ret;
}
}); });
} }
} }
......
import { Session } from 'koishi';
import { Observable } from 'rxjs';
export function sessionRxToPromise(session: Session, obs: Observable<string>) {
return new Promise<string>((resolve, reject) => {
let lastValue: string;
obs.subscribe({
next: async (value) => {
if (lastValue && session.send) {
await session.send(lastValue);
}
lastValue = value;
},
error: async (error) => {
if (lastValue && session.send) {
await session.send(lastValue);
}
reject(error);
},
complete: () => {
resolve(lastValue);
},
});
});
}
...@@ -12,6 +12,7 @@ import { ...@@ -12,6 +12,7 @@ import {
import { App, Command, Next, Session } from 'koishi'; import { App, Command, Next, Session } from 'koishi';
import { Registrar } from '../src/register'; import { Registrar } from '../src/register';
import { EventNameAndPrepend } from '../src/def'; import { EventNameAndPrepend } from '../src/def';
import { of } from 'rxjs';
class SkirtArg { class SkirtArg {
@PutArg(0) @PutArg(0)
...@@ -39,8 +40,8 @@ class MyClass { ...@@ -39,8 +40,8 @@ class MyClass {
@UseCommand('echo', 'hi') @UseCommand('echo', 'hi')
@CommandUsage('foo') @CommandUsage('foo')
async onEcho(@PutOption('content', '-c <content>') content: string) { onEcho(@PutOption('content', '-c <content>') content: string) {
return `bot: ${content}`; return of(`bot: ${content}`);
} }
@UseCommand('count <count>') @UseCommand('count <count>')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment