Commit c0fb33e2 authored by nanahira's avatar nanahira

first

parents
__pycache__
*.pyc
*.pyo
venv
*.http
.venv
/accounts*
/cookie*
.git*
Dockerfile
.dockerignore
\ No newline at end of file
__pycache__
*.pyc
*.pyo
venv
*.http
.venv
/accounts*
/cookie*
\ No newline at end of file
stages:
- build
- deploy
variables:
GIT_DEPTH: "1"
before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
.build-image:
stage: build
script:
- docker build --pull -t $TARGET_IMAGE .
- docker push $TARGET_IMAGE
build-x86:
extends: .build-image
tags:
- docker
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-x86
build-arm:
extends: .build-image
tags:
- docker-arm
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-arm
.deploy:
stage: deploy
tags:
- docker
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-x86
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-arm
- docker manifest create $TARGET_IMAGE --amend $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-x86 --amend
$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-arm
- docker manifest push $TARGET_IMAGE
deploy_latest:
extends: .deploy
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:latest
only:
- master
deploy_branch:
extends: .deploy
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N802" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.10 (shadowban-revive)" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/shadowban-revive.iml" filepath="$PROJECT_DIR$/.idea/shadowban-revive.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (shadowban-revive)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
FROM python:3.10
WORKDIR /app
COPY ./requirements.txt ./
RUN pip install --no-cache -r ./requirements.txt
COPY . ./
ENTRYPOINT ["python"]
CMD ["main.py"]
from __future__ import annotations
import asyncio
import os
import sys
import time
import json
import random
import threading
from typing import Any, Dict, List, Optional, Union
import httpx
import redis
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
twitterKeys = [
'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'AAAAAAAAAAAAAAAAAAAAAFXzAwAAAAAAMHCxpeSDG1gLNLghVe8d74hl6k4%3DRUMF4xAQLsbeBhTSRrCiQpJtxoGWeyHrDb5te2jpGskWDFW82F',
'AAAAAAAAAAAAAAAAAAAAAAj4AQAAAAAAPraK64zCZ9CSzdLesbE7LB%2Bw4uE%3DVJQREvQNCZJNiz3rHO7lOXlkVOQkzzdsgu6wWgcazdMUaGoUGm',
]
privateKey = os.getenv('TWITTER_AUTH_KEY')
if privateKey:
twitterKeys = privateKey.split(" ")
cookies = [os.getenv('COOKIE')]
cookie_file = os.getenv('COOKIE_FILE')
if cookie_file:
with open(cookie_file, 'r') as f:
cookies = str.strip(f.read()).split("\n")
# ---------------------------------------------------------------------------
# Constants that haven’t changed
# ---------------------------------------------------------------------------
ENDPOINT: Dict[str, str] = {
"UserTweetsAndReplies": "CwLU7qTfeu0doqhSr6tW4A",
"TweetDetail": "BoHLKeBvibdYDiJON1oqTg",
}
# <<< COPY `FeaturesDict` DEFINITION FROM OLD SCRIPT >>>
FeaturesDict = {
"responsive_web_uc_gql_enabled": False,
"dont_mention_me_view_api_enabled": False,
"interactive_text_enabled": False,
"responsive_web_edit_tweet_api_enabled": False,
"standardized_nudges_for_misinfo_nudges_enabled": False,
"vibe_tweet_context_enabled": False,
"standardized_nudges_misinfo": False,
"responsive_web_enhance_cards_enabled": False,
"vibe_api_enabled": False,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": False,
"responsive_web_text_conversations_enabled": False,
"unified_cards_ad_metadata_container_dynamic_card_content_query_enabled": False,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": False,
"responsive_web_graphql_timeline_navigation_enabled": False,
"tweetypie_unmention_optimization_enabled": False,
"verified_phone_label_enabled": False,
"responsive_web_twitter_blue_verified_badge_is_enabled": False,
"view_counts_public_visibility_enabled": False,
"view_counts_everywhere_api_enabled": False,
"longform_notetweets_consumption_enabled": False,
}
FeaturesJson = json.dumps(FeaturesDict)
# Thread‑safe health flag
_health_lock = threading.Lock()
_is_healthy: bool = True
# ---------------------------------------------------------------------------
# Redis (same sync client—the API is blocking so FastAPI’s default thread‑pool
# worker is fine.)
# ---------------------------------------------------------------------------
redis_client: Optional[redis.Redis] = None
if os.getenv("REDIS_HOST"):
redis_pool = redis.ConnectionPool(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT", 6379)),
password=os.getenv("REDIS_PASSWORD", None),
db=int(os.getenv("REDIS_DB", 0)),
)
redis_client = redis.Redis(connection_pool=redis_pool)
# ---------------------------------------------------------------------------
# FastAPI app & CORS
# ---------------------------------------------------------------------------
app = FastAPI(title="Shadow‑ban checker API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Pydantic models matching the JSON produced by the original script
# ---------------------------------------------------------------------------
class GhostModel(BaseModel):
ban: Optional[bool] = None
tweet: Optional[str] = None
in_reply_to: Optional[str] = Field(None, alias="in_reply_to")
class MoreRepliesModel(BaseModel):
ban: Optional[bool] = None
tweet: Optional[str] = None
in_reply_to: Optional[str] = Field(None, alias="in_reply_to")
class TestsModel(BaseModel):
search: Union[bool, str, None]
typeahead: Union[bool, str, None]
ghost: GhostModel
more_replies: MoreRepliesModel
class ProfileModel(BaseModel):
exists: bool = False
error: Optional[Union[str, Dict[str, Any]]] = None
screen_name: str
id: Optional[str] = None
protected: Optional[bool] = None
suspended: Optional[bool] = None
has_tweets: Optional[bool] = None
class ShadowBanResponse(BaseModel):
timestamp: float
profile: ProfileModel
tests: Optional[TestsModel] = None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def getTwitterHeaders(cookie: str | None = None) -> Dict[str, str]:
"""Builds the headers used for *every* twitter/x.com call."""
hdrs: Dict[str, str] = {
"Authorization": f"Bearer {random.choice(twitterKeys)}",
"user-agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
),
}
if cookie is None:
cookie = random.choice(cookies) if cookies else None
if cookie:
hdrs["cookie"] = cookie
hdrs["x-csrf-token"] = cookie.split("ct0=")[1].split(";")[0]
return hdrs
def getTwitterSession(cookie: str | None = None) -> httpx.AsyncClient:
"""A *new* AsyncClient with the right default headers set."""
return httpx.AsyncClient(headers=getTwitterHeaders(cookie))
async def checkCookies():
good = []
bad = []
for cookie in cookies:
print("Checking cookie: " + cookie, file=sys.stderr)
async with getTwitterSession(cookie) as client:
response = await client.get("https://api.twitter.com/1.1/guest/activate.json")
if response.status_code == 200:
return True
usertlurl = 'https://x.com/i/api/graphql/oUZZZ8Oddwxs8Cd3iW3UEA/UserByScreenName'
screen_name = 'Sena_n_Karin'
params = {
'variables': json.dumps({
'screen_name': screen_name,
'withSafetyModeUserFields': True,
}),
'features': json.dumps({
"hidden_profile_likes_enabled": False,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"subscriptions_verification_info_verified_since_enabled": True,
"highlights_tweets_tab_ui_enabled": True,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"responsive_web_graphql_timeline_navigation_enabled": True
})
}
usertl_b = await client.get(usertlurl, params=params)
usertl = usertl_b
usertl_json = usertl.json()
possibleErrors = [
'Could not authenticate you',
'Authorization: Denied by access control: To protect our users from spam and other malicious activity, this account is temporarily locked. Please log in to https://twitter.com to unlock your account.'
]
matchError = False
if "errors" in usertl_json:
for error in possibleErrors:
for e in usertl_json["errors"]:
if e['message'].startswith(error):
matchError = True
break
if matchError:
break
if usertl.status_code == 200 or not matchError:
print(f"Good cookie {usertl.status_code}: " + cookie + " " + usertl.text, file=sys.stderr)
good.append(cookie)
else:
print(f"Bad cookie {usertl.status_code}: " + cookie + " " + usertl.text, file=sys.stderr)
bad.append(cookie)
return good, bad
def extract_tweet(tweet):
if 'rest_id' in tweet:
return tweet
if 'tweet' in tweet and 'rest_id' in tweet['tweet']:
return tweet['tweet']
return None
async def doSearch(screen_name: str) -> Optional[List[dict]]:
"""Port of `doSearch`; unchanged except for async/await & httpx."""
searchurl_v2 = (
"https://x.com/i/api/graphql/"
"nK1dw4oV3k4w5TdtcAdSww/SearchTimeline"
)
params_v2 = {
"features": json.dumps({
"rweb_lists_timeline_redesign_enabled": True,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_timeline_navigation_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"tweetypie_unmention_optimization_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"responsive_web_twitter_article_tweet_consumption_enabled": False,
"tweet_awards_web_tipping_enabled": False,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True,
"longform_notetweets_rich_text_read_enabled": True,
"longform_notetweets_inline_media_enabled": True,
"responsive_web_media_download_video_enabled": False,
"responsive_web_enhance_cards_enabled": False
}),
"variables": json.dumps(
{
"count": 20,
"querySource": "typed_query",
"product": "Latest",
"rawQuery": f"from:@{screen_name}",
}
),
"fieldToggles": json.dumps({"withArticleRichContentState": False}),
}
async with getTwitterSession() as client:
r = await client.get(searchurl_v2, params=params_v2)
try:
data = r.json()
except ValueError:
print("Search Error parsing JSON:", r.text)
return None
global _is_healthy
_is_healthy = "data" in data
if not _is_healthy:
print("Search error:", r.text)
return None
return data["data"]["search_by_raw_query"]["search_timeline"]["timeline"][
"instructions"
]
# <<< PORT THE *ENTIRE* LOGIC of `searchban` (now async function below).
# The body is IDENTICAL to the original except:
# * replace every requests call with `await client.get(...)` etc.
# * use helper `getTwitterSession()` to create per‑call clients.
# * at the end, wrap the dict in `ShadowBanResponse(**returnjson)`
# before returning. >>>
async def searchbanLogic(screen_name: str) -> tuple[dict, bool]:
"""Converted one‑shot worker that returns the exact same JSON structure
as the original Flask view, but wrapped in the Pydantic model.
"""
# <<< COPY THE ORIGINAL BODY OF `searchban` FUNCTION HERE, then replace
# `twitter_b = getTwitterSession()` with `client = await getTwitterSession()`
# and change *every* `.get()` / `.post()` to `await client.get()` / `await client.post()`.
# Keep *all* print‑debugging so the behaviour stays transparent. >>>
# IMPORTANT: ensure you finish with something like ↓ and *nothing* else
print("Checking {}".format(screen_name))
returnjson = {
"timestamp": time.time(),
"profile": {
# "id": "7080152",
# "screenName": "TwitterJP",
# "protected": False,
# "suspended": False,
# "has_tweets": True,
"exists": False,
"error": None,
"screen_name": screen_name,
},
# "check": {
# "search": 1484727214419628037,
# "suggest": True,
# "ghost": {"ban": True},
# "reply": {"ban": False, "tweet": "1480819689898987523", "in_reply_to": "1369626114381901828"}
# }
}
async with (getTwitterSession() as client):
usertlurl = 'https://x.com/i/api/graphql/oUZZZ8Oddwxs8Cd3iW3UEA/UserByScreenName'
params = {
'variables': json.dumps({
'screen_name': screen_name,
'withSafetyModeUserFields': True,
}),
'features': json.dumps({
"hidden_profile_likes_enabled": False,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"subscriptions_verification_info_verified_since_enabled": True,
"highlights_tweets_tab_ui_enabled": True,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"responsive_web_graphql_timeline_navigation_enabled": True
})
}
usertl_b = await client.get(usertlurl, params=params)
usertl = usertl_b
usertl_json = usertl.json()
print("user", usertl_json)
if 'errors' in usertl_json:
returnjson["profile"]["error"] = usertl_json["errors"][0]['message']
return returnjson, True
if len(usertl_json) == 0 or 'user' not in usertl_json['data']:
returnjson["profile"]["has_tweets"] = False
return returnjson, True
returnjson["profile"]["has_tweets"] = True
if usertl.status_code == 200:
if 'reason' in usertl_json['data']["user"]["result"] and usertl_json['data']["user"]["result"]['reason'] == 'Suspended':
returnjson["profile"]["suspended"] = True
return returnjson, True
returnjson["profile"]["exists"] = True
returnjson["profile"]["id"] = usertl_json['data']["user"]["result"]["rest_id"]
returnjson["profile"]["screen_name"] = usertl_json['data']["user"]["result"]["legacy"]["screen_name"]
# returnjson["profile"]["protected"] = usertl_json["protected"]
elif usertl.status_code == 403:
returnjson["profile"]["suspended"] = True
return returnjson, True
else:
print("Profile error: " + usertl.text)
if "error" in usertl_json and usertl_json["error"] == "Not authorized.":
returnjson["profile"]["protected"] = True
returnjson["profile"]["suspended"] = True
returnjson["profile"]["has_tweets"] = False
return returnjson, True
returnjson["profile"]["error"] = usertl_json
return returnjson, True
# if usertl_json["protected"] == True:
# returnjson["profile"]["protected"] = True ## how do you determen protected and suspended
# return returnjson
# if usertl_json["statuses_count"] == 0:
# returnjson["profile"]["has_tweets"] = False
# return returnjson
# else:
# returnjson["profile"]["has_tweets"] = True
returnjson["tests"] = {
"search": True, ## Search ban
"typeahead": True, ## suggest ban
"ghost": {"ban": None},
"more_replies": {"ban": False, "tweet": "-1", "in_reply_to": "-1"}
}
hasFailure = False
# searchurl = "https://api.twitter.com/1.1/users/search.json"
# params = {"q": "from:@{}".format(screen_name), "count": 1}
# search = twitter_b.get(searchurl, params=params).json()
# print(search)
# if len(search) == 0:
# returnjson["test"]["search"] = "ban"
# return returnjson
# else:
# return returnjson
print("Checking search suggestion banned of {}".format(screen_name))
suggestions_req = await client.get("https://x.com/i/api/1.1/search/typeahead.json?include_ext_is_blue_verified=1&include_ext_verified_type=1&include_ext_profile_image_shape=1&src=search_box&result_type=users&q=@" + screen_name)
suggestions = suggestions_req.json()
print("Suggestions: " + str(suggestions))
try:
returnjson["tests"]["typeahead"] = len([1 for user in suggestions["users"] if user["screen_name"].lower() == screen_name.lower()]) > 0
except:
hasFailure = True
print("Error checking typeahead of {}".format(screen_name))
returnjson["tests"]["typeahead"] = '_error'
if returnjson["tests"]["typeahead"] != True:
print("{} is search suggestion banned or errored, checking search ban.".format(screen_name))
search_result = await doSearch(screen_name)
if search_result is None:
returnjson["tests"]["search"] = '_error'
hasFailure = True
else:
found_id = None
for item in search_result:
if item['type'] != 'TimelineAddEntries':
continue
for entry_item in item['entries']:
if not entry_item['entryId'].startswith('tweet-'):
continue
tweet = extract_tweet(entry_item['content']['itemContent']['tweet_results']['result'])
if tweet is None:
continue
found_id = tweet['rest_id']
if found_id:
break
if found_id:
returnjson["tests"]["search"] = found_id
print("{} is not search banned: {}".format(screen_name, found_id))
else:
returnjson["tests"]["search"] = False
print("{} is search banned.".format(screen_name))
#elif search_tweets == {}:
# returnjson["tests"]["search"] = False
# print("{} is search banned.".format(screen_name))
# # returnjson["tests"]["typeahead"] = False
#else:
# returnjson["tests"]["search"] = str(search_tweets[list(search_tweets.keys())[0]]["id"])
# print("{} is not search banned.".format(screen_name))
else:
returnjson["tests"]["search"] = "_implied_good"
print("{} is not search suggestion banned, skipped search ban check.".format(screen_name))
## get replies
## Start GraphQL
guest_session = await client.post("https://api.x.com/1.1/guest/activate.json")
print("Guest session: " + guest_session.text)
client.headers["x-guest-token"] = guest_session.json()["guest_token"]
user_id = returnjson["profile"]["id"]
reply = None
print("User ID of {}: {}".format(screen_name, user_id))
get_reply_vars = {
"count": 200, "userId": user_id,
"includePromotedContent": False, "withSuperFollowsUserFields": False, "withBirdwatchPivots": False,
"withDownvotePerspective": False, "withReactionsMetadata": False,
"withReactionsPerspective": False, "withSuperFollowsTweetFields": False, "withVoice": False, "withV2Timeline": False,
}
get_reply_param = param = {"variables": json.dumps(get_reply_vars), "features": FeaturesJson}
replies = None
try:
replies = await client.get("https://x.com/i/api/graphql/{}/{}".format(ENDPOINT["UserTweetsAndReplies"], "UserTweetsAndReplies"), params=get_reply_param)
ghostban = True
ghostTweetId = None
ghostReplyId = None
showmore = False
showmoreTweetId = None
showmoreReplyId = None
print("Replies text: " + replies.text)
repliesJson = replies.json()
maindata = repliesJson["data"]["user"]["result"]["timeline"]["timeline"]["instructions"]
checkedTweets = set()
for d in maindata:
if not ghostban:
# all checks done
break
if d["type"] == "TimelineAddEntries":
for ent in d["entries"]:
if not ghostban:
# all checks done
break
if ent["entryId"].startswith("tweet") and "legacy" in extract_tweet(ent["content"]["itemContent"]["tweet_results"]["result"]):
tmp = extract_tweet(ent["content"]["itemContent"]["tweet_results"]["result"])["legacy"]
if "in_reply_to_status_id_str" in tmp and tmp["in_reply_to_status_id_str"] not in checkedTweets:
reply = tmp
tweetId = reply["in_reply_to_status_id_str"]
checkedTweets.add(tweetId)
if ghostTweetId is None:
ghostTweetId = tweetId
ghostReplyId = reply["id_str"]
if showmoreTweetId is None:
showmoreTweetId = tweetId
showmoreReplyId = reply["id_str"]
tweet_detail_vars = {
"focalTweetId": tweetId,
"includePromotedContent":False,
"withBirdwatchNotes":False,
"withSuperFollowsUserFields":False,
"withDownvotePerspective":False,
"withReactionsMetadata":False,
"withReactionsPerspective":False,
"withSuperFollowsTweetFields":False,
"withVoice":False,
}
tweetdetails = await client.get("https://x.com/i/api/graphql/{}/{}".format(ENDPOINT["TweetDetail"], "TweetDetail"), params={"variables": json.dumps(tweet_detail_vars), "features": FeaturesJson})
tweetData = tweetdetails.json()["data"]
if "threaded_conversation_with_injections" not in tweetData:
continue
insts = tweetdetails.json()["data"]["threaded_conversation_with_injections"]["instructions"]
for inst in insts:
if not ghostban:
# all checks done
break
if inst["type"] == "TimelineAddEntries":
for ent in inst["entries"]:
if not ghostban:
# all checks done
break
print("Current entry of {} is: {}".format(screen_name, ent["entryId"]))
if ent["entryId"].startswith("conversationthread") and ghostban:
for item in ent["content"]["items"]:
if "tweet_results" in item["item"]["itemContent"] and "legacy" in extract_tweet(item["item"]["itemContent"]["tweet_results"]["result"]) and extract_tweet(item["item"]["itemContent"]["tweet_results"]["result"])["legacy"]["user_id_str"] == user_id:
replyId = extract_tweet(item["item"]["itemContent"]["tweet_results"]["result"])["legacy"]["id_str"]
returnjson["tests"]["ghost"] = {"ban": False, "tweet": tweetId, "in_reply_to": replyId}
ghostban = False
print("Found valid reply {} => {}, so {} is not ghost banned.".format(tweetId, replyId, screen_name))
break
if ent["entryId"].startswith("cursor-showmorethreadsprompt") and not showmore:
# showmore = True
cursor_vars = tweet_detail_vars
cursor_vars["cursor"] = ent["content"]["itemContent"]["value"]
cursor = await client.get("https://x.com/i/api/graphql/{}/{}".format(ENDPOINT["TweetDetail"], "TweetDetail"), params={"variables": json.dumps(cursor_vars), "features": FeaturesJson})
cursor_insts = cursor.json()["data"]["threaded_conversation_with_injections"]["instructions"]
for c_i in cursor_insts:
if c_i["type"] == "TimelineAddEntries":
if len(c_i["entries"]) == 0:
returnjson["tests"]["more_replies"] = {"ban": True}
showmore = True
break
for c_ent in c_i["entries"]:
if c_ent["entryId"].startswith("conversationthread"):
print("Checking more contents of {} by {}".format(tweetId, screen_name))
for c_item in c_ent["content"]["items"]:
if "legacy" in extract_tweet(c_item["item"]["itemContent"]["tweet_results"]["result"]) and extract_tweet(c_item["item"]["itemContent"]["tweet_results"]["result"])["legacy"]["user_id_str"] == user_id:
replyId = extract_tweet(c_item["item"]["itemContent"]["tweet_results"]["result"])["legacy"]["id_str"]
returnjson["tests"]["more_replies"] = {"ban": True, "in_reply_to": replyId, "tweet": tweetId}
print("{} is reply deboosted because of {} => {}.".format(screen_name, tweetId, replyId))
showmore = True
break
if len(checkedTweets) > 0:
if ghostban:
print("{} is ghost banned.".format(screen_name))
returnjson["tests"]["ghost"] = {
"ban": True,
"tweet": ghostTweetId,
"in_reply_to": ghostReplyId
}
if not showmore:
print("{} is not reply deboosted.".format(screen_name))
returnjson["tests"]["more_replies"] = {
"ban": False,
"in_reply_to": showmoreReplyId,
"tweet": showmoreTweetId
}
# No search ban || more replies => No ghost ban
#if returnjson["tests"]["search"] and returnjson["tests"]["search"] != "_error" and "ban" in returnjson["tests"]["ghost"] and returnjson["tests"]["ghost"]["ban"] == True or "ban" in returnjson["tests"]["more_replies"] and returnjson["tests"]["more_replies"]["ban"] == True:
# returnjson["tests"]["ghost"] = {
# "ban": False,
# "tweet": ghostTweetId,
# "in_reply_to": ghostReplyId
# }
# No ghost ban && unknown more replies => No more replies ban
if "ban" not in returnjson["tests"]["more_replies"] and "ban" in returnjson["tests"]["ghost"] and returnjson["tests"]["ghost"]["ban"] == False:
returnjson["tests"]["more_replies"] = {
"ban": False,
"in_reply_to": showmoreReplyId,
"tweet": showmoreTweetId
}
else:
print("No replies found for {}.".format(screen_name))
returnjson["tests"]["ghost"] = {}
returnjson["tests"]["more_replies"] = {}
except KeyError as e:
print("Errored testing {}".format(screen_name))
print(e)
if replies is not None:
print("Response: " + replies.text)
returnjson["tests"]["ghost"] = {}
returnjson["tests"]["more_replies"] = {}
hasFailure = True
return returnjson, hasFailure
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
class HealthyResponse(BaseModel):
healthy: bool
@app.get("/_healthy", response_model=HealthyResponse)
async def healthy():
# The old behaviour always answered 200 unless the global flag failed.
# Keeping that.
return HealthyResponse(healthy=True)
@app.get("/{screen_name}", response_model=ShadowBanResponse)
async def check_shadow_ban(screen_name: str):
if redis_client is not None:
cached = await redis_client.get(f"shadowban:{screen_name}")
if cached is not None:
return json.loads(cached)
try:
result, hasFailure = await searchbanLogic(screen_name)
except Exception as exc:
print("Unhandled exception:", exc)
raise HTTPException(status_code=500, detail="Internal error") from exc
# Cache for 10 minutes if the run completed without failure flags
if redis_client and ['tests'] in result and not hasFailure:
await redis_client.set(
f"shadowban:{screen_name}", json.dumps(result), ex=600
)
return result
# ---------------------------------------------------------------------------
# Convenience CLI entry‑point (just like the old `if __name__ == "__main__"`)
# ---------------------------------------------------------------------------
if __name__ == "__main__":
if os.getenv('CHECK_COOKIE'):
async def check_cookie_process():
good, bad = await checkCookies()
#print("Good cookies:")
for g in good:
print(g)
#print("Bad cookies:")
#for b in bad:
# print(b)
# quit
sys.exit(0)
asyncio.run(check_cookie_process())
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.environ.get("PORT", 3000)),
reload=True,
)
# Test your FastAPI endpoints
GET http://127.0.0.1:8000/
Accept: application/json
###
GET http://127.0.0.1:8000/hello/User
Accept: application/json
###
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment