Spaces:
Sleeping
Sleeping
import asyncio | |
from crewai import Crew, Process | |
from textwrap import dedent | |
import json | |
from crypto_analysis_agents import CryptoAnalysisAgents | |
from crypto__analysis_tasks import CryptoAnalysisTasks | |
class CryptoCrew: | |
def __init__(self, crypto): | |
self.crypto = crypto | |
def run(self): | |
return asyncio.run(self.run_async()) | |
async def run_async(self): | |
agents = CryptoAnalysisAgents() | |
tasks = CryptoAnalysisTasks() | |
market_analyst_agent = agents.market_analyst() | |
technical_analyst_agent = agents.technical_analyst() | |
crypto_advisor_agent = agents.crypto_advisor() | |
market_research_task = tasks.market_research(market_analyst_agent, self.crypto) | |
technical_analysis_task = tasks.technical_analysis(technical_analyst_agent, self.crypto) | |
sentiment_analysis_task = tasks.sentiment_analysis(market_analyst_agent, self.crypto) | |
recommend_task = tasks.recommend(crypto_advisor_agent, self.crypto) | |
crew = Crew( | |
agents=[ | |
market_analyst_agent, | |
technical_analyst_agent, | |
crypto_advisor_agent | |
], | |
tasks=[ | |
market_research_task, | |
technical_analysis_task, | |
sentiment_analysis_task, | |
recommend_task | |
], | |
verbose=True, | |
process=Process.sequential, | |
max_iterations=100, | |
task_timeout=600 | |
) | |
try: | |
result = await asyncio.to_thread(crew.kickoff) | |
return self.parse_result(result) | |
except Exception as e: | |
return {"summary": f"Analysis failed: {str(e)}. Please try again."} | |
def parse_result(self, result): | |
parsed = { | |
"summary": str(result), | |
"sentiment": { | |
"overall": "Neutral", | |
"social_media": "Neutral", | |
"news": "Neutral", | |
"community": "Neutral" | |
} | |
} | |
# Add your parsing logic here | |
return parsed | |
if __name__ == "__main__": | |
print("## Welcome to Crypto Analysis Crew") | |
print('-------------------------------') | |
crypto = input(dedent(""" | |
What is the cryptocurrency you want to analyze? | |
""")) | |
crypto_crew = CryptoCrew(crypto) | |
result = crypto_crew.run() | |
print("\n\n########################") | |
print("## Here is the Report") | |
print("########################\n") | |
print(json.dumps(result, indent=2)) |