Visión por computadora como sensor de movimiento para SmartThings
Usando una Raspberry Pi 3 y una PiCam, este sensor de visión por computadora detecta rostros y envía datos de presencia a través de LAN - UPNP a SmartThings.
Comenzaré asumiendo que tiene una Raspberry Pi 3 con una cámara que funciona y Open CV instalado. Si no, te recomiendo este tutorial 🙂
Creación de un controlador de dispositivo SmartThings personalizado
En SmartThings IDE, creamos un nuevo controlador de dispositivo para nuestro sensor de movimiento de visión por computadora.
Vaya a la sección "Mi controlador de dispositivo", haga clic en "+ Crear nuevo controlador de dispositivo" en la esquina superior derecha.
En este caso lo crearemos a partir de código. Haga clic en la segunda pestaña "Desde el código", pegue el código adjunto "Administrador de dispositivos" y haga clic en "Crear".
En la página siguiente, haga clic en "Publicar" para que el dispositivo esté disponible para usted.
Cómo escribir la aplicación SmartThings
Al igual que en el controlador de dispositivos, iremos a la sección "Mis SmartApps", haga clic en "+ Crear nuevas SmartApps" en la esquina superior derecha.
También lo crearemos a partir del código. Haga clic en la segunda pestaña "Desde Código", pegue el código adjunto " SmartApp" y haga clic en "Crear".
En la página siguiente, haga clic en "Publicar".
Preparando la Raspberry Pi
Ahora tenemos que agregar el script de Python que obtendrá imágenes de la cámara, detectando rostros y reportando a SmartThings.
Primero d Cargue e instale imutils y twisted
Si aún no tiene el paquete imutils instalado, querrá obtenerlo de GitHub o instalarlo a través de:
pip install imutils
Para retorcido:
sudo apt-get install python-twisted-web
Ahora que todo está listo, ve a / home / pi y crea un directorio para almacenar el script
cámara mkdir cameracd
Cree el archivo de secuencia de comandos:
sudo nano ssdpcamera.py
Pega el código adjunto " Secuencia de comandos de Python de la cámara ” y guarde presionando “control + x” y luego “y” e ingrese.
Pruebe el script escribiendo python ssdpcamera.py, debería ver algo como esto:
Descubriendo y emparejando la Raspberry Pi
Desde la aplicación móvil SmartThings debemos ir a Marketplace en la esquina inferior derecha, hacer clic en la pestaña "SmartApps" y finalmente buscar en "+ Mis aplicaciones" para "Computer Vision (Connect)"
Asegúrese de que la Raspberry Pi esté encendida y la secuencia de comandos de Python se esté ejecutando.
La SmartApp iniciará el proceso de descubrimiento y, una vez encontrada, haga clic en el cuadro de diálogo de selección, seleccione el dispositivo y haga clic en "Listo".
Esto creará el dispositivo en su cuenta y comenzará a recibir actualizaciones.
Inicio automático
Finalmente, si desea ejecutar el script de Python automáticamente cuando enciende la Raspberry Pi, puede editar /etc/rc.local y agregar la siguiente línea.
(dormir 10; python /home/pi/camera/ssdpcamera.py)&
El () hace que ambos comandos se ejecuten en segundo plano.
Código
#! / usr / bin / python2.7 "" "Cámara de visión por computadora para SmartThings Copyright 2016 Juan Pablo Risso <[email protected]> Dependencias:python-twisted, cv2, pyimagesearch Con licencia bajo la Licencia Apache, Versión 2.0 (la "Licencia"); no puede usar este archivo excepto en cumplimiento con la Licencia. Puede obtener una copia de la Licencia en:http://www.apache.org/licenses/LICENSE-2.0A menos que requerido por la ley aplicable o acordado por escrito, el software distribuido bajo la Licencia se distribuye "TAL CUAL", SIN GARANTÍAS NI CONDICIONES DE NINGÚN TIPO, ya sean expresas o implícitas. Consulte la Licencia para conocer el lenguaje específico que rige los permisos y limitaciones de la Licencia. "" "importar argparseimport loggingimport cv2import urllib2import imutilsfrom time import timefrom picamera.array import PiRGBArrayfrom picamera import PiCamerafrom twisted.web import server, resourcefrom twisted.internet import reactorfrom twisted.internet diferir la importación éxito de twisted.inter net.protocol import DatagramProtocolf de twisted.web.client import Agentfrom twisted.web.http_headers import Headersfrom twisted.web.iweb import IBodyProducer de twisted.web.newclient import ResponseFailed de zope.interface import implements -9220-11e4-96fa-123b93f75cba'SEARCH_RESPONSE ='HTTP / 1.1 200 OK \ r \ nCACHE-CONTROL:max-age =30 \ r \ nEXT:\ r \ nLOCATION:% s \ r \ nSERVER:Linux, UPnP / 1.0, Pi_Garage / 1.0 \ r \ nST:% s \ r \ nUSN:uuid:% s ::% s '# inicializa la cámara y toma una referencia a la cámara sin procesar # capturecamera =PiCamera () camera.resolution =(640 , 480) camera.framerate =32rawCapture =PiRGBArray (camera, size =(640, 480)) auxcount =0 # construye el detector facial y deja que la cámara se caliente upfd =FaceDetector ("cascades / haarcascade_frontalface_default.xml") time.sleep (0.1) def determine_ip_for_host (host):"" "Determine la dirección IP local utilizada para comunicarse con un host en particular" "" test_sock =DatagramProtocol () test_sock_listener =reactor.lis tenUDP (0, test_sock) # pylint:disable =no-membertest_sock.transport.connect (host, 1900) my_ip =test_sock.transport.getHost (). hosttest_sock_listener.stopListening () return my_ipclass StringProducer (objeto):"" "Escribe un cadena en memoria a una solicitud Twisted "" "implementa (IBodyProducer) def __init __ (self, body):self.body =bodyself.length =len (body) def startProducing (self, consumer):# pylint:disable =invalid- nombre "" "Comienza a producir la cadena suministrada para el consumidor especificado" "" consumer.write (self.body) return exitoso (Ninguno) def pauseProducing (self):# pylint:disable =invalid-name "" "Pausa de producción - no op "" "passdef stopProducing (self):# pylint:disable =invalid-name" "" Dejar de producir - no op "" "passclass SSDPServer (DatagramProtocol):" "" Recibir y responder a solicitudes de descubrimiento de M-SEARCH desde SmartThings hub " "" def __init __ (self, interface ='', status_port =0, device_target =''):self.interface =interfaceself.device_target =device_targetself.status_port =status_portself.port =reactor.listenMulticas t (SSDP_PORT, self, listenMultiple =True) # pylint:disable =no-memberself.port.joinGroup (SSDP_ADDR, interface =interface) reactor.addSystemEventTrigger ('before', 'shutdown', self.stop) # pylint:disable =no-memberdef datagramReceived (self, data, (host, port)):try:header, _ =data.split (b '\ r \ n \ r \ n') [:2] excepto ValueError:returnlines =header.split ('\ r \ n') cmd =lines.pop (0) .split ('') líneas =[x.replace (':', ':', 1) para x en líneas] líneas =[x para x en líneas si len (x)> 0] encabezados =[x.split (':', 1) para x en líneas] encabezados =dict ([(x [0] .lower (), x [1]) para x en encabezados]) logging.debug ('Comando SSDP% s% s - de% s:% d con encabezados% s', cmd [0], cmd [1], host, puerto, encabezados) search_target ='' if ' st 'en encabezados:search_target =headers [' st '] if cmd [0] ==' M-SEARCH 'y cmd [1] ==' * 'y search_target en self.device_target:logging.info (' Recibido% s % s para% s de% s:% d ', cmd [0], cmd [1], objetivo_búsqueda, host, puerto) url =' http://% s:% d / status '% (determine_ip_for_host (host) , self.status_port) respuesta =SEARCH_R ESPONSE% (url, search_target, UUID, self.device_target) self.port.write (respuesta, (host, puerto)) else:logging.debug ('Comando SSDP ignorado% s% s', cmd [0], cmd [ 1]) def stop (self):"" "Salir del grupo de multidifusión y dejar de escuchar" "" self.port.leaveGroup (SSDP_ADDR, interface =self.interface) self.port.stopListening () class StatusServer (resource.Resource):"" "Servidor HTTP que envía el estado de la cámara al concentrador de SmartThings" "" isLeaf =Truedef __init __ (self, device_target, subscription_list, garage_door_status):self.device_target =device_targetself.subscription_list =subscription_listself.garage_door_status =garage_door_estus =self) def render_SUBSCRIBE (self, request):# pylint:disable =inválido-nombre "" "Manejar solicitudes de suscripción desde el concentrador ST - el concentrador desea ser notificado de las actualizaciones de estado de la puerta del garaje" "" headers =request.getAllHeaders () logging.debug ("SUBSCRIBE:% s", encabezados) si 'callback' en encabezados:cb_url =headers ['callback'] [1:-1] si no cb_url en self.subscription_list:self.subs cription_list [cb_url] ={} # reactor.stop () logging.info ('Suscripción% s agregada', cb_url) self.subscription_list [cb_url] ['expiration'] =time () + 24 * 3600return "" def render_GET ( self, request):# pylint:disable =invalid-name "" "Manejar solicitudes de sondeo desde ST hub" "" if request.path =='/ status':if self.garage_door_status ['last_state'] =='inactive' :cmd ='status-inactive'else:cmd =' status-active'msg =''% (cmd, UUID, self.device_target) logging.info ("Solicitud de sondeo de% s para% s - devuelto% s", request.getClientIP (), request.path, cmd) return msgelse:logging.info ( "Se recibió una solicitud falsa de% s para% s", request.getClientIP (), request.path) return "" class MonitorCamera (object):"" "Monitorea el estado de la cámara, generando notificaciones cada vez que su estado cambia" "" def __init __ ( self, device_target, subscription_list, camera_status):# pylint:disable =demasiados-argumentosself.device_target =device_targetself.subscription_list =subscription_listself.camera_stat us =camera_statuscurrent_state ='inactive'reactor.callLater (0, self.check_garage_state, current_state, auxcount) # pylint:disable =no-memberdef check_garage_state (self, current_state, auxcount):self.current_state =current_stateselfcount =auxcount rawCapture, format ="bgr", use_video_port =True) # agarre la matriz NumPy sin procesar que representa el imageframe =rawCapture.array # cambie el tamaño del marco y conviértalo a grayscaleframe =imutils.resize (frame, width =640) gray =cv2.cvtColor (frame, cv2.COLOR_BGR2GRAY) # detecta caras en la imagen y luego clona el frame # para que podamos dibujar sobre élfaceRects =fd.detect (gray, scaleFactor =1.1, minNeighbors =10, minSize =(30, 30)) frameClone =frame.copy () if faceRects! =():auxcount =0 if current_state =='inactive':current_state ='active' logging.info ('El estado cambió de% sa% s', self.camera_status ['last_state '], current_state) self.camera_status [' last_state '] =current_stateself.notify_hubs () else:auxcount =auxcount + 1if auxcount ==60:current_state ='inactive' logging.info ('El estado cambió de% sa% s', self.camera_status ['last_state'], current_state) self.camera_status ['last_state'] =current_stateself.notify_hubs () # loop over los cuadros delimitadores de caras y dibujarlos para (fX, fY, fW, fH) en faceRects:cv2.rectangle (frameClone, (fX, fY), (fX + fW, fY + fH), (0, 255, 0), 2 ) # mostrar la transmisión de video en una nueva ventana GUI # cv2.imshow ("Face", videorotate) rawCapture.truncate (0) # Programar siguiente checkreactor.callLater (0, self.check_garage_state, current_state, auxcount) # pylint:disable =no-memberdef notify_hubs (self):"" "Notifica a los hubs de SmartThings suscritos que se ha producido un cambio de estado" "" if self.camera_status ['last_state'] =='inactive':cmd ='status-inactive'else:cmd ='estado activo' para la suscripción en self.subscription_list:if self.subscription_list [suscripción] ['expiration']> time ():logging.info ("Centro de notificación% s", suscripción) msg =' % s uuid:% s ::% s '% (cmd, UUID, self.device_target) cuerpo =StringProducer (msg) agent =Agent (reactor) req =agent.request ('POST', subscription, Headers ({'CONTENT-LENGTH':[len (msg)]}), body) req.addCallback (self.handle_response ) req.addErrback (self.handle_error) def handle_response (self, response):# pylint:disable =no-self-use "" "Maneja el hub SmartThings devolviendo un código de estado al POST. Esto es realmente inesperado - normalmente se cierra la conexión para POST / PUT sin dar un código de respuesta. "" "if response.code ==202:logging.info (" Actualización de estado aceptada ") else:logging.error (" Código de respuesta inesperado:% s ", código de respuesta ) def handle_error (self, response):# pylint:disable =no-self-use "" "Maneja los errores que generan al realizar el NOTIFY. No parece haber una manera de evitar ResponseFailed:SmartThings Hub no genera un código de respuesta adecuado para POST o PUT, y si se usa NOTIFY, ignora el cuerpo. "" "If isinstance (response.value, ResponseFailed):logging.debug ("Respuesta fallida (esperada)") else:logging.error ("Respuesta inesperada:% s", respuesta) def main ():"" "Función principal para manejar el uso desde la línea de comando" "" arg_proc =argparse .ArgumentParser (description ='Proporciona el estado activo / inactivo de la cámara a un concentrador SmartThings') arg_proc.add_argument ('- httpport', dest ='http_port', help ='Número de puerto HTTP', predeterminado =8080, type =int) arg_proc.add_argument ('- deviceindex', dest ='device_index', help ='Device index', default =1, type =int) arg_proc.add_argument ('- pollingfreq', dest ='polling_freq', help =' Número de segundos entre el estado de la cámara de sondeo ', predeterminado =5, tipo =int) arg_proc.add_argument (' - debug ', dest =' debug ', help =' Habilitar mensajes de depuración ', predeterminado =False, action =' store_true ' ) opciones =arg_proc.parse_args () device_target ='urn:esquema s-upnp-org:dispositivo:RPi_Computer_Vision:% d '% (options.device_index) log_level =logging.INFOif options.debug:log_level =logging.DEBUGlogging.basicConfig (format ='% (asctime) -15s% (nombre de nivel) - 8s% (message) s ', level =log_level) subscription_list ={} camera_status ={' last_state ':' unknown '} logging.info (' Inicializando la cámara ') # Servidor SSDP para manejar el descubrimientoSSDPServer (status_port =options.http_port, device_target =device_target) # sitio HTTP para manejar suscripciones / pollingstatus_site =server.Site (StatusServer (device_target, subscription_list, camera_status)) reactor.listenTCP (options.http_port, status_site) # pylint:disable =no-memberlogging.info ('Inicialización completa' ) # Monitorear el estado de la cámara y enviar notificaciones sobre el cambio de estadoMonitorCamera (device_target =device_target, subscription_list =subscription_list, camera_status =camera_status) reactor.run () # pylint:disable =no-memberif __name__ =="__main __":main () Fuente:Computadora Visión como sensor de movimiento para SmartThings % s uuid:% s ::% s
Proceso de manufactura
- ST:sensor de movimiento con aprendizaje automático para un seguimiento de la actividad de alta precisión y fácil de usar
- Sensor de movimiento, alarma, grabación de video en HA en Raspberry Pi
- Sistema de sensor de movimiento por infrarrojos de bricolaje para Raspberry Pi
- Sensor de movimiento con Raspberry Pi
- Tutorial de Contador Geiger - Placa de sensor de radiación para Raspberry Pi
- Raspberry Pi GPIO con sensor de movimiento PIR:el mejor tutorial
- Construcción de una red de sensores para un molino de molinos del siglo XVIII
- Interfaz del sensor de movimiento PIR HC-SR501 con Raspberry Pi
- Construye robots Raspberry Pi:el mejor tutorial para principiantes
- 7 aplicaciones de la visión artificial
- Visión artificial