1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| import cv2
import io
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
# Capture vidéo
capture = cv2.VideoCapture(0) # Changez l'index si nécessaire
# Vérifiez que la caméra est ouverte
if not capture.isOpened():
print("Erreur : Impossible d'ouvrir la caméra.")
exit(1)
class CamHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path.endswith('.mjpg'):
self.send_response(200)
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=--jpgboundary')
self.end_headers()
while True:
try:
# Lire une image depuis la caméra
ret, frame = capture.read()
if not ret:
print("Erreur lors de la capture de l'image.")
break
# Convertir l'image en JPEG
_, jpeg = cv2.imencode('.jpg', frame)
image_data = jpeg.tobytes()
# Envoyer l'image au client
self.wfile.write(b"--jpgboundary\r\n")
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', str(len(image_data)))
self.end_headers()
self.wfile.write(image_data)
self.wfile.write(b"\r\n")
time.sleep(0.1) # Ajustez cette valeur si nécessaire
except Exception as e:
print(f"Erreur lors de l'envoi du flux MJPEG : {e}")
break
return
if self.path.endswith('.html'):
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.end_headers()
html = '''
<html>
<head></head>
<body>
<h1>Flux MJPEG</h1>
<img src="/cam.mjpg" alt="Flux de la caméra"/>
</body>
</html>
'''
self.wfile.write(html.encode('utf-8'))
return
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Gère les requêtes dans un thread séparé."""
def run_server():
server = ThreadedHTTPServer(('0.0.0.0', 8081), CamHandler)
print("Serveur démarré à http://127.0.0.1:8081")
server.serve_forever()
if __name__ == '__main__':
# Lancer le serveur dans un thread séparé
server_thread = threading.Thread(target=run_server)
server_thread.start()
try:
while True:
time.sleep(1) # Maintenir le script en cours d'exécution
except KeyboardInterrupt:
print("Arrêt du serveur...")
capture.release()
server_thread.join() |
Partager