Skip to content

Commit

Permalink
Merge pull request #6 from surcyf123/updates-0.0.25
Browse files Browse the repository at this point in the history
Updates 0.0.25
  • Loading branch information
Chkhikvadze authored Jan 24, 2024
2 parents 40a275a + fa26c9d commit 2215933
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 66 deletions.
2 changes: 1 addition & 1 deletion neurons/miners/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

OpenAI.api_key = os.environ.get('OPENAI_API_KEY')
if not OpenAI.api_key:
raise ValueError("Please set the OPENAI_API_KEY environment variable.")
raise ValueError("Please set the OPENAI_API_KEY environment variable. See here: https://github.com/surcyf123/smart-scrape/blob/main/docs/env_variables.md")

netrc_path = pathlib.Path.home() / '.netrc'
wandb_api_key = os.getenv('WANDB_API_KEY')
Expand Down
2 changes: 1 addition & 1 deletion neurons/validators/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def update_scores(self, moving_averaged_scores, wandb_data):
if self.config.wandb_on:
wandb.log(wandb_data)

iterations_per_set_weights = 1
iterations_per_set_weights = 2
iterations_until_update = iterations_per_set_weights - ((self.steps_passed + 1) % iterations_per_set_weights)
bt.logging.info(f"Updating weights in {iterations_until_update} iterations.")

Expand Down
2 changes: 1 addition & 1 deletion template/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


