Commit dc70c89b authored by Francesco Poldi's avatar Francesco Poldi

Upgrade index for a better data visualization

Added some loop around, it seams that could slow down but not because with elasticsearch you can index about 6k items/s so the performance is not affected (in most cases). 
parent 06b00406
...@@ -7,9 +7,9 @@ PUT tweep ...@@ -7,9 +7,9 @@ PUT tweep
"datestamp": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, "datestamp": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"timezone": {"type": "text"}, "timezone": {"type": "text"},
"hashtags": {"type": "text"}, "hashtags": {"type": "text"},
"replies": {"type": "integer"}, "replies": {"type": "boolean"},
"retweets": {"type": "integer"}, "retweets": {"type": "boolean"},
"likes": {"type": "integer"}, "likes": {"type": "boolean"},
"username": {"type": "keyword"}, "username": {"type": "keyword"},
"day": {"type": "keyword"}, "day": {"type": "keyword"},
"hour": {"type": "keyword"} "hour": {"type": "keyword"}
......
#!/usr/bin/python3 #!/usr/bin/python3
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch, helpers
from time import gmtime, strftime from time import gmtime, strftime
import argparse import argparse
import aiohttp import aiohttp
import asyncio import asyncio
import async_timeout import async_timeout
import contextlib
import csv import csv
import datetime import datetime
import hashlib import hashlib
...@@ -14,6 +15,17 @@ import re ...@@ -14,6 +15,17 @@ import re
import sys import sys
import sqlite3 import sqlite3
## clean some output
class RecycleObject(object):
def write(self, junk): pass
@contextlib.contextmanager
def nostdout():
savestdout = sys.stdout
sys.stdout = RecycleObject()
yield
sys.stdout = savestdout
def initdb(db): def initdb(db):
''' '''
Creates a new SQLite database or connects to it if exists Creates a new SQLite database or connects to it if exists
...@@ -204,43 +216,99 @@ async def outTweet(tweet): ...@@ -204,43 +216,99 @@ async def outTweet(tweet):
except sqlite3.IntegrityError: # this happens if the tweet is already in the db except sqlite3.IntegrityError: # this happens if the tweet is already in the db
return "" return ""
day = d.strftime("%A")
if day == "Monday":
_day = 1
elif day == "Tuesday":
_day = 2
elif day == "Wednesday":
_day = 3
elif day == "Thursday":
_day = 4
elif day == "Friday":
_day = 5
elif day == "Saturday":
_day = 6
elif day == "Sunday":
_day = 7
else:
print("[x] Something is going wrong!")
sys.exit(1)
if arg.elasticsearch: if arg.elasticsearch:
jObject = {
"tweetid": tweetid, day = d.strftime("%A")
"datestamp": date + " " + time, if day == "Monday":
"timezone": timezone, _day = 1
"text": text, elif day == "Tuesday":
"hashtags": re.findall(r'(?i)\#\w+', text, flags=re.UNICODE), _day = 2
"replies": replies, elif day == "Wednesday":
"retweets": retweets, _day = 3
"likes": likes, elif day == "Thursday":
"username": username, _day = 4
"day": _day, elif day == "Friday":
"hour": time.split(":")[0] _day = 5
} elif day == "Saturday":
_day = 6
elif day == "Sunday":
_day = 7
else:
print("[x] Something is going wrong!")
sys.exit(1)
hashtags = re.findall(r'(?i)\#\w+', text, flags=re.UNICODE)
actions = []
nLikes = 0
nReplies = 0
nRetweets = 0
for l in range(int(likes)):
jObject = {
"tweetid": tweetid,
"datestamp": date + " " + time,
"timezone": timezone,
"text": text,
"hashtags": hashtags,
"likes": True,
"username": username,
"day": _day,
"hour": time.split(":")[0]
}
j_data = {
"_index": "tweep",
"_type": "items",
"_id": tweetid + "_likes_" + str(nLikes),
"_source": jObject
}
actions.append(j_data)
nLikes += 1
for rep in range(int(replies)):
jObject = {
"tweetid": tweetid,
"datestamp": date + " " + time,
"timezone": timezone,
"text": text,
"hashtags": hashtags,
"replies": True,
"username": username,
"day": _day,
"hour": time.split(":")[0]
}
j_data = {
"_index": "tweep",
"_type": "items",
"_id": tweetid + "_replies_" + str(nReplies),
"_source": jObject
}
actions.append(j_data)
nReplies += 1
for rep in range(int(retweets)):
jObject = {
"tweetid": tweetid,
"datestamp": date + " " + time,
"timezone": timezone,
"text": text,
"hashtags": hashtags,
"retweets": True,
"username": username,
"day": _day,
"hour": time.split(":")[0]
}
j_data = {
"_index": "tweep",
"_type": "items",
"_id": tweetid + "_retweets_" + str(nRetweets),
"_source": jObject
}
actions.append(j_data)
nRetweets += 1
es = Elasticsearch(arg.elasticsearch) es = Elasticsearch(arg.elasticsearch)
es.index(index="tweep", doc_type="items", id=tweetid, body=json.dumps(jObject)) with nostdout():
helpers.bulk(es, actions, chunk_size=2000, request_timeout=200)
actions = []
output = "" output = ""
elif arg.users: elif arg.users:
output = username output = username
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment