|
|
|
import os |
|
import asyncio |
|
import base64 |
|
import io |
|
import traceback |
|
|
|
import cv2 |
|
import pyaudio |
|
import PIL.Image |
|
import mss |
|
import gradio as gr |
|
|
|
from google import genai |
|
from google.genai import types |
|
|
|
|
|
FORMAT = pyaudio.paInt16 |
|
CHANNELS = 1 |
|
SEND_SAMPLE_RATE = 16000 |
|
RECEIVE_SAMPLE_RATE = 24000 |
|
CHUNK_SIZE = 1024 |
|
|
|
MODEL = "models/gemini-2.0-flash-live-001" |
|
|
|
|
|
client = genai.Client( |
|
http_options={"api_version": "v1beta"}, |
|
api_key=os.environ.get("GEMINI_API_KEY"), |
|
) |
|
|
|
|
|
CONFIG = types.LiveConnectConfig( |
|
response_modalities=["audio"], |
|
speech_config=types.SpeechConfig( |
|
voice_config=types.VoiceConfig( |
|
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck") |
|
) |
|
), |
|
system_instruction=types.Content( |
|
parts=[types.Part.from_text(text="You are Puck..." )] |
|
), |
|
) |
|
|
|
|
|
class AudioLoop: |
|
def __init__(self, mode="camera"): |
|
self.mode = mode |
|
self.audio_in_queue = None |
|
self.out_queue = None |
|
self.session = None |
|
|
|
async def _get_frame(self, cap): |
|
ret, frame = cap.read() |
|
if not ret: |
|
return None |
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
img = PIL.Image.fromarray(frame_rgb) |
|
img.thumbnail((640, 480)) |
|
buf = io.BytesIO() |
|
img.save(buf, format="JPEG") |
|
return buf.getvalue() |
|
|
|
async def _video_stream(self): |
|
cap = await asyncio.to_thread(cv2.VideoCapture, 0) |
|
try: |
|
while True: |
|
frame = await self._get_frame(cap) |
|
if frame is None: |
|
break |
|
await self.out_queue.put({"mime_type": "image/jpeg", "data": base64.b64encode(frame).decode()}) |
|
await asyncio.sleep(0.1) |
|
finally: |
|
cap.release() |
|
|
|
async def _audio_stream(self): |
|
mic_info = pya.get_default_input_device_info() |
|
stream = await asyncio.to_thread( |
|
pyaudio.PyAudio().open, |
|
format=FORMAT, |
|
channels=CHANNELS, |
|
rate=SEND_SAMPLE_RATE, |
|
input=True, |
|
input_device_index=mic_info['index'], |
|
frames_per_buffer=CHUNK_SIZE, |
|
) |
|
while True: |
|
data = await asyncio.to_thread(stream.read, CHUNK_SIZE, False) |
|
await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) |
|
|
|
async def send_realtime(self): |
|
while True: |
|
msg = await self.out_queue.get() |
|
await self.session.send(input=msg) |
|
|
|
async def receive_audio(self): |
|
while True: |
|
turn = self.session.receive() |
|
async for response in turn: |
|
if data := response.data: |
|
yield (None, data) |
|
if text := response.text: |
|
yield (text, None) |
|
|
|
async def run(self): |
|
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session: |
|
self.session = session |
|
self.audio_in_queue = asyncio.Queue() |
|
self.out_queue = asyncio.Queue(maxsize=5) |
|
|
|
tasks = [] |
|
tasks.append(asyncio.create_task(self._audio_stream())) |
|
if self.mode == "camera": |
|
tasks.append(asyncio.create_task(self._video_stream())) |
|
tasks.append(asyncio.create_task(self.send_realtime())) |
|
|
|
async for text, audio in self.receive_audio(): |
|
yield text, audio |
|
|
|
for t in tasks: |
|
t.cancel() |
|
|
|
|
|
async def chat(mode="camera"): |
|
"""Starts a live chat session and yields (text, audio) tuples as they arrive.""" |
|
loop = AudioLoop(mode=mode) |
|
async for t, a in loop.run(): |
|
yield t, a |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Gemini Live API Web Chat\nUse your microphone and camera directly from the browser.") |
|
mode = gr.Radio(choices=["camera", "screen", "none"], value="camera", label="Video Source") |
|
chatbot = gr.Chatbot() |
|
with gr.Row(): |
|
start = gr.Button("Start") |
|
stop = gr.Button("Stop") |
|
start.click(lambda m: chat(m), inputs=[mode], outputs=[chatbot], _js="(fn, inputs) => {fn(inputs).then(data => console.log(data));}") |
|
demo.launch(server_name="0.0.0.0", share=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|