Commit eea1e6ea authored by Francesco Poldi's avatar Francesco Poldi

Added global logger

parent 7edc6ec8
import logging
class _logger:
def __init__(self, loggerName):
self._level = logging.DEBUG
self._output_fn = 'test.log'
self.logger = logging.getLogger(loggerName)
self.logger.setLevel(self._level)
self.formatter = logging.Formatter('%(levelname)s:%(asctime)s:%(name)s:%(message)s')
self.fileHandler = logging.FileHandler(self._output_fn)
self.fileHandler.setLevel(self._level)
self.fileHandler.setFormatter(self.formatter)
self.logger.addHandler(self.fileHandler)
def critical(self, message):
self.logger.critical(message)
def info(self, message):
self.logger.info(message)
def debug(self, message):
self.logger.debug(message)
\ No newline at end of file
......@@ -3,10 +3,10 @@
Twint.py - Twitter Intelligence Tool (formerly known as Tweep).
See wiki on Github for in-depth details.
https://github.com/haccer/twint/wiki
https://github.com/twintproject/twint/wiki
Licensed under MIT License
Copyright (c) 2018 Cody Zacharias
Copyright (c) 2018 The Twint Project
'''
import sys
import os
......
import datetime
#import logging
from . import _logme
logme = _logme._logger(__name__)
class Datelock:
_until = None
......@@ -8,7 +10,7 @@ class Datelock:
_since_def_user = None
def Set(Until, Since):
#logging.info("[<] " + str(datetime.datetime.now()) + ':: datelock+Set')
logme.debug('Set')
d = Datelock()
if Until:
......
from bs4 import BeautifulSoup
from re import findall
from json import loads
#import logging
#from datetime import datetime
from . import _logme
logme = _logme._logger(__name__)
def Follow(response):
#logging.info("[<] " + str(datetime.now()) + ':: feed+Follow')
logme.debug('Follow')
soup = BeautifulSoup(response, "html.parser")
follow = soup.find_all("td", "info fifty screenname")
cursor = soup.find_all("div", "w-button-more")
try:
cursor = findall(r'cursor=(.*?)">', str(cursor))[0]
except IndexError:
pass
logme.critical('Follow:IndexError')
return follow, cursor
def Mobile(response):
#logging.info("[<] " + str(datetime.now()) + ':: feed+Mobile')
logme.debug('Mobile')
soup = BeautifulSoup(response, "html.parser")
tweets = soup.find_all("span", "metadata")
max_id = soup.find_all("div", "w-button-more")
try:
max_id = findall(r'max_id=(.*?)">', str(max_id))[0]
except Exception as e:
print(str(e) + " [x] feed.Mobile")
logme.critical('Mobile:' + str(e))
return tweets, max_id
def profile(response):
#logging.info("[<] " + str(datetime.now()) + ':: feed+profile')
logme.debug('profile')
json_response = loads(response)
html = json_response["items_html"]
soup = BeautifulSoup(html, "html.parser")
......@@ -38,7 +40,7 @@ def profile(response):
return feed, feed[-1]["data-item-id"]
def Json(response):
#logging.info("[<] " + str(datetime.now()) + ':: feed+Json')
logme.debug('Json')
json_response = loads(response)
html = json_response["items_html"]
soup = BeautifulSoup(html, "html.parser")
......
#import logging
#from datetime import datetime
from . import _logme
logme = _logme._logger(__name__)
def Tweet(config, t):
#logging.info("[<] " + str(datetime.now()) + ':: format+Tweet')
if config.Format:
logme.debug('Tweet:Format')
output = config.Format.replace("{id}", t.id_str)
output = output.replace("{date}", t.datestamp)
output = output.replace("{time}", t.timestamp)
......@@ -20,6 +21,7 @@ def Tweet(config, t):
output = output.replace("{is_retweet}", str(t.retweet))
output = output.replace("{mentions}", str(t.mentions))
else:
logme.debug('Tweet:notFormat')
output = f"{t.id_str} {t.datestamp} {t.timestamp} {t.timezone} "
if t.retweet == 1:
......@@ -38,8 +40,8 @@ def Tweet(config, t):
return output
def User(_format, u):
#logging.info("[<] " + str(datetime.now()) + ':: format+User')
if _format:
logme.debug('User:Format')
output = _format.replace("{id}", u.id)
output += output.replace("{name}", u.name)
output += output.replace("{username}", u.username)
......@@ -57,6 +59,7 @@ def User(_format, u):
output += output.replace("{verified}", str(u.is_verified))
output += output.replace("{avatar}", u.avatar)
else:
logme.debug('User:notFormat')
output = f"{u.id} | {u.name} | @{u.username} | Private: "
output += f"{u.is_private} | Verified: {u.is_verified} |"
output += f" Bio: {u.bio} | Location: {u.location} | Url: "
......
......@@ -13,12 +13,14 @@ from aiohttp_socks import SocksConnector, SocksVer
from . import url
from .output import Tweets, Users
from .user import inf
from . import _logme
#import logging
logme = _logme._logger(__name__)
httpproxy = None
def get_connector(config):
logme.debug('get_connector')
_connector = None
if config.Proxy_host is not None:
if config.Proxy_host.lower() == "tor":
......@@ -37,6 +39,7 @@ def get_connector(config):
httpproxy = "http://" + config.Proxy_host + ":" + str(config.Proxy_port)
return _connector
else:
logme.critical("get_connector:proxy-type-error")
print("Error: Proxy types allowed are: http, socks5 and socks4. No https.")
sys.exit(1)
_connector = SocksConnector(
......@@ -45,10 +48,12 @@ def get_connector(config):
port=config.Proxy_port,
rdns=True)
else:
logme.critical('get_connector:proxy-port-type-error')
print("Error: Please specify --proxy-host, --proxy-port, and --proxy-type")
sys.exit(1)
else:
if config.Proxy_port or config.Proxy_type:
logme.critical('get_connector:proxy-host-arg-error')
print("Error: Please specify --proxy-host, --proxy-port, and --proxy-type")
sys.exit(1)
......@@ -56,25 +61,31 @@ def get_connector(config):
async def RequestUrl(config, init, headers = []):
#loggin.info("[<] " + str(datetime.now()) + ':: get+requestURL')
logme.debug('RequestUrl')
_connector = get_connector(config)
if config.Profile:
if config.Profile_full:
logme.debug('RequestUrl:Profile_full')
_url = await url.MobileProfile(config.Username, init)
response = await MobileRequest(_url, connector=_connector)
else:
logme.debug('RequestUrl:notProfile_full')
_url = await url.Profile(config.Username, init)
response = await Request(_url, connector=_connector, headers=headers)
elif config.TwitterSearch:
logme.debug('RequestUrl:TwitterSearch')
_url, params = await url.Search(config, init)
response = await Request(_url, params=params, connector=_connector, headers=headers)
else:
if config.Following:
logme.debug('RequestUrl:Following')
_url = await url.Following(config.Username, init)
elif config.Followers:
logme.debug('RequestUrl:Followers')
_url = await url.Followers(config.Username, init)
else:
logme.debug('RequestUrl:Favorites')
_url = await url.Favorites(config.Username, init)
response = await MobileRequest(_url, connector=_connector)
......@@ -84,47 +95,53 @@ async def RequestUrl(config, init, headers = []):
return response
async def MobileRequest(url, **options):
#loggin.info("[<] " + str(datetime.now()) + ':: get+MobileRequest')
connector = options.get("connector")
if connector:
logme.debug('MobileRequest:Connector')
async with aiohttp.ClientSession(connector=connector) as session:
return await Response(session, url)
logme.debug('MobileRequest:notConnector')
async with aiohttp.ClientSession() as session:
return await Response(session, url)
def ForceNewTorIdentity(config):
logme.debug('ForceNewTorIdentity')
try:
tor_c = socket.create_connection(('127.0.0.1', config.Tor_control_port))
tor_c.send('AUTHENTICATE "{}"\r\nSIGNAL NEWNYM\r\n'.format(config.Tor_control_password).encode())
response = tor_c.recv(1024)
if response != b'250 OK\r\n250 OK\r\n':
sys.stderr.write('Unexpected response from Tor control port: {}\n'.format(response))
logme.critical('ForceNewTorIdentity:unexpectedResponse')
except Exception as e:
logme.debug('ForceNewTorIdentity:errorConnectingTor')
sys.stderr.write('Error connecting to Tor control port: {}\n'.format(repr(e)))
sys.stderr.write('If you want to rotate Tor ports automatically - enable Tor control port\n')
async def Request(url, connector=None, params=[], headers=[]):
#loggin.info("[<] " + str(datetime.now()) + ':: get+Request')
if connector:
logme.debug('Request:Connector')
async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
return await Response(session, url, params)
logme.debug('Request:notConnector')
async with aiohttp.ClientSession() as session:
return await Response(session, url, params)
async def Response(session, url, params=[]):
#loggin.info("[<] " + str(datetime.now()) + ':: get+Response')
logme.debug('Response')
with timeout(30):
async with session.get(url, ssl=False, params=params, proxy=httpproxy) as response:
return await response.text()
async def RandomUserAgent():
logme.debug('RandomUserAgent')
url = "https://fake-useragent.herokuapp.com/browsers/0.1.8"
r = await Request(url)
browsers = loads(r)['browsers']
return random.choice(browsers[random.choice(list(browsers))])
async def Username(_id):
#loggin.info("[<] " + str(datetime.now()) + ':: get+Username')
logme.debug('Username')
url = f"https://twitter.com/intent/user?user_id={_id}&lang=en"
r = await Request(url)
soup = BeautifulSoup(r, "html.parser")
......@@ -132,7 +149,7 @@ async def Username(_id):
return soup.find("a", "fn url alternate-context")["href"].replace("/", "")
async def Tweet(url, config, conn):
#loggin.info("[<] " + str(datetime.now()) + ':: Tweet')
logme.debug('Tweet')
try:
response = await Request(url)
soup = BeautifulSoup(response, "html.parser")
......@@ -141,27 +158,28 @@ async def Tweet(url, config, conn):
tweets = soup.find_all("div", "tweet")
await Tweets(tweets, location, config, conn, url)
except Exception as e:
logme.critical('Tweet:' + str(e))
print(str(e) + " [x] get.Tweet")
async def User(url, config, conn, user_id = False):
#loggin.info("[<] " + str(datetime.now()) + ':: get+User')
logme.debug('User')
_connector = get_connector(config)
try:
response = await Request(url, connector=_connector)
soup = BeautifulSoup(response, "html.parser")
await Users(soup, config, conn)
if user_id:
return int(inf(soup, "id"))
except Exception as e:
logme.critical('User:' + str(e))
print(str(e) + " [x] get.User")
def Limit(Limit, count):
#loggin.info("[<] " + str(datetime.now()) + ':: get+Limit')
logme.critical('Limit')
if Limit is not None and count >= int(Limit):
return True
async def Multi(feed, config, conn):
#loggin.info("[<] " + str(datetime.now()) + ':: get+Multi')
logme.debug('Multi')
count = 0
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
......@@ -170,28 +188,34 @@ async def Multi(feed, config, conn):
for tweet in feed:
count += 1
if config.Favorites or config.Profile_full:
logme.debug('Multi:Favorites-profileFull')
link = tweet.find("a")["href"]
url = f"https://twitter.com{link}&lang=en"
elif config.User_full:
logme.debug('Multi:userFull')
username = tweet.find("a")["name"]
url = f"http://twitter.com/{username}?lang=en"
else:
logme.debug('Multi:else-url')
link = tweet.find("a", "tweet-timestamp js-permalink js-nav js-tooltip")["href"]
url = f"https://twitter.com{link}?lang=en"
if config.User_full:
logme.debug('Multi:user-full-Run')
futures.append(loop.run_in_executor(executor, await User(url,
config, conn)))
else:
logme.debug('Multi:notUser-full-Run')
futures.append(loop.run_in_executor(executor, await Tweet(url,
config, conn)))
logme.debug('Multi:asyncioGather')
await asyncio.gather(*futures)
except Exception as e:
# TODO: fix error not error
# print(str(e) + " [x] get.Multi")
# will return "'NoneType' object is not callable"
# but still works
logme.critical('Multi:' + str(e))
pass
return count
from datetime import datetime
from . import format, get
from .tweet import Tweet
from .user import User
from datetime import datetime
from .storage import db, elasticsearch, write, panda
from . import _logme
#import logging
logme = _logme._logger(__name__)
follow_object = {}
tweets_object = []
......@@ -16,35 +18,41 @@ author_list.pop()
_follow_list = []
def clean_follow_list():
#logging.info("[<] " + str(datetime.now()) + ':: output+clean_follow_list')
logme.debug('clean_follow_list')
global _follow_list
_follow_list = []
def datecheck(datestamp, config):
#logging.info("[<] " + str(datetime.now()) + ':: output+datecheck')
logme.debug('datecheck')
if config.Since and config.Until:
logme.debug('datecheck:dateRangeTrue')
d = int(datestamp.replace("-", ""))
s = int(config.Since.replace("-", ""))
if d < s:
return False
logme.debug('datecheck:dateRangeFalse')
return True
def is_tweet(tw):
#logging.info("[<] " + str(datetime.now()) + ':: output+is_tweet')
try:
tw["data-item-id"]
logme.debug('is_tweet:True')
return True
except:
logme.critical('is_tweet:False')
return False
def _output(obj, output, config, **extra):
#logging.info("[<] " + str(datetime.now()) + ':: output+_output')
logme.debug('_output')
if config.Lowercase:
if isinstance(obj, str):
logme.debug('_output:Lowercase:username')
obj = obj.lower()
elif obj.__class__.__name__ == "user":
logme.debug('_output:Lowercase:user')
pass
elif obj.__class__.__name__ == "tweet":
logme.debug('_output:Lowercase:tweet')
obj.username = obj.username.lower()
author_list.update({obj.username})
for i in range(len(obj.mentions)):
......@@ -52,42 +60,52 @@ def _output(obj, output, config, **extra):
for i in range(len(obj.hashtags)):
obj.hashtags[i] = obj.hashtags[i].lower()
else:
logme.info('_output:Lowercase:hiddenTweetFound')
print("[x] Hidden tweet found, account suspended due to violation of TOS")
return
if config.Output != None:
if config.Store_csv:
try :
try:
write.Csv(obj, config)
logme.debug('_output:CSV')
except Exception as e:
logme.critical('_output:CSV:Error:' + str(e))
print(str(e) + " [x] output._output")
elif config.Store_json:
write.Json(obj, config)
logme.debug('_output:JSON')
else:
write.Text(output, config.Output)
logme.debug('_output:Text')
if config.Pandas and obj.type == "user":
logme.debug('_output:Pandas+user')
panda.update(obj, config)
if extra.get("follow_list"):
logme.debug('_output:follow_list')
follow_object.username = config.Username
follow_object.action = config.Following*"following" + config.Followers*"followers"
follow_object.users = _follow_list
panda.update(follow_object, config.Essid)
if config.Elasticsearch:
logme.debug('_output:Elasticsearch')
print("", end=".", flush=True)
else:
if not config.Hide_output:
try:
print(output)
except UnicodeEncodeError:
logme.critical('_output:UnicodeEncodeError')
print("unicode error [x] output._output")
async def checkData(tweet, location, config, conn):
usernames = []
logme.debug('checkData')
copyright = tweet.find("div", "StreamItemContent--withheld")
if copyright is None and is_tweet(tweet):
tweet = Tweet(tweet, location, config)
if not tweet.datestamp:
logme.critical('checkData:hiddenTweetFound')
print("[x] Hidden tweet found, account suspended due to violation of TOS")
return
......@@ -95,44 +113,56 @@ async def checkData(tweet, location, config, conn):
output = format.Tweet(config, tweet)
if config.Database:
logme.debug('checkData:Database')
db.tweets(conn, tweet, config)
if config.Pandas:
logme.debug('checkData:Pandas')
panda.update(tweet, config)
if config.Store_object:
logme.debug('checkData:Store_object')
tweets_object.append(tweet)
if config.Elasticsearch:
logme.debug('checkData:Elasticsearch')
elasticsearch.Tweet(tweet, config)
_output(tweet, output, config)
logme.critical('checkData:copyrightedTweet')
async def Tweets(tweets, location, config, conn, url=''):
logme.debug('Tweets')
if (config.Profile_full or config.Location) and config.Get_replies:
logme.debug('Tweets:full+loc+replies')
for tw in tweets:
await checkData(tw, location, config, conn)
elif config.Favorites or config.Profile_full or config.Location:
logme.debug('Tweets:fav+full+loc')
for tw in tweets:
if tw['data-item-id'] == url.split('?')[0].split('/')[-1]:
await checkData(tw, location, config, conn)
elif config.TwitterSearch:
logme.debug('Tweets:TwitterSearch')
await checkData(tweets, location, config, conn)
else:
logme.debug('Tweets:else')
if int(tweets["data-user-id"]) == config.User_id:
await checkData(tweets, location, config, conn)
async def Users(u, config, conn):
#logging.info("[<] " + str(datetime.now()) + ':: output+Users')
logme.debug('User')
global user_object
user = User(u)
output = format.User(config.Format, user)
if config.Database:
logme.debug('User:Database')
db.user(conn, config, user)
if config.Elasticsearch:
logme.debug('User:Elasticsearch')
_save_date = user.join_date
_save_time = user.join_time
user.join_date = str(datetime.strptime(user.join_date, "%d %b %Y")).split()[0]
......@@ -142,27 +172,32 @@ async def Users(u, config, conn):
user.join_time = _save_time
if config.Store_object:
logme.debug('User:Store_object')
user_object.append(user) # twint.user.user
_output(user, output, config)
async def Username(username, config, conn):
#logging.info("[<] " + str(datetime.now()) + ':: output+Username')
logme.debug('Username')
global follow_object
follow_var = config.Following*"following" + config.Followers*"followers"
if config.Database:
logme.debug('Username:Database')
db.follow(conn, config.Username, config.Followers, username)
if config.Elasticsearch:
logme.debug('Username:Elasticsearch')
elasticsearch.Follow(username, config)
if config.Store_object or config.Pandas:
logme.debug('Username:object+pandas')
try:
_ = follow_object[config.Username][follow_var]
except KeyError:
follow_object.update({config.Username: {follow_var: []}})
follow_object[config.Username][follow_var].append(username)
if config.Pandas_au:
logme.debug('Username:object+pandas+au')
panda.update(follow_object[config.Username], config)
_output(username, username, config, follow_list=_follow_list)
from . import datelock, feed, get, output, verbose, storage
import sys
from asyncio import get_event_loop, TimeoutError
from datetime import timedelta, datetime
from . import datelock, feed, get, output, verbose, storage
from .storage import db
import sys
from . import _logme
#import logging
logme = _logme._logger(__name__)
class Twint:
def __init__(self, config):
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+__init__')
logme.debug('Twint:__init__')
if config.Resume is not None and config.TwitterSearch:
logme.debug('Twint:__init__:Resume')
self.init = f"TWEET-{config.Resume}-0"
else:
self.init = -1
......@@ -23,19 +26,23 @@ class Twint:
verbose.Elastic(config.Elasticsearch)
if self.config.Store_object:
logme.debug('Twint:__init__:clean_follow_list')
output.clean_follow_list()
if self.config.Pandas_clean:
logme.debug('Twint:__init__:pandas_clean')
storage.panda.clean()
if not self.config.Timedelta:
if (self.d._until - self.d._since).days > 30:
self.config.Timedelta = 30
logme.debug('Twint:__init__:timedelta_fixed')
else:
logme.debug('Twint:__init__:timedelta_unfixed')
self.config.Timedelta = (self.d._until - self.d._since).days
async def Feed(self):
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+Feed')
logme.debug('Twint:Feed')
consecutive_errors_count = 0
while True:
response = await get.RequestUrl(self.config, self.init, headers=[("User-Agent", self.user_agent)])
......@@ -60,6 +67,7 @@ class Twint:
if self.config.Proxy_host.lower() == "tor":
print("[?] Timed out, changing Tor identity...")
if self.config.Tor_control_password is None:
logme.critical('Twint:Feed:tor-password')
sys.stderr.write("Error: config.Tor_control_password must be set for proxy autorotation!\r\n")
sys.stderr.write("Info: What is it? See https://stem.torproject.org/faq.html#can-i-interact-with-tors-controller-interface-directly\r\n")
break
......@@ -67,67 +75,73 @@ class Twint:
get.ForceNewTorIdentity(self.config)
continue
else:
logme.critical('Twint:Feed:' + str(e))
print(str(e))
break
except Exception as e:
logme.critical('Twint:Feed:noData' + str(e))
# Sometimes Twitter says there is no data. But it's a lie.
consecutive_errors_count += 1
if consecutive_errors_count < self.config.Retries_count:
# Change disguise
self.user_agent = await get.RandomUserAgent()
continue
logme.critical('Twint:Feed:Tweets_known_error:' + str(e))
print(str(e) + " [x] run.Feed")
print("[!] if get this error but you know for sure that more tweets exist, please open an issue and we will investigate it!")
break
async def follow(self):
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+follow')
await self.Feed()
if self.config.User_full:
logme.debug('Twint:follow:userFull')
self.count += await get.Multi(self.feed, self.config, self.conn)
else:
logme.debug('Twint:follow:notUserFull')
for user in self.feed:
self.count += 1
username = user.find("a")["name"]
await output.Username(username, self.config, self.conn)
async def favorite(self):
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+favorite')
logme.debug('Twint:favorite')
await self.Feed()
self.count += await get.Multi(self.feed, self.config, self.conn)
async def profile(self):
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+profile')
await self.Feed()
if self.config.Profile_full:
logme.debug('Twint:profileFull')
self.count += await get.Multi(self.feed, self.config, self.conn)
else:
logme.debug('Twint:notProfileFull')
for tweet in self.feed:
self.count += 1
await output.Tweets(tweet, "", self.config, self.conn)
async def tweets(self):
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+tweets')
await self.Feed()
if self.config.Location:
logme.debug('Twint:tweets:location')
self.count += await get.Multi(self.feed, self.config, self.conn)
else:
logme.debug('Twint:tweets:notLocation')
for tweet in self.feed:
self.count += 1
await output.Tweets(tweet, "", self.config, self.conn)
async def main(self):
self.user_agent = await get.RandomUserAgent()
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main')
if self.config.User_id is not None:
logme.debug('Twint:main:user_id')
self.config.Username = await get.Username(self.config.User_id)
#if self.config.Username is not None:
# url = f"http://twitter.com/{self.config.Username}?lang=en"
# self.config.User_id = await get.User(url, self.config, self.conn, True)
# TODO: keep this or not!?
if self.config.Username is not None:
logme.debug('Twint:main:username')
url = f"http://twitter.com/{self.config.Username}?lang=en"
self.config.User_id = await get.User(url, self.config, self.conn, True)
if self.config.TwitterSearch and self.config.Since and self.config.Until:
logme.debug('Twint:main:search+since+until')
_days = timedelta(days=int(self.config.Timedelta))
while self.d._since < self.d._until:
self.config.Since = str(self.d._until - _days)
......@@ -135,45 +149,52 @@ class Twint:
if len(self.feed) > 0:
await self.tweets()
else:
logme.debug('Twint:main:gettingNewTweets')
self.d._until = self.d._until - _days
self.feed = [-1]
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main+CallingGetLimit1')
if get.Limit(self.config.Limit, self.count):
self.d._until = self.d._until - _days
self.feed = [-1]
else:
logme.debug('Twint:main:not-search+since+until')
while True:
if len(self.feed) > 0:
if self.config.Followers or self.config.Following:
logme.debug('Twint:main:follow')
await self.follow()
elif self.config.Favorites:
logme.debug('Twint:main:favorites')
await self.favorite()
elif self.config.Profile:
logme.debug('Twint:main:profile')
await self.profile()
elif self.config.TwitterSearch:
logme.debug('Twint:main:twitter-search')
await self.tweets()
else:
logme.debug('Twint:main:no-more-tweets')
break
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main+CallingGetLimit2')
if get.Limit(self.config.Limit, self.count):
logme.debug('Twint:main:reachedLimit')
break
if self.config.Count:
verbose.Count(self.count, self.config)
def run(config):
#logging.info("[<] " + str(datetime.now()) + ':: run+run')
logme.debug('run')
get_event_loop().run_until_complete(Twint(config).main())
def Favorites(config):
#logging.info("[<] " + str(datetime.now()) + ':: run+Favorites')
logme.debug('Favorites')
config.Favorites = True
run(config)
def Followers(config):
#logging.info("[<] " + str(datetime.now()) + ':: run+Followers')
logme.debug('Followers')
output.clean_follow_list()
config.Followers = True
config.Following = False
......@@ -186,7 +207,7 @@ def Followers(config):
storage.panda.clean()
def Following(config):
#logging.info("[<] " + str(datetime.now()) + ':: run+Following')
logme.debug('Following')
output.clean_follow_list()
config.Following = True
config.Followers = False
......@@ -199,17 +220,17 @@ def Following(config):
storage.panda.clean()
def Lookup(config):
#logging.info("[<] " + str(datetime.now()) + ':: run+Lookup')
logme.debug('Lookup')
url = f"http://twitter.com/{config.Username}?lang=en"
get_event_loop().run_until_complete(get.User(url, config, db.Conn(config.Database)))
def Profile(config):
logme.debug('Profile')
config.Profile = True
#logging.info("[<] " + str(datetime.now()) + ':: run+Profile')
run(config)
def Search(config):
#logging.info("[<] " + str(datetime.now()) + ':: run+Search')
logme.debug('Search')
config.TwitterSearch = True
config.Following = False
config.Followers = False
......
from time import strftime, localtime
import json
#from datetime import datetime
#import logging
from . import _logme
logme = _logme._logger(__name__)
class tweet:
"""Define Tweet class
......@@ -12,9 +14,9 @@ class tweet:
pass
def getMentions(tw):
#logging.info("[<] " + str(datetime.now()) + ':: tweet+getMentions')
"""Extract ment from tweet
"""
logme.debug('getMentions')
try:
mentions = tw["data-mentions"].split(" ")
except:
......@@ -23,9 +25,9 @@ def getMentions(tw):
return mentions
def getQuoteURL(tw):
#logging.info("[<] " + str(datetime.now()) + ':: tweet+getQuoteInfo')
"""Extract quote from tweet
"""
logme.debug('getQuoteURL')
base_twitter = "https://twitter.com"
quote_url = ""
try:
......@@ -37,9 +39,9 @@ def getQuoteURL(tw):
return quote_url
def getText(tw):
#logging.info("[<] " + str(datetime.now()) + ':: tweet+getText')
"""Replace some text
"""
logme.debug('getText')
text = tw.find("p", "tweet-text").text
text = text.replace("\n", " ")
text = text.replace("http", " http")
......@@ -50,19 +52,21 @@ def getText(tw):
def getStat(tw, _type):
"""Get stats about Tweet
"""
#logging.info("[<] " + str(datetime.now()) + ':: tweet+getStat')
logme.debug('getStat')
st = f"ProfileTweet-action--{_type} u-hiddenVisually"
return tw.find("span", st).find("span")["data-tweet-stat-count"]
def getRetweet(profile, username, user):
#logging.info("[<] " + str(datetime.now()) + ':: tweet+getRetweet')
"""Get Retweet
"""
logme.debug('getRetweet')
if profile and username.lower() != user.lower():
return 1
def Tweet(tw, location, config):
"""Create Tweet object
"""
##logging.info("[<] " + str(datetime.now()) + ':: tweet+Tweet')
logme.debug('Tweet')
t = tweet()
t.id = int(tw["data-item-id"])
t.id_str = tw["data-item-id"]
......
#from datetime import datetime
#import logging
from . import _logme
logme = _logme._logger(__name__)
mobile = "https://mobile.twitter.com"
base = "https://twitter.com/i"
async def Favorites(username, init):
#logging.info("[<] " + str(datetime.now()) + ':: url+Favorites')
logme.debug('Favorites')
url = f"{mobile}/{username}/favorites?lang=en"
if init != -1:
......@@ -14,7 +14,7 @@ async def Favorites(username, init):
return url
async def Followers(username, init):
#logging.info("[<] " + str(datetime.now()) + ':: url+Followers')
logme.debug('Followers')
url = f"{mobile}/{username}/followers?lang=en"
if init != -1:
......@@ -23,7 +23,7 @@ async def Followers(username, init):
return url
async def Following(username, init):
#logging.info("[<] " + str(datetime.now()) + ':: url+Following')
logme.debug('Following')
url = f"{mobile}/{username}/following?lang=en"
if init != -1:
......@@ -32,7 +32,7 @@ async def Following(username, init):
return url
async def MobileProfile(username, init):
#logging.info("[<] " + str(datetime.now()) + ':: url+MobileProfile')
logme.debug('MobileProfile')
url = f"{mobile}/{username}?lang=en"
if init != -1:
......@@ -41,7 +41,7 @@ async def MobileProfile(username, init):
return url
async def Profile(username, init):
#logging.info("[<] " + str(datetime.now()) + ':: url+Profile')
logme.debug('Profile')
url = f"{base}/profiles/show/{username}/timeline/tweets?include_"
url += "available_features=1&lang=en&include_entities=1"
url += "&include_new_items_bar=true"
......@@ -52,7 +52,7 @@ async def Profile(username, init):
return url
async def Search(config, init):
#logging.info("[<] " + str(datetime.now()) + ':: url+Search')
logme.debug('Search')
url = f"{base}/search/timeline"
params = [
('f', 'tweets'),
......
from . import _logme
logme = _logme._logger(__name__)
class user:
type = "user"
......@@ -5,6 +9,7 @@ class user:
pass
def inf(ur, _type):
logme.debug('inf')
try:
group = ur.find("div", "user-actions btn-group not-following")
if group == None :
......@@ -28,6 +33,7 @@ def inf(ur, _type):
return ret
def card(ur, _type):
logme.debug('card')
if _type == "bio":
try:
ret = ur.find("p", "ProfileHeaderCard-bio u-dir").text.replace("\n", " ")
......@@ -48,33 +54,36 @@ def card(ur, _type):
return ret
def join(ur):
logme.debug('join')
jd = ur.find("span", "ProfileHeaderCard-joinDateText js-tooltip u-dir")["title"]
return jd.split(" - ")
def convertToInt(x):
logme.debug('contertToInt')
multDict = {
"k" : 1000,
"m" : 1000000,
"b" : 1000000000,
}
try :
try:
if ',' in x:
x = x.replace(',', '')
y = int(x)
return y
except :
except:
pass
try :
try:
y = float(str(x)[:-1])
y = y * multDict[str(x)[-1:].lower()]
return int(y)
except :
except:
pass
return 0
def stat(ur, _type):
logme.debug('stat')
_class = f"ProfileNav-item ProfileNav-item--{_type}"
stat = ur.find("li", _class)
try :
......@@ -84,6 +93,7 @@ def stat(ur, _type):
return r
def media(ur):
logme.debug('media')
try:
media_count = ur.find("a", "PhotoRail-headingWithCount js-nav").text.strip().split(" ")[0]
media_count = convertToInt(media_count)
......@@ -93,6 +103,7 @@ def media(ur):
return media_count
def verified(ur):
logme.debug('verified')
try:
is_verified = ur.find("span", "ProfileHeaderCard-badges").text
if "Verified account" in is_verified:
......@@ -105,6 +116,7 @@ def verified(ur):
return is_verified
def User(ur):
logme.debug('User')
u = user()
for img in ur.findAll("img", "Emoji Emoji--forText"):
img.replaceWith(img["alt"])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment