以前、Raspberry PiでGoogle Assitant SDKを使ってGoogle Homeを作りました。今回はGoogleアシスタントのコマンド(googlesamples-assitant-hotword)をPythonアプリから呼び出し、その出力を処理するプログラムを作ります。
PythonアプリからGoogleアシスタントのコマンドを呼び出す
最初はsubprocessのPopenを使うつもりでしたが、非同期にコマンドの出力を取得する必要があったので、asyncioを使いました。
以下のように呼び出すようにしました。AAAとBBBは自分でGoogleアシスタントを設定したときの値になります。
(2020/01/05追記)
OSをRaspbian Busterにして Python 3.7.3 になってからyield を使う方法がうまく動かなくなったため本記事末尾のコードに書き換えました。
p = yield from asyncio.create_subprocess_exec(\ '/home/pi/env/bin/googlesamples-assistant-hotword',\ '--project_id','AAA','--device_model_id',’BBB' ,\ stdout=asyncio.subprocess.PIPE)
PythonアプリでGoogleアシスタントコマンドの出力を取得する
ここでかなりはまりました。コマンドの出力を得ようと「readline()」を呼んだところでアプリが止まってしまうのです。
きっとコマンドの出力がバッファリングされていて、Pythonのreadline()ではすぐに出力が取れないんだろうなとは思いましたが、解決策が思いつきません。
仕方がないので、アプリをC言語で作ったりして色々やりましたが、やはりコマンド自身のバッファリングを無効にしないといけないようです。
そこで、googlesamples-assitant-hotwordというファイルを覗いてみたところ、googlesamples-assitant-hotword自体がPythonのアプリであることが分かりました。
それなら、Pythonでバッファリングを制御することができるかもと思い、ネットを検索したところ、pythonコマンドの「-u」というオプションを見つけました。
早速、googlesamples-assitant-hotwordの1行目(Shebang(シェバンというらしいです))に「-u」を追加したところ、バッファリングが無効化され、コマンドの出力が取れるようになりました。
作ったPythonアプリを実行!
以下のようにコマンドの出力を処理したところ、
while True: str = yield from p.stdout.readline() strutf = str.decode('utf-8') if "text" in strutf: strlist = strutf.split("\"") if strlist[4][0] == ',': print("answer:"+ strlist[3]) else: print("inquiry:" + strlist[3])
以下のような出力を得ることができました。
inquiry:アインシュタインの誕生日はいつ
answer:アルベルト・アインシュタインは、1879年3月14日生まれです。
(2020/01/05追記)
OSをRaspbian Busterにして Python 3.7.3 になってからyield を使う方法がうまく動かなくなったため本記事末尾のコードに書き換えました。
これでGoogleアシスタントの音声認識結果や応答結果を利用して独自の処理を行うことができます。
次は Actions on Googleと組み合わせて、「スマホ + Raspberry Pi」で出来ていたこと(リモコン操作、音楽再生、YouTube再生)を実現したいと思います。
(2020/01/05追記)
Python 3.7.3だと yield を使ったコードがうまく動作しないため、下記のコードに書き換えました。 また、async を使う必要はないようだったので subprocess.Popen を使いました。
googlesamples-assistant-hotwordを起動し、コマンドの出力を処理するコード
args = ['/home/pi/env/bin/googlesamples-assistant-hotword','--project_id','AAA','--device_model_id','BBB'] proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while True: out = proc.stdout.readline() strbuf = out.decode('utf-8') if "text" in strbuf: strlist = strbuf.split("\"") if strlist[4][0] == ',': print("answer:"+ strlist[3]) else: print("inquiry:" + strlist[3]) if not out and proc.poll() is not None: break return