Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Jan 21, 2024
1 parent 5220276 commit 96461d4
Show file tree
Hide file tree
Showing 25 changed files with 346 additions and 251 deletions.
1 change: 1 addition & 0 deletions packages/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ export {
machineStatusCommandSchema,
} from './src/machine-status-command.js'
export * from './src/async.js'
export * from './src/type-guards.js'
export { ProcessOutputBuffers, orderedOutput, OrderedOutput } from './src/process-output-buffers.js'
export { generateSchemaErrorMessage } from './src/schema.js'
1 change: 0 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"@types/ssh2": "^1.11.8",
"@typescript-eslint/eslint-plugin": "6.14.0",
"@typescript-eslint/parser": "6.14.0",
"esbuild": "^0.19.9",
"eslint": "^8.36.0",
"husky": "^8.0.0",
"jest": "29.7.0",
Expand Down
13 changes: 13 additions & 0 deletions packages/common/src/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,16 @@ export const asyncMapValues = async <T extends object, TResult>(
Object.entries(obj).map(async ([key, value]) => [key, await callback(value, key, obj)])
)
)

export function timeoutPromise(ms: number): Promise<void>
export function timeoutPromise<T>(ms: number, value: T): Promise<T>
export function timeoutPromise<T>(ms: number, value?: T) {
return new Promise(resolve => { setTimeout(() => resolve(value), ms) })
}

export function withTimeout<Val, TimeoutVal = Val>(p: PromiseLike<Val>, ms: number, timeoutValue?: TimeoutVal) {
return Promise.race([
p,
timeoutPromise(ms, timeoutValue),
])
}
2 changes: 1 addition & 1 deletion packages/common/src/ssh/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export const sshBaseConnectionConfigSchema = z.object({
hostname: z.string(),
port: z.number(),
username: z.string(),
clientPrivateKey: z.string(),
clientPrivateKey: z.union([z.string(), z.instanceof(Buffer)]),
insecureSkipVerify: z.boolean().default(false),
knownServerPublicKeys: z.array(z.union([z.string(), z.instanceof(Buffer)])).default([]),
})
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/type-guards.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const isDefined = <T>(val: T | undefined): val is T => val !== undefined
1 change: 1 addition & 0 deletions packages/compose-tunnel-agent/.eslintignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/.eslintrc.cjs
/dist
/out
/build.mjs
146 changes: 9 additions & 137 deletions packages/compose-tunnel-agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,151 +1,23 @@
import { rimraf } from 'rimraf'
import {
formatPublicKey,
MachineStatusCommand,
parseSshUrl,
SshConnectionConfig,
tunnelNameResolver,
} from '@preevy/common'
import { inspect } from 'util'
import { omit } from 'lodash-es'
import { createApp } from './src/api-server/index.js'
import { sshClient as createSshClient } from './src/ssh/index.js'
import { readConfig, Config, Plugin } from './src/configuration/index.js'
import { createLog } from './src/log.js'
import { Forward } from './src/forwards.js'
import { aggregator } from './src/aggregator.js'
import { loadPlugins } from './src/plugins.js'
import { main } from './src/main.js'

const fastifyListenArgsFromConfig = async (config: Pick<Config, 'listen'>) => {
const portOrPath = config.listen
const portNumber = Number(portOrPath)
if (typeof portOrPath === 'string' && Number.isNaN(portNumber)) {
await rimraf(portOrPath)
return { path: portOrPath }
}
return { port: portNumber, host: '0.0.0.0' }
}

const findMachineStatusCommandRunner = (
plugins: Plugin[],
spec: MachineStatusCommand | undefined,
) => {
if (!spec) {
return undefined
}
const runner = plugins.map(p => p.machineStatusCommands?.[spec.recipe.type]).find(x => x)
if (!runner) {
throw new Error(`no handler found in plugins for machine status command with type "${spec.recipe.type}"`)
}
return async () => ({
data: await runner(spec.recipe),
contentType: spec.contentType,
})
}

