#Coding4Fun – How to control your #drone with 20 lines of code! (12/N)

Buy Me A Coffee

Hi!

Today code objective is very simple, based on a request I received from internet:

The drone is flying very happy, but if the camera detects a face, the drone will flip out !

Let’s take a look at the program working:

This one is very similar to the previous one. I also realized that I may need a better camera to record the live action side by side with the drone footage, but I think you get the idea. The command to make the drone flip is “flip x”, where “x” is the direction. In example:

"flip l" # flip left
"flip r" # flip right
"flip f" # flip forward
"flip b" # flip back

Here is the code:

# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV and display FPS
# detect faces
# launch the drone with key T, and land with key L
# if the drone is flying, and a face is detected, the drone will flip left
import cv2
import socket
import time
import threading
import winsound
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face and smile detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
drone_flying = False
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (480, 360))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# detect faces
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 0, 255), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'face', (h + 6, w 6), font, 0.7, (255, 255, 255), 1)
if(len(faces) > 0 and drone_flying == True):
msg = "flip l"
sendCommand(msg)
# display fps
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'flying: {drone_flying} – battery: {battery} % – i: {i}{fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
msg = "land"
sendCommand(msg) # land
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

As I promised last time, in next posts, I’ll analyze more in details how this works, and a couple of improvements that I can implement.

Happy coding!

Greetings

El Bruno


References

My Posts

#Coding4Fun – How to control your #drone with 20 lines of code! (11/N)

Buy Me A Coffee

Hi!

Today code objective is very simple:

The drone is flying very happy, but if the camera detects a banana, the drone must land !

Let’s take a look at the program working:

drone flying and when detect a banana lands

And a couple of notes regarding the app

  • Still use Haar Cascades for object detection. I found an article with a Xml file to detect bananas, so I’m working with this one (see references).
  • Using Haar Cascades is not the best technique for object detection. During the testing process, I found a lot of false positives. Mostly with small portions of the frame who were detected as bananas. One solution, was to limit the size of the detected objects using OpenCV (I’ll write more about this in the future)
  • As you can see in the animation, when the drone is a few meters away, the video feed becomes messy. And because the object detection is performed locally, it takes some time to detect the banana.
  • I also implemented some code to take off the drone when the user press the key ‘T’, and land the drone when the user press the key ‘L’
  • The code is starting to become a mess, so a refactoring is needed

Here is the code

# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV
# display FPS
# detect faces and bananas
# launch the drone with key T, and land with key L
## if the drone is flying, and a banana is detected, land the drone
import cv2
import socket
import time
import threading
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face and smile detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
banana_cascade = cv2.CascadeClassifier('banana_classifier.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
banana_detected = False
drone_flying = False
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (480, 360))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# detect faces
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 0, 255), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'face', (h + 6, w 6), font, 0.7, (255, 255, 255), 1)
# detect banana
bananas = banana_cascade.detectMultiScale(gray,
scaleFactor=1.3,
minNeighbors=5,
minSize=(150, 50))
for (x, y, w, h) in bananas:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 255, 0), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'bananas', (h + 6, w 6), font, 0.7, (255, 255, 255), 1)
if(len(bananas) > 0):
banana_detected = True
else:
banana_detected = False
# fly logic
if (drone_flying == True and banana_detected == True):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
break
# display fps
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'banana: {banana_detected} – flying: {drone_flying} – battery: {battery} % – i: {i}{fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
#raise e
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
msg = "land"
sendCommand(msg) # land
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

In next posts, I’ll analyze more in details how this works, and a couple of improvements that I can implement.

Happy coding!

Greetings

El Bruno


References

My Posts

#Coding4Fun – How to control your #drone with 20 lines of code! (10/N)

Buy Me A Coffee

Hi!

Back to some drone posts! I was kind of busy during the last weeks and now I can get back to write about the drone.

OK, in the last posts I described how to connect and work with the drone camera feed using OpenCV. Now with 2 extra lines of code we can also detect faces. Let’s take a look at the final sample.

drone camera and camera view performing face detection

In the previous image we can see 2 camera feeds. My computer webcam, where you can see how I hold the drone with the drone camera pointing to my face. And the drone camera feed, presented using OpenCV and drawing a frame over each detected face.

Let’s share some code insights:

  • As usual, I resize the camera feed to 320 x 240
  • The average processing time is between 40 and 70 FPS
  • I use a haar cascade classifier to detect the faces in each frame

Note: I need to write about Haar Cascades as part of my face detection post series.

# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV
# display FPS
# detect faces
import cv2
import socket
import time
import threading
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (320, 240))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (top, right, bottom, left) in faces:
cv2.rectangle(frame,(top,right),(top+bottom,right+left),(0,0,255),2)
# display fps
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'battery: {battery} % – i: {i}{fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

In my next posts, I’ll add some drone specific behaviors for each face detected.

Happy coding!

Greetings

El Bruno


References

My Posts

#Coding4Fun – How to control your #drone with 20 lines of code! (9/N)

Buy Me A Coffee

Hi!

Let’s take some Frames Per Second measurements on the UDP and OpenCV connection. It seems that working with simple movements, the values moves between 30 and 60 FPS.

showing FPS information with the drone camera

Just added a couple of lines in the main While, to calculate the FPS.

# open
i = 0
while True:
    i = i + 1
    start_time = time.time()

    sendReadCommand('battery?')
    print(f'battery: {battery} % - i: {i}')

    try:
        ret, frame = cap.read()
        img = cv2.resize(frame, (640, 480))

        if (time.time() - start_time ) > 0:
            fpsInfo = "FPS: " + str(1.0 / (time.time() - start_time)) # FPS = 1 / time to process loop
            font = cv2.FONT_HERSHEY_DUPLEX
            cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)

        cv2.imshow('@elbruno - DJI Tello Camera', img)
    except Exception as e:
        print(f'exc: {e}')
        pass

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

As a final note, just need to mention that I make some tests using different camera resolutions and the FPS averages are similar. I tested with 640 * 480 pixels and 1024*768 pixels.

Next posts, let’s do some Face Detection and rock some AI with the drone!

Happy coding!

Greetings

El Bruno


References

My Posts

#WinML – #CustomVision, reconocimiento de objectos utilizando Onnx en Windows10, calculando FPS

Buenas !

Hoy va un post rápido. Y es del tipo de ayuda mental, ya que siempre que tengo que mostrar información relacionada a proceso de frames por segundo, tengo que buscar en mis aplicaciones anteriores.

En este caso agregare esta información al reconocimiento de imágenes con un modelo Onnx exportado desde Custom Vision. En la UWP app que he creado en post anteriores, mostrare un label con la fecha y hora, e información de FPS.

01 custom vision uwp frame analysis using onnx fps

El código es muy simple, especial detalle a la linea 10

private Stopwatch _stopwatch;
private async Task LoadAndEvaluateModelAsync(VideoFrame videoFrame)
{
_stopwatch = Stopwatch.StartNew();
_predictions = await _objectDetection.PredictImageAsync(videoFrame);
_stopwatch.Stop();
await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
{
var message = $"{DateTime.Now.ToLongTimeString()} – {1000f / _stopwatch.ElapsedMilliseconds,4:f1} fps";
TextBlockResults.Text = message; DrawFrames();
});
}

view raw
CVOnnxFps.cs
hosted with ❤ by GitHub

Brain Backup done!

The full app can be seen in https://github.com/elbruno/events/tree/master/2019%2001%2010%20CodeMash%20CustomVision/CSharp/CustomVisionMarvelConsole01

Happy Coding!

Saludos @ Burlington

El Bruno

References

My Posts

Windows 10 and YOLOV2 for Object Detection Series

#WinML – #CustomVision, object recognition using Onnx in Windows10, calculate FPS

Hi !

Quick post today. And it’s mostly as a brain reminder on the best way to perform a Frames Per Second calculation when we are analyzing images using a ONNX model. In the final UWP app, I added a top right label displaying the current date and time, and the processed FPS

01 custom vision uwp frame analysis using onnx fps

And the code behind all this is very simple, specially line 10

private Stopwatch _stopwatch;
private async Task LoadAndEvaluateModelAsync(VideoFrame videoFrame)
{
_stopwatch = Stopwatch.StartNew();
_predictions = await _objectDetection.PredictImageAsync(videoFrame);
_stopwatch.Stop();
await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
{
var message = $"{DateTime.Now.ToLongTimeString()} – {1000f / _stopwatch.ElapsedMilliseconds,4:f1} fps";
TextBlockResults.Text = message; DrawFrames();
});
}

view raw
CVOnnxFps.cs
hosted with ❤ by GitHub

So, I’ll search for my sample next time I need to display this.

The full app can be seen in https://github.com/elbruno/events/tree/master/2019%2001%2010%20CodeMash%20CustomVision/CSharp/CustomVisionMarvelConsole01

Happy Coding!

Greetings @ Burlington

El Bruno

References

My Posts

Windows 10 and YOLOV2 for Object Detection Series