#Coding4Fun – How to control your #drone with 20 lines of code! (21/N)

Buy Me A Coffee

Hi !

In my post series I already wrote about how to detect faces. We can do this with a camera and OpenCV. However, a drone can also be moved on command, so let’s write some lines to detect a face, and calculate the orientation and distance of the detected face from the center camera of the camera.

In order to do this, 1st let’s draw a grid in the camera frame, and once a face is detected, let’s show the distance and orientation from the center.

face detected on camera and calculate position from center

Let’s start with a Grid. The idea is to create a 3×3 grid in the camera frame, and use the center cell as reference for the detected objects. The code to create a 3×3 grid is this one:

def displayGrid(frame):
    # Add a 3x3 Grid
    cv2.line(frame, (int(camera_Width/2)-centerZone, 0)     , (int(camera_Width/2)-centerZone, camera_Heigth)    , lineColor, lineThickness)
    cv2.line(frame, (int(camera_Width/2)+centerZone, 0)     , (int(camera_Width/2)+centerZone, camera_Heigth)    , lineColor, lineThickness)
    cv2.line(frame, (0, int(camera_Heigth / 2) - centerZone), (camera_Width, int(camera_Heigth / 2) - centerZone), lineColor, lineThickness)
    cv2.line(frame, (0, int(camera_Heigth / 2) + centerZone), (camera_Width, int(camera_Heigth / 2) + centerZone), lineColor, lineThickness)

# Camera Settings
camera_Width  = 1024 # 1280 # 640
camera_Heigth = 780  # 960  # 480
centerZone    = 100

# GridLine color green and thickness
lineColor = (0, 255, 0) 
lineThickness = 2

We use the line() function on OpenCV, and do some calculations to get the starting and endpoint for the 4 lines for the grid: 2 vertical lines and 2 horizontal lines. For this demo, I’ll implement this in my main webcam.

drone 3x3 grid in the camera frame

Based on my face detection samples and other samples in GitHub (see references), now I’ll calculate the position of the detected face (with x, y, h, w) from the center of the camera:

def calculatePositionForDetectedFace(frame, x, y, h , w):
    # calculate direction and relative position of the face
    cx = int(x + (w / 2))  # Center X of the Face
    cy = int(y + (h / 2))  # Center Y of the Face

    if (cx <int(camera_Width/2) - centerZone):
        cv2.putText  (frame, " LEFT " , (20, 50), cv2.FONT_HERSHEY_COMPLEX, 1 , colorGreen, 2)
        dir = 1
    elif (cx > int(camera_Width / 2) + centerZone):
        cv2.putText(frame, " RIGHT ", (20, 50), cv2.FONT_HERSHEY_COMPLEX,1,colorGreen, 3)
        dir = 2
    elif (cy < int(camera_Heigth / 2) - centerZone):
        cv2.putText(frame, " UP ", (20, 50), cv2.FONT_HERSHEY_COMPLEX,1,colorGreen, 3)
        dir = 3
    elif (cy > int(camera_Heigth / 2) + centerZone):
        cv2.putText(frame, " DOWN ", (20, 50), cv2.FONT_HERSHEY_COMPLEX, 1,colorGreen, 3)
        dir = 4
    else: dir=0

    # display detected face frame, line from center and direction to go
    cv2.line     (frame, (int(camera_Width/2),int(camera_Heigth/2)), (cx,cy), colorRed, messageThickness)
    cv2.rectangle(frame, (x, y), (x + w, y + h), colorBlue, messageThickness)
    cv2.putText  (frame, str(int(x)) + " " + str(int(y)), (x - 20, y - 45), cv2.FONT_HERSHEY_COMPLEX,0.7, colorRed, messageThickness)

The output is similar to this one

And now with the base code completed, it’s time to add this logic to the drone samples !

Bonus: the complete code.

