お題
トイドローン Tello のプログラミング記事です。
今回はGUIを用いたドローンの操作と状態表示をやります。
GUIフレームワーク
GUIフレームワークとしては簡単に導入できる PySimpleGUI を使用します。
pip install PySimpleGUI
これでサクッとインストールできるのですが、macOSの最近のバージョンを使ってるとハマる可能性があります。 その際は このあたり を参考に。
実装の進め方
この先、画面がまともに動くようになるまでは試行錯誤が必要そうなので、以下の戦略で進むことにします。
- ドローンなしで一通り動作するところまで試行錯誤しつつ作る
- これまで作ったプログラムと結合してドローンを操作する
何でこんな回りくどいことをするかというと、これやらないと1.の試行錯誤の途中でバッテリー切れしちゃうからです🔋
(1) 画面レイアウト
まずは画面レイアウトを作ります。
import PySimpleGUI as sg
# レイアウト定義
layout = [
[sg.Text("Command:"), sg.Input(), sg.Button("OK")],
[
sg.Text(key="sent", font=("monospace", 20)),
sg.Text(text="-->"),
sg.Text(key="recv", font=("monospace", 20)),
],
[sg.Text(key="state", font=("monospace", 20))],
[
sg.Button("Quit", font=("Arial", 32)),
sg.Button("Takeoff", font=("Arial", 32)),
sg.Button("Land", font=("Arial", 32)),
],
]
# ウインドウ表示
window = sg.Window("My Drone", layout)
while True:
# 入力内容の読み取り
event, values = window.read(timeout=1)
# x (閉じる) ボタンがクリックされたらループを抜ける
if event == sg.WINDOW_CLOSED:
break
# ウインドウクローズ
window.close()
これを実行すると以下のような画面が表示されます。
操作イメージ:
- Command : コマンド入力用テキストフィールド。 OK ボタンをクリックすると入力したコマンドを実行する。
- —> :
left 20 --> OK
というような形で送信したコマンドと実行結果を表示する。 - Quit : プログラムを終了する。
- Takeoff : 離陸する。
- Land : 着陸する。
(2) ボタン処理の追加
各ボタンがクリックされた時の処理を追加していきます (差分)。
while True:
msg = ""
event, values = window.read(timeout=1)
# Quitボタンでもループを抜ける
if event == sg.WINDOW_CLOSED or event == "Quit":
break
# OKボタンの場合はテキストフィールド(values[0])をmsgに格納
if event == "OK":
msg = values[0]
# Takeoff, Landも同様に msg に格納
if event == "Takeoff":
msg = "takeoff"
if event == "Land":
msg = "land"
if msg == "":
continue
# とりあえず msg --> ok を表示するようにしておく
window["sent"].update(msg)
window["recv"].update("ok")
テキストフィールドに up 20
と入力して OK ボタンをクリックすると以下のように表示されます。
(3) バッテリー残量表示
バッテリー残量表示機能も追加します (差分)。
# ドローンの状態等を格納するためのクラス
class Info:
def __init__(self):
self.__state = {}
# ドローンの全状態を設定する
def set_states(self, states):
self.__state = states
# ドローンの全状態を取得する
def get_states(self):
return self.__state
# name で指定された特定の状態だけを取得する
def get_state(self, name):
return self.__state.get(name, 0.0)
info = Info()
# ドローンの状態取得処理 (仮実装)
def receive_state():
while True:
# 1秒毎にランダムな値をバッテリー値として設定する
time.sleep(1)
try:
info.set_states({"bat": random.random() * 100})
except Exception:
print("\nExit . . .\n")
break
state_receive_thread = threading.Thread(target=receive_state)
state_receive_thread.start()
while True:
msg = ""
event, values = window.read(timeout=1)
# バッテリー残量表示する
window["state"].update(f'battery: {info.get_state("bat"):.1f}%')
(...)
最終的には receive_state
は前回作成したものに置き換えますが、今はドローンと接続しなくても動くように適当に実装しておきます。実際にこれを動かすと以下のように1秒毎にバッテリー情報の更新が行われます。
(4) 終了可能にする
現在の実装のままでは receive_state
の処理が終わらないため、プログラムが終了できなくなっています。これをきれいに終了できる形に修正します (差分)。
まず Info クラスに停止機能 (stop
メソッド) と停止判定機能 (is_active
メソッド) を追加します。
class Info:
(...)
# 稼働中かどうかを返す (stopが呼ばれたら false を返す)
def is_active(self):
return self.__is_active
# プログラムを停止する
def stop(self):
self.__is_active = False
状態受信関数 receive_state
の無限ループ指定箇所 ( while True:
) を以下のように while info.is_active():
に置き換えます。
def receive_state():
while info.is_active():
(...)
主処理のループを抜けたあとに info.stop()
の呼び出しを追加します。
while True:
msg = ""
(...)
info.stop()
window.close()
これでひとまず GUI 機能は完成です。
前回までのプログラムと結合
最後に前回までに作成したプログラムと結合 (差分)して実機ドローンを使って動作確認します。
プログラム全体としては以下のようになりました:
import socket
import threading
import time
import PySimpleGUI as sg
layout = [
[sg.Text("Command:"), sg.Input(), sg.Button("OK")],
[
sg.Text(key="sent", font=("monospace", 20)),
sg.Text(text="-->"),
sg.Text(key="recv", font=("monospace", 20)),
],
[sg.Text(key="state", font=("monospace", 20))],
[
sg.Button("Quit", font=("Arial", 32)),
sg.Button("Takeoff", font=("Arial", 32)),
sg.Button("Land", font=("Arial", 32)),
],
]
window = sg.Window("My Drone", layout)
class Info:
def __init__(self):
self.__state = {}
self.__is_active = True
def set_states(self, states):
self.__state = states
def get_states(self):
return self.__state
def get_state(self, name):
return self.__state.get(name, 0.0)
def is_active(self):
return self.__is_active
def stop(self):
self.__is_active = False
info = Info()
def __get_drone_state(data):
s = data.decode(errors="replace")
values = s.split(";")
state = {}
for v in values:
kv = v.split(":")
if len(kv) > 1:
state[kv[0]] = float(kv[1])
return state
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("", 8889))
sock.sendto("command".encode(), ("192.168.10.1", 8889))
sock.recvfrom(1024)
state_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
state_sock.bind(("", 8890))
def receive_state():
while info.is_active():
try:
data, _ = state_sock.recvfrom(1024)
info.set_states(__get_drone_state(data))
except Exception:
print("\nExit . . .\n")
break
state_receive_thread = threading.Thread(target=receive_state)
state_receive_thread.start()
while True:
msg = ""
event, values = window.read(timeout=1)
window["state"].update(f'battery: {info.get_state("bat"):.1f}%')
if event == sg.WINDOW_CLOSED or event == "Quit":
break
if event == "OK":
msg = values[0]
if event == "Takeoff":
msg = "takeoff"
if event == "Land":
msg = "land"
if msg == "":
continue
sock.sendto(msg.encode(), ("192.168.10.1", 8889))
window["sent"].update(msg)
start = time.time()
data, _ = sock.recvfrom(1024)
window["recv"].update(f"{data.decode()} {time.time() - start:.1f}")
info.stop()
window.close()
次回
GUIもできて「もう完成したんじゃない?」という気分になっていたんですが、ひとつ大事な機能を実装するのを忘れてました。ドローンが撮影したビデオの受信です。
ということで次回はビデオの受信をやってみたいと思います。
Tello 関連記事
- Python によるドローン操作プログラミング
- Low-Level Protocolの活用
- 複合現実ヘッドセットによるドローン操作