Commit 7bcf1c58 authored by Francesco Poldi's avatar Francesco Poldi

Extended options Elasticsearch

parent 9e48de7e
...@@ -112,7 +112,10 @@ def initialize(args): ...@@ -112,7 +112,10 @@ def initialize(args):
c.Profile_full = args.profile_full c.Profile_full = args.profile_full
c.Store_pandas = args.store_pandas c.Store_pandas = args.store_pandas
c.Pandas_type = args.pandas_type c.Pandas_type = args.pandas_type
c.search_name = args.search_name c.Search_name = args.search_name
c.Index_tweets = args.index_tweets
c.Index_follow = args.index_follow
c.Index_users = args.index_users
return c return c
def options(): def options():
...@@ -132,7 +135,7 @@ def options(): ...@@ -132,7 +135,7 @@ def options():
ap.add_argument("--since", help="Filter Tweets sent since date (Example: 2017-12-27).") ap.add_argument("--since", help="Filter Tweets sent since date (Example: 2017-12-27).")
ap.add_argument("--until", help="Filter Tweets sent until date (Example: 2017-12-27).") ap.add_argument("--until", help="Filter Tweets sent until date (Example: 2017-12-27).")
ap.add_argument("--fruit", help="Display 'low-hanging-fruit' Tweets.", action="store_true") ap.add_argument("--fruit", help="Display 'low-hanging-fruit' Tweets.", action="store_true")
ap.add_argument("--verified", help="Display Tweets only from verified users (Use with -s).", ap.add_argument("--verified", help="Display Tweets only from verified users (Use with -s).",
action="store_true") action="store_true")
ap.add_argument("--csv", help="Write as .csv file.", action="store_true") ap.add_argument("--csv", help="Write as .csv file.", action="store_true")
ap.add_argument("--json", help="Write as .json file", action="store_true") ap.add_argument("--json", help="Write as .json file", action="store_true")
...@@ -160,14 +163,17 @@ def options(): ...@@ -160,14 +163,17 @@ def options():
ap.add_argument("--format", help="Custom output format (See wiki for details).") ap.add_argument("--format", help="Custom output format (See wiki for details).")
ap.add_argument("--user-full", help="Collect all user information (Use with followers or following only).", ap.add_argument("--user-full", help="Collect all user information (Use with followers or following only).",
action="store_true") action="store_true")
ap.add_argument("--profile-full", ap.add_argument("--profile-full",
help="Slow, but effective method of collecting a user's Tweets (Including Retweets).", help="Slow, but effective method of collecting a user's Tweets (Including Retweets).",
action="store_true") action="store_true")
ap.add_argument("--store-pandas", help="Save Tweets in a DataFrame (Pandas) file.") ap.add_argument("--store-pandas", help="Save Tweets in a DataFrame (Pandas) file.")
ap.add_argument("--pandas-type", help="Specify HDF5 or Pickle (HDF5 as default)") ap.add_argument("--pandas-type", help="Specify HDF5 or Pickle (HDF5 as default)")
ap.add_argument("--search_name", help="name for identify the search like -3dprinter stuff- only for mysql") ap.add_argument("--search_name", help="Name for identify the search like -3dprinter stuff- only for mysql")
ap.add_argument("-it", "--index-tweets", help="Custom Elasticsearch Index name for Tweets.")
ap.add_argument("-if", "--index-follow", help="Custom Elasticsearch Index name for Follows.")
ap.add_argument("-iu", "--index-users", help="Custom Elasticsearch Index name for Users.")
args = ap.parse_args() args = ap.parse_args()
return args return args
def main(): def main():
...@@ -176,10 +182,19 @@ def main(): ...@@ -176,10 +182,19 @@ def main():
if args.userlist: if args.userlist:
args.username = loadUserList(args.userlist, "search") args.username = loadUserList(args.userlist, "search")
if not args.pandas_type: if not args.pandas_type:
args.pandas_type = "HDF5" args.pandas_type = "HDF5"
if not args.index_tweets:
args.index_tweets = "twint"
if not args.index_follow:
args.index_follow = "twintGraph"
if not args.index_users:
args.index_users = "twintUser"
c = initialize(args) c = initialize(args)
if args.favorites: if args.favorites:
......
...@@ -20,7 +20,7 @@ class Config: ...@@ -20,7 +20,7 @@ class Config:
Show_hashtags = False Show_hashtags = False
Limit = None Limit = None
Count = None Count = None
Stats = False Stats = False
hostname = None #mysql hostname = None #mysql
Database = None Database = None
DB_user = None #mysql DB_user = None #mysql
...@@ -41,4 +41,7 @@ class Config: ...@@ -41,4 +41,7 @@ class Config:
Store_pandas = False Store_pandas = False
Pandas_type = None Pandas_type = None
Pandas = False Pandas = False
search_name = "-" #for identify a records in mysql with the search it provides from. it cannot be null for DB requirements. a tweet must be in several search so the PK are tweet ID and search_name Search_name = "-" #for identify a records in mysql with the search it provides from. it cannot be null for DB requirements. a tweet must be in several search so the PK are tweet ID and search_name
Index_tweets = None
Index_follow = None
Index_users = None
...@@ -30,7 +30,7 @@ def weekday(day): ...@@ -30,7 +30,7 @@ def weekday(day):
def hour(datetime): def hour(datetime):
return strftime("%H", localtime(datetime)) return strftime("%H", localtime(datetime))
def Tweet(Tweet, es, session): def Tweet(Tweet, config):
day = weekday(strftime("%A", localtime(Tweet.datetime))) day = weekday(strftime("%A", localtime(Tweet.datetime)))
actions = [] actions = []
...@@ -41,9 +41,9 @@ def Tweet(Tweet, es, session): ...@@ -41,9 +41,9 @@ def Tweet(Tweet, es, session):
dt = "{} {}".format(Tweet.datestamp, Tweet.timestamp) dt = "{} {}".format(Tweet.datestamp, Tweet.timestamp)
j_data = { j_data = {
"_index": "twint", "_index": config.Index_tweets,
"_type": "items", "_type": "items",
"_id": Tweet.id + "_raw_" + str(session), "_id": Tweet.id + "_raw_" + config.Essid,
"_source": { "_source": {
"id": Tweet.id, "id": Tweet.id,
"date": dt, "date": dt,
...@@ -58,16 +58,16 @@ def Tweet(Tweet, es, session): ...@@ -58,16 +58,16 @@ def Tweet(Tweet, es, session):
"link": Tweet.link, "link": Tweet.link,
"retweet": Tweet.retweet, "retweet": Tweet.retweet,
"user_rt": Tweet.user_rt, "user_rt": Tweet.user_rt,
"essid": str(session) "essid": config.Essid
} }
} }
actions.append(j_data) actions.append(j_data)
for l in range(int(Tweet.likes)): for l in range(int(Tweet.likes)):
j_data = { j_data = {
"_index": "twint", "_index": config.Index_tweets,
"_type": "items", "_type": "items",
"_id": Tweet.id + "_likes_" + str(nLikes) + "_" + str(session), "_id": Tweet.id + "_likes_" + str(nLikes) + "_" + config.Essid,
"_source": { "_source": {
"id": Tweet.id, "id": Tweet.id,
"date": dt, "date": dt,
...@@ -83,7 +83,7 @@ def Tweet(Tweet, es, session): ...@@ -83,7 +83,7 @@ def Tweet(Tweet, es, session):
"link": Tweet.link, "link": Tweet.link,
"retweet": Tweet.retweet, "retweet": Tweet.retweet,
"user_rt": Tweet.user_rt, "user_rt": Tweet.user_rt,
"essid": str(session) "essid": config.Essid
} }
} }
actions.append(j_data) actions.append(j_data)
...@@ -91,9 +91,9 @@ def Tweet(Tweet, es, session): ...@@ -91,9 +91,9 @@ def Tweet(Tweet, es, session):
for rep in range(int(Tweet.replies)): for rep in range(int(Tweet.replies)):
j_data = { j_data = {
"_index": "twint", "_index": config.Index_tweets,
"_type": "items", "_type": "items",
"_id": Tweet.id + "_replies_" + str(nReplies) + "_" + str(session), "_id": Tweet.id + "_replies_" + str(nReplies) + "_" + config.Essid,
"_source": { "_source": {
"id": Tweet.id, "id": Tweet.id,
"date": dt, "date": dt,
...@@ -109,7 +109,7 @@ def Tweet(Tweet, es, session): ...@@ -109,7 +109,7 @@ def Tweet(Tweet, es, session):
"link": Tweet.link, "link": Tweet.link,
"retweet": Tweet.retweet, "retweet": Tweet.retweet,
"user_rt": Tweet.user_rt, "user_rt": Tweet.user_rt,
"essid": str(session) "essid": config.Essid
} }
} }
actions.append(j_data) actions.append(j_data)
...@@ -117,9 +117,9 @@ def Tweet(Tweet, es, session): ...@@ -117,9 +117,9 @@ def Tweet(Tweet, es, session):
for ret in range(int(Tweet.retweets)): for ret in range(int(Tweet.retweets)):
j_data = { j_data = {
"_index": "twint", "_index": config.Index_tweets,
"_type": "items", "_type": "items",
"_id": Tweet.id + "_retweets_" + str(nRetweets) + "_" + str(session), "_id": Tweet.id + "_retweets_" + str(nRetweets) + "_" + config.Essid,
"_source": { "_source": {
"id": Tweet.id, "id": Tweet.id,
"date": dt, "date": dt,
...@@ -135,44 +135,44 @@ def Tweet(Tweet, es, session): ...@@ -135,44 +135,44 @@ def Tweet(Tweet, es, session):
"link": Tweet.link, "link": Tweet.link,
"retweet": Tweet.retweet, "retweet": Tweet.retweet,
"user_rt": Tweet.user_rt, "user_rt": Tweet.user_rt,
"essid": str(session) "essid": config.Essid
} }
} }
actions.append(j_data) actions.append(j_data)
nRetweets += 1 nRetweets += 1
es = Elasticsearch(es) es = Elasticsearch(config.Elasticsearch)
with nostdout(): with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200) helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = [] actions = []
def Follow(es, user, follow, session): def Follow(user, config):
actions = [] actions = []
j_data = { j_data = {
"_index": "twintGraph", "_index": config.Index_follow,
"_type": "items", "_type": "items",
"_id": user + "_" + follow + "_" + str(session), "_id": user + "_" + config.Username + "_" + config.Essid,
"_source": { "_source": {
"user": user, "user": user,
"follow": follow, "follow": config.Username,
"essid": str(session) "essid": config.Essid
} }
} }
actions.append(j_data) actions.append(j_data)
es = Elasticsearch(es) es = Elasticsearch(config.Elasticsearch)
with nostdout(): with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200) helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = [] actions = []
def UserProfile(es, user, follow, session): def UserProfile(user, config):
actions = [] actions = []
j_data = { j_data = {
"_index": "twintUser", "_index": config.Index_users,
"_type": "items", "_type": "items",
"_id": user.id + "_" + user.join_date + "_" + user.join_time + "_" + str(session), "_id": user.id + "_" + user.join_date + "_" + user.join_time + "_" + config.Essid,
"_source": { "_source": {
"id": user.id, "id": user.id,
"name": user.name, "name": user.name,
...@@ -191,12 +191,12 @@ def UserProfile(es, user, follow, session): ...@@ -191,12 +191,12 @@ def UserProfile(es, user, follow, session):
"private": user.is_private, "private": user.is_private,
"verified": user.is_verified, "verified": user.is_verified,
"avatar": user.avatar, "avatar": user.avatar,
"session": str(session) "session": config.Essid
} }
} }
actions.append(j_data) actions.append(j_data)
es = Elasticsearch(es) es = Elasticsearch(config.Elasticsearch)
with nostdout(): with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200) helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = [] actions = []
...@@ -50,13 +50,13 @@ async def Tweets(tw, location, config, conn): ...@@ -50,13 +50,13 @@ async def Tweets(tw, location, config, conn):
tweet = Tweet(tw, location, config) tweet = Tweet(tw, location, config)
if datecheck(tweet.datestamp, config): if datecheck(tweet.datestamp, config):
output = format.Tweet(config, tweet) output = format.Tweet(config, tweet)
if config.hostname: if config.hostname:
dbmysql.tweets(conn, tweet, config) dbmysql.tweets(conn, tweet, config)
elif config.Database: elif config.Database:
db.tweets(conn, tweet, config) db.tweets(conn, tweet, config)
if config.Elasticsearch: if config.Elasticsearch:
elasticsearch.Tweet(tweet, config.Elasticsearch, config.Essid) elasticsearch.Tweet(tweet, config)
_output(tweet, output, config) _output(tweet, output, config)
async def Users(u, config, conn): async def Users(u, config, conn):
...@@ -73,11 +73,10 @@ async def Users(u, config, conn): ...@@ -73,11 +73,10 @@ async def Users(u, config, conn):
_save_time = user.join_time _save_time = user.join_time
user.join_date = str(datetime.strptime(user.join_date, "%d %b %Y")).split()[0] user.join_date = str(datetime.strptime(user.join_date, "%d %b %Y")).split()[0]
user.join_time = str(datetime.strptime(user.join_time, "%I:%M %p")).split()[1] user.join_time = str(datetime.strptime(user.join_time, "%I:%M %p")).split()[1]
elasticsearch.UserProfile(config.Elasticsearch, user, elasticsearch.UserProfile(user, config)
config.Username, config.Essid)
user.join_date = _save_date user.join_date = _save_date
user.join_time = _save_time user.join_time = _save_time
_output(user, output, config) _output(user, output, config)
async def Username(username, config, conn): async def Username(username, config, conn):
...@@ -87,7 +86,6 @@ async def Username(username, config, conn): ...@@ -87,7 +86,6 @@ async def Username(username, config, conn):
db.follow(conn, config.Username, config.Followers, username) db.follow(conn, config.Username, config.Followers, username)
if config.Elasticsearch: if config.Elasticsearch:
elasticsearch.Follow(config.Elasticsearch, username, elasticsearch.Follow(username, config)
config.Username, config.Essid)
_output(username, username, config) _output(username, username, config)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment