Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hepa: add third party ozone support to automod #894

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions automod/consumer/ozone.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (oc *OzoneConsumer) Run(ctx context.Context) error {
"", // subjectType string
nil, // types []string
)

if err != nil {
oc.Logger.Warn("ozone query events failed; sleeping then will retrying", "err", err, "period", period.String())
time.Sleep(period)
Expand Down
36 changes: 36 additions & 0 deletions automod/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type EngineConfig struct {
QuotaModTakedownDay int
// number of misc actions automod can do per day, for all subjects combined (circuit breaker)
QuotaModActionDay int
// whether hepa is running in `authority` or `labeler` mode
Mode string
}

// Entrypoint for external code pushing #identity events in to the engine.
Expand Down Expand Up @@ -390,3 +392,37 @@ func (e *Engine) CanonicalLogLineNotification(c *NotificationContext) {
"reject", c.effects.RejectEvent,
)
}

func (e *Engine) RunRefreshSession(ctx context.Context) {
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if e.OzoneClient != nil && e.OzoneClient.Auth.AccessJwt != "" {
// copy the ozone client. we dont have a mutex to lock, and we don't want to rugpull the
// access jwt while its trying to make requests. this isnt perfect...
oc := &*e.OzoneClient

// set the access jwt to the existing refresh jwt
oc.Auth.AccessJwt = oc.Auth.RefreshJwt

res, err := comatproto.ServerRefreshSession(ctx, oc)
if err != nil {
e.Logger.Error("failed refreshing ozone session", "err", err)
continue
}

// update the existing clients auth
e.OzoneClient.Auth.AccessJwt = res.AccessJwt
e.OzoneClient.Auth.RefreshJwt = res.RefreshJwt
}

case <-ctx.Done():
return
}
}
}()
}
2 changes: 1 addition & 1 deletion automod/engine/fetch_account_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) (
}

