# ejemplo requiere la biblioteca websocket-client:
# pip instala el cliente websocket
importar sistema operativo
importar json
importar websocket
URL = "ws://yossapi.com/v1/realtime?model=gpt-4o-realtime-preview"
encabezados = [
"Autorización: Portador sk-xxx",
"OpenAI-Beta: tiempo real=v1"
]
def decode_unicode_string(s):
"""Decodifica cadenas Unicode para que el chino sea legible"""
prueba:
# Primero intenta decodificar directamente
si es instancia(s, bytes):
devolver s.decode('utf-8')
# Si es una cadena confusa, codifíquela primero y luego decodifíquela.
devolver s.encode('latin1').decode('utf-8')
excepto excepción como e:
print(f"Error de decodificación: {e}")
regresar
def on_error(ws, error):
imprimir("\n=== mensaje de error ===")
imprimir(f"{error}")
def on_close(ws, close_status_code, close_msg):
print("\n=== conexión cerrada ===")
print(f"Código de estado: {close_status_code}")
print(f"Cerrar mensaje: {close_msg}")
def on_open(ws):
print("\n=== Conexión establecida ===")
evento = {
"tipo": "respuesta.crear",
"respuesta": {
"modalidades": ["texto"],
"instructions": "Por favor ayude al usuario."
}
}
print("\n=== Enviar mensaje ===")
print(json.dumps(evento, asegurar_ascii=Falso, sangría=2))
ws.send(json.dumps(evento))
def on_message(ws, mensaje):
print("\n=== mensaje recibido ===")
prueba:
datos = json.loads (mensaje)
# Si hay un mensaje de error, decodifica y muestra chino
si 'error' en datos y 'mensaje' en datos['error']:
datos['error']['mensaje'] = decode_unicode_string(datos['error']['mensaje'])
print(json.dumps(datos, asegurar_ascii=Falso, sangría=2))
# Si es un mensaje de error, mostrar adicionalmente una versión simplificada
si data.get('tipo') == 'error':
print("\n=== Resumen de errores ===")
print(f"Tipo de error: {datos['error'].get('tipo', 'desconocido')}")
print(f"Código de error: {datos['error'].get('código', 'desconocido')}")
mensaje_error = decode_unicode_string(datos['error'].get('mensaje', 'desconocido'))
print(f"Mensaje de error: {mensaje_error}")
excepto json.JSONDecodeError como e:
print(f"Error al analizar el mensaje: {mensaje}")
imprimir(f"Error: {e}")
# Habilitar registros de websocket, pero solo mostrar información clave
websocket.enableTrace (Falso)
ws = websocket.WebSocketApp(
dirección URL,
encabezado = encabezados,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
print("\n=== Iniciar conexión ===")
print(f"Dirección de conexión: {url}")
ws.run_forever()