Skip to content

Commit

Permalink
posthog tracking opts (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
diego-escobedo authored Mar 23, 2023
1 parent 9963b96 commit f908380
Showing 1 changed file with 64 additions and 23 deletions.
87 changes: 64 additions & 23 deletions go/event-guidance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ import (
"github.com/uselotus/lotus/go/pkg/types"
)

const batchSize = 1000
const batchSize = 2

type StreamEvents struct {
OrganizationID int64 `json:"organization_id"`
Event *types.VerifiedEvent `json:"event"`
}

type batch struct {
type insertBatch struct {
tx *sql.Tx
insertStatement *sql.Stmt
count int
}

func (b *batch) addRecord(event *types.VerifiedEvent) (bool, error) {
func (b *insertBatch) addRecord(event *types.VerifiedEvent) (bool, error) {
propertiesJSON, errJSON := json.Marshal(event.Properties)
if errJSON != nil {
log.Printf("Error encoding properties to JSON: %s\n", errJSON)
Expand Down Expand Up @@ -202,49 +202,74 @@ func main() {
panic(err)
}

batch := &batch{
batch := &insertBatch{
tx: tx,
insertStatement: insertStatement,
count: 0,
}

lastOrgID := int64(0)
lastEventCount := 0

fetches.EachRecord(func(r *kgo.Record) {
log.Printf("Received record: %s\n", r.Value)
//extract event from kafka message
var streamEvents StreamEvents
err := json.Unmarshal(r.Value, &streamEvents)

if err != nil {
log.Printf("Error unmarshalling event: %s\n", err)
// since we check in the previous statement that the event has the correct format, an error unmarshalling should be a fatal error
return
// since we check in the previous step in the pipeline that the event has the correct format, an error unmarshalling should be a fatal error
panic(err)
}

if streamEvents.Event == nil {
log.Printf("event from OrganizationID %d is empty", streamEvents.OrganizationID)
panic(fmt.Errorf("event from OrganizationID %d is empty", streamEvents.OrganizationID))
}

event := streamEvents.Event

// if its the first event, set the lastOrgID
if lastOrgID == 0 {
lastOrgID = streamEvents.OrganizationID
}

// commit the record
if committed, err := batch.addRecord(event); err != nil {
//only thing that can go wrong in batch is either bugs in the code or a serious database failure/network partition of some kind. Because the usual referential integrity issues are already dealt with (on conflict do nothing), all that's left is bad stuff.
log.Printf("Error inserting event: %s\n", err)
panic(err)
} else {
if phWorks {
phClient.Enqueue(posthog.Capture{
DistinctId: fmt.Sprintf("%d (API Key)", event.OrganizationID),
Event: "track_event",
Properties: posthog.NewProperties().
Set("customer", event.CustID),
})
}
log.Printf("Committed record: %s\n committed? %t", r.Value, committed)
if committed {
// commit offsets
if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
// this is a fatal error
log.Printf("commit records failed: %v", err)
panic(fmt.Errorf("commit records failed: %w", err))
}

// start a new transaction and reset the batch
tx, err := db.Begin()
if err != nil {
log.Printf("Error starting transaction: %s\n", err)
panic(err)
}
batch.tx = tx
}

// send posthog event if orgID has changed or we've reached batchSize (and set batch.count to 0)
if event.OrganizationID != lastOrgID || batch.count == 0 {
numEvents := 0
if batch.count > 0 {
// this is in the case haven't reached batchSize yet
numEvents = batch.count - lastEventCount
} else {
numEvents = batchSize - lastEventCount
}
if phWorks {
posthogTrack(phClient, lastOrgID, numEvents)
}
// either update lastEventCount or if we committed reset to 0
lastEventCount = batch.count
lastOrgID = event.OrganizationID
}
}

Expand All @@ -255,13 +280,29 @@ func main() {
// again, this should be a fatal error
log.Printf("Error inserting events into database: %s\n", err)
panic(err)
}
if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
// this is a fatal error
log.Printf("commit records failed: %v", err)
panic(fmt.Errorf("commit records failed: %w", err))
} else {
if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
// this is a fatal error
log.Printf("commit records failed: %v", err)
panic(fmt.Errorf("commit records failed: %w", err))
}

// send posthog event
if phWorks {
posthogTrack(phClient, lastOrgID, batch.count-lastEventCount)
}
}
}

}
}

func posthogTrack(phClient posthog.Client, organizationID int64, numEvents int) {
// send posthog event
phClient.Enqueue(posthog.Capture{
DistinctId: fmt.Sprintf("%d (API Key)", organizationID),
Event: "track_event",
Properties: posthog.NewProperties().
Set("num_events", numEvents),
})
}

0 comments on commit f908380

Please sign in to comment.