Skip to content

Commit

Permalink
update database scheme and kafka reader log
Browse files Browse the repository at this point in the history
  • Loading branch information
suslmk-lee committed Nov 25, 2024
1 parent b7345ba commit ec642a8
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 12 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
)

// StartAPIServer - 간단한 API 서버 시작 함수
// StartAPIServer - 간단한 API 서버 시작 함수 / health, data
func StartAPIServer() {
http.HandleFunc("/health", healthHandler)
http.HandleFunc("/data", dataHandler)
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func CreateKafkaReader() *kafka.Reader {
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // 오프셋 커밋 간격
MaxWait: 500 * time.Millisecond, // 메시지 대기 시간
Logger: log.New(log.Writer(), "DEBUG: ", log.LstdFlags),
//Logger: log.New(log.Writer(), "DEBUG: ", log.LstdFlags),
})

log.Println("Kafka reader successfully created and ready to consume messages")
Expand Down
13 changes: 5 additions & 8 deletions db/db.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// db/db.go
package db

import (
Expand All @@ -11,7 +10,6 @@ import (
"zim-kafka-comsum/common"
)

// ConnectDB - 데이터베이스 연결 풀 생성 함수
func ConnectDB(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.Connect(ctx, dbURL)
if err != nil {
Expand All @@ -20,26 +18,25 @@ func ConnectDB(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
return pool, nil
}

// InsertBatch - 배치 삽입 함수
func InsertBatch(ctx context.Context, pool *pgxpool.Pool, batch []common.IoTData) {
if len(batch) == 0 {
return
}

// 트랜잭션 시작
tx, err := pool.Begin(ctx)
if err != nil {
log.Printf("Failed to begin transaction: %v\n", err)
return
}
defer tx.Rollback(ctx)

// SQL 쿼리 동적 생성
sql := `
INSERT INTO IoT_Data (
Device, Timestamp, ProVer, MinorVer, SN, Model, TYield, DYield, PF, PMax, PAC, SAC,
UAB, UBC, UCA, IA, IB, IC, Freq, TMod, TAmb, Mode, QAC, BusCapacitance,
ACCapacitance, PDC, PMaxLim, SMaxLim, IsSent
device, timestamp, pro_ver, minor_ver, sn, model,
tyield, dyield, pf, pmax, pac, sac,
uab, ubc, uca, ia, ib, ic,
freq, tmod,tamb, mode, qac, bus_capacitance,
ac_capacitance, pdc, pmax_lim, smax_lim, is_sent
) VALUES
`
valueStrings := make([]string, 0, len(batch))
Expand Down
1 change: 1 addition & 0 deletions deployment/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ data:
KAFKA_HOST: "133.186.209.209"
KAFKA_PORT: "31092"
KAFKA_TOPIC: "cp-db-topic"
KAFKA_GROUP_ID: "zim-consumer-group"
INIT_DB_SCHEMA: |
CREATE TABLE IF NOT EXISTS iot_data (
device VARCHAR(50),
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func main() {

go api.StartAPIServer()

messageChan := make(chan common.IoTData, 1000) // 버퍼 크기는 필요에 따라 조정
workerCount := 5 // 워커 수는 시스템 자원에 따라 조정
messageChan := make(chan common.IoTData, 1000) // 버퍼 크기
workerCount := 5 // 워커
for i := 0; i < workerCount; i++ {
go consumer.Worker(ctx, pool, messageChan)
}
Expand Down

0 comments on commit ec642a8

Please sign in to comment.