From d48ad866b9e60a9f4db334319e095b93e07e203f Mon Sep 17 00:00:00 2001 From: Jozef Flakus Date: Fri, 25 Jan 2019 20:37:19 +0100 Subject: [PATCH] feat(core, websockets): Effects output stream --- .../core/src/effects/effects.interface.ts | 3 ++ packages/core/src/http.interface.ts | 5 --- packages/core/src/listener/http.listener.ts | 43 +++++++++---------- .../response/responseContentType.factory.ts | 9 ++-- .../src/effects/ws-effects.interface.ts | 4 ++ .../src/listener/websocket.listener.ts | 5 ++- 6 files changed, 35 insertions(+), 34 deletions(-) diff --git a/packages/core/src/effects/effects.interface.ts b/packages/core/src/effects/effects.interface.ts index 8936b051..033ad85b 100644 --- a/packages/core/src/effects/effects.interface.ts +++ b/packages/core/src/effects/effects.interface.ts @@ -29,6 +29,9 @@ export interface ErrorEffect export interface ServerEffect extends Effect {} +export interface OutputEffect + extends Effect {} + export interface Effect< T = HttpRequest, U = EffectHttpResponse, diff --git a/packages/core/src/http.interface.ts b/packages/core/src/http.interface.ts index 742b0577..182d9201 100644 --- a/packages/core/src/http.interface.ts +++ b/packages/core/src/http.interface.ts @@ -44,11 +44,6 @@ export enum HttpMethodType { export type HttpMethod = keyof typeof HttpMethodType; -export type Http = { - req: HttpRequest; - res: HttpResponse; -}; - export enum HttpStatus { CONTINUE = 100, SWITCHING_PROTOCOLS = 101, diff --git a/packages/core/src/listener/http.listener.ts b/packages/core/src/listener/http.listener.ts index f7c2c183..a15f611e 100644 --- a/packages/core/src/listener/http.listener.ts +++ b/packages/core/src/listener/http.listener.ts @@ -1,9 +1,9 @@ import { IncomingMessage, OutgoingMessage } from 'http'; import { of, Subject } from 'rxjs'; -import { catchError, defaultIfEmpty, mergeMap, switchMap, tap, takeWhile } from 'rxjs/operators'; +import { catchError, defaultIfEmpty, mergeMap, tap, takeWhile } from 'rxjs/operators'; import { combineMiddlewares } from '../effects/effects.combiner'; -import { EffectHttpResponse, Middleware, ErrorEffect } from '../effects/effects.interface'; -import { Http, HttpRequest, HttpResponse, HttpStatus } from '../http.interface'; +import { EffectHttpResponse, Middleware, ErrorEffect, OutputEffect } from '../effects/effects.interface'; +import { HttpRequest, HttpResponse, HttpStatus } from '../http.interface'; import { handleResponse } from '../response/response.handler'; import { RouteEffect, RouteEffectGroup } from '../router/router.interface'; import { resolveRouting } from '../router/router.resolver'; @@ -16,39 +16,38 @@ export interface HttpListenerConfig { middlewares?: Middleware[]; effects: (RouteEffect | RouteEffectGroup)[]; error$?: ErrorEffect; + output$?: OutputEffect; } export const httpListener = ({ middlewares = [], effects, error$ = defaultError$, + output$ = out$ => out$, }: HttpListenerConfig) => { - const requestSubject$ = new Subject(); + const requestSubject$ = new Subject<{ req: HttpRequest; res: HttpResponse; }>(); const combinedMiddlewares = combineMiddlewares(...middlewares); const routing = factorizeRouting(effects); const injector = createStaticInjectionContainer(); const defaultMetadata = createEffectMetadata({ inject: injector.get }); const defaultResponse = { status: HttpStatus.NOT_FOUND } as EffectHttpResponse; - const effect$ = requestSubject$.pipe( - mergeMap(({ req, res }) => { - res.send = handleResponse(res)(req); - - return combinedMiddlewares(of(req), res, defaultMetadata).pipe( - takeWhile(() => !res.finished), - switchMap(resolveRouting(routing, defaultMetadata)(res)), - defaultIfEmpty(defaultResponse), - tap(res.send), - catchError(error => - error$(of(req), res, createEffectMetadata({ ...defaultMetadata, error })).pipe( - tap(res.send), - ), + requestSubject$.pipe( + tap(({ req, res }) => res.send = handleResponse(res)(req)), + mergeMap(({ req, res }) => combinedMiddlewares(of(req), res, defaultMetadata).pipe( + takeWhile(() => !res.finished), + mergeMap(resolveRouting(routing, defaultMetadata)(res)), + defaultIfEmpty(defaultResponse), + mergeMap(out => output$(of(out), res, defaultMetadata)), + tap(res.send), + catchError(error => + error$(of(req), res, createEffectMetadata({ ...defaultMetadata, error })).pipe( + mergeMap(out => output$(of(out), res, createEffectMetadata({ ...defaultMetadata, error }))), + tap(res.send), ), - ); - }), - ); - - effect$.subscribe(); + ), + )), + ).subscribe(); const httpServer = (req: IncomingMessage, res: OutgoingMessage) => requestSubject$.next({ req: req as HttpRequest, diff --git a/packages/core/src/response/responseContentType.factory.ts b/packages/core/src/response/responseContentType.factory.ts index 02f07f0e..c0b39428 100644 --- a/packages/core/src/response/responseContentType.factory.ts +++ b/packages/core/src/response/responseContentType.factory.ts @@ -7,12 +7,9 @@ export const DEFAULT_CONTENT_TYPE = ContentType.APPLICATION_JSON; export const getMimeType = (body: any, path: string) => { const mimeFromBuffer = Buffer.isBuffer(body) && fileType(body); - - if (mimeFromBuffer) { - return mimeFromBuffer.mime; - } - - return mime.getType(path) || DEFAULT_CONTENT_TYPE; + return mimeFromBuffer + ? mimeFromBuffer.mime + : mime.getType(path) || DEFAULT_CONTENT_TYPE; }; export const contentTypeFactory = (data: { diff --git a/packages/websockets/src/effects/ws-effects.interface.ts b/packages/websockets/src/effects/ws-effects.interface.ts index e14645a1..1905ae11 100644 --- a/packages/websockets/src/effects/ws-effects.interface.ts +++ b/packages/websockets/src/effects/ws-effects.interface.ts @@ -18,6 +18,10 @@ export interface WebSocketConnectionEffect< T extends http.IncomingMessage = http.IncomingMessage > extends WebSocketEffect {} +export interface WebSocketOutputEffect< + T extends Event = Event +> extends WebSocketEffect {} + export interface WebSocketEffect< T = Event, U = Event, diff --git a/packages/websockets/src/listener/websocket.listener.ts b/packages/websockets/src/listener/websocket.listener.ts index 853eab99..2471385f 100644 --- a/packages/websockets/src/listener/websocket.listener.ts +++ b/packages/websockets/src/listener/websocket.listener.ts @@ -34,6 +34,7 @@ export interface WebSocketListenerConfig { error$?: WSEffect.WebSocketErrorEffect; eventTransformer?: EventTransformer; connection$?: WSEffect.WebSocketConnectionEffect; + output$?: WSEffect.WebSocketOutputEffect; } export const webSocketListener = (config: WebSocketListenerConfig = {}) => { @@ -43,6 +44,7 @@ export const webSocketListener = (config: WebSocketListenerConfig = {}) => { middlewares = [], eventTransformer, connection$ = (req$: Observable) => req$, + output$ = (out$: Observable) => out$, } = config; const combinedMiddlewares = combineMiddlewares(...middlewares); @@ -81,9 +83,10 @@ export const webSocketListener = (config: WebSocketListenerConfig = {}) => { const decodedEvent$ = incomingEventSubject$.pipe(map(providedTransformer.decode)); const middlewares$ = combinedMiddlewares(decodedEvent$, client, defaultMetadata); const effects$ = combinedEffects(eventSubject$, client, defaultMetadata); + const effectsOutput$ = output$(effects$, client, defaultMetadata); let middlewaresSub = subscribeMiddlewares(middlewares$); - let effectsSub = subscribeEffects(effects$); + let effectsSub = subscribeEffects(effectsOutput$); client.on('message', onMessage); client.once('close', onClose);