let log = createLog(process)
const SHUTDOWN_TIMEOUT = 5000
const exitSignals = ['SIGTERM', 'SIGINT', 'uncaughtException'] as const
const SHUTDOWN_TIMEOUT = 5000

const main = async () => {
const config = await readConfig(process)

log = createLog(process, config)

let endRequested = false
const disposables = new AsyncDisposableStack()
const end = async () => {
endRequested = true
await disposables.disposeAsync()
}

const {
server: serverUrl,
envId,
machineStatusCommand: machineStatusCommandSpec,
} = config

const connectionConfig: SshConnectionConfig = {
...parseSshUrl(serverUrl),
clientPrivateKey: await config.privateKey,
username: envId,
knownServerPublicKeys: config.serverKey,
insecureSkipVerify: Boolean(config.insecureSkipVerify),
tlsServerName: config.tlsServerName,
}

log.debug('ssh config: %j', {
...omit<SshConnectionConfig, 'clientPrivateKey'>(connectionConfig, 'clientPrivateKey'),
clientPublicKey: formatPublicKey(connectionConfig.clientPrivateKey),
})

const sshLog = log.child({ name: 'ssh' })
sshLog.info('ssh client connecting to %j', serverUrl)
const sshClient = disposables.use(await createSshClient({
log: sshLog,
connectionConfig,
defaultAccess: config.defaultAccess,
globalInjects: config.globalInjects,
}))

sshLog.info('ssh client connected to %j', serverUrl)
sshClient.ssh.on('close', async () => {
if (!endRequested) {
log.error('ssh client closed unexpectedly')
await end()
process.exit(1)
}
log.info('ssh client closed')
})

const plugins = await loadPlugins(config, p => ({ log: log.child({ name: `plugin-${p}` }) }))

const app = disposables.use(await createApp({
log: log.child({ name: 'api' }),
currentSshState: () => sshClient.state(),
machineStatus: findMachineStatusCommandRunner(Object.values(plugins), machineStatusCommandSpec),
envMetadata: config.envMetadata,
}))

const forwardsAggregator = aggregator<Forward>(f => f.externalName)
await sshClient.updateForwards(forwardsAggregator(Symbol('staticConfig'), config.forwards))

await Promise.all(Object.entries(plugins).map(async ([pluginName, plugin]) => {
if (plugin.forwardsEmitter) {
disposables.use(await plugin.forwardsEmitter({
tunnelNameResolver: tunnelNameResolver({ envId }),
})).on('forwards', async forwards => {
await sshClient.updateForwards(forwardsAggregator(pluginName, forwards))
})
}

if (plugin.fastifyPlugin) {
await app.register(plugin.fastifyPlugin)
}
}))

void app.listen({ ...await fastifyListenArgsFromConfig(config) })
app.server.unref()

void main(process).then(({ log, [Symbol.asyncDispose]: dispose }) => {
exitSignals.forEach(signal => {
process.once(signal, async (...args) => {
const argsStr = args ? args.map(arg => inspect(arg)).join(', ') : undefined
const logLevel = signal === 'uncaughtException' ? 'error' : 'warn'
const isError = signal === 'uncaughtException'
const logLevel = isError ? 'error' : 'warn'
const argsStr = args.map(arg => inspect(arg)).join(', ')
log[logLevel](`shutting down on ${[signal, argsStr].filter(Boolean).join(': ')}`)
if (!await Promise.race([
end().then(() => true),
dispose().then(() => true),
new Promise<void>(resolve => { setTimeout(resolve, SHUTDOWN_TIMEOUT) }),
])) {
log.error(`timed out while waiting ${SHUTDOWN_TIMEOUT}ms for server to close, exiting`)
}
process.exit(1)
process.exit(isError ? 1 : 0)
})
})
}

void main().catch(
err => {
log.error(err)
process.exit(1)
}
)
})
2 changes: 1 addition & 1 deletion packages/compose-tunnel-agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"@types/ssh2": "^1.11.8",
"@typescript-eslint/eslint-plugin": "6.14.0",
"@typescript-eslint/parser": "6.14.0",
"esbuild": "^0.19.9",
"esbuild": "^0.19.11",
"eslint": "^8.36.0",
"husky": "^8.0.0",
"jest": "29.7.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/compose-tunnel-agent/src/api-server/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export const env: FastifyPluginAsync<{
machineStatus?: () => Promise<{ data: Buffer; contentType: string }>
envMetadata?: Record<string, unknown>
}> = async (app, { currentSshState, machineStatus, envMetadata }) => {
app.get('/forwards', async () => await currentSshState())
app.get('/state', async () => await currentSshState())

if (machineStatus) {
const limit = plimit(1)
Expand Down
10 changes: 9 additions & 1 deletion packages/compose-tunnel-agent/src/configuration/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ import yaml from 'yaml'
import { describe, it, expect, beforeEach, afterEach, test, beforeAll, afterAll } from '@jest/globals'
import path from 'path'
import { rimraf } from 'rimraf'
import { Config, ConfigParseResult, mergedConfig } from './index.js'
import { Config, ConfigParseResult, mergedConfig as mc } from './index.js'
import { PluginOpts, pluginFactories } from '../plugins/index.js'
import { Opts } from './opts.js'
import { PluginFactory } from '../plugin-definition.js'

const mergedConfig = (argv: string[] | string) => mc(
pluginFactories as unknown as Record<string, PluginFactory<Opts & PluginOpts>>,
argv,
) as Promise<ConfigParseResult<PluginOpts>>

type Env = Record<string, string | undefined>
const setupEnv = (envOrEnvFactory: Env | (() => Env | Promise<Env>)) => {
Expand Down
51 changes: 36 additions & 15 deletions packages/compose-tunnel-agent/src/configuration/index.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,53 @@
import { camelCase, pickBy } from 'lodash-es'
import { hideBin } from 'yargs/helpers'
import yaml from 'yaml'
import { InferredOptionTypes } from 'yargs'
import { CamelCasedProperties } from 'type-fest'
import { inspect } from 'util'
import { pluginOptsFor } from '../plugins.js'
import { mergeParse } from './yargs-helpers.js'
import { ConfigParseResult, opts } from './opts.js'
import { OptionsObject, ParseResult, mergeParse } from './yargs-helpers.js'
import { opts, Opts } from './opts.js'
import { PluginFactory } from '../plugin-definition.js'

export { Config, ConfigParseResult } from './opts.js'
export { Plugin } from './plugins.js'
export type Config<O extends OptionsObject = {}> = CamelCasedProperties<InferredOptionTypes<Opts & O>>
export type ConfigParseResult<O extends OptionsObject = {}> = ParseResult<Config<O>>

export const mergedConfig = async (argv: string[] | string) => {
const pluginOptsFor = <
YargsOpts extends OptionsObject,
T extends Record<string, PluginFactory<YargsOpts>>
>(plugins: T, keys: (keyof T)[]) => Object.assign({}, ...keys.map(k => plugins[k].yargsOpts))

export const mergedConfig = async <
YargsOpts extends OptionsObject,
>(plugins: Record<string, PluginFactory<YargsOpts>>, argv: string[] | string) => {
const parsedMerged = await mergeParse(
opts,
'CTA',
config => config.config ?? [],
config => pluginOptsFor(config.plugin ?? []),
argv,
) as ConfigParseResult
{
options: opts(Object.keys(plugins)),
envPrefix: 'CTA',
extractConfigFiles: config => config.config ?? [],
extractExtraOptions: config => pluginOptsFor<YargsOpts, Record<string, PluginFactory<YargsOpts>>>(
plugins,
config.plugin ?? [],
),
},
argv
) as ConfigParseResult<YargsOpts>

if ('result' in parsedMerged && parsedMerged.result.printConfig) {
const stringifier = parsedMerged.result.printConfig === 'yaml' ? yaml : JSON
return { output: stringifier.stringify(parsedMerged.result) }
return { output: stringifier.stringify(pickBy(
parsedMerged.result,
(_v, k) => k !== '_' && k !== 'printConfig' && !k.startsWith('$') && k === camelCase(k),
)) }
}

return parsedMerged
}

export const readConfig = async (process: Pick<NodeJS.Process, 'stderr' | 'stdout' | 'argv' | 'exit'>) => {
const configParseResult = await mergedConfig(hideBin(process.argv))
export const readConfig = async <YargsOpts extends Opts>(
plugins: Record<string, PluginFactory<YargsOpts>>,
process: Pick<NodeJS.Process, 'stderr' | 'stdout' | 'argv' | 'exit'>,
) => {
const configParseResult = await mergedConfig<YargsOpts>(plugins, hideBin(process.argv))
if ('output' in configParseResult) {
const error = ('error' in configParseResult) ? configParseResult.error : undefined
const outStream = error ? process.stderr : process.stdout
Expand Down
22 changes: 8 additions & 14 deletions packages/compose-tunnel-agent/src/configuration/opts.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import fs from 'fs'
import { InferredOptionTypes } from 'yargs'
import { CamelCasedProperties } from 'type-fest'
import z from 'zod'
import { ScriptInjection, generateSchemaErrorMessage, machineStatusCommandSchema, parseScriptInjection } from '@preevy/common'
import { inspect } from 'util'
import { Forward, forwardSchema } from '../forwards.js'
import { PluginOpts, Plugins, plugins } from '../plugins.js'
import { mergeDeep } from '../merge.js'
import { OptionsObject, ParseResult, splitCommaSeparatedStringArrays } from './yargs-helpers.js'
import { splitCommaSeparatedStringArrays } from './yargs-helpers.js'

export const accessSchema = z.union([z.literal('private'), z.literal('public')])
export type Access = z.infer<typeof accessSchema>
Expand All @@ -16,14 +13,14 @@ export const logLevelSchema = z.union([z.literal('debug'), z.literal('info'), z.
const tunnelServerGroup = 'Tunnel server connection'
const logGroup = 'Logging'

export const opts = {
export const opts = <PluginNames extends string[] = string[]>(pluginNames: PluginNames) => ({
config: {
array: true,
string: true,
description: 'Load config from specified YAML/JSON file',
coerce: splitCommaSeparatedStringArrays,
},
printConfig: {
'print-config': {
description: 'Print config in specified format (default: JSON) and exit',
choices: [true, 'json', 'yaml'],
},
Expand Down Expand Up @@ -57,7 +54,7 @@ export const opts = {
default: process.stderr.isTTY,
defaultDescription: 'if stderr is a TTY',
},
defaultAccess: {
'default-access': {
choices: accessSchema.options.map(o => o.value),
coerce: (v: string) => v as Access,
default: 'public',
Expand Down Expand Up @@ -155,13 +152,10 @@ export const opts = {
},
plugin: {
array: true,
choices: Object.keys(plugins),
coerce: (o: string[]) => splitCommaSeparatedStringArrays(o) as (keyof Plugins)[],
choices: pluginNames,
coerce: (o: string[]) => splitCommaSeparatedStringArrays(o) as PluginNames,
default: [],
},
} as const
} as const)

export type Opts = typeof opts

export type Config<O extends OptionsObject = PluginOpts> = CamelCasedProperties<InferredOptionTypes<Opts & O>>
export type ConfigParseResult = ParseResult<Config>
export type Opts = ReturnType<typeof opts>
Loading

0 comments on commit 96461d4

Please sign in to comment.