Skip to content

Commit

Permalink
separate the implementations of grpc and http clients and hide the im…
Browse files Browse the repository at this point in the history
…plementation with an interface
  • Loading branch information
xoxloviwan committed Dec 10, 2024
1 parent b3326fa commit 18273fa
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 198 deletions.
203 changes: 8 additions & 195 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
package main

import (
"bytes"
"compress/gzip"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/signal"
"sync"
Expand All @@ -20,18 +10,12 @@ import (

"log/slog"

"github.com/mailru/easyjson"
asc "github.com/xoxloviwan/go-monitor/internal/asymcrypto"
"github.com/xoxloviwan/go-monitor/internal/clients"
"github.com/xoxloviwan/go-monitor/internal/clients/base"
conf "github.com/xoxloviwan/go-monitor/internal/config_agent"
metrs "github.com/xoxloviwan/go-monitor/internal/metrics"
api "github.com/xoxloviwan/go-monitor/internal/metrics_types"

mcv "github.com/xoxloviwan/go-monitor/internal/metrics_convert"
pb "github.com/xoxloviwan/go-monitor/internal/metrics_types/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
grpcGzip "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
)

var (
Expand All @@ -40,131 +24,6 @@ var (
buildCommit = "N/A"
)

// send
//
// workerID - идентификатор потока
// adr - адрес сервера
// msgs - список метрик
// key - ключ подписи
// publicKey - RSA публичный ключ для шифрования сообщения
func send(workerID int, adr string, msgs api.MetricsList, key string, publicKey *asc.PublicKey, localIP string) (err error) {
cl := &http.Client{}

url := "http://" + adr + "/updates/"

var body []byte
body, err = easyjson.Marshal(&msgs)
if err != nil {
return err
}
var sessionKey []byte
if publicKey != nil {
var err error
sessionKey, body, err = asc.Encrypt(publicKey, body)
if err != nil {
return err
}
}
var sign string
if key != "" {
sign, err = getHash(body, key)
if err != nil {
return err
}
}
var gzbody []byte
gzbody, err = compressGzip(body)
if err != nil {
return err
}
var req *http.Request
req, err = http.NewRequest("POST", url, bytes.NewBuffer(gzbody))
if err != nil {
return err
}
// net.IPAddr

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Accept-Encoding", "gzip")
if localIP != "" {
req.Header.Set("X-Real-IP", localIP)
}
if sessionKey != nil {
req.Header.Set("X-Key", hex.EncodeToString(sessionKey))
}

if key != "" {
req.Header.Set("HashSHA256", sign)
}

var response *http.Response
retry := 0
response, err = cl.Do(req)
if err == nil && response.StatusCode != http.StatusOK {
slog.Warn("Unexpected status code", "worker", workerID, "status_code", response.StatusCode)
}
for err != nil && retry < 3 {
if response != nil {
response.Body.Close()
}
after := (retry+1)*2 - 1
time.Sleep(time.Duration(after) * time.Second)
slog.Warn("Retry attempt", "worker", workerID, "error", err, "retry", retry+1)
response, err = cl.Do(req)
retry++
}
if err != nil {
return err
}

defer func() {
if closeErr := response.Body.Close(); closeErr != nil {
closeErr = fmt.Errorf("could not close response body: %w", closeErr)
err = errors.Join(err, closeErr)
}
}()

_, err = io.Copy(io.Discard, response.Body)
if err != nil {
return err
}
return nil
}

// Compress сжимает слайс байт.
func compressGzip(data []byte) ([]byte, error) {
var b bytes.Buffer
// создаём переменную w — в неё будут записываться входящие данные,
// которые будут сжиматься и сохраняться в bytes.Buffer
w := gzip.NewWriter(&b)
// запись данных
_, err := w.Write(data)
if err != nil {
return nil, fmt.Errorf("failed write data to compress temporary buffer: %v", err)
}
// обязательно нужно вызвать метод Close() — в противном случае часть данных
// может не записаться в буфер b; если нужно выгрузить все упакованные данные
// в какой-то момент сжатия, используйте метод Flush()
err = w.Close()
if err != nil {
return nil, fmt.Errorf("failed compress data: %v", err)
}
// переменная b содержит сжатые данные
return b.Bytes(), nil
}

func getHash(data []byte, strkey string) (string, error) {
h := hmac.New(sha256.New, []byte(strkey))
_, err := h.Write(data)
if err != nil {
return "", err
}

sign := h.Sum(nil)
return hex.EncodeToString(sign), nil
}

// source - канал со снятыми метриками, содержит весь пакет
// poolSize - на сколько запросов/работников можно разделить пакет метрик
func SplitBatch(source <-chan api.Metrics, poolSize int) []<-chan api.Metrics {
Expand Down Expand Up @@ -196,7 +55,11 @@ func main() {
publicKey = nil
}
}
localIP, _ := getIP()
localIP, _ := base.GetIP()
if cfg.GRPC != "" {
cfg.Address = cfg.GRPC
}
sender := clients.NewSender(cfg.GRPC != "", cfg.Address, cfg.Key, localIP.String(), publicKey)
pollTicker := time.NewTicker(time.Duration(cfg.PollInterval) * time.Second)
defer pollTicker.Stop()
sendTicker := time.NewTicker(time.Duration(cfg.ReportInterval) * time.Second)
Expand Down Expand Up @@ -228,12 +91,7 @@ func main() {
}
if len(subbatch) > 0 {
slog.Info("Worker got task", "worker", worker, "subbatch", subbatch)
var err error
if cfg.GRPC != "" {
err = sendGRPC(worker, cfg.GRPC, subbatch, cfg.Key, localIP.String())
} else {
err = send(worker, cfg.Address, subbatch, cfg.Key, publicKey, localIP.String())
}
err := sender.Send(worker, subbatch)
if err != nil {
slog.Error("Send error", "worker", worker, "error", err)
}
Expand All @@ -249,48 +107,3 @@ func main() {
}
}
}

func getIP() (net.IP, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return []byte{}, err
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)

return localAddr.IP, nil
}

