Commit 2df62ff8 authored by Francesco Poldi's avatar Francesco Poldi

Update Elasticsearch

Index creation
parent 0586e21a
......@@ -4,10 +4,136 @@ from time import strftime, localtime
import contextlib
import sys
_index_tweet_status = False
_index_follow_status = False
_index_user_status = False
class RecycleObject(object):
def write(self, junk): pass
def flush(self): pass
def handleIndexResponse(response):
try:
if response["status"] == 400:
return True
except KeyError:
pass
if response["acknowledged"]:
print("[+] Index \"" + response["index"] + "\" created!")
else:
print("[x] error index creation :: storage.elasticsearch.handleIndexCreation")
if response["shards_acknowledged"]:
print("[+] Shards acknowledged, everything is ready to be used!")
return True
else:
print("[x] error with shards :: storage.elasticsearch.HandleIndexCreation")
return False
def createIndex(config, instance, **scope):
if scope.get("scope") == "tweet":
tweets_body = {
"mappings": {
"items": {
"properties": {
"id": {"type": "long"},
"conversation_id": {"type": "long"},
"created_at": {"type": "long"},
"date": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"timezone": {"type": "keyword"},
"place": {"type": "keyword"},
"location": {"type": "keyword"},
"tweet": {"type": "text"},
"hashtags": {"type": "keyword"},
"user_id": {"type": "long"},
"user_id_str": {"type": "keyword"},
"username": {"type": "keyword"},
"name": {"type": "text"},
"profile_image_url": {"type": "text"},
"day": {"type": "integer"},
"hour": {"type": "integer"},
"link": {"type": "text"},
"gif_url": {"type": "text"},
"gif_thumb": {"type": "text"},
"video_url": {"type": "text"},
"video_thumb": {"type": "text"},
"is_reply_to": {"type": "long"},
"has_parent_tweet": {"type": "long"},
"retweet": {"type": "text"},
"essid": {"type": "keyword"},
"nlikes": {"type": "integer"},
"nreplies": {"type": "integer"},
"nretweets": {"type": "integer"},
"is_quote_status": {"type": "long"},
"quote_id": {"type": "long"},
"quote_id_str": {"type": "text"},
"quote_url": {"type": "text"},
"search": {"type": "text"},
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_tweets, body=tweets_body, ignore=400)
return handleIndexResponse(resp)
elif scope.get("scope") == "follow":
follow_body = {
"mappings": {
"items": {
"properties": {
"user": {"type": "keyword"},
"follow": {"type": "keyword"},
"essid": {"type": "keyword"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_follow, body=follow_body, ignore=400)
return handleIndexResponse(resp)
elif scope.get("scope") == "user":
user_body = {
"mappings": {
"items": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "keyword"},
"username": {"type": "keyword"},
"bio": {"type": "text"},
"location": {"type": "keyword"},
"url": {"type": "text"},
"join_datetime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"join_date": {"type": "date", "format": "yyyy-MM-dd"},
"join_time": {"type": "date", "format": "HH:mm:ss"},
"tweets": {"type": "integer"},
"following": {"type": "integer"},
"followers": {"type": "integer"},
"likes": {"type": "integer"},
"media": {"type": "integer"},
"private": {"type": "integer"},
"verified": {"type": "integer"},
"avatar": {"type": "text"},
"background_image": {"type": "text"},
"session": {"type": "keyword"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_users, body=user_body, ignore=400)
return handleIndexResponse(resp)
else:
print("[x] error index pre-creation :: storage.elasticsearch.createIndex")
return False
@contextlib.contextmanager
def nostdout():
savestdout = sys.stdout
......@@ -32,6 +158,7 @@ def hour(datetime):
return strftime("%H", localtime(datetime))
def Tweet(Tweet, config):
global _index_tweet_status
weekdays = {
"Monday": 1,
"Tuesday": 2,
......@@ -44,9 +171,6 @@ def Tweet(Tweet, config):
day = weekdays[strftime("%A", localtime(Tweet.datetime))]
actions = []
nLikes = 1
nReplies = 1
nRetweets = 1
dt = f"{Tweet.datestamp} {Tweet.timestamp}"
......@@ -92,150 +216,15 @@ def Tweet(Tweet, config):
}
actions.append(j_data)
if config.ES_count["likes"]:
for l in range(int(Tweet.likes)):
j_data = {
"_index": config.Index_tweets,
"_type": config.Index_type,
"_id": str(Tweet.id) + "_like_" + str(nLikes) + config.Essid,
"_source": {
"id": str(Tweet.id),
"conversation_id": Tweet.conversation_id,
"created_at": Tweet.datetime,
"date": dt,
"timezone": Tweet.timezone,
"place": Tweet.place,
"location": Tweet.location,
"tweet": Tweet.tweet,
"hashtags": Tweet.hashtags,
"user_id": Tweet.user_id,
"user_id_str": Tweet.user_id_str,
"username": Tweet.username,
"name": Tweet.name,
"profile_image_url": Tweet.profile_image_url,
"day": day,
"hour": hour(Tweet.datetime),
"link": Tweet.link,
"gif_url": Tweet.gif_url,
"gif_thumb": Tweet.gif_thumb,
"video_url": Tweet.video_url,
"video_thumb": Tweet.video_thumb,
"is_reply_to": Tweet.is_reply_to,
"has_parent_tweet": Tweet.has_parent_tweet,
"retweet": Tweet.retweet,
"essid": config.Essid,
"nlikes": int(Tweet.likes_count),
"nreplies": int(Tweet.replies_count),
"nretweets": int(Tweet.retweets_count),
"is_quote_status": Tweet.is_quote_status,
"quote_id": Tweet.quote_id,
"quote_id_str": Tweet.quote_id_str,
"quote_url": Tweet.quote_url,
"search": str(config.Search),
"likes": True
}
}
actions.append(j_data)
nLikes += 1
if config.ES_count["replies"]:
for rep in range(int(Tweet.replies)):
j_data = {
"_index": config.Index_tweets,
"_type": config.Index_type,
"_id": str(Tweet.id) + "_reply_" + str(nReplies) + config.Essid,
"_source": {
"id": str(Tweet.id),
"conversation_id": Tweet.conversation_id,
"created_at": Tweet.datetime,
"date": dt,
"timezone": Tweet.timezone,
"place": Tweet.place,
"location": Tweet.location,
"tweet": Tweet.tweet,
"hashtags": Tweet.hashtags,
"user_id": Tweet.user_id,
"user_id_str": Tweet.user_id_str,
"username": Tweet.username,
"name": Tweet.name,
"profile_image_url": Tweet.profile_image_url,
"day": day,
"hour": hour(Tweet.datetime),
"link": Tweet.link,
"gif_url": Tweet.gif_url,
"gif_thumb": Tweet.gif_thumb,
"video_url": Tweet.video_url,
"video_thumb": Tweet.video_thumb,
"is_reply_to": Tweet.is_reply_to,
"has_parent_tweet": Tweet.has_parent_tweet,
"retweet": Tweet.retweet,
"essid": config.Essid,
"nlikes": int(Tweet.likes_count),
"nreplies": int(Tweet.replies_count),
"nretweets": int(Tweet.retweets_count),
"is_quote_status": Tweet.is_quote_status,
"quote_id": Tweet.quote_id,
"quote_id_str": Tweet.quote_id_str,
"quote_url": Tweet.quote_url,
"search": str(config.Search),
"replies": True
}
}
actions.append(j_data)
nReplies += 1
if config.ES_count["retweets"]:
for ret in range(int(Tweet.retweets)):
j_data = {
"_index": config.Index_tweets,
"_type": config.Index_type,
"_id": str(Tweet.id) + "_retweet_" + str(nRetweets) + config.Essid,
"_source": {
"id": str(Tweet.id),
"conversation_id": Tweet.conversation_id,
"created_at": Tweet.datetime,
"date": dt,
"timezone": Tweet.timezone,
"place": Tweet.place,
"location": Tweet.location,
"tweet": Tweet.tweet,
"hashtags": Tweet.hashtags,
"user_id": Tweet.user_id,
"user_id_str": Tweet.user_id_str,
"username": Tweet.username,
"name": Tweet.name,
"profile_image_url": Tweet.profile_image_url,
"day": day,
"hour": hour(Tweet.datetime),
"link": Tweet.link,
"gif_url": Tweet.gif_url,
"gif_thumb": Tweet.gif_thumb,
"video_url": Tweet.video_url,
"video_thumb": Tweet.video_thumb,
"is_reply_to": Tweet.is_reply_to,
"has_parent_tweet": Tweet.has_parent_tweet,
"retweet": Tweet.retweet,
"essid": config.Essid,
"nlikes": int(Tweet.likes_count),
"nreplies": int(Tweet.replies_count),
"nretweets": int(Tweet.retweets_count),
"is_quote_status": Tweet.is_quote_status,
"quote_id": Tweet.quote_id,
"quote_id_str": Tweet.quote_id_str,
"quote_url": Tweet.quote_url,
"search": str(config.Search),
"retweets": True
}
}
actions.append(j_data)
nRetweets += 1
es = Elasticsearch(config.Elasticsearch)
if not _index_tweet_status:
_index_tweet_status = createIndex(config, es, scope="tweet")
with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
def Follow(user, config):
global _index_follow_status
actions = []
j_data = {
......@@ -251,11 +240,14 @@ def Follow(user, config):
actions.append(j_data)
es = Elasticsearch(config.Elasticsearch)
if not _index_follow_status:
_index_follow_status = createIndex(config, es, scope="follow")
with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
def UserProfile(user, config):
global _index_user_status
actions = []
j_data = {
......@@ -287,6 +279,8 @@ def UserProfile(user, config):
actions.append(j_data)
es = Elasticsearch(config.Elasticsearch)
if not _index_user_status:
_index_user_status = createIndex(config, es, scope="user")
with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment