From 9a30f29c9360c4af7c2c307278c404ee6da92b47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 1 Oct 2023 18:16:05 +0200 Subject: [PATCH] add pull status SSE to API. --- OpenApi.yml | 18 ++++++++++++ client/src/api/api.ts | 60 ++++++++++++++++++++++++++++++++++++++++ internal/api/api.go | 1 + internal/api/pull.go | 34 +++++++++++++++++++++++ internal/pull/manager.go | 53 ++++++++++++++++++++++++++++++++++- internal/types/pull.go | 1 + neko.go | 3 ++ 7 files changed, 169 insertions(+), 1 deletion(-) diff --git a/OpenApi.yml b/OpenApi.yml index a1eca03..8b63c3d 100644 --- a/OpenApi.yml +++ b/OpenApi.yml @@ -313,6 +313,24 @@ paths: description: OK '500': description: Internal server error + /api/pull/sse: + get: + tags: + - default + summary: Get pull status as SSE + operationId: pullStatusSSE + responses: + '200': + description: OK + content: + text/event-stream: + schema: + type: array + format: event-stream + items: + $ref: '#/components/schemas/PullLayer' + '500': + description: Internal server error components: schemas: diff --git a/client/src/api/api.ts b/client/src/api/api.ts index 579a029..892c6b3 100644 --- a/client/src/api/api.ts +++ b/client/src/api/api.ts @@ -792,6 +792,36 @@ export const DefaultApiAxiosParamCreator = function (configuration?: Configurati + setSearchParams(localVarUrlObj, localVarQueryParameter); + let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {}; + localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, + /** + * + * @summary Get pull status as SSE + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + pullStatusSSE: async (options: AxiosRequestConfig = {}): Promise => { + const localVarPath = `/api/pull/sse`; + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: 'GET', ...baseOptions, ...options}; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + + setSearchParams(localVarUrlObj, localVarQueryParameter); let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {}; localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; @@ -862,6 +892,16 @@ export const DefaultApiFp = function(configuration?: Configuration) { const localVarAxiosArgs = await localVarAxiosParamCreator.pullStatus(options); return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); }, + /** + * + * @summary Get pull status as SSE + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + async pullStatusSSE(options?: AxiosRequestConfig): Promise<(axios?: AxiosInstance, basePath?: string) => AxiosPromise>> { + const localVarAxiosArgs = await localVarAxiosParamCreator.pullStatusSSE(options); + return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); + }, /** * * @summary Stop existing pull in progress @@ -901,6 +941,15 @@ export const DefaultApiFactory = function (configuration?: Configuration, basePa pullStatus(options?: any): AxiosPromise { return localVarFp.pullStatus(options).then((request) => request(axios, basePath)); }, + /** + * + * @summary Get pull status as SSE + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + pullStatusSSE(options?: any): AxiosPromise> { + return localVarFp.pullStatusSSE(options).then((request) => request(axios, basePath)); + }, /** * * @summary Stop existing pull in progress @@ -943,6 +992,17 @@ export class DefaultApi extends BaseAPI { return DefaultApiFp(this.configuration).pullStatus(options).then((request) => request(this.axios, this.basePath)); } + /** + * + * @summary Get pull status as SSE + * @param {*} [options] Override http request option. + * @throws {RequiredError} + * @memberof DefaultApi + */ + public pullStatusSSE(options?: AxiosRequestConfig) { + return DefaultApiFp(this.configuration).pullStatusSSE(options).then((request) => request(this.axios, this.basePath)); + } + /** * * @summary Stop existing pull in progress diff --git a/internal/api/api.go b/internal/api/api.go index 7e19caf..b62b143 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -35,6 +35,7 @@ func (manager *ApiManagerCtx) Mount(r chi.Router) { r.Route("/pull", func(r chi.Router) { r.Get("/", manager.pullStatus) + r.Get("/sse", manager.pullStatusSSE) r.Post("/", manager.pullStart) r.Delete("/", manager.pullStop) }) diff --git a/internal/api/pull.go b/internal/api/pull.go index 7787a1a..51aa5c3 100644 --- a/internal/api/pull.go +++ b/internal/api/pull.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "fmt" "net/http" "github.com/m1k1o/neko-rooms/internal/types" @@ -32,6 +33,39 @@ func (manager *ApiManagerCtx) pullStatus(w http.ResponseWriter, r *http.Request) json.NewEncoder(w).Encode(response) } +func (manager *ApiManagerCtx) pullStatusSSE(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Connection does not support streaming", http.StatusBadRequest) + return + } + + sseChan := make(chan string) + unsubscribe := manager.pull.Subscribe(sseChan) + + for { + select { + case <-r.Context().Done(): + manager.logger.Debug().Msg("sse context done") + unsubscribe() + return + case data, ok := <-sseChan: + if !ok { + manager.logger.Debug().Msg("sse channel closed") + return + } + + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + } +} + func (manager *ApiManagerCtx) pullStop(w http.ResponseWriter, r *http.Request) { err := manager.pull.Stop() if err != nil { diff --git a/internal/pull/manager.go b/internal/pull/manager.go index 1f2074a..b210043 100644 --- a/internal/pull/manager.go +++ b/internal/pull/manager.go @@ -10,6 +10,7 @@ import ( "time" dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/registry" dockerClient "github.com/docker/docker/client" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -27,6 +28,9 @@ type PullManagerCtx struct { cancel func() status types.PullStatus layers map[string]int + + chansMu sync.Mutex + chans []chan<- string } func New(client *dockerClient.Client, nekoImages []string) *PullManagerCtx { @@ -83,7 +87,7 @@ func (manager *PullManagerCtx) Start(request types.PullStart) error { // handle registry auth var opts dockerTypes.ImagePullOptions if request.RegistryUser != "" && request.RegistryPass != "" { - authConfig := dockerTypes.AuthConfig{ + authConfig := registry.AuthConfig{ Username: request.RegistryUser, Password: request.RegistryPass, } @@ -109,6 +113,7 @@ func (manager *PullManagerCtx) Start(request types.PullStart) error { scanner := bufio.NewScanner(reader) for scanner.Scan() { data := scanner.Bytes() + manager.sendSSE(string(data)) layer := types.PullLayer{} if err := json.Unmarshal(data, &layer); err != nil { @@ -168,3 +173,49 @@ func (manager *PullManagerCtx) Status() types.PullStatus { return manager.status } + +func (manager *PullManagerCtx) sendSSE(status string) { + manager.chansMu.Lock() + defer manager.chansMu.Unlock() + + for _, ch := range manager.chans { + ch <- status + } +} + +func (manager *PullManagerCtx) Subscribe(ch chan<- string) func() { + manager.chansMu.Lock() + defer manager.chansMu.Unlock() + + // subscribe + manager.chans = append(manager.chans, ch) + + // unsubscribe + return func() { + manager.chansMu.Lock() + defer manager.chansMu.Unlock() + + for i, c := range manager.chans { + if c == ch { + manager.chans = append(manager.chans[:i], manager.chans[i+1:]...) + break + } + } + } +} + +func (manager *PullManagerCtx) Shutdown() error { + manager.chansMu.Lock() + for _, ch := range manager.chans { + close(ch) + } + manager.chansMu.Unlock() + + manager.mu.Lock() + if manager.cancel != nil { + manager.cancel() + } + manager.mu.Unlock() + + return nil +} diff --git a/internal/types/pull.go b/internal/types/pull.go index 3037433..6900df5 100644 --- a/internal/types/pull.go +++ b/internal/types/pull.go @@ -30,4 +30,5 @@ type PullManager interface { Start(request PullStart) error Stop() error Status() PullStatus + Subscribe(ch chan<- string) func() } diff --git a/neko.go b/neko.go index 9fb3822..f4fe711 100644 --- a/neko.go +++ b/neko.go @@ -166,6 +166,9 @@ func (main *MainCtx) Shutdown() { err = main.proxyManager.Shutdown() main.logger.Err(err).Msg("proxy manager shutdown") + + err = main.pullManager.Shutdown() + main.logger.Err(err).Msg("pull manager shutdown") } func (main *MainCtx) ServeCommand(cmd *cobra.Command, args []string) {