// first attempt to fetch private account metadata from Ozone
if e.OzoneClient != nil {
if e.Config.Mode == "authority" && e.OzoneClient != nil {
rd, err := toolsozone.ModerationGetRepo(ctx, e.OzoneClient, ident.DID.String())
if err != nil {
logger.Warn("failed to fetch private account metadata from Ozone", "err", err)
Expand Down
29 changes: 29 additions & 0 deletions cmd/hepa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func run(args []string) error {
}

app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "mode",
Usage: "mode to run in: 'authority' (default) or `labeler`",
Value: "authority",
EnvVars: []string{"HEPA_MODE"},
},
&cli.StringFlag{
Name: "atp-relay-host",
Usage: "hostname and port of Relay to subscribe to",
Expand Down Expand Up @@ -75,6 +81,21 @@ func run(args []string) error {
Usage: "admin authentication password for mod service",
EnvVars: []string{"HEPA_OZONE_AUTH_ADMIN_TOKEN", "HEPA_MOD_AUTH_ADMIN_TOKEN"},
},
&cli.StringFlag{
Name: "ozone-mod-password",
Usage: "authentication password for mod service account. used when not supplying an admin authentication token.",
EnvVars: []string{"HEPA_OZONE_MOD_PASS"},
},
&cli.StringFlag{
Name: "ozone-mod-service",
Usage: "service the mod account is hosted on",
EnvVars: []string{"HEPA_OZONE_MOD_SERVICE"},
},
&cli.StringFlag{
Name: "ozone-service-did",
Usage: "did of the ozone service. only required when running in \"labeler\" mode.",
EnvVars: []string{"HEPA_OZONE_SERVICE_DID"},
},
&cli.StringFlag{
Name: "atp-pds-host",
Usage: "method, hostname, and port of PDS (or entryway) for admin account info; uses admin auth",
Expand Down Expand Up @@ -261,11 +282,15 @@ var runCmd = &cli.Command{
dir,
Config{
Logger: logger,
Mode: cctx.String("mode"),
RelayHost: cctx.String("atp-relay-host"), // DEPRECATED
BskyHost: cctx.String("atp-bsky-host"),
OzoneHost: cctx.String("atp-ozone-host"),
OzoneDID: cctx.String("ozone-did"),
OzoneAdminToken: cctx.String("ozone-admin-token"),
OzoneModPassword: cctx.String("ozone-mod-password"),
OzoneModService: cctx.String("ozone-mod-service"),
OzoneServiceDid: cctx.String("ozone-service-did"),
PDSHost: cctx.String("atp-pds-host"),
PDSAdminToken: cctx.String("pds-admin-token"),
SetsFileJSON: cctx.String("sets-json-path"),
Expand Down Expand Up @@ -361,11 +386,15 @@ func configEphemeralServer(cctx *cli.Context) (*Server, error) {
dir,
Config{
Logger: logger,
Mode: cctx.String("mode"),
RelayHost: cctx.String("atp-relay-host"),
BskyHost: cctx.String("atp-bsky-host"),
OzoneHost: cctx.String("atp-ozone-host"),
OzoneDID: cctx.String("ozone-did"),
OzoneAdminToken: cctx.String("ozone-admin-token"),
OzoneModPassword: cctx.String("ozone-mod-password"),
OzoneModService: cctx.String("ozone-mod-service"),
OzoneServiceDid: cctx.String("ozone-service-did"),
PDSHost: cctx.String("atp-pds-host"),
PDSAdminToken: cctx.String("pds-admin-token"),
SetsFileJSON: cctx.String("sets-json-path"),
Expand Down
62 changes: 56 additions & 6 deletions cmd/hepa/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/automod"
Expand Down Expand Up @@ -37,11 +38,15 @@ type Server struct {

type Config struct {
Logger *slog.Logger
Mode string
RelayHost string // DEPRECATED
BskyHost string
OzoneHost string
OzoneDID string
OzoneAdminToken string
OzoneModPassword string
OzoneModService string
OzoneServiceDid string
PDSHost string
PDSAdminToken string
SetsFileJSON string
Expand Down Expand Up @@ -69,23 +74,63 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
}))
}

if config.Mode == "labeler" && (config.OzoneServiceDid == "" || config.OzoneModPassword == "" || config.OzoneModService == "") {
return nil, fmt.Errorf("must provide ozone service DID and password for labeler mode")
} else if config.Mode == "authority" && config.OzoneAdminToken == "" {
return nil, fmt.Errorf("must provide ozone admin token for authority mode")
}

relayws := config.RelayHost
if !strings.HasPrefix(relayws, "ws") {
return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'")
}

var ozoneClient *xrpc.Client
if config.OzoneAdminToken != "" && config.OzoneDID != "" {
if config.OzoneDID != "" {
var host string
if config.Mode == "authority" {
host = config.OzoneHost
} else if config.Mode == "labeler" {
host = config.OzoneModService
}

ozoneClient = &xrpc.Client{
Client: util.RobustHTTPClient(),
Host: config.OzoneHost,
AdminToken: &config.OzoneAdminToken,
Auth: &xrpc.AuthInfo{},
Client: util.RobustHTTPClient(),
Host: host,
Headers: make(map[string]string),
}

if config.Mode == "authority" {
ozoneClient.AdminToken = &config.OzoneAdminToken
ozoneClient.Auth = &xrpc.AuthInfo{}
} else if config.Mode == "labeler" {
res, err := atproto.ServerCreateSession(context.TODO(), ozoneClient, &atproto.ServerCreateSession_Input{
Identifier: config.OzoneDID,
Password: config.OzoneModPassword,
})

if err != nil {
return nil, fmt.Errorf("creating ozone session: %v", err)
}

ozoneClient.Auth = &xrpc.AuthInfo{
Did: res.Did,
Handle: res.Handle,
AccessJwt: res.AccessJwt,
RefreshJwt: res.RefreshJwt,
}

logger.Info("created ozone session", "did", res.Did)
}

if config.RatelimitBypass != "" {
ozoneClient.Headers = make(map[string]string)
ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass
}

if config.Mode == "labeler" {
ozoneClient.Headers["atproto-proxy"] = fmt.Sprintf("%s#atproto_labeler", config.OzoneServiceDid)
}

od, err := syntax.ParseDID(config.OzoneDID)
if err != nil {
return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err)
Expand Down Expand Up @@ -229,6 +274,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
QuotaModReportDay: config.QuotaModReportDay,
QuotaModTakedownDay: config.QuotaModTakedownDay,
QuotaModActionDay: config.QuotaModActionDay,
Mode: config.Mode,
},
}

Expand All @@ -240,6 +286,10 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
RedisClient: rdb,
}

if config.Mode == "labeler" {
s.Engine.RunRefreshSession(context.TODO())
}

return s, nil
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/ipfs/go-ipld-cbor v0.1.0
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-libipfs v0.7.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4
github.com/ipld/go-car/v2 v2.13.1
github.com/jackc/pgx/v5 v5.5.0
Expand Down Expand Up @@ -64,6 +65,7 @@ require (
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.14.0
Expand All @@ -89,7 +91,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand All @@ -103,7 +104,6 @@ require (
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
)

Expand Down
Loading