CaesarAI / sendweb.py
CaesarCloudSync
CaesarAI Deployed
9d3162f
import cv2
import numpy as np
import requests
import base64
import time
import websockets
import asyncio
import cv2
import json
import os
import imutils
class CaesarSendWeb:
@classmethod
def send_video_https(self,uri = "http://192.168.0.10:7860/caesarobjectdetect"):
cap = cv2.VideoCapture(0)
while True:
_, image = cap.read()
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
valresp = response.json()
imagebase64 = np.array(valresp["frame"])
image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3)
cv2.imshow("image", image)
if ord("q") == cv2.waitKey(1):
break
cap.release()
cv2.destroyAllWindows()
@classmethod
def send_video_websocket(self,uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws',jsondata=None,phone=False):
if phone == False:
camera = cv2.VideoCapture(0)
async def main():
# Connect to the server
#uri = 'wss://palondomus-caesarai.hf.space/sendvideows'
#uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws'
async with websockets.connect(uri) as ws:
while True:
if phone == False:
success, frame = camera.read()
elif phone == True:
img_resp = requests.get("http://192.168.0.14:8080/shot.jpg")
frame_arr = np.array(bytearray(img_resp.content),dtype=np.uint8)
frame = cv2.imdecode(frame_arr,-1)
frame = imutils.resize(frame,width=500,height=600)
success = True
#print(frame.shape)
#1080, 1920, 3)
#print(success)
#print(frame.shape)
if not success:
break
else:
#print("hi")
ret, buffer = cv2.imencode('.png', frame)
await ws.send(buffer.tobytes())
if jsondata:
await ws.send(json.dumps(jsondata))
contents = await ws.recv()
if type(contents) == bytes:
arr = np.frombuffer(contents, np.uint8)
frameobj = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
cv2.imshow('frame',frameobj)
cv2.waitKey(1)
if type(contents) == str:
print(json.loads(contents))
asyncio.run(main())
@classmethod
def send_image_recieve_text(self,uri ="http://192.168.0.10:7860/caesarocr",showimage=False):
cap = cv2.VideoCapture(0)
_, image = cap.read()
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
messageresp = response.json()
if showimage == True:
cv2.imshow('frame',image)
cv2.waitKey(0)
# closing all open windows
cv2.destroyAllWindows()
return messageresp
@classmethod
def send_image_recieve_image(self,uri ="http://192.168.0.10:7860/caesarfacesnap",showimage=False,saveimage=False):
cap = cv2.VideoCapture(0)
_, image = cap.read()
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
valresp = response.json()
if valresp["frame"] == "no face was detected.":
print("No face was detected.")
if valresp["frame"] != "no face was detected.":
imagebase64 = np.array(valresp["frame"])
image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3)
if showimage == True:
cv2.imshow('frame',image)
cv2.waitKey(0)
# closing all open windows
cv2.destroyAllWindows()
if saveimage == True:
filename = "croppedimage.png"
count = 0
while filename in os.listdir("CaesarFaceDetection/FaceDataPoints/"):
filename = f"{filename.replace('.png','').replace(f'{count-1}','')}{count}.png"
count +=1
cv2.imwrite(f"CaesarFaceDetection/FaceDataPoints/{filename}", image)
return image
if __name__ == "__main__":
#success, frame = camera.read()
#yolo_uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws'
#CaesarSendWeb.send_video_websocket(uri = yolo_uri)
#video_uri = 'wss://palondomus-caesarai.hf.space/sendvideows'
#CaesarSendWeb.send_video_websocket(uri = video_uri)
#extraction_uri = "wss://palondomus-caesarai.hf.space/caesarocrextractionws"
#CaesarSendWeb.send_video_websocket(uri = extraction_uri,jsondata={"target_words":["your","brain","power"]})
#ocrws_uri = "wss://palondomus-caesarai.hf.space/caesarocrws"
#CaesarSendWeb.send_video_websocket(uri = ocrws_uri)
#ocr_uri = "http://192.168.0.10:7860/caesarocr"
#message = CaesarSendWeb.send_image_recieve_text(uri = ocr_uri,showimage=True)
#print(message)
facedetect_uri = "wss://palondomus-caesarai.hf.space/caesarfacedetectws"#"wss://palondomus-caesarai.hf.space/caesarfacedetectws"
CaesarSendWeb.send_video_websocket(uri = facedetect_uri,phone=True)
#facesnap_uri = "http://192.168.0.10:7860/caesarfacesnap"
#cropped_image = CaesarSendWeb.send_image_recieve_image(uri = facesnap_uri,saveimage=True)
#print(cropped_image)
#https_uri = 'http://192.168.0.10:7860/caesarobjectdetect'
#CaesarSendWeb.send_video_https(uri = https_uri)