# Bruno Capuano 2020
# display the camera feed using OpenCV
# display a 3×3 Grid
# detect faces using openCV and haar cascades
# calculate the relative position for the face from the center of the camera
import os
import time
import cv2
def displayGrid(frame):
# Add a 3×3 Grid
cv2.line(frame, (int(camera_Width/2)centerZone, 0) , (int(camera_Width/2)centerZone, camera_Heigth) , lineColor, lineThickness)
cv2.line(frame, (int(camera_Width/2)+centerZone, 0) , (int(camera_Width/2)+centerZone, camera_Heigth) , lineColor, lineThickness)
cv2.line(frame, (0, int(camera_Heigth / 2) centerZone), (camera_Width, int(camera_Heigth / 2) centerZone), lineColor, lineThickness)
cv2.line(frame, (0, int(camera_Heigth / 2) + centerZone), (camera_Width, int(camera_Heigth / 2) + centerZone), lineColor, lineThickness)
def calculatePositionForDetectedFace(frame, x, y, h , w):
# calculate direction and relative position of the face
cx = int(x + (w / 2)) # Center X of the Face
cy = int(y + (h / 2)) # Center Y of the Face
if (cx <int(camera_Width/2) centerZone):
cv2.putText (frame, " LEFT " , (20, 50), cv2.FONT_HERSHEY_COMPLEX, 1 , colorGreen, 2)
dir = 1
elif (cx > int(camera_Width / 2) + centerZone):
cv2.putText(frame, " RIGHT ", (20, 50), cv2.FONT_HERSHEY_COMPLEX,1,colorGreen, 3)
dir = 2
elif (cy < int(camera_Heigth / 2) centerZone):
cv2.putText(frame, " UP ", (20, 50), cv2.FONT_HERSHEY_COMPLEX,1,colorGreen, 3)
dir = 3
elif (cy > int(camera_Heigth / 2) + centerZone):
cv2.putText(frame, " DOWN ", (20, 50), cv2.FONT_HERSHEY_COMPLEX, 1,colorGreen, 3)
dir = 4
else: dir=0
# display detected face frame, line from center and direction to go
cv2.line (frame, (int(camera_Width/2),int(camera_Heigth/2)), (cx,cy), colorRed, messageThickness)
cv2.rectangle(frame, (x, y), (x + w, y + h), colorBlue, messageThickness)
cv2.putText (frame, str(int(x)) + " " + str(int(y)), (x 20, y 45), cv2.FONT_HERSHEY_COMPLEX,0.7, colorRed, messageThickness)
# Camera Settings
camera_Width = 1024 # 1280 # 640
camera_Heigth = 780 # 960 # 480
centerZone = 100
# GridLine color green and thickness
lineColor = (0, 255, 0)
lineThickness = 2
# message color and thickness
colorBlue = (255, 0, 0)
colorGreen = (0, 255, 0)
colorRed = (0, 0, 255) #red
messageThickness = 2
dsize = (camera_Width, camera_Heigth)
video_capture = cv2.VideoCapture(1)
time.sleep(2.0)
# enable face and smile detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
i = 0
while True:
i = i + 1
ret, frameOrig = video_capture.read()
frame = cv2.resize(frameOrig, dsize)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
displayGrid(frame)
# detect faces
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x, y, w, h) in faces:
# display face in grid
calculatePositionForDetectedFace(frame, x, y, h , w)
cv2.imshow('@ElBruno – Follow Faces', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
video_capture.release()
cv2.destroyAllWindows()

Happy coding!

Greetings

El Bruno

References

#Coding4Fun – How to control your #drone with 20 lines of code! (20/N)

Buy Me A Coffee

Hi !

We already have the drone camera feed ready to process, so let’s do some Image Segmentation today. As usual, let’s start with the formal definition of Image Segmentation

In digital image processing and computer vision, image segmentation is the process of partitioning a digital image into multiple segments (sets of pixels, also known as image objects). The goal of segmentation is to simplify and/or change the representation of an image into something that is more meaningful and easier to analyze.[1][2] Image segmentation is typically used to locate objects and boundaries (lines, curves, etc.) in images. More precisely, image segmentation is the process of assigning a label to every pixel in an image such that pixels with the same label share certain characteristics.

The result of image segmentation is a set of segments that collectively cover the entire image, or a set of contours extracted from the image (see edge detection). Each of the pixels in a region are similar with respect to some characteristic or computed property, such as color, intensity, or texture. Adjacent regions are significantly different with respect to the same characteristic(s).[1] When applied to a stack of images, typical in medical imaging, the resulting contours after image segmentation can be used to create 3D reconstructions with the help of interpolation algorithms like marching cubes.[3]

Wikipedia, Image Segmentation

The technique is amazing, and once is attached to the drone camera, we can get something like this:

I used a Python library to make most of the work: PixelLib. It was created by an amazing set of colleagues, so please check the references and take a look at the project description.

PixelLib: is a library built for an easy implementation of Image Segmentation in real life problems. PixelLib is a flexible library that can be integrated into software solutions that require the application of Image Segmentation.

PixelLib

Once I have all the pieces together, I pulled a Pull Request with a single change to allow the use of OpenCV and webcam camera frames and I got a basic demo up and running.

Let’s review the code

  • Line 147. That’s it, a single line which performs the instance segmentation, and also display the bounding boxes.

Sample Code

# Bruno Capuano
# enable drone video camera
# display video camera using OpenCV
# display FPS
# add a bottom image overlay, using a background image
# key D enable / disable instance segmentation detection
# save a local video with the camera recorded
import pixellib
from pixellib.instance import instance_segmentation
import socket
import time
import threading
import os
import cv2
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open video writer to save video
vid_cod = cv2.VideoWriter_fourcc(*'XVID')
vid_output = cv2.VideoWriter("cam_video.mp4", vid_cod, 20.0, (640,480))
dsize = (640, 480)
# load bottom img
background = cv2.imread('Bottom03.png')
background = cv2.resize(background, dsize)
# load model
instance_seg = instance_segmentation()
instance_seg.load_model("mask_rcnn_coco.h5")
# main app
detectionEnabled = False
i = 0
while True:
i = i + 1
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % – i: {i}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (640, 480))
if (detectionEnabled):
# save image to disk and open it
imgNumber = str(i).zfill(5)
frameImageFileName = str(f'tmp\image{imgNumber}.png')
outputImageName = str(f'tmp\image{imgNumber}Out.png')
if os.path.exists(frameImageFileName):
os.remove(frameImageFileName)
cv2.imwrite(frameImageFileName, img)
segmask, img = instance_seg.segmentFrame(img, show_bboxes= True)
cv2.imwrite(outputImageName, img)
# overlay background
img = cv2.addWeighted(background, 1, img, 1, 0)
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', img)
vid_output.write(img)
except Exception as e:
print(f'exc: {e}')
pass
# key controller
key = cv2.waitKey(1) & 0xFF
if key == ord("d"):
if (detectionEnabled == True):
detectionEnabled = False
else:
detectionEnabled = True
if key == ord("q"):
break
# release resources
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')
# close the already opened camera, and the video file
cap.release()
vid_output.release()
cv2.destroyAllWindows()

