Commit c58d1521 authored by Francesco Poldi's avatar Francesco Poldi

Added auto-index-creation

Making Elasticsearch setup easier
parent 40437688
......@@ -4,6 +4,10 @@ from time import strftime, localtime
import contextlib
import sys
_index_tweet_status = False
_index_follow_status = False
_index_user_status = False
class RecycleObject(object):
def write(self, junk): pass
def flush(self): pass
......@@ -15,6 +19,116 @@ def nostdout():
yield
sys.stdout = savestdout
def handleIndexResponse(response):
try:
if response["status"] == 400:
return True
except KeyError:
pass
if response["acknowledged"]:
print("[+] Index \"" + response["index"] + "\" created!")
else:
print("[x] error index creation :: storage.elasticsearch.handleIndexCreation")
if response["shards_acknowledged"]:
print("[+] Shards acknowledged, everything is ready to be used!")
return True
else:
print("[x] error with shards :: storage.elasticsearch.HandleIndexCreation")
return False
def createIndex(config, instance, **scope):
if scope.get("scope") == "tweet":
tweets_body = {
"mappings": {
"items": {
"properties": {
"id": {"type": "long"},
"date": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"timezone": {"type": "text"},
"location": {"type": "text"},
"hashtags": {"type": "text"},
"tweet": {"type": "text"},
"replies": {"type": "boolean"},
"retweets": {"type": "boolean"},
"likes": {"type": "boolean"},
"user_id": {"type": "keyword"},
"username": {"type": "keyword"},
"day": {"type": "integer"},
"hour": {"type": "integer"},
"link": {"type": "text"},
"retweet": {"type": "text"},
"user_rt": {"type": "text"},
"essid": {"type": "keyword"},
"nlikes": {"type": "integer"},
"nreplies": {"type": "integer"},
"nretweets": {"type": "integer"},
"search": {"type": "text"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_tweets, body=tweets_body, ignore=400)
return handleIndexResponse(resp)
elif scope.get("scope") == "follow":
follow_body = {
"mappings": {
"items": {
"properties": {
"user": {"type": "keyword"},
"follow": {"type": "keyword"},
"essid": {"type": "keyword"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_follow, body=follow_body, ignore=400)
return handleIndexResponse(resp)
elif scope.get("scope") == "user":
user_body = {
"mappings": {
"items": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "keyword"},
"username": {"type": "keyword"},
"bio": {"type": "text"},
"location": {"type": "keyword"},
"url": {"type": "text"},
"join_datetime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"join_date": {"type": "date", "format": "yyyy-MM-dd"},
"join_time": {"type": "date", "format": "HH:mm:ss"},
"tweets": {"type": "integer"},
"following": {"type": "integer"},
"followers": {"type": "integer"},
"likes": {"type": "integer"},
"media": {"type": "integer"},
"private": {"type": "boolean"},
"verified": {"type": "boolean"},
"avatar": {"type": "text"},
"essid": {"type": "keyword"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_users, body=user_body, ignore=400)
return handleIndexResponse(resp)
else:
print("[x] error index pre-creation :: storage.elasticsearch.createIndex")
return False
def weekday(day):
weekdays = {
"Monday": 1,
......@@ -32,6 +146,7 @@ def hour(datetime):
return strftime("%H", localtime(datetime))
def Tweet(Tweet, config):
global _index_tweet_status
weekdays = {
"Monday": 1,
"Tuesday": 2,
......@@ -159,11 +274,15 @@ def Tweet(Tweet, config):
nRetweets += 1
es = Elasticsearch(config.Elasticsearch)
if not _index_tweet_status:
_index_tweet_status = createIndex(config, es, scope="tweet")
with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
def Follow(user, config):
global _index_follow_status
actions = []
j_data = {
......@@ -179,11 +298,14 @@ def Follow(user, config):
actions.append(j_data)
es = Elasticsearch(config.Elasticsearch)
if not _index_follow_status:
_index_follow_status = createIndex(config, es, scope="follow")
with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
def UserProfile(user, config):
global _index_user_status
actions = []
j_data = {
......@@ -214,6 +336,8 @@ def UserProfile(user, config):
actions.append(j_data)
es = Elasticsearch(config.Elasticsearch)
if not _index_user_status:
_index_user_status = createIndex(config, es, scope="user")
with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment