Skip to content

Commit

Permalink
add pull status SSE to API.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1k1o committed Oct 1, 2023
1 parent c620c70 commit 9a30f29
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 1 deletion.
18 changes: 18 additions & 0 deletions OpenApi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,24 @@ paths:
description: OK
'500':
description: Internal server error
/api/pull/sse:
get:
tags:
- default
summary: Get pull status as SSE
operationId: pullStatusSSE
responses:
'200':
description: OK
content:
text/event-stream:
schema:
type: array
format: event-stream
items:
$ref: '#/components/schemas/PullLayer'
'500':
description: Internal server error

components:
schemas:
Expand Down
60 changes: 60 additions & 0 deletions client/src/api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,36 @@ export const DefaultApiAxiosParamCreator = function (configuration?: Configurati



setSearchParams(localVarUrlObj, localVarQueryParameter);
let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {};
localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers};

return {
url: toPathString(localVarUrlObj),
options: localVarRequestOptions,
};
},
/**
*
* @summary Get pull status as SSE
* @param {*} [options] Override http request option.
* @throws {RequiredError}
*/
pullStatusSSE: async (options: AxiosRequestConfig = {}): Promise<RequestArgs> => {
const localVarPath = `/api/pull/sse`;
// use dummy base URL string because the URL constructor only accepts absolute URLs.
const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL);
let baseOptions;
if (configuration) {
baseOptions = configuration.baseOptions;
}

const localVarRequestOptions = { method: 'GET', ...baseOptions, ...options};
const localVarHeaderParameter = {} as any;
const localVarQueryParameter = {} as any;



setSearchParams(localVarUrlObj, localVarQueryParameter);
let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {};
localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers};
Expand Down Expand Up @@ -862,6 +892,16 @@ export const DefaultApiFp = function(configuration?: Configuration) {
const localVarAxiosArgs = await localVarAxiosParamCreator.pullStatus(options);
return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration);
},
/**
*
* @summary Get pull status as SSE
* @param {*} [options] Override http request option.
* @throws {RequiredError}
*/
async pullStatusSSE(options?: AxiosRequestConfig): Promise<(axios?: AxiosInstance, basePath?: string) => AxiosPromise<Array<PullLayer>>> {
const localVarAxiosArgs = await localVarAxiosParamCreator.pullStatusSSE(options);
return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration);
},
/**
*
* @summary Stop existing pull in progress
Expand Down Expand Up @@ -901,6 +941,15 @@ export const DefaultApiFactory = function (configuration?: Configuration, basePa
pullStatus(options?: any): AxiosPromise<PullStatus> {
return localVarFp.pullStatus(options).then((request) => request(axios, basePath));
},
/**
*
* @summary Get pull status as SSE
* @param {*} [options] Override http request option.
* @throws {RequiredError}
*/
pullStatusSSE(options?: any): AxiosPromise<Array<PullLayer>> {
return localVarFp.pullStatusSSE(options).then((request) => request(axios, basePath));
},
/**
*
* @summary Stop existing pull in progress
Expand Down Expand Up @@ -943,6 +992,17 @@ export class DefaultApi extends BaseAPI {
return DefaultApiFp(this.configuration).pullStatus(options).then((request) => request(this.axios, this.basePath));
}

/**
*
* @summary Get pull status as SSE
* @param {*} [options] Override http request option.
* @throws {RequiredError}
* @memberof DefaultApi
*/
public pullStatusSSE(options?: AxiosRequestConfig) {
return DefaultApiFp(this.configuration).pullStatusSSE(options).then((request) => request(this.axios, this.basePath));
}