I’ll show a couple of live demos of this in my next Global AI Community, Drone AI demos. Check my next event sections!

Happy coding!

Greetings

El Bruno

References

#Coding4Fun – How to control your #drone with 20 lines of code! (19/N)

Buy Me A Coffee

Hi !

Today I face another challenge: I needed to overlay an image on top of another. Something like this.

camera overlay images with python

Lucky for me, and as usual, OpenCV allow us to do this with a few lines of code. Let’s take a look.

  • Line 8. Define a custom size for all the images: background image and camera feed frame.
  • Lines 10-12. Load and resize background image.
  • Line 21. Overlay the camera frame and the background image.

Sample Code

# Bruno Capuano 2020
# display the camera feed using OpenCV
# add a bottom image overlay, using a background image
import time
import cv2
dsize = (640, 480)
# load bottom img
background = cv2.imread('Bottom03.png')
background = cv2.resize(background, dsize)
video_capture = cv2.VideoCapture(0)
time.sleep(2.0)
while True:
ret, frameOrig = video_capture.read()
frame = cv2.resize(frameOrig, dsize)
img = cv2.addWeighted(background, 1, frame, 1, 0)
cv2.imshow('Video', img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
video_capture.release()
cv2.destroyAllWindows()
view raw CameraOverlayLogo.py hosted with ❤ by GitHub

And from here, I’ll update some posts with the drone camera.

Happy coding!

Greetings

El Bruno

References

#Coding4Fun – How to control your #drone with 20 lines of code! (18/N)

Buy Me A Coffee

Hi !

Today I’ll step back a couple of posts, and add 2 simple lines to allow me to save a video file from the Drone camera. This is a request, and it’s makes a lot of sense to have recorded a file with the drone camera.

The video will later contains detected objects and more, so let’s go with the code. All the magic happens here:

  • Lines 97-103. Open the drone camera stream, and also opens a video output stream to save the video file.
  • Lines 123-124. Display the camera feed and add the camera frame into the output video file.
  • Lines 136-139. Dispose objects, and close the video output file.
# Bruno Capuano
# enable drone video camera
# display video camera using OpenCV
# display FPS
import socket
import time
import threading
import cv2
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
vid_cod = cv2.VideoWriter_fourcc(*'XVID')
vid_output = cv2.VideoWriter("videos/cam_video.mp4", vid_cod, 20.0, (640,480))
# open
i = 0
while True:
i = i + 1
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % – i: {i}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (640, 480))
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', img)
vid_output.write(img)
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')
# close the already opened camera, and the video file
cap.release()
vid_output.release()
cv2.destroyAllWindows()

