Spaces:
Sleeping
Sleeping
File size: 2,520 Bytes
095163f 24a1671 3af55de d5baf5f ba582f4 3af55de 3078761 871ea65 d5baf5f 3af55de 24a1671 3af55de 24a1671 3af55de 5933bbd 24a1671 3af55de 7f2d346 b873140 17349b1 3af55de 24a1671 095163f 3078761 5933bbd 3078761 8afb178 3af55de 9bb5a46 3af55de 095163f 3af55de 871ea65 3af55de 871ea65 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
import asyncio
from crewai import Crew, Process
from textwrap import dedent
import json
from crypto_analysis_agents import CryptoAnalysisAgents
from crypto__analysis_tasks import CryptoAnalysisTasks
class CryptoCrew:
def __init__(self, crypto):
self.crypto = crypto
def run(self):
return asyncio.run(self.run_async())
async def run_async(self):
agents = CryptoAnalysisAgents()
tasks = CryptoAnalysisTasks()
market_analyst_agent = agents.market_analyst()
technical_analyst_agent = agents.technical_analyst()
crypto_advisor_agent = agents.crypto_advisor()
market_research_task = tasks.market_research(market_analyst_agent, self.crypto)
technical_analysis_task = tasks.technical_analysis(technical_analyst_agent, self.crypto)
sentiment_analysis_task = tasks.sentiment_analysis(market_analyst_agent, self.crypto)
recommend_task = tasks.recommend(crypto_advisor_agent, self.crypto)
crew = Crew(
agents=[
market_analyst_agent,
technical_analyst_agent,
crypto_advisor_agent
],
tasks=[
market_research_task,
technical_analysis_task,
sentiment_analysis_task,
recommend_task
],
verbose=True,
process=Process.sequential,
max_iterations=100,
task_timeout=600
)
try:
result = await asyncio.to_thread(crew.kickoff)
return self.parse_result(result)
except Exception as e:
return {"summary": f"Analysis failed: {str(e)}. Please try again."}
def parse_result(self, result):
parsed = {
"summary": str(result),
"sentiment": {
"overall": "Neutral",
"social_media": "Neutral",
"news": "Neutral",
"community": "Neutral"
}
}
# Add your parsing logic here
return parsed
if __name__ == "__main__":
print("## Welcome to Crypto Analysis Crew")
print('-------------------------------')
crypto = input(dedent("""
What is the cryptocurrency you want to analyze?
"""))
crypto_crew = CryptoCrew(crypto)
result = crypto_crew.run()
print("\n\n########################")
print("## Here is the Report")
print("########################\n")
print(json.dumps(result, indent=2)) |