/**
*
* @summary Stop existing pull in progress
Expand Down
1 change: 1 addition & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (manager *ApiManagerCtx) Mount(r chi.Router) {

r.Route("/pull", func(r chi.Router) {
r.Get("/", manager.pullStatus)
r.Get("/sse", manager.pullStatusSSE)
r.Post("/", manager.pullStart)
r.Delete("/", manager.pullStop)
})
Expand Down
34 changes: 34 additions & 0 deletions internal/api/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/json"
"fmt"
"net/http"

"github.com/m1k1o/neko-rooms/internal/types"
Expand Down Expand Up @@ -32,6 +33,39 @@ func (manager *ApiManagerCtx) pullStatus(w http.ResponseWriter, r *http.Request)
json.NewEncoder(w).Encode(response)
}

func (manager *ApiManagerCtx) pullStatusSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Connection does not support streaming", http.StatusBadRequest)
return
}

sseChan := make(chan string)
unsubscribe := manager.pull.Subscribe(sseChan)

for {
select {
case <-r.Context().Done():
manager.logger.Debug().Msg("sse context done")
unsubscribe()
return
case data, ok := <-sseChan:
if !ok {
manager.logger.Debug().Msg("sse channel closed")
return
}

fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
}
}

func (manager *ApiManagerCtx) pullStop(w http.ResponseWriter, r *http.Request) {
err := manager.pull.Stop()
if err != nil {
Expand Down
53 changes: 52 additions & 1 deletion internal/pull/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

dockerTypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/registry"
dockerClient "github.com/docker/docker/client"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -27,6 +28,9 @@ type PullManagerCtx struct {
cancel func()
status types.PullStatus
layers map[string]int

chansMu sync.Mutex
chans []chan<- string
}

func New(client *dockerClient.Client, nekoImages []string) *PullManagerCtx {
Expand Down Expand Up @@ -83,7 +87,7 @@ func (manager *PullManagerCtx) Start(request types.PullStart) error {
// handle registry auth
var opts dockerTypes.ImagePullOptions
if request.RegistryUser != "" && request.RegistryPass != "" {
authConfig := dockerTypes.AuthConfig{
authConfig := registry.AuthConfig{
Username: request.RegistryUser,
Password: request.RegistryPass,
}
Expand All @@ -109,6 +113,7 @@ func (manager *PullManagerCtx) Start(request types.PullStart) error {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
data := scanner.Bytes()
manager.sendSSE(string(data))

layer := types.PullLayer{}
if err := json.Unmarshal(data, &layer); err != nil {
Expand Down Expand Up @@ -168,3 +173,49 @@ func (manager *PullManagerCtx) Status() types.PullStatus {

return manager.status
}

func (manager *PullManagerCtx) sendSSE(status string) {
manager.chansMu.Lock()
defer manager.chansMu.Unlock()

for _, ch := range manager.chans {
ch <- status
}
}

func (manager *PullManagerCtx) Subscribe(ch chan<- string) func() {
manager.chansMu.Lock()
defer manager.chansMu.Unlock()

// subscribe
manager.chans = append(manager.chans, ch)

// unsubscribe
return func() {
manager.chansMu.Lock()
defer manager.chansMu.Unlock()

for i, c := range manager.chans {
if c == ch {
manager.chans = append(manager.chans[:i], manager.chans[i+1:]...)
break
}
}
}
}

func (manager *PullManagerCtx) Shutdown() error {
manager.chansMu.Lock()
for _, ch := range manager.chans {
close(ch)
}
manager.chansMu.Unlock()

manager.mu.Lock()
if manager.cancel != nil {
manager.cancel()
}
manager.mu.Unlock()

return nil
}
1 change: 1 addition & 0 deletions internal/types/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ type PullManager interface {
Start(request PullStart) error
Stop() error
Status() PullStatus
Subscribe(ch chan<- string) func()
}
3 changes: 3 additions & 0 deletions neko.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ func (main *MainCtx) Shutdown() {

err = main.proxyManager.Shutdown()
main.logger.Err(err).Msg("proxy manager shutdown")

err = main.pullManager.Shutdown()
main.logger.Err(err).Msg("pull manager shutdown")
}

func (main *MainCtx) ServeCommand(cmd *cobra.Command, args []string) {
Expand Down

0 comments on commit 9a30f29

Please sign in to comment.