From 543d52b8b49d6e37767d1a7d98cc361c4cccfa58 Mon Sep 17 00:00:00 2001 From: Hailey Date: Wed, 1 Jan 2025 17:50:15 -0800 Subject: [PATCH 1/3] add support for normal auth w/ ozone --- automod/engine/fetch_account_meta.go | 2 +- cmd/hepa/main.go | 5 ++ cmd/hepa/server.go | 79 ++++++++++++++++++++++++++-- go.mod | 4 +- 4 files changed, 82 insertions(+), 8 deletions(-) diff --git a/automod/engine/fetch_account_meta.go b/automod/engine/fetch_account_meta.go index 39b4df1eb..803666c54 100644 --- a/automod/engine/fetch_account_meta.go +++ b/automod/engine/fetch_account_meta.go @@ -112,7 +112,7 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) ( } // first attempt to fetch private account metadata from Ozone - if e.OzoneClient != nil { + if e.OzoneClient != nil && e.OzoneClient.AdminToken != nil && *e.OzoneClient.AdminToken != "" { rd, err := toolsozone.ModerationGetRepo(ctx, e.OzoneClient, ident.DID.String()) if err != nil { logger.Warn("failed to fetch private account metadata from Ozone", "err", err) diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 9e3029b6c..5d54edb36 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -75,6 +75,11 @@ func run(args []string) error { Usage: "admin authentication password for mod service", EnvVars: []string{"HEPA_OZONE_AUTH_ADMIN_TOKEN", "HEPA_MOD_AUTH_ADMIN_TOKEN"}, }, + &cli.StringFlag{ + Name: "ozone-password", + Usage: "authentication password for mod service account. used when not supplying an admin authentication token.", + EnvVars: []string{"HEPA_OZONE_PASSWORD"}, + }, &cli.StringFlag{ Name: "atp-pds-host", Usage: "method, hostname, and port of PDS (or entryway) for admin account info; uses admin auth", diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index a4ca252d1..159cebe2a 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod" @@ -42,6 +43,7 @@ type Config struct { OzoneHost string OzoneDID string OzoneAdminToken string + OzonePassword string PDSHost string PDSAdminToken string SetsFileJSON string @@ -75,13 +77,35 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { } var ozoneClient *xrpc.Client - if config.OzoneAdminToken != "" && config.OzoneDID != "" { + if config.OzoneDID != "" { ozoneClient = &xrpc.Client{ - Client: util.RobustHTTPClient(), - Host: config.OzoneHost, - AdminToken: &config.OzoneAdminToken, - Auth: &xrpc.AuthInfo{}, + Client: util.RobustHTTPClient(), + Host: config.OzoneHost, } + + if config.OzoneAdminToken != "" { + ozoneClient.AdminToken = &config.OzoneAdminToken + ozoneClient.Auth = &xrpc.AuthInfo{} + } else if config.OzonePassword != "" { + res, err := atproto.ServerCreateSession(context.TODO(), ozoneClient, &atproto.ServerCreateSession_Input{ + Identifier: config.OzoneDID, + Password: config.OzonePassword, + }) + + if err != nil { + return nil, fmt.Errorf("creating ozone session: %v", err) + } + + ozoneClient.Auth = &xrpc.AuthInfo{ + Did: res.Did, + Handle: res.Handle, + AccessJwt: res.AccessJwt, + RefreshJwt: res.RefreshJwt, + } + + logger.Info("created ozone session", "did", res.Did) + } + if config.RatelimitBypass != "" { ozoneClient.Headers = make(map[string]string) ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass @@ -240,6 +264,10 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { RedisClient: rdb, } + if config.OzonePassword != "" && config.OzoneAdminToken == "" { + s.runRefreshSession(context.TODO()) + } + return s, nil } @@ -247,3 +275,44 @@ func (s *Server) RunMetrics(listen string) error { http.Handle("/metrics", promhttp.Handler()) return http.ListenAndServe(listen, nil) } + +func (s *Server) runRefreshSession(ctx context.Context) { + go func() { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if s.Engine.OzoneClient != nil && s.Engine.OzoneClient.Auth.AccessJwt != "" { + // create a new ozone client since we dont have a mutex to lock + oc := &xrpc.Client{ + Client: util.RobustHTTPClient(), + Host: s.Engine.OzoneClient.Host, + Auth: &xrpc.AuthInfo{ + Did: s.Engine.OzoneClient.Auth.Did, + Handle: s.Engine.OzoneClient.Auth.Handle, + AccessJwt: s.Engine.OzoneClient.Auth.RefreshJwt, // Use the refresh jwt + RefreshJwt: s.Engine.OzoneClient.Auth.RefreshJwt, + }, + } + + res, err := atproto.ServerRefreshSession(ctx, oc) + if err != nil { + s.logger.Error("failed refreshing ozone session", "err", err) + continue + } + + // update the auth and client + oc.Auth.AccessJwt = res.AccessJwt + oc.Auth.RefreshJwt = res.RefreshJwt + + s.Engine.OzoneClient = oc + } + + case <-ctx.Done(): + return + } + } + }() +} diff --git a/go.mod b/go.mod index ff1c8ee77..fb2df5ce1 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/ipfs/go-ipld-cbor v0.1.0 github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-libipfs v0.7.0 + github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 github.com/ipld/go-car/v2 v2.13.1 github.com/jackc/pgx/v5 v5.5.0 @@ -64,6 +65,7 @@ require ( go.opentelemetry.io/otel/sdk v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/automaxprocs v1.5.3 + go.uber.org/zap v1.26.0 golang.org/x/crypto v0.21.0 golang.org/x/sync v0.7.0 golang.org/x/text v0.14.0 @@ -89,7 +91,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect - github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/klauspost/compress v1.17.3 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -103,7 +104,6 @@ require ( github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect - go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect ) From 839eb0407bdb835b24662f026b8419d5bb24339d Mon Sep 17 00:00:00 2001 From: Hailey Date: Wed, 1 Jan 2025 17:52:08 -0800 Subject: [PATCH 2/3] fix flag --- cmd/hepa/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 5d54edb36..243a3a91e 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -271,6 +271,7 @@ var runCmd = &cli.Command{ OzoneHost: cctx.String("atp-ozone-host"), OzoneDID: cctx.String("ozone-did"), OzoneAdminToken: cctx.String("ozone-admin-token"), + OzonePassword: cctx.String("ozone-password"), PDSHost: cctx.String("atp-pds-host"), PDSAdminToken: cctx.String("pds-admin-token"), SetsFileJSON: cctx.String("sets-json-path"), @@ -371,6 +372,7 @@ func configEphemeralServer(cctx *cli.Context) (*Server, error) { OzoneHost: cctx.String("atp-ozone-host"), OzoneDID: cctx.String("ozone-did"), OzoneAdminToken: cctx.String("ozone-admin-token"), + OzonePassword: cctx.String("ozone-password"), PDSHost: cctx.String("atp-pds-host"), PDSAdminToken: cctx.String("pds-admin-token"), SetsFileJSON: cctx.String("sets-json-path"), From 626231444684af3b4a5ff6f8d0cb17e83436fb47 Mon Sep 17 00:00:00 2001 From: Hailey Date: Wed, 1 Jan 2025 19:52:22 -0800 Subject: [PATCH 3/3] logic forks --- automod/consumer/ozone.go | 1 + automod/engine/engine.go | 36 +++++++++++++ automod/engine/fetch_account_meta.go | 2 +- cmd/hepa/main.go | 30 +++++++++-- cmd/hepa/server.go | 81 +++++++++++----------------- 5 files changed, 95 insertions(+), 55 deletions(-) diff --git a/automod/consumer/ozone.go b/automod/consumer/ozone.go index 2211cf21a..749a22f04 100644 --- a/automod/consumer/ozone.go +++ b/automod/consumer/ozone.go @@ -77,6 +77,7 @@ func (oc *OzoneConsumer) Run(ctx context.Context) error { "", // subjectType string nil, // types []string ) + if err != nil { oc.Logger.Warn("ozone query events failed; sleeping then will retrying", "err", err, "period", period.String()) time.Sleep(period) diff --git a/automod/engine/engine.go b/automod/engine/engine.go index 933987072..7565d679a 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -60,6 +60,8 @@ type EngineConfig struct { QuotaModTakedownDay int // number of misc actions automod can do per day, for all subjects combined (circuit breaker) QuotaModActionDay int + // whether hepa is running in `authority` or `labeler` mode + Mode string } // Entrypoint for external code pushing #identity events in to the engine. @@ -390,3 +392,37 @@ func (e *Engine) CanonicalLogLineNotification(c *NotificationContext) { "reject", c.effects.RejectEvent, ) } + +func (e *Engine) RunRefreshSession(ctx context.Context) { + go func() { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if e.OzoneClient != nil && e.OzoneClient.Auth.AccessJwt != "" { + // copy the ozone client. we dont have a mutex to lock, and we don't want to rugpull the + // access jwt while its trying to make requests. this isnt perfect... + oc := &*e.OzoneClient + + // set the access jwt to the existing refresh jwt + oc.Auth.AccessJwt = oc.Auth.RefreshJwt + + res, err := comatproto.ServerRefreshSession(ctx, oc) + if err != nil { + e.Logger.Error("failed refreshing ozone session", "err", err) + continue + } + + // update the existing clients auth + e.OzoneClient.Auth.AccessJwt = res.AccessJwt + e.OzoneClient.Auth.RefreshJwt = res.RefreshJwt + } + + case <-ctx.Done(): + return + } + } + }() +} diff --git a/automod/engine/fetch_account_meta.go b/automod/engine/fetch_account_meta.go index 803666c54..80e79141e 100644 --- a/automod/engine/fetch_account_meta.go +++ b/automod/engine/fetch_account_meta.go @@ -112,7 +112,7 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) ( } // first attempt to fetch private account metadata from Ozone - if e.OzoneClient != nil && e.OzoneClient.AdminToken != nil && *e.OzoneClient.AdminToken != "" { + if e.Config.Mode == "authority" && e.OzoneClient != nil { rd, err := toolsozone.ModerationGetRepo(ctx, e.OzoneClient, ident.DID.String()) if err != nil { logger.Warn("failed to fetch private account metadata from Ozone", "err", err) diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 243a3a91e..abb3b9f3b 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -41,6 +41,12 @@ func run(args []string) error { } app.Flags = []cli.Flag{ + &cli.StringFlag{ + Name: "mode", + Usage: "mode to run in: 'authority' (default) or `labeler`", + Value: "authority", + EnvVars: []string{"HEPA_MODE"}, + }, &cli.StringFlag{ Name: "atp-relay-host", Usage: "hostname and port of Relay to subscribe to", @@ -76,9 +82,19 @@ func run(args []string) error { EnvVars: []string{"HEPA_OZONE_AUTH_ADMIN_TOKEN", "HEPA_MOD_AUTH_ADMIN_TOKEN"}, }, &cli.StringFlag{ - Name: "ozone-password", + Name: "ozone-mod-password", Usage: "authentication password for mod service account. used when not supplying an admin authentication token.", - EnvVars: []string{"HEPA_OZONE_PASSWORD"}, + EnvVars: []string{"HEPA_OZONE_MOD_PASS"}, + }, + &cli.StringFlag{ + Name: "ozone-mod-service", + Usage: "service the mod account is hosted on", + EnvVars: []string{"HEPA_OZONE_MOD_SERVICE"}, + }, + &cli.StringFlag{ + Name: "ozone-service-did", + Usage: "did of the ozone service. only required when running in \"labeler\" mode.", + EnvVars: []string{"HEPA_OZONE_SERVICE_DID"}, }, &cli.StringFlag{ Name: "atp-pds-host", @@ -266,12 +282,15 @@ var runCmd = &cli.Command{ dir, Config{ Logger: logger, + Mode: cctx.String("mode"), RelayHost: cctx.String("atp-relay-host"), // DEPRECATED BskyHost: cctx.String("atp-bsky-host"), OzoneHost: cctx.String("atp-ozone-host"), OzoneDID: cctx.String("ozone-did"), OzoneAdminToken: cctx.String("ozone-admin-token"), - OzonePassword: cctx.String("ozone-password"), + OzoneModPassword: cctx.String("ozone-mod-password"), + OzoneModService: cctx.String("ozone-mod-service"), + OzoneServiceDid: cctx.String("ozone-service-did"), PDSHost: cctx.String("atp-pds-host"), PDSAdminToken: cctx.String("pds-admin-token"), SetsFileJSON: cctx.String("sets-json-path"), @@ -367,12 +386,15 @@ func configEphemeralServer(cctx *cli.Context) (*Server, error) { dir, Config{ Logger: logger, + Mode: cctx.String("mode"), RelayHost: cctx.String("atp-relay-host"), BskyHost: cctx.String("atp-bsky-host"), OzoneHost: cctx.String("atp-ozone-host"), OzoneDID: cctx.String("ozone-did"), OzoneAdminToken: cctx.String("ozone-admin-token"), - OzonePassword: cctx.String("ozone-password"), + OzoneModPassword: cctx.String("ozone-mod-password"), + OzoneModService: cctx.String("ozone-mod-service"), + OzoneServiceDid: cctx.String("ozone-service-did"), PDSHost: cctx.String("atp-pds-host"), PDSAdminToken: cctx.String("pds-admin-token"), SetsFileJSON: cctx.String("sets-json-path"), diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index 159cebe2a..9b092f1c7 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -38,12 +38,15 @@ type Server struct { type Config struct { Logger *slog.Logger + Mode string RelayHost string // DEPRECATED BskyHost string OzoneHost string OzoneDID string OzoneAdminToken string - OzonePassword string + OzoneModPassword string + OzoneModService string + OzoneServiceDid string PDSHost string PDSAdminToken string SetsFileJSON string @@ -71,6 +74,12 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { })) } + if config.Mode == "labeler" && (config.OzoneServiceDid == "" || config.OzoneModPassword == "" || config.OzoneModService == "") { + return nil, fmt.Errorf("must provide ozone service DID and password for labeler mode") + } else if config.Mode == "authority" && config.OzoneAdminToken == "" { + return nil, fmt.Errorf("must provide ozone admin token for authority mode") + } + relayws := config.RelayHost if !strings.HasPrefix(relayws, "ws") { return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'") @@ -78,18 +87,26 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { var ozoneClient *xrpc.Client if config.OzoneDID != "" { + var host string + if config.Mode == "authority" { + host = config.OzoneHost + } else if config.Mode == "labeler" { + host = config.OzoneModService + } + ozoneClient = &xrpc.Client{ - Client: util.RobustHTTPClient(), - Host: config.OzoneHost, + Client: util.RobustHTTPClient(), + Host: host, + Headers: make(map[string]string), } - if config.OzoneAdminToken != "" { + if config.Mode == "authority" { ozoneClient.AdminToken = &config.OzoneAdminToken ozoneClient.Auth = &xrpc.AuthInfo{} - } else if config.OzonePassword != "" { + } else if config.Mode == "labeler" { res, err := atproto.ServerCreateSession(context.TODO(), ozoneClient, &atproto.ServerCreateSession_Input{ Identifier: config.OzoneDID, - Password: config.OzonePassword, + Password: config.OzoneModPassword, }) if err != nil { @@ -107,9 +124,13 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { } if config.RatelimitBypass != "" { - ozoneClient.Headers = make(map[string]string) ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass } + + if config.Mode == "labeler" { + ozoneClient.Headers["atproto-proxy"] = fmt.Sprintf("%s#atproto_labeler", config.OzoneServiceDid) + } + od, err := syntax.ParseDID(config.OzoneDID) if err != nil { return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err) @@ -253,6 +274,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { QuotaModReportDay: config.QuotaModReportDay, QuotaModTakedownDay: config.QuotaModTakedownDay, QuotaModActionDay: config.QuotaModActionDay, + Mode: config.Mode, }, } @@ -264,8 +286,8 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { RedisClient: rdb, } - if config.OzonePassword != "" && config.OzoneAdminToken == "" { - s.runRefreshSession(context.TODO()) + if config.Mode == "labeler" { + s.Engine.RunRefreshSession(context.TODO()) } return s, nil @@ -275,44 +297,3 @@ func (s *Server) RunMetrics(listen string) error { http.Handle("/metrics", promhttp.Handler()) return http.ListenAndServe(listen, nil) } - -func (s *Server) runRefreshSession(ctx context.Context) { - go func() { - ticker := time.NewTicker(1 * time.Hour) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if s.Engine.OzoneClient != nil && s.Engine.OzoneClient.Auth.AccessJwt != "" { - // create a new ozone client since we dont have a mutex to lock - oc := &xrpc.Client{ - Client: util.RobustHTTPClient(), - Host: s.Engine.OzoneClient.Host, - Auth: &xrpc.AuthInfo{ - Did: s.Engine.OzoneClient.Auth.Did, - Handle: s.Engine.OzoneClient.Auth.Handle, - AccessJwt: s.Engine.OzoneClient.Auth.RefreshJwt, // Use the refresh jwt - RefreshJwt: s.Engine.OzoneClient.Auth.RefreshJwt, - }, - } - - res, err := atproto.ServerRefreshSession(ctx, oc) - if err != nil { - s.logger.Error("failed refreshing ozone session", "err", err) - continue - } - - // update the auth and client - oc.Auth.AccessJwt = res.AccessJwt - oc.Auth.RefreshJwt = res.RefreshJwt - - s.Engine.OzoneClient = oc - } - - case <-ctx.Done(): - return - } - } - }() -}