""" ## Documentation Quickstart: https://github.com/google-gemini/cookbook/blob/main/quickstarts/Get_started_LiveAPI.py ## Setup To install the dependencies for this script, run: ``` pip install google-genai opencv-python pyaudio pillow mss ``` """ import os import asyncio import base64 import io import traceback import cv2 import pyaudio import PIL.Image import mss import argparse from google import genai from google.genai import types FORMAT = pyaudio.paInt16 CHANNELS = 1 SEND_SAMPLE_RATE = 16000 RECEIVE_SAMPLE_RATE = 24000 CHUNK_SIZE = 1024 MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025" DEFAULT_MODE = "camera" client = genai.Client( http_options={"api_version": "v1beta"}, api_key=os.environ.get("GEMINI_API_KEY"), ) CONFIG = types.LiveConnectConfig( response_modalities=[ "AUDIO", ], media_resolution="MEDIA_RESOLUTION_MEDIUM", speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr") ) ), context_window_compression=types.ContextWindowCompressionConfig( trigger_tokens=25600, sliding_window=types.SlidingWindow(target_tokens=12800), ), system_instruction=types.Content( parts=[types.Part.from_text(text="You are a stromg of mind AI who says it as it is")], role="user" ), ) pya = pyaudio.PyAudio() class AudioLoop: def __init__(self, video_mode=DEFAULT_MODE): self.video_mode = video_mode self.audio_in_queue = None self.out_queue = None self.session = None self.send_text_task = None self.receive_audio_task = None self.play_audio_task = None async def send_text(self): while True: text = await asyncio.to_thread( input, "message > ", ) if text.lower() == "q": break await self.session.send(input=text or ".", end_of_turn=True) def _get_frame(self, cap): # Read the frameq ret, frame = cap.read() # Check if the frame was read successfully if not ret: return None # Fix: Convert BGR to RGB color space # OpenCV captures in BGR but PIL expects RGB format # This prevents the blue tint in the video feed frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = PIL.Image.fromarray(frame_rgb) # Now using RGB frame img.thumbnail([1024, 1024]) image_io = io.BytesIO() img.save(image_io, format="jpeg") image_io.seek(0) mime_type = "image/jpeg" image_bytes = image_io.read() return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} async def get_frames(self): # This takes about a second, and will block the whole program # causing the audio pipeline to overflow if you don't to_thread it. cap = await asyncio.to_thread( cv2.VideoCapture, 0 ) # 0 represents the default camera while True: frame = await asyncio.to_thread(self._get_frame, cap) if frame is None: break await asyncio.sleep(1.0) await self.out_queue.put(frame) # Release the VideoCapture object cap.release() def _get_screen(self): sct = mss.mss() monitor = sct.monitors[0] i = sct.grab(monitor) mime_type = "image/jpeg" image_bytes = mss.tools.to_png(i.rgb, i.size) img = PIL.Image.open(io.BytesIO(image_bytes)) image_io = io.BytesIO() img.save(image_io, format="jpeg") image_io.seek(0) image_bytes = image_io.read() return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} async def get_screen(self): while True: frame = await asyncio.to_thread(self._get_screen) if frame is None: break await asyncio.sleep(1.0) await self.out_queue.put(frame) async def send_realtime(self): while True: msg = await self.out_queue.get() await self.session.send(input=msg) async def listen_audio(self): mic_info = pya.get_default_input_device_info() self.audio_stream = await asyncio.to_thread( pya.open, format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE, input=True, input_device_index=mic_info["index"], frames_per_buffer=CHUNK_SIZE, ) if __debug__: kwargs = {"exception_on_overflow": False} else: kwargs = {} while True: data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs) await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) async def receive_audio(self): "Background task to reads from the websocket and write pcm chunks to the output queue" while True: turn = self.session.receive() async for response in turn: if data := response.data: self.audio_in_queue.put_nowait(data) continue if text := response.text: print(text, end="") # If you interrupt the model, it sends a turn_complete. # For interruptions to work, we need to stop playback. # So empty out the audio queue because it may have loaded # much more audio than has played yet. while not self.audio_in_queue.empty(): self.audio_in_queue.get_nowait() async def play_audio(self): stream = await asyncio.to_thread( pya.open, format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE, output=True, ) while True: bytestream = await self.audio_in_queue.get() await asyncio.to_thread(stream.write, bytestream) async def run(self): try: async with ( client.aio.live.connect(model=MODEL, config=CONFIG) as session, asyncio.TaskGroup() as tg, ): self.session = session self.audio_in_queue = asyncio.Queue() self.out_queue = asyncio.Queue(maxsize=5) send_text_task = tg.create_task(self.send_text()) tg.create_task(self.send_realtime()) tg.create_task(self.listen_audio()) if self.video_mode == "camera": tg.create_task(self.get_frames()) elif self.video_mode == "screen": tg.create_task(self.get_screen()) tg.create_task(self.receive_audio()) tg.create_task(self.play_audio()) await send_text_task raise asyncio.CancelledError("User requested exit") except asyncio.CancelledError: pass except ExceptionGroup as EG: self.audio_stream.close() traceback.print_exception(EG) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--mode", type=str, default=DEFAULT_MODE, help="pixels to stream from", choices=["camera", "screen", "none"], ) args = parser.parse_args() main = AudioLoop(video_mode=args.mode) asyncio.run(main.run())