EasyDetect / pipeline /nltk /twitter /twitterclient.py
sunnychenxiwang's picture
update nltk
d916065
raw
history blame
19.9 kB
# Natural Language Toolkit: Twitter client
#
# Copyright (C) 2001-2023 NLTK Project
# Author: Ewan Klein <[email protected]>
# Lorenzo Rubio <[email protected]>
# URL: <https://www.nltk.org/>
# For license information, see LICENSE.TXT
"""
NLTK Twitter client
This module offers methods for collecting and processing Tweets. Most of the
functionality depends on access to the Twitter APIs, and this is handled via
the third party Twython library.
If one of the methods below returns an integer, it is probably a `Twitter
error code <https://dev.twitter.com/overview/api/response-codes>`_. For
example, the response of '420' means that you have reached the limit of the
requests you can currently make to the Twitter API. Currently, `rate limits
for the search API <https://dev.twitter.com/rest/public/rate-limiting>`_ are
divided into 15 minute windows.
"""
import datetime
import gzip
import itertools
import json
import os
import time
import requests
from twython import Twython, TwythonStreamer
from twython.exceptions import TwythonError, TwythonRateLimitError
from nltk.twitter.api import BasicTweetHandler, TweetHandlerI
from nltk.twitter.util import credsfromfile, guess_path
class Streamer(TwythonStreamer):
"""
Retrieve data from the Twitter Streaming API.
The streaming API requires
`OAuth 1.0 <https://en.wikipedia.org/wiki/OAuth>`_ authentication.
"""
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret):
self.handler = None
self.do_continue = True
TwythonStreamer.__init__(
self, app_key, app_secret, oauth_token, oauth_token_secret
)
def register(self, handler):
"""
Register a method for handling Tweets.
:param TweetHandlerI handler: method for viewing
"""
self.handler = handler
def on_success(self, data):
"""
:param data: response from Twitter API
"""
if self.do_continue:
if self.handler is not None:
if "text" in data:
self.handler.counter += 1
self.handler.handle(data)
self.do_continue = self.handler.do_continue()
else:
raise ValueError("No data handler has been registered.")
else:
self.disconnect()
self.handler.on_finish()
def on_error(self, status_code, data):
"""
:param status_code: The status code returned by the Twitter API
:param data: The response from Twitter API
"""
print(status_code)
def sample(self):
"""
Wrapper for 'statuses / sample' API call
"""
while self.do_continue:
# Stream in an endless loop until limit is reached. See twython
# issue 288: https://github.com/ryanmcgrath/twython/issues/288
# colditzjb commented on 9 Dec 2014
try:
self.statuses.sample()
except requests.exceptions.ChunkedEncodingError as e:
if e is not None:
print(f"Error (stream will continue): {e}")
continue
def filter(self, track="", follow="", lang="en"):
"""
Wrapper for 'statuses / filter' API call
"""
while self.do_continue:
# Stream in an endless loop until limit is reached
try:
if track == "" and follow == "":
msg = "Please supply a value for 'track', 'follow'"
raise ValueError(msg)
self.statuses.filter(track=track, follow=follow, lang=lang)
except requests.exceptions.ChunkedEncodingError as e:
if e is not None:
print(f"Error (stream will continue): {e}")
continue
class Query(Twython):
"""
Retrieve data from the Twitter REST API.
"""
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret):
"""
:param app_key: (optional) Your applications key
:param app_secret: (optional) Your applications secret key
:param oauth_token: (optional) When using **OAuth 1**, combined with
oauth_token_secret to make authenticated calls
:param oauth_token_secret: (optional) When using **OAuth 1** combined
with oauth_token to make authenticated calls
"""
self.handler = None
self.do_continue = True
Twython.__init__(self, app_key, app_secret, oauth_token, oauth_token_secret)
def register(self, handler):
"""
Register a method for handling Tweets.
:param TweetHandlerI handler: method for viewing or writing Tweets to a file.
"""
self.handler = handler
def expand_tweetids(self, ids_f, verbose=True):
"""
Given a file object containing a list of Tweet IDs, fetch the
corresponding full Tweets from the Twitter API.
The API call `statuses/lookup` will fail to retrieve a Tweet if the
user has deleted it.
This call to the Twitter API is rate-limited. See
<https://dev.twitter.com/rest/reference/get/statuses/lookup> for details.
:param ids_f: input file object consisting of Tweet IDs, one to a line
:return: iterable of Tweet objects in JSON format
"""
ids = [line.strip() for line in ids_f if line]
if verbose:
print(f"Counted {len(ids)} Tweet IDs in {ids_f}.")
# The Twitter endpoint takes lists of up to 100 ids, so we chunk the
# ids.
id_chunks = [ids[i : i + 100] for i in range(0, len(ids), 100)]
chunked_tweets = (self.lookup_status(id=chunk) for chunk in id_chunks)
return itertools.chain.from_iterable(chunked_tweets)
def _search_tweets(self, keywords, limit=100, lang="en"):
"""
Assumes that the handler has been informed. Fetches Tweets from
search_tweets generator output and passses them to handler
:param str keywords: A list of query terms to search for, written as\
a comma-separated string.
:param int limit: Number of Tweets to process
:param str lang: language
"""
while True:
tweets = self.search_tweets(
keywords=keywords, limit=limit, lang=lang, max_id=self.handler.max_id
)
for tweet in tweets:
self.handler.handle(tweet)
if not (self.handler.do_continue() and self.handler.repeat):
break
self.handler.on_finish()
def search_tweets(
self,
keywords,
limit=100,
lang="en",
max_id=None,
retries_after_twython_exception=0,
):
"""
Call the REST API ``'search/tweets'`` endpoint with some plausible
defaults. See `the Twitter search documentation
<https://dev.twitter.com/rest/public/search>`_ for more information
about admissible search parameters.
:param str keywords: A list of query terms to search for, written as\
a comma-separated string
:param int limit: Number of Tweets to process
:param str lang: language
:param int max_id: id of the last tweet fetched
:param int retries_after_twython_exception: number of retries when\
searching Tweets before raising an exception
:rtype: python generator
"""
if not self.handler:
# if no handler is provided, `BasicTweetHandler` provides minimum
# functionality for limiting the number of Tweets retrieved
self.handler = BasicTweetHandler(limit=limit)
count_from_query = 0
if max_id:
self.handler.max_id = max_id
else:
results = self.search(
q=keywords, count=min(100, limit), lang=lang, result_type="recent"
)
count = len(results["statuses"])
if count == 0:
print("No Tweets available through REST API for those keywords")
return
count_from_query = count
self.handler.max_id = results["statuses"][count - 1]["id"] - 1
for result in results["statuses"]:
yield result
self.handler.counter += 1
if self.handler.do_continue() == False:
return
# Pagination loop: keep fetching Tweets until the desired count is
# reached while dealing with Twitter rate limits.
retries = 0
while count_from_query < limit:
try:
mcount = min(100, limit - count_from_query)
results = self.search(
q=keywords,
count=mcount,
lang=lang,
max_id=self.handler.max_id,
result_type="recent",
)
except TwythonRateLimitError as e:
print(f"Waiting for 15 minutes -{e}")
time.sleep(15 * 60) # wait 15 minutes
continue
except TwythonError as e:
print(f"Fatal error in Twython request -{e}")
if retries_after_twython_exception == retries:
raise e
retries += 1
count = len(results["statuses"])
if count == 0:
print("No more Tweets available through rest api")
return
count_from_query += count
# the max_id is also present in the Tweet metadata
# results['search_metadata']['next_results'], but as part of a
# query and difficult to fetch. This is doing the equivalent
# (last tweet id minus one)
self.handler.max_id = results["statuses"][count - 1]["id"] - 1
for result in results["statuses"]:
yield result
self.handler.counter += 1
if self.handler.do_continue() == False:
return
def user_info_from_id(self, userids):
"""
Convert a list of userIDs into a variety of information about the users.
See <https://dev.twitter.com/rest/reference/get/users/show>.
:param list userids: A list of integer strings corresponding to Twitter userIDs
:rtype: list(json)
"""
return [self.show_user(user_id=userid) for userid in userids]
def user_tweets(self, screen_name, limit, include_rts="false"):
"""
Return a collection of the most recent Tweets posted by the user
:param str user: The user's screen name; the initial '@' symbol\
should be omitted
:param int limit: The number of Tweets to recover; 200 is the maximum allowed
:param str include_rts: Whether to include statuses which have been\
retweeted by the user; possible values are 'true' and 'false'
"""
data = self.get_user_timeline(
screen_name=screen_name, count=limit, include_rts=include_rts
)
for item in data:
self.handler.handle(item)
class Twitter:
"""
Wrapper class with restricted functionality and fewer options.
"""
def __init__(self):
self._oauth = credsfromfile()
self.streamer = Streamer(**self._oauth)
self.query = Query(**self._oauth)
def tweets(
self,
keywords="",
follow="",
to_screen=True,
stream=True,
limit=100,
date_limit=None,
lang="en",
repeat=False,
gzip_compress=False,
):
"""
Process some Tweets in a simple manner.
:param str keywords: Keywords to use for searching or filtering
:param list follow: UserIDs to use for filtering Tweets from the public stream
:param bool to_screen: If `True`, display the tweet texts on the screen,\
otherwise print to a file
:param bool stream: If `True`, use the live public stream,\
otherwise search past public Tweets
:param int limit: The number of data items to process in the current\
round of processing.
:param tuple date_limit: The date at which to stop collecting\
new data. This should be entered as a tuple which can serve as the\
argument to `datetime.datetime`.\
E.g. `date_limit=(2015, 4, 1, 12, 40)` for 12:30 pm on April 1 2015.
Note that, in the case of streaming, this is the maximum date, i.e.\
a date in the future; if not, it is the minimum date, i.e. a date\
in the past
:param str lang: language
:param bool repeat: A flag to determine whether multiple files should\
be written. If `True`, the length of each file will be set by the\
value of `limit`. Use only if `to_screen` is `False`. See also
:py:func:`handle`.
:param gzip_compress: if `True`, output files are compressed with gzip.
"""
if stream:
upper_date_limit = date_limit
lower_date_limit = None
else:
upper_date_limit = None
lower_date_limit = date_limit
if to_screen:
handler = TweetViewer(
limit=limit,
upper_date_limit=upper_date_limit,
lower_date_limit=lower_date_limit,
)
else:
handler = TweetWriter(
limit=limit,
upper_date_limit=upper_date_limit,
lower_date_limit=lower_date_limit,
repeat=repeat,
gzip_compress=gzip_compress,
)
if to_screen:
handler = TweetViewer(limit=limit)
else:
if stream:
upper_date_limit = date_limit
lower_date_limit = None
else:
upper_date_limit = None
lower_date_limit = date_limit
handler = TweetWriter(
limit=limit,
upper_date_limit=upper_date_limit,
lower_date_limit=lower_date_limit,
repeat=repeat,
gzip_compress=gzip_compress,
)
if stream:
self.streamer.register(handler)
if keywords == "" and follow == "":
self.streamer.sample()
else:
self.streamer.filter(track=keywords, follow=follow, lang=lang)
else:
self.query.register(handler)
if keywords == "":
raise ValueError("Please supply at least one keyword to search for.")
else:
self.query._search_tweets(keywords, limit=limit, lang=lang)
class TweetViewer(TweetHandlerI):
"""
Handle data by sending it to the terminal.
"""
def handle(self, data):
"""
Direct data to `sys.stdout`
:return: return ``False`` if processing should cease, otherwise return ``True``.
:rtype: bool
:param data: Tweet object returned by Twitter API
"""
text = data["text"]
print(text)
self.check_date_limit(data)
if self.do_stop:
return
def on_finish(self):
print(f"Written {self.counter} Tweets")
class TweetWriter(TweetHandlerI):
"""
Handle data by writing it to a file.
"""
def __init__(
self,
limit=2000,
upper_date_limit=None,
lower_date_limit=None,
fprefix="tweets",
subdir="twitter-files",
repeat=False,
gzip_compress=False,
):
"""
The difference between the upper and lower date limits depends on
whether Tweets are coming in an ascending date order (i.e. when
streaming) or descending date order (i.e. when searching past Tweets).
:param int limit: number of data items to process in the current\
round of processing.
:param tuple upper_date_limit: The date at which to stop collecting new\
data. This should be entered as a tuple which can serve as the\
argument to `datetime.datetime`. E.g. `upper_date_limit=(2015, 4, 1, 12,\
40)` for 12:30 pm on April 1 2015.
:param tuple lower_date_limit: The date at which to stop collecting new\
data. See `upper_data_limit` for formatting.
:param str fprefix: The prefix to use in creating file names for Tweet\
collections.
:param str subdir: The name of the directory where Tweet collection\
files should be stored.
:param bool repeat: flag to determine whether multiple files should be\
written. If `True`, the length of each file will be set by the value\
of `limit`. See also :py:func:`handle`.
:param gzip_compress: if `True`, output files are compressed with gzip.
"""
self.fprefix = fprefix
self.subdir = guess_path(subdir)
self.gzip_compress = gzip_compress
self.fname = self.timestamped_file()
self.repeat = repeat
self.output = None
TweetHandlerI.__init__(self, limit, upper_date_limit, lower_date_limit)
def timestamped_file(self):
"""
:return: timestamped file name
:rtype: str
"""
subdir = self.subdir
fprefix = self.fprefix
if subdir:
if not os.path.exists(subdir):
os.mkdir(subdir)
fname = os.path.join(subdir, fprefix)
fmt = "%Y%m%d-%H%M%S"
timestamp = datetime.datetime.now().strftime(fmt)
if self.gzip_compress:
suffix = ".gz"
else:
suffix = ""
outfile = f"{fname}.{timestamp}.json{suffix}"
return outfile
def handle(self, data):
"""
Write Twitter data as line-delimited JSON into one or more files.
:return: return `False` if processing should cease, otherwise return `True`.
:param data: tweet object returned by Twitter API
"""
if self.startingup:
if self.gzip_compress:
self.output = gzip.open(self.fname, "w")
else:
self.output = open(self.fname, "w")
print(f"Writing to {self.fname}")
json_data = json.dumps(data)
if self.gzip_compress:
self.output.write((json_data + "\n").encode("utf-8"))
else:
self.output.write(json_data + "\n")
self.check_date_limit(data)
if self.do_stop:
return
self.startingup = False
def on_finish(self):
print(f"Written {self.counter} Tweets")
if self.output:
self.output.close()
def do_continue(self):
if self.repeat == False:
return TweetHandlerI.do_continue(self)
if self.do_stop:
# stop for a functional cause (e.g. date limit)
return False
if self.counter == self.limit:
# repeat is True, thus close output file and
# create a new one
self._restart_file()
return True
def _restart_file(self):
self.on_finish()
self.fname = self.timestamped_file()
self.startingup = True
self.counter = 0