Crypto-Analyst / main.py
menikev's picture
Update main.py
17349b1 verified
import asyncio
from crewai import Crew, Process
from textwrap import dedent
import json
from crypto_analysis_agents import CryptoAnalysisAgents
from crypto__analysis_tasks import CryptoAnalysisTasks
class CryptoCrew:
def __init__(self, crypto):
self.crypto = crypto
def run(self):
return asyncio.run(self.run_async())
async def run_async(self):
agents = CryptoAnalysisAgents()
tasks = CryptoAnalysisTasks()
market_analyst_agent = agents.market_analyst()
technical_analyst_agent = agents.technical_analyst()
crypto_advisor_agent = agents.crypto_advisor()
market_research_task = tasks.market_research(market_analyst_agent, self.crypto)
technical_analysis_task = tasks.technical_analysis(technical_analyst_agent, self.crypto)
sentiment_analysis_task = tasks.sentiment_analysis(market_analyst_agent, self.crypto)
recommend_task = tasks.recommend(crypto_advisor_agent, self.crypto)
crew = Crew(
agents=[
market_analyst_agent,
technical_analyst_agent,
crypto_advisor_agent
],
tasks=[
market_research_task,
technical_analysis_task,
sentiment_analysis_task,
recommend_task
],
verbose=True,
process=Process.sequential,
max_iterations=100,
task_timeout=600
)
try:
result = await asyncio.to_thread(crew.kickoff)
return self.parse_result(result)
except Exception as e:
return {"summary": f"Analysis failed: {str(e)}. Please try again."}
def parse_result(self, result):
parsed = {
"summary": str(result),
"sentiment": {
"overall": "Neutral",
"social_media": "Neutral",
"news": "Neutral",
"community": "Neutral"
}
}
# Add your parsing logic here
return parsed
if __name__ == "__main__":
print("## Welcome to Crypto Analysis Crew")
print('-------------------------------')
crypto = input(dedent("""
What is the cryptocurrency you want to analyze?
"""))
crypto_crew = CryptoCrew(crypto)
result = crypto_crew.run()
print("\n\n########################")
print("## Here is the Report")
print("########################\n")
print(json.dumps(result, indent=2))