# version must stay on line 22
__version__ = "0.0.24"
__version__ = "0.0.25"
version_split = __version__.split(".")
__spec_version__ = (
(1000 * int(version_split[0]))
Expand Down
149 changes: 86 additions & 63 deletions template/services/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
from template.utils import call_openai, tweet_prompts
from template.protocol import TwitterPromptAnalysisResult
import bittensor as bt
from typing import List
from typing import List, Dict, Any
from urllib.parse import urlparse

BEARER_TOKEN = os.environ.get('TWITTER_BEARER_TOKEN')
if not BEARER_TOKEN:
raise ValueError("Please set the TWITTER_BEARER_TOKEN environment variable. See here: https://github.com/surcyf123/smart-scrape/blob/main/docs/env_variables.md")

VALID_DOMAINS = ["twitter.com", "x.com"]
twitter_api_query_example = {
'query': '(from:twitterdev -is:retweet) OR #twitterdev',
Expand Down Expand Up @@ -80,7 +83,7 @@ def get_query_gen_prompt(prompt, is_accuracy=True):
else:
accuracy_text = f"""
RULES:
1. Generate keywords, hashtags, and mentions that are closely related to the user's prompt and after generate Twitter API query
1. Similiar Generate keywords, hashtags, and mentions that are closely related to the user's prompt and after generate Twitter API query
"""
content = f"""
Given the specific User's prompt: '{prompt}', please perform the following tasks and provide the results in a JSON object format:
Expand Down Expand Up @@ -124,14 +127,31 @@ def get_query_gen_prompt(prompt, is_accuracy=True):
bt.logging.info("get_query_gen_prompt End ==============================")
return content

def get_fix_query_prompt(prompt, prompt_analysis, error):
task = get_query_gen_prompt(prompt=prompt, is_accuracy=False)
content = F"""That was my task for you: "{task}",
That was your result: {prompt_analysis}
That was Twitter API's error: "{error}"
Please, make a new better output to get better result from Twitter API.
Output must be as Output example.
def get_fix_query_prompt(prompt, old_query, error, is_accuracy= True):
task = get_query_gen_prompt(prompt=prompt, is_accuracy=is_accuracy)

old_query_text = ""
if old_query:
old_query_text = f"""Your previous query was:
<OLD_QUERY>
{old_query}
</OLD_QUERY>
which did not return any results, Please analyse it and make better query."""
content = F"""That was task for you:
<TASK>
{task}
<Task>,
That was user's promot:
<PROMPT>
{prompt}
</PROMPT>
{old_query_text}
That was Twitter API's result: "{error}"
Please, make a new better Output to get better result from Twitter API.
Output must be as Output example as described in <TASK>.
"""
return content

Expand All @@ -153,29 +173,34 @@ def bearer_oauth(self, r):

def connect_to_endpoint(self, url, params):
response = requests.get(url, auth=self.bearer_oauth, params=params)
print(response.status_code)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()


if response.status_code in [401, 403]:
bt.logging.error(f"Critical Twitter API Ruquest error occurred: {response.text}")
os._exit(1)

return response

def get_tweet_by_id(self, tweet_id):
tweet_url = f"https://api.twitter.com/2/tweets/{tweet_id}"
json_response = self.connect_to_endpoint(tweet_url, {})
return json_response
response = self.connect_to_endpoint(tweet_url, {})
if response.status_code != 200:
return None
return response.json()


def get_tweets_by_ids(self, tweet_ids):
ids = ','.join(tweet_ids) # Combine all tweet IDs into a comma-separated string
tweets_url = f"https://api.twitter.com/2/tweets?ids={ids}"
json_response = self.connect_to_endpoint(tweets_url, {})
return json_response
response = self.connect_to_endpoint(tweets_url, {})
if response.status_code != 200:
return []
return response.json()

def get_recent_tweets(self, query_params):
search_url = "https://api.twitter.com/2/tweets/search/recent"
json_response = self.connect_to_endpoint(search_url, query_params)
return json.dumps(json_response, indent=4, sort_keys=True)


response = self.connect_to_endpoint(search_url, query_params)
return response

async def generate_query_params_from_prompt(self, prompt, is_accuracy = True):
"""
This function utilizes OpenAI's API to analyze the user's query and extract relevant information such
Expand All @@ -188,42 +213,63 @@ async def generate_query_params_from_prompt(self, prompt, is_accuracy = True):
response_dict = json.loads(res)
bt.logging.info("generate_query_params_from_prompt Content: ", response_dict)
return response_dict
async def fix_twitter_query(self, prompt, query, error):

async def fix_twitter_query(self, prompt, query, error, is_accuracy = True):
"""
This method refines the user's initial query by leveraging OpenAI's API
to parse and enhance the query with more precise keywords, hashtags, and user mentions,
aiming to improve the search results from the Twitter API.
"""
try:
content = get_fix_query_prompt(prompt=prompt,
prompt_analysis=query,
error=error)
old_query=query,
error=error,
is_accuracy=is_accuracy)
messages = [{'role': 'user', 'content': content }]
bt.logging.info(content)
res = await call_openai(messages, 0.2, "gpt-4-1106-preview", None, {"type": "json_object"})
res = await call_openai(messages, 0.5, "gpt-4-1106-preview", None, {"type": "json_object"})
response_dict = json.loads(res)
bt.logging.info("generate_query_params_from_prompt Content: ", response_dict)
bt.logging.info("fix_twitter_query Content: ", response_dict)
return response_dict
except Exception as e:
bt.logging.info(e)
return [], None

async def analyse_prompt_and_fetch_tweets(self, prompt):
try:
result = {}
query, prompt_analysis = await self.generate_and_analyze_query(prompt)
result = self.get_recent_tweets(prompt_analysis.api_params)
response = self.get_recent_tweets(prompt_analysis.api_params)

if response.status_code in [429, 502, 503, 504]:
bt.logging.warning("analyse_prompt_and_fetch_tweets ===================================================, {response.text}")
await asyncio.sleep(20) # Wait for 20 seconds before retrying
response = self.get_recent_tweets(prompt_analysis.api_params) # Retry fetching tweets

result_json = json.loads(result) # Parse the JSON response
if result_json.get('meta', {}).get('result_count', 0) == 0:
result, prompt_analysis = await self.retry_with_fixed_query(prompt, query)
if response.status_code == 400:
bt.logging.warning("analyse_prompt_and_fetch_tweets: Try to fix bad tweets Query ===================================================, {response.text}")
response, prompt_analysis = await self.retry_with_fixed_query(prompt=prompt, old_query=prompt_analysis, error=response.text)

self.log_fetched_tweets(result)
return result, prompt_analysis
if response.status_code != 200:
bt.logging.error("Tweets Query ===================================================, {response.text}")
raise Exception(F"analyse_prompt_and_fetch_tweets: {response.text}")

result_json = response.json()
if result_json.get('meta', {}).get('result_count', 0) == 0:
bt.logging.info("analyse_prompt_and_fetch_tweets: No tweets found, attempting next query.")
response, prompt_analysis = await self.retry_with_fixed_query(prompt, old_query=prompt_analysis, is_accuracy=False)
result_json = response.json()

bt.logging.info("Tweets fetched ===================================================")
bt.logging.info(result)
bt.logging.info("================================================================")


return result_json, prompt_analysis
except Exception as e:
return await self.handle_exceptions(e, prompt, query)

bt.logging.error("analyse_prompt_and_fetch_tweets, {e}")
raise e

async def generate_and_analyze_query(self, prompt):
query = await self.generate_query_params_from_prompt(prompt)
prompt_analysis = TwitterPromptAnalysisResult()
Expand All @@ -237,37 +283,14 @@ async def generate_and_analyze_query(self, prompt):
def set_max_results(self, api_params, max_results=10):
api_params['max_results'] = max_results

async def retry_with_fixed_query(self, prompt, query, error= None):
if not error:
new_query = await self.generate_query_params_from_prompt(prompt, is_accuracy=False)
else:
new_query = await self.fix_twitter_query(prompt=prompt, query=query, error=error)
async def retry_with_fixed_query(self, prompt, old_query, error= None, is_accuracy=True):
new_query = await self.fix_twitter_query(prompt=prompt, query=old_query, error=error, is_accuracy=is_accuracy)
prompt_analysis = TwitterPromptAnalysisResult()
prompt_analysis.fill(new_query)
self.set_max_results(prompt_analysis.api_params)
result = self.get_recent_tweets(prompt_analysis.api_params)
return result, prompt_analysis

def log_fetched_tweets(self, result):
bt.logging.info("Tweets fetched ===================================================")
bt.logging.info(result)
bt.logging.info("================================================================")

async def handle_exceptions(self, e, prompt, query):
if hasattr(e, 'status') and e.status == 401:
bt.logging.info("Unauthorized access, check API credentials.")
else:
bt.logging.info(e)
return await self.attempt_fix_and_fetch(prompt=prompt, query=query, error=e)
return [], None

async def attempt_fix_and_fetch(self, prompt, query, error):
try:
return await self.retry_with_fixed_query(prompt, query, error)
except Exception as e:
bt.logging.info(e)
return [], None

@staticmethod
def extract_tweet_id(url: str) -> str:
"""
Expand Down Expand Up @@ -341,4 +364,4 @@ def find_twitter_links(self, text: str) -> List[str]:
# # if len(result) > 0
# print(result)

# client.get_recent_tweets(query_params)
# client.get_recent_tweets(query_params)

0 comments on commit 2215933

Please sign in to comment.