import os
import base64
import requests
import numpy as np
import faiss
import re
import logging
from pathlib import Path
# For local development, load environment variables from a .env file.
# In HuggingFace Spaces, secrets are automatically available as environment variables.
from dotenv import load_dotenv
load_dotenv()
from sentence_transformers import SentenceTransformer, CrossEncoder
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
# Optionally import BM25 for sparse retrieval.
try:
from rank_bm25 import BM25Okapi
except ImportError:
BM25Okapi = None
# ---------------------------
# Environment Variables & Setup
# ---------------------------
# GitHub API key (required for GitHub API calls)
GITHUB_API_KEY = os.getenv("GITHUB_API_KEY")
# GROQ API key (if required by ChatGroq)
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
# HuggingFace token (if you need it to load private models from HuggingFace)
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
CROSS_ENCODER_MODEL = os.getenv("CROSS_ENCODER_MODEL", "cross-encoder/ms-marco-MiniLM-L-6-v2")
# Set up a persistent session for GitHub API requests.
session = requests.Session()
session.headers.update({
"Authorization": f"token {GITHUB_API_KEY}",
"Accept": "application/vnd.github.v3+json"
})
# ---------------------------
# Langchain Groq Setup for Search Tag Conversion
# ---------------------------
llm = ChatGroq(
model="deepseek-r1-distill-llama-70b",
temperature=0.3,
max_tokens=512,
max_retries=3,
api_key=GROQ_API_KEY # Pass GROQ_API_KEY if the ChatGroq library supports it.
)
prompt = ChatPromptTemplate.from_messages([
("system",
"""You are a GitHub search optimization expert.
Your job is to:
1. Read a user's query about tools, research, or tasks.
2. Detect if the query mentions a specific programming language other than Python (for example, JavaScript or JS). If so, record that language as the target language.
3. Think iteratively and generate your internal chain-of-thought enclosed in ... tags.
4. After your internal reasoning, output up to five GitHub-style search tags or library names that maximize repository discovery.
Use as many tags as necessary based on the query's complexity, but never more than five.
5. If you detected a non-Python target language, append an additional tag at the end in the format target-[language] (e.g., target-javascript).
If no specific language is mentioned, do not include any target tag.
Output Format:
tag1:tag2[:tag3[:tag4[:tag5[:target-language]]]]
Rules:
- Use lowercase and hyphenated keywords (e.g., image-augmentation, chain-of-thought).
- Use terms commonly found in GitHub repo names, topics, or descriptions.
- Avoid generic terms like "python", "ai", "tool", "project".
- Do NOT use full phrases or vague words like "no-code", "framework", or "approach".
- Prefer real tools, popular methods, or dataset names when mentioned.
- If your output does not strictly match the required format, correct it after your internal reasoning.
- Choose high-signal keywords to ensure the search yields the most relevant GitHub repositories.
Excellent Examples:
Input: "No code tool to augment image and annotation"
Output: image-augmentation:albumentations
Input: "Repos around chain of thought prompting mainly for finetuned models"
Output: chain-of-thought:finetuned-llm
Input: "Find repositories implementing data augmentation pipelines in JavaScript"
Output: data-augmentation:target-javascript
Output must be ONLY the search tags separated by colons. Do not include any extra text, bullet points, or explanations.
"""),
("human", "{query}")
])
chain = prompt | llm
def valid_tags(tags: str) -> bool:
pattern = r'^[a-z0-9-]+(?::[a-z0-9-]+){1,5}$'
return re.match(pattern, tags) is not None
def parse_search_tags(response: str) -> str:
# Remove any text inside ... blocks.
cleaned = re.sub(r'.*?', '', response, flags=re.DOTALL)
pattern = r'([a-z0-9-]+(?::[a-z0-9-]+){1,5})'
match = re.search(pattern, cleaned)
if match:
return match.group(1).strip()
return cleaned.strip()
def iterative_convert_to_search_tags(query: str, max_iterations: int = 2) -> str:
print(f"\n🧠 [iterative_convert_to_search_tags] Input Query: {query}")
refined_query = query
tags_output = ""
for iteration in range(max_iterations):
print(f"\n🔄 Iteration {iteration+1}")
response = chain.invoke({"query": refined_query})
full_output = response.content.strip()
tags_output = parse_search_tags(full_output)
print(f"Output Tags: {tags_output}")
if valid_tags(tags_output):
print("✅ Valid tags format detected.")
return tags_output
else:
print("⚠️ Invalid tags format. Requesting refinement...")
refined_query = f"{query}\nPlease refine your answer so that the output strictly matches the format: tag1:tag2[:tag3[:tag4[:tag5[:target-language]]]]."
print("Final output (may be invalid):", tags_output)
return tags_output
# ---------------------------
# GitHub API Helper Functions
# ---------------------------
def fetch_readme_content(repo_full_name: str) -> str:
readme_url = f"https://api.github.com/repos/{repo_full_name}/readme"
response = session.get(readme_url)
if response.status_code == 200:
readme_data = response.json()
try:
return base64.b64decode(readme_data.get('content', '')).decode('utf-8', errors='replace')
except Exception:
return ""
return ""
def fetch_markdown_contents(repo_full_name: str) -> str:
url = f"https://api.github.com/repos/{repo_full_name}/contents"
response = session.get(url)
contents = ""
if response.status_code == 200:
items = response.json()
for item in items:
if item.get("type") == "file" and item.get("name", "").lower().endswith(".md"):
file_url = item.get("download_url")
if file_url:
file_resp = requests.get(file_url)
if file_resp.status_code == 200:
contents += "\n" + file_resp.text
return contents
def fetch_all_markdown(repo_full_name: str) -> str:
readme = fetch_readme_content(repo_full_name)
other_md = fetch_markdown_contents(repo_full_name)
return readme + "\n" + other_md
def fetch_github_repositories(query: str, max_results: int = 10) -> list:
url = "https://api.github.com/search/repositories"
params = {
"q": query,
"per_page": max_results
}
response = session.get(url, params=params)
if response.status_code != 200:
print(f"Error {response.status_code}: {response.json().get('message')}")
return []
repo_list = []
for repo in response.json().get('items', []):
repo_link = repo.get('html_url')
description = repo.get('description') or ""
combined_markdown = fetch_all_markdown(repo.get('full_name'))
combined_text = (description + "\n" + combined_markdown).strip()
repo_list.append({
"title": repo.get('name', 'No title available'),
"link": repo_link,
"combined_text": combined_text
})
return repo_list
# ---------------------------
# Dense Retrieval Model Setup
# ---------------------------
try:
# If using a GPU-enabled model, the HuggingFace token can be used for private models.
model = SentenceTransformer('all-mpnet-base-v2', device='cuda')
except Exception as e:
print("Error initializing GPU for SentenceTransformer; falling back to CPU:", e)
model = SentenceTransformer('all-mpnet-base-v2', device='cpu')
def robust_min_max_norm(scores: np.ndarray) -> np.ndarray:
min_val = scores.min()
max_val = scores.max()
if max_val - min_val < 1e-10:
return np.ones_like(scores)
return (scores - min_val) / (max_val - min_val)
# ---------------------------
# Cross-Encoder Re-Ranking Function
# ---------------------------
def cross_encoder_rerank_candidates(candidates: list, query: str, model_name: str, top_n: int = 10) -> list:
try:
cross_encoder = CrossEncoder(model_name, device='cuda')
except Exception as e:
print("Error initializing CrossEncoder on GPU; falling back to CPU:", e)
cross_encoder = CrossEncoder(model_name, device='cpu')
CHUNK_SIZE = 2000
MAX_DOC_LENGTH = 5000
MIN_DOC_LENGTH = 200
def split_text(text: str, chunk_size: int = CHUNK_SIZE) -> list:
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
for candidate in candidates:
doc = candidate.get("combined_text", "")
if len(doc) > MAX_DOC_LENGTH:
doc = doc[:MAX_DOC_LENGTH]
try:
if len(doc) < MIN_DOC_LENGTH:
score = cross_encoder.predict([[query, doc]])
candidate["cross_encoder_score"] = float(score[0])
else:
chunks = split_text(doc)
pairs = [[query, chunk] for chunk in chunks]
scores = cross_encoder.predict(pairs)
max_score = np.max(scores) if scores else 0.0
avg_score = np.mean(scores) if scores else 0.0
candidate["cross_encoder_score"] = float(0.5 * max_score + 0.5 * avg_score)
except Exception as e:
logging.error(f"Error scoring candidate {candidate.get('link', 'unknown')}: {e}")
candidate["cross_encoder_score"] = 0.0
all_scores = [candidate["cross_encoder_score"] for candidate in candidates]
if all_scores:
min_score = min(all_scores)
if min_score < 0:
for candidate in candidates:
candidate["cross_encoder_score"] += -min_score
return candidates
# ---------------------------
# Main Ranking Function with Hybrid Retrieval and Combined Scoring
# ---------------------------
def run_repository_ranking(query: str) -> str:
# Step 1: Generate search tags from the query.
search_tags = iterative_convert_to_search_tags(query)
tag_list = [tag.strip() for tag in search_tags.split(":") if tag.strip()]
# Step 2: Handle target language extraction.
if any(tag.startswith("target-") for tag in tag_list):
target_tag = next(tag for tag in tag_list if tag.startswith("target-"))
lang_query = f"language:{target_tag.replace('target-', '')}"
tag_list = [tag for tag in tag_list if not tag.startswith("target-")]
else:
lang_query = "language:python"
# Step 3: Build advanced search qualifiers.
advanced_qualifier = "in:name,description,readme"
all_repositories = []
for tag in tag_list:
github_query = f"{tag} {advanced_qualifier} {lang_query}"
print("GitHub Query:", github_query)
repos = fetch_github_repositories(github_query, max_results=15)
all_repositories.extend(repos)
combined_query = " OR ".join(tag_list)
combined_query = f"({combined_query}) {advanced_qualifier} {lang_query}"
print("Combined GitHub Query:", combined_query)
repos = fetch_github_repositories(combined_query, max_results=15)
all_repositories.extend(repos)
unique_repositories = {}
for repo in all_repositories:
if repo["link"] not in unique_repositories:
unique_repositories[repo["link"]] = repo
else:
existing_text = unique_repositories[repo["link"]]["combined_text"]
unique_repositories[repo["link"]]["combined_text"] = existing_text + "\n" + repo["combined_text"]
repositories = list(unique_repositories.values())
if not repositories:
return "No repositories found for your query."
# Step 4: Prepare documents.
docs = [repo.get("combined_text", "") for repo in repositories]
# Step 5: Dense retrieval.
doc_embeddings = model.encode(docs, convert_to_numpy=True, show_progress_bar=True, batch_size=16)
if doc_embeddings.ndim == 1:
doc_embeddings = doc_embeddings.reshape(1, -1)
norms = np.linalg.norm(doc_embeddings, axis=1, keepdims=True)
norm_doc_embeddings = doc_embeddings / (norms + 1e-10)
query_embedding = model.encode(query, convert_to_numpy=True)
if query_embedding.ndim == 1:
query_embedding = query_embedding.reshape(1, -1)
norm_query_embedding = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
dim = norm_doc_embeddings.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(norm_doc_embeddings)
k = norm_doc_embeddings.shape[0]
D, I = index.search(norm_query_embedding, k)
dense_scores = D.squeeze()
norm_dense_scores = robust_min_max_norm(dense_scores)
# Step 6: BM25 scoring.
if BM25Okapi is not None:
tokenized_docs = [re.findall(r'\w+', doc.lower()) for doc in docs]
bm25 = BM25Okapi(tokenized_docs)
query_tokens = re.findall(r'\w+', query.lower())
bm25_scores = np.array(bm25.get_scores(query_tokens))
norm_bm25_scores = robust_min_max_norm(bm25_scores)
else:
norm_bm25_scores = np.zeros_like(norm_dense_scores)
# Step 7: Combine scores (dense score weighted higher).
alpha = 0.8
combined_scores = alpha * norm_dense_scores + (1 - alpha) * norm_bm25_scores
for idx, repo in enumerate(repositories):
repo["combined_score"] = float(combined_scores[idx])
# Step 8: Initial ranking by combined score.
ranked_repositories = sorted(repositories, key=lambda x: x.get("combined_score", 0), reverse=True)
# Step 9: Compute cross-encoder scores for the top candidates.
top_candidates = ranked_repositories[:100] if len(ranked_repositories) > 100 else ranked_repositories
cross_encoder_rerank_candidates(top_candidates, query, model_name=CROSS_ENCODER_MODEL, top_n=len(top_candidates))
# Combine both metrics: final_score = w1 * combined_score + w2 * cross_encoder_score.
w1 = 0.7
w2 = 0.3
for candidate in top_candidates:
candidate["final_score"] = w1 * candidate.get("combined_score", 0) + w2 * candidate.get("cross_encoder_score", 0)
final_ranked = sorted(top_candidates, key=lambda x: x.get("final_score", 0), reverse=True)[:10]
# Step 10: Format final output with scores as percentages.
output = "\n=== Ranked Repositories ===\n"
for rank, repo in enumerate(final_ranked, 1):
output += f"Final Rank: {rank}\n"
output += f"Title: {repo['title']}\n"
output += f"Link: {repo['link']}\n"
output += f"Combined Score: {repo.get('combined_score', 0) * 100:.2f}%\n"
output += f"Cross-Encoder Score: {repo.get('cross_encoder_score', 0) * 100:.2f}%\n"
output += f"Final Score: {repo.get('final_score', 0) * 100:.2f}%\n"
snippet = repo['combined_text'][:300].replace('\n', ' ')
output += f"Snippet: {snippet}...\n"
output += '-' * 80 + "\n"
output += "\n=== End of Results ==="
return output
# ---------------------------
# Main Entry Point for Testing
# ---------------------------
if __name__ == "__main__":
test_query = "Chain of thought prompting for reasoning models"
result = run_repository_ranking(test_query)
print(result)