From b5fe0c5e673eafee67d51e007fe4f5b5219c76f2 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 24 Mar 2023 16:08:36 -0500 Subject: [PATCH] Add pipeline, scope out entrypoint.example.ts --- entrypoint.example.ts | 20 +++++++++++++++++++- mod.ts | 3 +++ src/{stdin.ts => io.ts} | 9 +++++++-- src/pipeline.ts | 20 ++++++++++++++++++-- 4 files changed, 47 insertions(+), 5 deletions(-) rename src/{stdin.ts => io.ts} (54%) diff --git a/entrypoint.example.ts b/entrypoint.example.ts index 0cd6f3b..5898a94 100644 --- a/entrypoint.example.ts +++ b/entrypoint.example.ts @@ -1,4 +1,22 @@ #!/bin/sh //bin/true; exec deno run -A "$0" "$@" +import { + antiDuplicationPolicy, + hellthreadPolicy, + noopPolicy, + pipeline, + rateLimitPolicy, + readStdin, + writeStdout, +} from './mod.ts'; -// TODO +const msg = await readStdin(); + +const result = await pipeline(msg, [ + noopPolicy, + hellthreadPolicy, + antiDuplicationPolicy, + rateLimitPolicy, +]); + +writeStdout(result); diff --git a/mod.ts b/mod.ts index bebdbdd..e00c0d2 100644 --- a/mod.ts +++ b/mod.ts @@ -3,3 +3,6 @@ export { default as hellthreadPolicy } from './src/policies/hellthread-policy.ts export { default as noopPolicy } from './src/policies/noop-policy.ts'; export { default as rateLimitPolicy } from './src/policies/rate-limit-policy.ts'; export { default as readOnlyPolicy } from './src/policies/read-only-policy.ts'; + +export { readStdin, writeStdout } from './src/io.ts'; +export { default as pipeline } from './src/pipeline.ts'; diff --git a/src/stdin.ts b/src/io.ts similarity index 54% rename from src/stdin.ts rename to src/io.ts index 3305d19..d62aa8c 100644 --- a/src/stdin.ts +++ b/src/io.ts @@ -1,6 +1,6 @@ import { readLines } from './deps.ts'; -import type { InputMessage } from './types.ts'; +import type { InputMessage, OutputMessage } from './types.ts'; /** * Get the first line from stdin. @@ -11,4 +11,9 @@ async function readStdin(): Promise { return JSON.parse(value); } -export { readStdin }; +/** Writes the output message to stdout. */ +function writeStdout(msg: OutputMessage): void { + console.log(JSON.stringify(msg)); +} + +export { readStdin, writeStdout }; diff --git a/src/pipeline.ts b/src/pipeline.ts index df068ab..70a0260 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,3 +1,19 @@ -import { readStdin } from './stdin.ts'; +import { InputMessage, OutputMessage, Policy } from './types.ts'; -console.log(await readStdin()); +/** Processes messages through multiple policies, bailing early on rejection. */ +async function pipeline(msg: InputMessage, policies: Policy[]): Promise { + for (const policy of policies) { + const result = await policy(msg); + if (result.action !== 'accept') { + return result; + } + } + + return { + id: msg.event.id, + action: 'accept', + msg: '', + }; +} + +export default pipeline;