Commit d998edb1 authored by Cody Zacharias's avatar Cody Zacharias Committed by GitHub

Rewritten for Python3

parent db08c905
#!/usr/bin/env python #!/usr/bin/python3
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from time import gmtime, strftime from time import gmtime, strftime
from PIL import Image
from io import BytesIO
import argparse import argparse
import aiohttp
import asyncio
import async_timeout
import datetime import datetime
import json import json
import os
import Queue
import re
import requests
import sys import sys
import threading
q = Queue.Queue() async def getUrl(Min):
if Min == -1:
url = "https://twitter.com/search?f=tweets&vertical=default&lang=en&q="
else:
url = "https://twitter.com/i/search/timeline?f=tweets&vertical=default"
url+= "&lang=en&include_available_features=1&include_entities=1&reset_"
url+= "error_state=false&src=typd&max_position={}&q=".format(Min)
class tweep: if arg.u != None:
def __init__(self): url+= "from%3A{0.u}".format(arg)
self.min = -1 if arg.s != None:
self.author = arg.u arg.s = arg.s.replace(" ", "%20").replace("#", "%23")
self.search = arg.s url+= "%20{0.s}".format(arg)
self.year = arg.year if arg.year != None:
self.feed = [-1] url+= "%20until%3A{0.year}-1-1".format(arg)
self.tweets = 0 if arg.fruit:
self.tweet_urls = [] url+= "%20myspace.com%20OR%20last.fm%20OR"
self.pic_count = 0 url+= "%20mail%20OR%20email%20OR%20gmail%20OR%20e-mail"
url+= "%20OR%20phone%20OR%20call%20me%20OR%20text%20me"
url+= "%20OR%20keybase"
if arg.verified:
url+= "%20filter%3Averified"
def get_url(self): return url
url_1 = "https://twitter.com/search?f=tweets&vertical=default&lang=en&q="
url_2 = "https://twitter.com/i/search/timeline?f=tweets&vertical=default"
url_2 +="&lang=en&include_available_features=1&include_entities=1&reset_error_state=false&src=typd"
url = url_1 if self.min == -1 else "{0}&max_position={1.min}&q=".format(url_2, self)
if self.author != None:
url+= "from%3A{0.author}".format(self)
if self.search != None:
search = self.search.replace(' ','%20').replace('#','%23')
url+= "%20{}".format(search)
if self.year != None:
url+= "%20until%3A{0.year}-1-1".format(self)
if arg.pics:
url+= "%20filter%3Aimages"
if arg.fruit:
url+= "%20myspace.com%20OR%20last.fm%20OR"
url+= "%20mail%20OR%20email%20OR%20gmail%20OR%20e-mail"
url+= "%20OR%20phone%20OR%20call%20me%20OR%20text%20me"
url+= "%20OR%20keybase"
if arg.verified:
url+= "%20filter%3Averified"
return url
def get_feed(self): async def fetch(session, url):
r = requests.get(self.get_url(),headers=agent) with async_timeout.timeout(30):
self.feed = [] async with session.get(url) as response:
try: return await response.text()
if self.min == -1:
html = r.text
else:
json_response = json.loads(r.text)
html = json_response['items_html']
soup = BeautifulSoup(html,"lxml")
self.feed = soup.find_all('li','js-stream-item')
lastid = self.feed[-1]['data-item-id']
firstid = self.feed[0]['data-item-id']
if self.min == -1:
self.min = "TWEET-{}-{}".format(lastid,firstid)
else:
minsplit = json_response['min_position'].split('-')
minsplit[1] = lastid
self.min = "-".join(minsplit)
except: pass
return self.feed
def get_tweets(self): async def getFeed(Min):
for tweet in self.get_feed(): async with aiohttp.ClientSession() as session:
self.tweets += 1 r = await fetch(session, await getUrl(Min))
tweetid = tweet['data-item-id'] feed = []
datestamp = tweet.find('a','tweet-timestamp')['title'].rpartition(' - ')[-1] try:
d = datetime.datetime.strptime(datestamp, '%d %b %Y') if Min == -1:
date = d.strftime('%Y-%m-%d') html = r
timestamp = str(datetime.timedelta(seconds=int(tweet.find('span','_timestamp')['data-time']))).rpartition(', ')[-1] else:
t = datetime.datetime.strptime(timestamp,'%H:%M:%S') json_response = json.loads(r)
time = t.strftime('%H:%M:%S') html = json_response["items_html"]
username = tweet.find('span','username').text.encode('utf8').replace('@','') soup = BeautifulSoup(html, "html.parser")
timezone = strftime("%Z", gmtime()) feed = soup.find_all("li", "js-stream-item")
text = tweet.find('p','tweet-text').text.encode('utf8').replace('\n',' ') if Min == -1:
try: Min = "TWEET-{}-{}".format(feed[-1]["data-item-id"], feed[0]["data-item-id"])
mentions = tweet.find("div", "js-original-tweet")['data-mentions'].split(" ") else:
for i in range(len(mentions)): minsplit = json_response["min_position"].split("-")
text = "@{} {}".format(mentions[i], text) minsplit[1] = feed[-1]["data-item-id"]
except: pass Min = "-".join(minsplit)
if arg.pics: except:
tweet_url = "https://twitter.com/{0}/status/{1}/photo/1".format(username,tweetid) pass
self.tweet_urls.append(tweet_url)
else:
if arg.users:
print(username)
elif arg.tweets:
print(text)
else:
print("{} {} {} {} <{}> {}".format(tweetid, date, time, timezone, username, text))
def save_pic(self,picture): return feed, Min
if not os.path.exists('tweep_img'):
os.makedirs('tweep_img')
if not os.path.exists('tweep_img/{0.author}'.format(self)):
os.makedirs('tweep_img/{0.author}'.format(self))
filename = picture[len('https://pbs.twimg.com/media/'):]
save_dir = 'tweep_img/{0.author}'.format(self)
if not os.path.isfile('{}/{}'.format(save_dir,filename)):
r = requests.get(picture,headers=agent)
i = Image.open(BytesIO(r.content))
i.save(os.path.join(save_dir, filename))
print(" Downloading: {}".format(filename))
self.pic_count += 1
def get_pics(self,tweet_url): async def getTweets(Min):
r = requests.get(tweet_url,headers=agent) feed, Min = await getFeed(Min)
soup = BeautifulSoup(r.text,"lxml") for tweet in feed:
picture = soup.find('div','AdaptiveMedia-photoContainer js-adaptive-photo ') tweetid = tweet["data-item-id"]
if picture is not None: datestamp = tweet.find("a", "tweet-timestamp")["title"].rpartition(" - ")[-1]
picture = picture['data-image-url'].replace(' ','') d = datetime.datetime.strptime(datestamp, "%d %b %Y")
self.save_pic(picture) date = d.strftime("%Y-%m-%d")
timestamp = str(datetime.timedelta(seconds=int(tweet.find("span", "_timestamp")["data-time"]))).rpartition(", ")[-1]
t = datetime.datetime.strptime(timestamp, "%H:%M:%S")
time = t.strftime("%H:%M:%S")
username = tweet.find("span", "username").text.replace("@", "")
timezone = strftime("%Z", gmtime())
text = tweet.find("p", "tweet-text").text.replace("\n", " ")
try:
mentions = tweet.find("div", "js-original-tweet")["data-mentions"].split(" ")
for i in range(len(mentions)):
mention = "@{}".format(mentions[i])
if mention not in text:
text = "{} {}".format(mention, text)
except:
pass
def fetch_pics(self): if arg.users:
while True: output = username
tweet_url = q.get() elif arg.tweets:
self.get_pics(tweet_url) output = tweets
q.task_done() else:
output = "{} {} {} {} <{}> {}".format(tweetid, date, time, timezone, username, text)
def main(self): if arg.o != None:
if arg.pics: print(output, file=open(arg.o, "a"))
print("[+] Searching Tweets For Photos.")
while True if (self.tweets < float('inf')) and len(self.feed)>0 else False:
self.get_tweets()
if arg.pics:
total = len(self.tweet_urls) - 1
print("[+] {} pictures found. Collecting Pictures.".format(total))
for i in range(10):
t = threading.Thread(target=self.fetch_pics)
t.daemon = True
t.start()
for tweet_url in self.tweet_urls:
q.put(tweet_url)
q.join()
print("[+] Done. {t.pic_count} pictures saved from {t.author}.".format(t=self))
def check(): print(output)
if arg.u is not None:
if arg.users:
print("Please use --users in combination with -s.")
sys.exit(0)
if arg.verified:
print("Please use --verified in combination with -s.")
sys.exit(0)
if arg.tweets and arg.users:
print("--users and --tweets cannot be used together.")
sys.exit(0)
if __name__ == '__main__': return feed, Min
agent = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
ap = argparse.ArgumentParser(prog='tweep.py',usage='python %(prog)s [options]',description="tweep.py - An Advanced Twitter Scraping Tool") async def main():
ap.add_argument('-u',help="User's tweets you want to scrape.") feed = [-1]
ap.add_argument('-s',help='Search for tweets containing this word or phrase.') Min = -1
ap.add_argument('--year',help='Filter tweets before specified year.') while True:
ap.add_argument('--pics',help='Save pictures.',action='store_true') if len(feed) > 0:
ap.add_argument('--fruit',help='Display "low-hanging-fruit" tweets.',action='store_true') feed, Min = await getTweets(Min)
ap.add_argument('--tweets',help='Display tweets only.',action='store_true') else:
ap.add_argument('--verified',help='Display Tweets only from verified users (Use with -s).',action='store_true') break
ap.add_argument('--users',help='Display users only (Use with -s).',action='store_true')
arg = ap.parse_args() if __name__ == "__main__":
check() ap = argparse.ArgumentParser(prog="tweep.py", usage="python3 %(prog)s [options]", description="tweep.py - An Advanced Twitter Scraping Tool")
tweep().main() ap.add_argument("-u", help="User's tweets you want to scrape.")
ap.add_argument("-s", help="Search for tweets containing this word or phrase.")
ap.add_argument("-o", help="Save output to a file.")
ap.add_argument("--year", help="Filter tweets before specified year.")
ap.add_argument("--fruit", help="Display 'low-hanging-fruit' tweets.", action="store_true")
ap.add_argument("--tweets", help="Display tweets only.", action="store_true")
ap.add_argument("--verified", help="Display Tweets only from verified users (Use with -s).", action="store_true")
ap.add_argument("--users", help="Display users only (Use with -s).", action="store_true")
arg = ap.parse_args()
if arg.u is not None:
if arg.users:
print("[-] Contradicting Args: Please use --users in combination with -s.")
sys.exit(0)
if arg.verified:
print("[-] Contradicting Args: Please use --verified in combination with -s.")
if arg.tweets and arg.users:
print("[-] Contradicting Args: --users and --tweets cannot be used together.")
sys.exit(0)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment