Skip to content

Commit

Permalink
add comment stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexis Guerville committed Sep 26, 2022
1 parent ba4f3e3 commit 1a0e676
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 0 deletions.
9 changes: 9 additions & 0 deletions reddit/reddit.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,15 @@ type ListPostOptions struct {
Time string `url:"t,omitempty"`
}

// ListCommentsOptions defines possible options used when getting comments from a subreddit.
type ListCommentsOptions struct {
ListOptions
// One of: hour, day, week, month, year, all.
Time string `url:"t,omitempty"`
// One of: relevance, hot, top, new, comments.
Sort string `url:"sort,omitempty"`
}

// ListPostSearchOptions defines possible options used when searching for posts within a subreddit.
type ListPostSearchOptions struct {
ListPostOptions
Expand Down
91 changes: 91 additions & 0 deletions reddit/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type StreamService struct {
// - a channel into which new posts will be sent
// - a channel into which any errors will be sent
// - a function that the client can call once to stop the streaming and close the channels
//
// Because of the 100 post limit imposed by Reddit when fetching posts, some high-traffic
// streams might drop submissions between API requests, such as when streaming r/all.
func (s *StreamService) Posts(subreddit string, opts ...StreamOpt) (<-chan *Post, <-chan error, func()) {
Expand Down Expand Up @@ -93,11 +94,101 @@ func (s *StreamService) Posts(subreddit string, opts ...StreamOpt) (<-chan *Post
return postsCh, errsCh, stop
}

// Comments streams comments from the specified subreddit.
// It returns 2 channels and a function:
// - a channel into which new comments will be sent
// - a channel into which any errors will be sent
// - a function that the client can call once to stop the streaming and close the channels
//
// Because of the 100 post limit imposed by Reddit when fetching posts, some high-traffic
// streams might drop submissions between API requests, such as when streaming r/all.
func (s *StreamService) Comments(subreddit string, opts ...StreamOpt) (<-chan *Comment, <-chan error, func()) {
streamConfig := &streamConfig{
Interval: defaultStreamInterval,
DiscardInitial: false,
MaxRequests: 0,
}
for _, opt := range opts {
opt(streamConfig)
}

ticker := time.NewTicker(streamConfig.Interval)
commentsCh := make(chan *Comment)
errsCh := make(chan error)

var once sync.Once
stop := func() {
once.Do(func() {
ticker.Stop()
close(commentsCh)
close(errsCh)
})
}

// originally used the "before" parameter, but if that post gets deleted, subsequent requests
// would just return empty listings; easier to just keep track of all post ids encountered
ids := set{}

go func() {
defer stop()

var n int
infinite := streamConfig.MaxRequests == 0

for ; ; <-ticker.C {
n++

comments, err := s.getComments(subreddit)
if err != nil {
errsCh <- err
if !infinite && n >= streamConfig.MaxRequests {
break
}
continue
}

for _, comment := range comments {
id := comment.FullID

// if this post id is already part of the set, it means that it and the ones
// after it in the list have already been streamed, so break out of the loop
if ids.Exists(id) {
break
}
ids.Add(id)

// skip this post because we are discarding initial fetch of posts
if streamConfig.DiscardInitial {
continue
}

commentsCh <- comment
}

// setting discard initial to false, since we the loop has run once already
streamConfig.DiscardInitial = false

if !infinite && n >= streamConfig.MaxRequests {
break
}
}
}()

return commentsCh, errsCh, stop
}

func (s *StreamService) getPosts(subreddit string) ([]*Post, error) {
posts, _, err := s.client.Subreddit.NewPosts(context.Background(), subreddit, &ListOptions{Limit: 100})
return posts, err
}

func (s *StreamService) getComments(subreddit string) ([]*Comment, error) {
opts := &ListCommentsOptions{}
opts.Limit = 100
comments, _, err := s.client.Subreddit.NewComments(context.Background(), subreddit, opts)
return comments, err
}

type set map[string]struct{}

func (s set) Add(v string) {
Expand Down
22 changes: 22 additions & 0 deletions reddit/subreddit.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,18 @@ func (s *SubredditService) getPosts(ctx context.Context, sort string, subreddit
return l.Posts(), resp, nil
}

func (s *SubredditService) getComments(ctx context.Context, subreddit string, opts interface{}) ([]*Comment, *Response, error) {
path := ""
if subreddit != "" {
path = fmt.Sprintf("r/%s/comments.json", subreddit)
}
l, resp, err := s.client.getListing(ctx, path, opts)
if err != nil {
return nil, resp, err
}
return l.Comments(), resp, nil
}

// HotPosts returns the hottest posts from the specified subreddit.
// To search through multiple, separate the names with a plus (+), e.g. "golang+test".
// If none are defined, it returns the ones from your subscribed subreddits.
Expand Down Expand Up @@ -367,6 +379,16 @@ func (s *SubredditService) TopPosts(ctx context.Context, subreddit string, opts
return s.getPosts(ctx, "top", subreddit, opts)
}

// NewComments returns the newest comments from the specified subreddit.
// To search through multiple, separate the names with a plus (+), e.g. "golang+test".
// If none are defined, it returns the ones from your subscribed subreddits.
// To search through all, just specify "all".
// To search through all and filter out subreddits, provide "all-name1-name2".
func (s *SubredditService) NewComments(ctx context.Context, subreddit string, opts *ListCommentsOptions) ([]*Comment, *Response, error) {
opts.Sort = "new"
return s.getComments(ctx, subreddit, opts)
}

// Get a subreddit by name.
func (s *SubredditService) Get(ctx context.Context, name string) (*Subreddit, *Response, error) {
if name == "" {
Expand Down

0 comments on commit 1a0e676

Please sign in to comment.