func sendGRPC(worker int, adr string, msgs api.MetricsList, key string, localIP string) error {
slog.Info("gRPC worker got task", "worker", worker)
// устанавливаем соединение с сервером
conn, err := grpc.NewClient(adr, grpc.WithTransportCredentials(insecure.NewCredentials()))

if err != nil {
return err
}
defer conn.Close()
// получаем переменную интерфейсного типа MetricsServiceClient,
// через которую будем отправлять сообщения
c := pb.NewMetricsServiceClient(conn)
md := metadata.New(map[string]string{
"X-Real-IP": localIP,
})
metrs := mcv.ConvMetrics(msgs)
if key != "" {
msg := metrs.String()
sign, err := getHash([]byte(msg), key)
if err != nil {
return err
}
md.Set("HashSHA256", sign)
}
ctx := metadata.NewOutgoingContext(context.Background(), md)
MetricsResponse, err := c.AddMetrics(ctx, metrs, grpc.UseCompressor(grpcGzip.Name))
if err != nil {
return err
}
slog.Info("gRPC worker got response", "worker", worker, "response", MetricsResponse)
return nil
}
12 changes: 12 additions & 0 deletions internal/clients/base/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package base

import (
asc "github.com/xoxloviwan/go-monitor/internal/asymcrypto"
)

type Client struct {
Addr string
Key string
LocalIP string
PublicKey *asc.PublicKey
}
59 changes: 59 additions & 0 deletions internal/clients/base/tools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package base

import (
"bytes"
"compress/gzip"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"net"
)

// GetHash calculates an HMAC-SHA256 hash of the provided data using the given secret key.
// The resulting hash is returned as a hexadecimal-encoded string.
func GetHash(data []byte, strkey string) (string, error) {
h := hmac.New(sha256.New, []byte(strkey))
_, err := h.Write(data)
if err != nil {
return "", err
}

sign := h.Sum(nil)
return hex.EncodeToString(sign), nil
}

// Compress сжимает слайс байт.
func CompressGzip(data []byte) ([]byte, error) {
var b bytes.Buffer
// создаём переменную w — в неё будут записываться входящие данные,
// которые будут сжиматься и сохраняться в bytes.Buffer
w := gzip.NewWriter(&b)
// запись данных
_, err := w.Write(data)
if err != nil {
return nil, fmt.Errorf("failed write data to compress temporary buffer: %v", err)
}
// обязательно нужно вызвать метод Close() — в противном случае часть данных
// может не записаться в буфер b; если нужно выгрузить все упакованные данные
// в какой-то момент сжатия, используйте метод Flush()
err = w.Close()
if err != nil {
return nil, fmt.Errorf("failed compress data: %v", err)
}
// переменная b содержит сжатые данные
return b.Bytes(), nil
}

// GetIP returns the local IP address of the machine.
func GetIP() (net.IP, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return []byte{}, err
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)

return localAddr.IP, nil
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main
package base

import (
"testing"
)

func Test_getIP(t *testing.T) {
got, err := getIP()
func Test_GetIP(t *testing.T) {
got, err := GetIP()
t.Logf("ip: %s", got)
if err != nil {
t.Errorf("getIP() error = %v, wantErr %v", err, nil)
Expand Down
29 changes: 29 additions & 0 deletions internal/clients/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package clients

import (
asc "github.com/xoxloviwan/go-monitor/internal/asymcrypto"
"github.com/xoxloviwan/go-monitor/internal/clients/grpc"
"github.com/xoxloviwan/go-monitor/internal/clients/http"
api "github.com/xoxloviwan/go-monitor/internal/metrics_types"
)

type Sender interface {
Send(worker int, msgs api.MetricsList) error
}

func NewSender(grpcFlag bool, addr string, key string, localIP string, publicKey *asc.PublicKey) Sender {
if grpcFlag {
return &grpc.Client{
Addr: addr,
Key: key,
LocalIP: localIP,
PublicKey: publicKey,
} // не знаю как избежать здесь дублирования при инициализации разных структур с одинаковым набором полей
}
return &http.Client{
Addr: addr,
Key: key,
LocalIP: localIP,
PublicKey: publicKey,
}
}
Loading

0 comments on commit 18273fa

Please sign in to comment.