Skip to content

Commit

Permalink
Add object interfaces for better testability (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Aug 2, 2022
1 parent 9f09e55 commit cee0c49
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 45 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func main() {
log.Fatal(err)
}
ctx := context.Backgroud()
endpoints, err := client.Get[*client.Endpoints](kc, ctx, "/api/v1/namespaces/kube-system/endpoints/kubelet", client.GetOptions{})
endpointsOperator := client.NewEndpointsOperator(kc)
endpoints, err := endpointsOperator.Get(ctx, "kube-system", "kubelet", client.GetOptions{})
if err != nil {
log.Fatal(err)
}
Expand Down
46 changes: 18 additions & 28 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ type Interface interface {
Token() string
}

// ObjectGetter is generic object getter.
type ObjectGetter[T Object] interface {
Get(ctx context.Context, namespace, name string, _ GetOptions) (T, error)
}

// ObjectWatcher is generic object watcher.
type ObjectWatcher[T Object] interface {
Watch(ctx context.Context, namespace, name string, _ ListOptions) (WatchInterface[T], error)
}

// ObjectOperator wraps all operations on object.
type ObjectOperator[T Object] interface {
ObjectGetter[T]
ObjectWatcher[T]
}

// NewInCluster creates Client if it is inside Kubernetes.
func NewInCluster() (*Client, error) {
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
Expand Down Expand Up @@ -142,7 +158,7 @@ func (kc *Client) Token() string {
return kc.token
}

func Get[T Object](kc *Client, ctx context.Context, reqURL string, _ GetOptions) (T, error) {
func get[T Object](kc *Client, ctx context.Context, reqURL string, _ GetOptions) (T, error) {
var t T
u, err := url.Parse(reqURL)
if err != nil {
Expand All @@ -167,7 +183,7 @@ func Get[T Object](kc *Client, ctx context.Context, reqURL string, _ GetOptions)
return t, err
}

func Watch[T Object](kc *Client, ctx context.Context, reqURL string, _ ListOptions) (WatchInterface[T], error) {
func watch[T Object](kc *Client, ctx context.Context, reqURL string, _ ListOptions) (WatchInterface[T], error) {
u, err := url.Parse(reqURL)
if err != nil {
return nil, err
Expand All @@ -188,29 +204,3 @@ func Watch[T Object](kc *Client, ctx context.Context, reqURL string, _ ListOptio

return newStreamWatcher[T](resp.Body, kc.Logger, kc.ResponseDecoderFunc(resp.Body)), nil
}

func GetEndpoints(kc *Client, ctx context.Context, namespace, name string, opts GetOptions) (*Endpoints, error) {
reqURL := fmt.Sprintf("%s/api/v1/namespaces/%s/endpoints/%s", kc.Host, namespace, name)
return Get[*Endpoints](kc, ctx, reqURL, opts)
}

func WatchEndpoints(kc *Client, ctx context.Context, namespace, name string, _ ListOptions) (WatchInterface[*Endpoints], error) {
u, err := url.Parse(fmt.Sprintf("%s/api/v1/watch/namespaces/%s/endpoints/%s", kc.Host, namespace, name))
if err != nil {
return nil, err
}
req, err := kc.GetRequest(ctx, u.String())
if err != nil {
return nil, err
}
resp, err := kc.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
errmsg, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("invalid response code %d for service %s in namespace %s: %s", resp.StatusCode, name, namespace, string(errmsg))
}
return newStreamWatcher[*Endpoints](resp.Body, kc.Logger, kc.ResponseDecoderFunc(resp.Body)), nil
}
22 changes: 8 additions & 14 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,22 @@ func ExampleClient() {

ctx := context.Background()

// Generic get.
endpoints, err := client.Get[*client.Endpoints](kc, ctx, "/api/v1/namespaces/kube-system/endpoints/kubelet", client.GetOptions{})
// Getter example.
endpointsOperator := client.NewEndpointsOperator(kc)
endpoints, err := endpointsOperator.Get(ctx, "kube-system", "kubelet", client.GetOptions{})
if err != nil {
// Handle err
return
}
fmt.Printf("%+v\n", endpoints)

// Typed methods. Simple wrapper for Get.
endpoints, err = client.GetEndpoints(kc, ctx, "kube-system", "kubelet", client.GetOptions{})
// Watcher example.
events, err := endpointsOperator.Watch(ctx, "kube-system", "kubelet", client.ListOptions{})
if err != nil {
// Handle err
return
}
fmt.Printf("%+v", endpoints)

// Watch support.
events, err := client.Watch[*client.Endpoints](kc, ctx, "/api/v1/namespaces/kube-system/endpoints/kubelet", client.ListOptions{})
if err != nil {
// Handle err
return
}
for event := range events.ResultChan() {
fmt.Println(event.Type, event.Object)
for e := range events.ResultChan() {
fmt.Printf("%s: %+v\n", e.Type, e.Object)
}
}
6 changes: 4 additions & 2 deletions e2e/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ func testEndpoints(nativeClient *kubernetes.Clientset, kc *client.Client) error
return fmt.Errorf("creating test endpoint: %w", err)
}

endpointsOperator := client.NewEndpointsOperator(kc)

a, err := nativeClient.CoreV1().Endpoints(testEndpoints.Namespace).Get(context.Background(), testEndpoints.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("getting endpoint using native client: %w", err)
}

b, err := client.GetEndpoints(kc, context.Background(), testEndpoints.Namespace, testEndpoints.Name, client.GetOptions{})
b, err := endpointsOperator.Get(context.Background(), testEndpoints.Namespace, testEndpoints.Name, client.GetOptions{})
if err != nil {
return fmt.Errorf("getting endpoint using client: %w", err)
}
Expand Down Expand Up @@ -103,7 +105,7 @@ func testEndpoints(nativeClient *kubernetes.Clientset, kc *client.Client) error
})

errg.Go(func() error {
e, err := client.WatchEndpoints(kc, context.Background(), testEndpoints.Namespace, testEndpoints.Name, client.ListOptions{})
e, err := endpointsOperator.Watch(context.Background(), testEndpoints.Namespace, testEndpoints.Name, client.ListOptions{})
if err != nil {
return fmt.Errorf("watching using native client: %w", err)
}
Expand Down
28 changes: 28 additions & 0 deletions endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package client

import (
"context"
"fmt"
)

var _ ObjectOperator[*Endpoints] = (*EndpointsOperator)(nil)

func NewEndpointsOperator(kc *Client) *EndpointsOperator {
return &EndpointsOperator{
kc: kc,
}
}

type EndpointsOperator struct {
kc *Client
}

func (e *EndpointsOperator) Get(ctx context.Context, namespace, name string, opts GetOptions) (*Endpoints, error) {
reqURL := fmt.Sprintf("%s/api/v1/namespaces/%s/endpoints/%s", e.kc.Host, namespace, name)
return get[*Endpoints](e.kc, ctx, reqURL, opts)
}

func (e *EndpointsOperator) Watch(ctx context.Context, namespace, name string, opts ListOptions) (WatchInterface[*Endpoints], error) {
reqURL := fmt.Sprintf("%s/api/v1/watch/namespaces/%s/endpoints/%s", e.kc.Host, namespace, name)
return watch[*Endpoints](e.kc, ctx, reqURL, opts)
}

0 comments on commit cee0c49

Please sign in to comment.