-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtweetratio.py
164 lines (140 loc) · 5.18 KB
/
tweetratio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import backoff
import bs4
import json
import logging
import os
import requests
import sys
import tqdm
import traceback
import tweepy
auth = tweepy.OAuthHandler(
os.environ['consumer_key'],
os.environ['consumer_secret']
)
auth.set_access_token(
os.environ['access_token'],
os.environ['access_token_secret']
)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
logging.basicConfig(filename='log.log',
format='%(levelname)s:%(asctime)s:%(message)s')
logging.getLogger().setLevel(logging.INFO)
logging.getLogger('tweepy').setLevel(logging.WARNING)
def get_tweets(user, ntweets=3200):
'''Get most recent ntweets tweets for user.'''
cursor = tweepy.Cursor(api.user_timeline, id=user, include_rts=False,
count=min(ntweets, 200))
tweets = {}
for tweet in cursor.items(ntweets):
tweets[tweet._json['id_str']] = tweet._json
return tweets
@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
AttributeError),
max_tries=8)
def scrape_reply_count_tweet(tweet):
'''Scrape reply count from tweet page.'''
url = (f"http://twitter.com/{tweet['user']['screen_name']}"
f"/status/{tweet['id_str']}")
try:
page = bs4.BeautifulSoup(requests.get(url).text, 'lxml')
span = (page
.find("div", {"class": 'permalink-tweet'})
.find("span", {"class": "ProfileTweet-actionCount"}))
count = int(span.get('data-tweet-stat-count'))
except:
logging.error(f'Failed to scrape {url}')
raise
return count
def scrape_reply_counts_timeline_slice(timeline):
'''
Scrape all reply counts from a bs4 object representing a slice of a user
timeline.
'''
counts = []
for tweet in timeline.find_all("div", {"class": "tweet"}):
tweet_id = tweet.get('data-item-id')
reply_count = int(tweet
.find('span', {'class': 'ProfileTweet-actionCount'})
.get('data-tweet-stat-count'))
counts.append((tweet_id, reply_count))
return counts
def scrape_reply_counts_timeline(user):
'''Scrape all the reply counts possible from a users timeline.'''
counts = []
root_url = f"https://twitter.com/i/profiles/show/{user}/timeline/tweets"
url = root_url
with tqdm.tqdm(total=1000) as pbar: # 1000 ~= maximum timeline length
while True:
try:
json_payload = json.loads(requests.get(url).text)
if not json_payload["has_more_items"]:
return counts
else:
html = json_payload['items_html']
except json.decoder.JSONDecodeError:
logging.exception(f'Failed at {url}')
return counts
timeline = bs4.BeautifulSoup(html, 'lxml')
newcounts = scrape_reply_counts_timeline_slice(timeline)
counts += newcounts
pbar.update(len(newcounts))
oldest_tweet_seen = counts[-1][0]
url = root_url + f"?max_position={oldest_tweet_seen}"
def count_reply_counts(tweets):
'''Returns number of tweets that have reply_count set.'''
return sum(('reply_count' in tweet) and (tweet['reply_count'] is not None)
for tweet in tweets.values())
def add_reply_counts(tweets):
'''Add 'reply_count' to each tweet in tweets.'''
user = list(tweets.values())[0]['user']['screen_name']
print('--- Scraping timeline')
reply_counts = scrape_reply_counts_timeline(user)
for tweet_id, reply_count in reply_counts:
try:
tweets[tweet_id]['reply_count'] = reply_count
except KeyError:
pass
logging.info(f'Added {count_reply_counts(tweets)}'
+ ' reply counts from timeline')
print('--- Getting missing tweets')
for tweet_id, tweet in tqdm.tqdm(tweets.items()):
if 'reply_count' not in tweet:
try:
tweet['reply_count'] = scrape_reply_count_tweet(tweet)
except Exception as err:
traceback.print_exc()
return tweets
logging.info(f'Acquired {count_reply_counts(tweets)} reply counts')
return tweets
def load_json(jsonf):
'''Load jsonf.'''
with open(jsonf) as f:
return json.load(f)
def save_json(data, jsonf):
with open(jsonf, 'w') as f:
json.dump(data, f)
def scrape_user(user):
tweets = get_tweets(user)
logging.info(f'Got {len(tweets)} tweets from API')
tweets = add_reply_counts(tweets)
return tweets
def get_user(user, save=True, rescrape=True):
'''Gets tweets for user.'''
logging.info(f'Get @{user}')
rawf = f'raw/{user}.json'
if rescrape or not os.path.exists(rawf):
logging.info(f'Scrape @{user}')
tweets = scrape_user(user)
# Save only if scraping fresh data
if save:
logging.info(f'Save @{user}')
save_json(tweets, rawf)
else:
logging.info(f'Load @{user}')
tweets = load_json(rawf)
return tweets
if __name__ == '__main__':
get_user(sys.argv[1])