import time
import copy
import os
from flask import Flask, render_template, request
import json
from main import bing_serach, extract_web
import asyncio
import requests
from langchain_text_splitters import RecursiveCharacterTextSplitter
import numpy as np
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/') # Replace with your MongoDB URI
# Create or access a database
db = client['webdata'] # Replace 'my_database' with your database name
collection = db['data']
def cosine_similarity(vec1, vec2):
# Compute the dot product between the two vectors
dot_product = np.dot(vec1, vec2)
# Compute the magnitude (norm) of the vectors
norm_vec1 = np.linalg.norm(vec1)
norm_vec2 = np.linalg.norm(vec2)
# Compute cosine similarity
similarity = dot_product / (norm_vec1 * norm_vec2)
return similarity
def percentage_similarity(vec1, vec2):
# Get cosine similarity
try:
cosine_sim = cosine_similarity(vec1, vec2)
# Convert cosine similarity to percentage similarity
percentage_sim = (cosine_sim + 1) / 2 * 100 # Shift range from [-1,1] to [0,100]
return percentage_sim
except:
return 0
text_splitter = RecursiveCharacterTextSplitter(
# Set a really small chunk size, just to show.
chunk_size=2000,
chunk_overlap=100,
length_function=len,
is_separator_regex=False,
)
app = Flask(__name__)
@app.route("/status", methods=['GET'])
def status():
return "OK"
@app.route("/", methods=['GET','POST'])
def websearch():
try:
if request.args.get('q'):
query = request.args.get('q')
ifextract = request.args.get('ifextract')
try:
start = int(request.args.get('start'))
except:
start = 0
if ifextract == '1':
return asyncio.run(bing_serach(query,collection,ifextract=True,start=start))
elif ifextract == '0':
return asyncio.run(bing_serach(query,collection,ifextract=False,start=start))
else:
return '
Invalid Value of ifextract
it can Two Value either 0 or 1
for 1 it will provide Webpage Extracted'
else:
return 'Enter Valid Query
GET parameters
1. q(query) = Search query in quote_plus ex: Is+Mango+Sweet
1. ifextract(ifextract) = 0,1 for 1 it will provide extracted webpage for suitable websites
2. startIndex(start) =Optional Ender the start index of search query'
except Exception as e:
return {'type':'error','message':'Unexpected Error',"detail":str(e)}
@app.route("/adv",methods=["POST","GET"])
def adv_make():
global collection
args = request.get_json()
if all(key in args for key in ['long_query', 'short_query']):
short_query = args["short_query"]
dataz = asyncio.run(bing_serach(short_query, collection, ifextract=True))
data = dataz['result']
with open("r.json",'w') as f:
f.write(json.dumps(data,indent=4))
toembed = [z['webpage'] for z in data if "embedding_data" not in z and z['webpage'] != "Some Error while Extracting"]
# Split these documents into chunks
toemb = [text_splitter.create_documents([z]) for z in toembed]
# Flatten the document chunks
toembz = [sublist.page_content for z in toemb for sublist in z]
print("Length of Documents")
print(len(toembz))
if(len(toembz) > 0):
data_to_send = {
"text":toembz
}
embedding = requests.post("https://mangoman7002-flash-embedding.hf.space",json=data_to_send)
if(embedding.status_code != 200):
return json.dumps({"type":"error","message":f"error With API {str(embedding.status_code)}"},indent=4)
embedding = embedding.json()
else:
embedding = {'result':[]}
data_to_send = {
"text":[args['long_query']]
}
query_embedding = requests.post("https://mangoman7002-flash-embedding.hf.space",json=data_to_send)
if(query_embedding.status_code != 200):
return json.dumps({"type":"error","message":f"error With API {str(embedding.status_code)}"},indent=4)
query_embedding = query_embedding.json()
results = embedding['result']
current_index=0
embedding_index = 0
for index,value in enumerate(dataz['result']):
if("embedding_data" in dataz['result'][index] and dataz['result'][index]['webpage'] != "Some Error while Extracting"):
pass
elif(dataz['result'][index]['webpage'] != "Some Error while Extracting"):
em_vector = results[embedding_index:embedding_index+len(toemb[current_index])]
embedding_index+=len(toemb[current_index])
dataz['result'][index]['embedding_data'] = em_vector
current_index+=1
else:
pass
final_results = []
for z in range(len(dataz['result'])):
thisdata = copy.deepcopy(dataz['result'][z])
# data['result'][z].pop("embedding")
collection.update_one({"URL":thisdata['URL']},{"$set":thisdata})
for z in copy.deepcopy(dataz['result']):
try:
for a in copy.deepcopy(z['embedding_data']):
results.append(a)
except:
pass
results = copy.deepcopy(results)
for thisr in results:
thisr['similairy'] = percentage_similarity(thisr['embedding'],query_embedding['result'][0]['embedding'])
final_results.append(thisr)
final_results = [z for z in final_results if z['similairy'] > 80]
final_results = sorted(final_results,key=lambda x:x['similairy'],reverse=True )
remove_embedding = [z.pop("embedding") for z in final_results]
dataz['extracts'] = final_results
return dataz
else:
return(json.dumps({"type":'error','message':"long_query and short_query is not in request"},indent=4))
@app.route("/webpage",methods=["POST","GET"])
def webpage():
global collection
args = request.get_json()
url = args.get("url",None)
if(url == None):
return(json.dumps({'type':'error','message':'url is not provided'},indent=4))
else:
previous_data = collection.find_one({"URL":url})
if(previous_data is None):
result = {}
result['URL'] = url
result['time'] = time.time()
result['webpage'] = asyncio.run(extract_web(result))
else:
time_change = time.time() - previous_data['time']
if(time_change < 86400):
result = previous_data
else:
result = {}
result['time'] = time.time()
result['URL'] = url
result['webpage'] = asyncio.run(extract_web(result))
if("embedding_data" not in result and result['webpage'] != "Some Error while Extracting"):
toemb = text_splitter.create_documents([result['webpage']])
toembz = [z.page_content for z in toemb]
data_to_send = {
"text":toembz
}
embedding = requests.post("https://mangoman7002-flash-embedding.hf.space",json=data_to_send)
if(embedding.status_code != 200):
return json.dumps({"type":"error","message":f"error With API {str(embedding.status_code)}"},indent=4)
embedding = embedding.json()
result['embedding_data'] = embedding['result']
try:
result.pop("_id")
except:
pass
return(json.dumps(result))
if __name__ == '__main__':
app.run(debug=False)