Happy coding!

Greetings

El Bruno

#Coding4Fun – How to control your #drone with 20 lines of code! (17/N)

Buy Me A Coffee

Hi !

Once we have the a custom vision trained model instance, we can use it to recognize objects from the drone camera feed. Read my previous posts for descriptions on these.

Another interesting scenario, is to save local files for every detected object. In the following code, I’ll save 2 different files for every detected object

  • A camera frame image, with a frame around the detected object
  • A plain text file with the JSON information

In the sample code below, the save process is in the lines 122-129. And, not in a fancy way, the files have the same name to correlate them.

drone recognized files

So let’s go to the full code:

# Bruno Capuano
# open camera with openCV
# analyze camera frame with local docker custom vision project
# draw bounding boxes for each reconized object
import socket
import time
import threading
import cv2
import urllib
import json
import requests
import os
from flask import Flask, request, jsonify
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
pitch = int(list[1])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Local calls
# ———————————————–
probabilityThreshold = 75
def displayPredictions(jsonPrediction, frame, frameImageFileName):
global camera_Width, camera_Heigth
jsonObj = json.loads(jsonPrediction)
preds = jsonObj['predictions']
sorted_preds = sorted(preds, key=lambda x: x['probability'], reverse=True)
strSortedPreds = ""
resultFound = False
if (sorted_preds):
# open img to save results
img = cv2.imread(frameImageFileName)
detected = False
for pred in sorted_preds:
# tag name and prob * 100
tagName = str(pred['tagName'])
probability = pred['probability'] * 100
# apply threshold
if (probability >= probabilityThreshold):
detected = True
bb = pred['boundingBox']
resize_factor = 100
height = int(bb['height'] * resize_factor)
left = int(bb['left'] * resize_factor)
top = int(bb['top'] * resize_factor)
width = int(bb['width'] * resize_factor)
print(f'height = {height} – left {left} – top {top} – width {width}')
# adjust to size
camera_Width,
height = int(height * camera_Heigth / 100)
left = int(left * camera_Width / 100)
top = int(top * camera_Heigth / 100)
width = int(width * camera_Width / 100)
print(f'Adjusted height = {height} – left {left} – top {top} – width {width}')
# draw bounding boxes
start_point = (top, left)
end_point = (top + height, left + width)
print(f'MVP – {probability}')
print(f'start point: {start_point} – end point: {end_point}')
color = (255, 0, 0)
thickness = 2
cv2.rectangle(img, start_point, end_point, color, thickness)
print(jsonPrediction)
# save the detected image
cv2.rectangle(img, start_point, end_point, color, thickness)
if (detected == True):
detImageFileName = frameImageFileName.replace('tmp', 'det')
cv2.imwrite(detImageFileName, img)
detJsonFileName = detImageFileName.replace('png', 'json')
save_text = open(detJsonFileName, 'w')
save_text.write(jsonStr)
save_text.close()
return strSortedPreds
# instantiate flask app and push a context
app = Flask(__name__)
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
pitch = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
camera_Width = 640
camera_Heigth = 480
# open
i = 0
while True:
i = i + 1
imgNumber = str(i).zfill(5)
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % – pitch: {pitch} – i: {imgNumber}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (camera_Width, camera_Heigth))
# save image to disk and open it
frameImageFileName = str(f'tmp\image{imgNumber}.png')
cv2.imwrite(frameImageFileName, img)
with open(frameImageFileName, 'rb') as f:
img_data = f.read()
# analyze file in local container
api_url = "http://127.0.0.1:8070/image&quot;
r = requests.post(api_url, data=img_data)
with app.app_context():
jsonResults = jsonify(r.json())
jsonStr = jsonResults.get_data(as_text=True)
displayPredictions(jsonStr, frame, frameImageFileName)
fpsInfo = ""
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', img)
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

And if you want to see this up and running, it’s much better to see this in a video (start at ):

The complete source code can be found here https://github.com/elbruno/events/tree/master/2020%2004%2018%20Global%20AI%20On%20Tour%20MTY%20Drone%20AI%20Mex

Happy coding!

Greetings

El Bruno

References

#Coding4Fun – How to control your #drone with 20 lines of code! (16/N)

Buy Me A Coffee

Hi !

In my previous post, I shared an example where I analyzed the camera feed using a Image Recognition model created using Custom Vision. Today I’ll expand the sample, and show in real time the detected MVPs logos with a frame in the drone camera feed.

Let’s take a look at the demo working in the following image.

drone camera image analysis using custom vision and drawing frames for detected objects

In the top of the image, we can see the app console log, with the information received for each analyzed frame. When an image is detected, we can see the tag, the probability and the bounding box coordinates.

A sample JSON return string start like this one:

{
  "created": "2020-04-08T17:22:02.179359",
  "id": "",
  "iteration": "",
  "predictions": [
    {
      "boundingBox": {
        "height": 0.1979116,
        "left": 0.3235259,
        "top": 0.05847502,
        "width": 0.20438321
      },
      "probability": 0.89171505,
      "tagId": 0,
      "tagName": "MVP"
    },
    {
      "boundingBox": {
        "height": 0.2091526,
        "left": 0.65271178,
        "top": 0.0433814,
        "width": 0.17669522
      },
      "probability": 0.70330358,
      "tagId": 0,
      "tagName": "MVP"
    },

In order to position the frames in the correct location, I need to make some math using the current camera and image size and the returned bounding box values for, height, left, top and width. Lines 87-110.

resize_factor = 100

height = int(bb['height'] * resize_factor)
left = int(bb['left'] * resize_factor)
top = int(bb['top'] * resize_factor)
width = int(bb['width'] * resize_factor)

# adjust to size
camera_Width, 
height = int(height * camera_Heigth / 100)
left = int(left * camera_Width / 100)
top = int(top * camera_Heigth / 100)
width = int(width * camera_Width / 100)

# draw bounding boxes
start_point = (top, left)                 
end_point = (top + height, left + width) 
color = (255, 0, 0) 
thickness = 2                
cv2.rectangle(img, start_point, end_point, color, thickness)            

So let’s go to the full code:

# Bruno Capuano
# open camera with openCV
# analyze camera frame with local docker custom vision project
# draw bounding boxes for each reconized object
import socket
import time
import threading
import cv2
import urllib
import json
import requests
import os
from flask import Flask, request, jsonify
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
pitch = int(list[1])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Local calls
# ———————————————–
probabilityThreshold = 75
def displayPredictions(jsonPrediction, frame):
global camera_Width, camera_Heigth
jsonObj = json.loads(jsonPrediction)
preds = jsonObj['predictions']
sorted_preds = sorted(preds, key=lambda x: x['probability'], reverse=True)
strSortedPreds = ""
resultFound = False
if (sorted_preds):
for pred in sorted_preds:
# tag name and prob * 100
tagName = str(pred['tagName'])
probability = pred['probability'] * 100
# apply threshold
if (probability >= probabilityThreshold):
bb = pred['boundingBox']
resize_factor = 100
height = int(bb['height'] * resize_factor)
left = int(bb['left'] * resize_factor)
top = int(bb['top'] * resize_factor)
width = int(bb['width'] * resize_factor)
#print(f'height = {height} – left {left} – top {top} – width {width}')
# adjust to size
camera_Width,
height = int(height * camera_Heigth / 100)
left = int(left * camera_Width / 100)
top = int(top * camera_Heigth / 100)
width = int(width * camera_Width / 100)
#print(f'Adjusted height = {height} – left {left} – top {top} – width {width}')
# draw bounding boxes
start_point = (top, left)
end_point = (top + height, left + width)
color = (255, 0, 0)
thickness = 2
cv2.rectangle(img, start_point, end_point, color, thickness)
print(f'MVP – {probability}')
print(f'start point: {start_point} – end point: {end_point}')
print(jsonPrediction)
return strSortedPreds
# instantiate flask app and push a context
app = Flask(__name__)
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
pitch = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
camera_Width = 640
camera_Heigth = 480
# open
i = 0
while True:
i = i + 1
imgNumber = str(i).zfill(5)
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % – pitch: {pitch} – i: {imgNumber}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (camera_Width, camera_Heigth))
# save image to disk and open it
frameImageFileName = str(f'tmp\image{imgNumber}.png')
cv2.imwrite(frameImageFileName, img)
with open(frameImageFileName, 'rb') as f:
img_data = f.read()
# analyze file in local container
api_url = "http://127.0.0.1:8070/image&quot;
r = requests.post(api_url, data=img_data)
with app.app_context():
jsonResults = jsonify(r.json())
jsonStr = jsonResults.get_data(as_text=True)
displayPredictions(jsonStr, frame)
fpsInfo = ""
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(img, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', img)
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

And if you want to see this up and running, it’s much better to see this in a video (start at ):

The complete source code can be found here https://github.com/elbruno/events/tree/master/2020%2004%2018%20Global%20AI%20On%20Tour%20MTY%20Drone%20AI%20Mex

Happy coding!

Greetings

El Bruno

References

#Coding4Fun – How to control your #drone with 20 lines of code! (15/N)

Buy Me A Coffee

Hi !

Let’s use Custom Vision to analyze the images from our drone camera. In this scenario, I created a custom model to recognize MVP awards from my MVP wall. I know, that’s bragging, but I like it.

Disclaimer: There are plenty of documentation and tutorials about Custom Vision. I won’t go deep on the steps about how to create a model. See references.

For my next scenario, I would assume that

  • You have created a model in Custom Vision
  • You have published the Custom Vision model, and have a HTTP endpoint
  • Or the model is exported as a docker image, and it’s running in a docker container. And we have a HTTP endpoint.

The code is similar to the one we used before. OpenCV to hookup the camera, commands to take off and land. Let me remark a couple of important lines in this code:

  • There are a couple of new references, mostly used for the process of the JSON response from the Custom Vision model.
  • Lines 146-155. Get the frame from the drone camera and save a local file. Apply a specific format to the file name, for demo purposes.
  • Lines 157-163. Make a HTTP POST call to analyze the saved file. Convert the result to a JSON object (room for improvement here), and analyze the JSON response.
  • Lines 70-85. Analyzed the JSON response from the Custom Vision model. Sort the results by probability and filter the results using a threshold (75). Return a string with the detected object.
  • Lines 165-178. Calculate and display FPS and detected objects.

A sample JSON return string start like this one:

{
  "created": "2020-04-08T17:22:02.179359",
  "id": "",
  "iteration": "",
  "predictions": [
    {
      "boundingBox": {
        "height": 0.1979116,
        "left": 0.3235259,
        "top": 0.05847502,
        "width": 0.20438321
      },
      "probability": 0.89171505,
      "tagId": 0,
      "tagName": "MVP"
    },
    {
      "boundingBox": {
        "height": 0.2091526,
        "left": 0.65271178,
        "top": 0.0433814,
        "width": 0.17669522
      },
      "probability": 0.70330358,
      "tagId": 0,
      "tagName": "MVP"
    },

So let’s go to the full code:

# Bruno Capuano
# open camera with openCV
# analyze camera frame with local docker custom vision project
# display recognized objects in output log
import socket
import time
import threading
import cv2
import urllib
import json
import requests
import os
from flask import Flask, request, jsonify
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Local calls
# ———————————————–
probabilityThreshold = 50
def getPredictionsSorted(jsonPrediction):
jsonObj = json.loads(jsonPrediction)
preds = jsonObj['predictions']
sorted_preds = sorted(preds, key=lambda x: x['probability'], reverse=True)
strSortedPreds = ""
if (sorted_preds):
for pred in sorted_preds:
# tag name and prob * 100
tagName = str(pred['tagName'])
probability = pred['probability'] * 100
# apply threshold
if (probability >= probabilityThreshold):
strSortedPreds = strSortedPreds + tagName + ": " + str(probability) + "\n"
return strSortedPreds
# instantiate flask app and push a context
app = Flask(__name__)
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
i = 0
while True:
i = i + 1
start_time = time.time()
sendReadCommand('battery?')
print(f'battery: {battery} % – i: {i}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (640, 480))
# save image to disk and open it
imgNumber = str(i).zfill(5)
frameImageFileName = str(f'image{imgNumber}.png')
if os.path.exists(frameImageFileName):
os.remove(frameImageFileName)
cv2.imwrite(frameImageFileName, img)
with open(frameImageFileName, 'rb') as f:
img_data = f.read()
# analyze file in local container
api_url = "http://127.0.0.1:8070/image&quot;
r = requests.post(api_url, data=img_data)
with app.app_context():
jsonResults = jsonify(r.json())
jsonStr = jsonResults.get_data(as_text=True)
predSorted = getPredictionsSorted(jsonStr)
fpsInfo = ""
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) + "\n——————-\n" # FPS = 1 / time to process loop
# display FPS and Predictions, split text into lines, thanks OpenCV putText()
frameInfo = fpsInfo + predSorted
print(frameInfo)
j = 0
for j, line in enumerate(frameInfo.split('\n')):
print(f'{j}{line}')
cv2.putText(img, line, (10, 10 * j), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', img)
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

And if you want to see this up and running, it’s much better to see this in a video (start at ):

The complete source code can be found here https://github.com/elbruno/events/tree/master/2020%2004%2018%20Global%20AI%20On%20Tour%20MTY%20Drone%20AI%20Mex

Happy coding!

Greetings

El Bruno

References

#Coding4Fun – How to control your #drone with 20 lines of code! (12/N)

Buy Me A Coffee

Hi!

Today code objective is very simple, based on a request I received from internet:

The drone is flying very happy, but if the camera detects a face, the drone will flip out !

Let’s take a look at the program working:

This one is very similar to the previous one. I also realized that I may need a better camera to record the live action side by side with the drone footage, but I think you get the idea. The command to make the drone flip is “flip x”, where “x” is the direction. In example:

"flip l" # flip left
"flip r" # flip right
"flip f" # flip forward
"flip b" # flip back

Here is the code:

# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV and display FPS
# detect faces
# launch the drone with key T, and land with key L
# if the drone is flying, and a face is detected, the drone will flip left
import cv2
import socket
import time
import threading
import winsound
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face and smile detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
drone_flying = False
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (480, 360))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# detect faces
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 0, 255), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'face', (h + 6, w 6), font, 0.7, (255, 255, 255), 1)
if(len(faces) > 0 and drone_flying == True):
msg = "flip l"
sendCommand(msg)
# display fps
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'flying: {drone_flying} – battery: {battery} % – i: {i}{fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
msg = "land"
sendCommand(msg) # land
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

As I promised last time, in next posts, I’ll analyze more in details how this works, and a couple of improvements that I can implement.

Happy coding!

Greetings

El Bruno

References

My Posts

#Coding4Fun – How to control your #drone with 20 lines of code! (11/N)

Buy Me A Coffee

Hi!

Today code objective is very simple:

The drone is flying very happy, but if the camera detects a banana, the drone must land !

Let’s take a look at the program working:

drone flying and when detect a banana lands

And a couple of notes regarding the app

  • Still use Haar Cascades for object detection. I found an article with a Xml file to detect bananas, so I’m working with this one (see references).
  • Using Haar Cascades is not the best technique for object detection. During the testing process, I found a lot of false positives. Mostly with small portions of the frame who were detected as bananas. One solution, was to limit the size of the detected objects using OpenCV (I’ll write more about this in the future)
  • As you can see in the animation, when the drone is a few meters away, the video feed becomes messy. And because the object detection is performed locally, it takes some time to detect the banana.
  • I also implemented some code to take off the drone when the user press the key ‘T’, and land the drone when the user press the key ‘L’
  • The code is starting to become a mess, so a refactoring is needed

Here is the code

# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV
# display FPS
# detect faces and bananas
# launch the drone with key T, and land with key L
## if the drone is flying, and a banana is detected, land the drone
import cv2
import socket
import time
import threading
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face and smile detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
banana_cascade = cv2.CascadeClassifier('banana_classifier.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
banana_detected = False
drone_flying = False
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (480, 360))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# detect faces
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 0, 255), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'face', (h + 6, w 6), font, 0.7, (255, 255, 255), 1)
# detect banana
bananas = banana_cascade.detectMultiScale(gray,
scaleFactor=1.3,
minNeighbors=5,
minSize=(150, 50))
for (x, y, w, h) in bananas:
cv2.rectangle(frame, (x, y), ((x + w), (y + h)), (0, 255, 0), 2)
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
cv2.putText(frame, 'bananas', (h + 6, w 6), font, 0.7, (255, 255, 255), 1)
if(len(bananas) > 0):
banana_detected = True
else:
banana_detected = False
# fly logic
if (drone_flying == True and banana_detected == True):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
break
# display fps
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'banana: {banana_detected} – flying: {drone_flying} – battery: {battery} % – i: {i}{fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
#raise e
if cv2.waitKey(1) & 0xFF == ord('t'):
drone_flying = True
detection_started = True
msg = "takeoff"
sendCommand(msg)
if cv2.waitKey(1) & 0xFF == ord('l'):
drone_flying = False
msg = "land"
sendCommand(msg)
time.sleep(5)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
msg = "land"
sendCommand(msg) # land
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

