Commit bbde381a authored by Francesco Poldi's avatar Francesco Poldi

Added auto-index-creation

Making Elasticsearch setup easier
parent 0ca63b35
...@@ -146,7 +146,7 @@ async def Multi(feed, config, conn): ...@@ -146,7 +146,7 @@ async def Multi(feed, config, conn):
else: else:
link = tweet.find("a", "tweet-timestamp js-permalink js-nav js-tooltip")["href"] link = tweet.find("a", "tweet-timestamp js-permalink js-nav js-tooltip")["href"]
url = f"https://twitter.com{link}?lang=en" url = f"https://twitter.com{link}?lang=en"
if config.User_full: if config.User_full:
futures.append(loop.run_in_executor(executor, await User(url, futures.append(loop.run_in_executor(executor, await User(url,
config, conn))) config, conn)))
......
...@@ -88,7 +88,7 @@ async def Tweets(tw, location, config, conn): ...@@ -88,7 +88,7 @@ async def Tweets(tw, location, config, conn):
if config.Database: if config.Database:
db.tweets(conn, tweet, config) db.tweets(conn, tweet, config)
if config.Pandas: if config.Pandas:
panda.update(tweet, config) panda.update(tweet, config)
......
...@@ -4,6 +4,10 @@ from time import strftime, localtime ...@@ -4,6 +4,10 @@ from time import strftime, localtime
import contextlib import contextlib
import sys import sys
_index_tweet_status = False
_index_follow_status = False
_index_user_status = False
class RecycleObject(object): class RecycleObject(object):
def write(self, junk): pass def write(self, junk): pass
def flush(self): pass def flush(self): pass
...@@ -15,6 +19,116 @@ def nostdout(): ...@@ -15,6 +19,116 @@ def nostdout():
yield yield
sys.stdout = savestdout sys.stdout = savestdout
def handleIndexResponse(response):
try:
if response["status"] == 400:
return True
except KeyError:
pass
if response["acknowledged"]:
print("[+] Index \"" + response["index"] + "\" created!")
else:
print("[x] error index creation :: storage.elasticsearch.handleIndexCreation")
if response["shards_acknowledged"]:
print("[+] Shards acknowledged, everything is ready to be used!")
return True
else:
print("[x] error with shards :: storage.elasticsearch.HandleIndexCreation")
return False
def createIndex(config, instance, **scope):
if scope.get("scope") == "tweet":
tweets_body = {
"mappings": {
"items": {
"properties": {
"id": {"type": "long"},
"date": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"timezone": {"type": "text"},
"location": {"type": "text"},
"hashtags": {"type": "text"},
"tweet": {"type": "text"},
"replies": {"type": "boolean"},
"retweets": {"type": "boolean"},
"likes": {"type": "boolean"},
"user_id": {"type": "keyword"},
"username": {"type": "keyword"},
"day": {"type": "integer"},
"hour": {"type": "integer"},
"link": {"type": "text"},
"retweet": {"type": "text"},
"user_rt": {"type": "text"},
"essid": {"type": "keyword"},
"nlikes": {"type": "integer"},
"nreplies": {"type": "integer"},
"nretweets": {"type": "integer"},
"search": {"type": "text"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_tweets, body=tweets_body, ignore=400)
return handleIndexResponse(resp)
elif scope.get("scope") == "follow":
follow_body = {
"mappings": {
"items": {
"properties": {
"user": {"type": "keyword"},
"follow": {"type": "keyword"},
"essid": {"type": "keyword"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_follow, body=follow_body, ignore=400)
return handleIndexResponse(resp)
elif scope.get("scope") == "user":
user_body = {
"mappings": {
"items": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "keyword"},
"username": {"type": "keyword"},
"bio": {"type": "text"},
"location": {"type": "keyword"},
"url": {"type": "text"},
"join_datetime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"join_date": {"type": "date", "format": "yyyy-MM-dd"},
"join_time": {"type": "date", "format": "HH:mm:ss"},
"tweets": {"type": "integer"},
"following": {"type": "integer"},
"followers": {"type": "integer"},
"likes": {"type": "integer"},
"media": {"type": "integer"},
"private": {"type": "boolean"},
"verified": {"type": "boolean"},
"avatar": {"type": "text"},
"essid": {"type": "keyword"}
}
}
},
"settings": {
"number_of_shards": 1
}
}
with nostdout():
resp = instance.indices.create(index=config.Index_users, body=user_body, ignore=400)
return handleIndexResponse(resp)
else:
print("[x] error index pre-creation :: storage.elasticsearch.createIndex")
return False
def weekday(day): def weekday(day):
weekdays = { weekdays = {
"Monday": 1, "Monday": 1,
...@@ -32,6 +146,7 @@ def hour(datetime): ...@@ -32,6 +146,7 @@ def hour(datetime):
return strftime("%H", localtime(datetime)) return strftime("%H", localtime(datetime))
def Tweet(Tweet, config): def Tweet(Tweet, config):
global _index_tweet_status
weekdays = { weekdays = {
"Monday": 1, "Monday": 1,
"Tuesday": 2, "Tuesday": 2,
...@@ -159,11 +274,15 @@ def Tweet(Tweet, config): ...@@ -159,11 +274,15 @@ def Tweet(Tweet, config):
nRetweets += 1 nRetweets += 1
es = Elasticsearch(config.Elasticsearch) es = Elasticsearch(config.Elasticsearch)
if not _index_tweet_status:
_index_tweet_status = createIndex(config, es, scope="tweet")
with nostdout(): with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200) helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = [] actions = []
def Follow(user, config): def Follow(user, config):
global _index_follow_status
actions = [] actions = []
j_data = { j_data = {
...@@ -179,11 +298,14 @@ def Follow(user, config): ...@@ -179,11 +298,14 @@ def Follow(user, config):
actions.append(j_data) actions.append(j_data)
es = Elasticsearch(config.Elasticsearch) es = Elasticsearch(config.Elasticsearch)
if not _index_follow_status:
_index_follow_status = createIndex(config, es, scope="follow")
with nostdout(): with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200) helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = [] actions = []
def UserProfile(user, config): def UserProfile(user, config):
global _index_user_status
actions = [] actions = []
j_data = { j_data = {
...@@ -214,6 +336,8 @@ def UserProfile(user, config): ...@@ -214,6 +336,8 @@ def UserProfile(user, config):
actions.append(j_data) actions.append(j_data)
es = Elasticsearch(config.Elasticsearch) es = Elasticsearch(config.Elasticsearch)
if not _index_user_status:
_index_user_status = createIndex(config, es, scope="user")
with nostdout(): with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200) helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = [] actions = []
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment