#Coding4Fun – How to control your #drone with 20 lines of code! (8/N)

Buy Me A Coffee

Hi!

Now that I started to understand how UDP works, I also did my research to find which are the best options to access an UDP video feed. Lucky for me, there are plenty of resources about doing this task using my old friend OpenCV.

Most of the OpenCV documentation is written in C++. However at the end, it all goes down to these basic lines of code

# open UDP
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)

# read a frame from the feed
ret, frame = cap.read()
img = cv2.resize(frame, (320, 240))

# diplay the frame in openCV video window
cv2.imshow('Video', img)


Note: At the references section below I shared some of my posts with my experiences on how to install OpenCV in Windows 10.

Let’s go back to our sample Python App. Using the previous sample that display the battery level, I changed this code to be alive all the time and displaying the video feed in a 320*240 window.

The following video display how fast this works:

And of course the complete code with this notes:

  • Line 96-100. Open video feed and wait 2 seconds
  • Line 104-118. Main App.
    • Get and display battery level
    • Get UDP video frame
    • Resize frame to 320×240
    • Display frame
    • When Q key is pressed, exit app
  • Line 121. Close video stream
# Bruno Capuano
# enable drone video camera
# display video camera using OpenCV
import socket
import time
import threading
import cv2
def receiveData():
global response
while True:
try:
response, _ = clientSocket.recvfrom(1024)
except:
break
def readStates():
global battery
while True:
try:
response_state, _ = stateSocket.recvfrom(256)
if response_state != 'ok':
response_state = response_state.decode('ASCII')
list = response_state.replace(';', ':').split(':')
battery = int(list[21])
except:
break
def sendCommand(command):
global response
timestamp = int(time.time() * 1000)
clientSocket.sendto(command.encode('utf-8'), address)
while response is None:
if (time.time() * 1000) – timestamp > 5 * 1000:
return False
return response
def sendReadCommand(command):
response = sendCommand(command)
try:
response = str(response)
except:
pass
return response
def sendControlCommand(command):
response = None
for i in range(0, 5):
response = sendCommand(command)
if response == 'OK' or response == 'ok':
return True
return False
# ———————————————–
# Main program
# ———————————————–
# connection info
UDP_IP = '192.168.10.1'
UDP_PORT = 8889
last_received_command = time.time()
STATE_UDP_PORT = 8890
address = (UDP_IP, UDP_PORT)
response = None
response_state = None
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientSocket.bind(('', UDP_PORT))
stateSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stateSocket.bind(('', STATE_UDP_PORT))
# start threads
recThread = threading.Thread(target=receiveData)
recThread.daemon = True
recThread.start()
stateThread = threading.Thread(target=readStates)
stateThread.daemon = True
stateThread.start()
# connect to drone
response = sendControlCommand("command")
print(f'command response: {response}')
response = sendControlCommand("streamon")
print(f'streamon response: {response}')
# drone information
battery = 0
# open UDP
print(f'opening UDP video feed, wait 2 seconds ')
videoUDP = 'udp://192.168.10.1:11111'
cap = cv2.VideoCapture(videoUDP)
time.sleep(2)
# open
i = 0
while True:
i = i + 1
sendReadCommand('battery?')
print(f'battery: {battery} % – i: {i}')
try:
ret, frame = cap.read()
img = cv2.resize(frame, (320, 240))
cv2.imshow('Video', img)
except Exception as e:
print(f'exc: {e}')
pass
if cv2.waitKey(1) & 0xFF == ord('q'):
break
response = sendControlCommand("streamoff")
print(f'streamon response: {response}')

Happy coding!

Greetings

El Bruno

More posts in my blog ElBruno.com.

More info in https://beacons.ai/elbruno


References

My Posts

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.