In next posts, I’ll analyze more in details how this works, and a couple of improvements that I can implement.

Happy coding!

Greetings

El Bruno

References

My Posts

#Coding4Fun – How to control your #drone with 20 lines of code! (10/N)

Buy Me A Coffee

Hi!

Back to some drone posts! I was kind of busy during the last weeks and now I can get back to write about the drone.

OK, in the last posts I described how to connect and work with the drone camera feed using OpenCV. Now with 2 extra lines of code we can also detect faces. Let’s take a look at the final sample.

drone camera and camera view performing face detection

In the previous image we can see 2 camera feeds. My computer webcam, where you can see how I hold the drone with the drone camera pointing to my face. And the drone camera feed, presented using OpenCV and drawing a frame over each detected face.

Let’s share some code insights:

  • As usual, I resize the camera feed to 320 x 240
  • The average processing time is between 40 and 70 FPS
  • I use a haar cascade classifier to detect the faces in each frame

Note: I need to write about Haar Cascades as part of my face detection post series.

# Bruno Capuano
# detect faces using haar cascades from https://github.com/opencv/opencv/tree/master/data/haarcascades
# enable drone video camera
# display video camera using OpenCV
# display FPS
# detect faces
import cv2
import socket
import time
import threading
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# enable face detection
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
i = 0
while True:
i = i + 1
start_time = time.time()
try:
_, frameOrig = cap.read()
frame = cv2.resize(frameOrig, (320, 240))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (top, right, bottom, left) in faces:
cv2.rectangle(frame,(top,right),(top+bottom,right+left),(0,0,255),2)
# display fps
if (time.time() start_time ) > 0:
fpsInfo = "FPS: " + str(1.0 / (time.time() start_time)) # FPS = 1 / time to process loop
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, fpsInfo, (10, 20), font, 0.4, (255, 255, 255), 1)
cv2.imshow('@elbruno – DJI Tello Camera', frame)
sendReadCommand('battery?')
print(f'battery: {battery} % – i: {i}{fpsInfo}')
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

In my next posts, I’ll add some drone specific behaviors for each face detected.

Happy coding!

Greetings

El Bruno

References

My Posts