3Dプリンタ スマートロック「3」編(2) APIサーバ

アプリ

ハードがそれなりにできそうなので、制御するアプリの作成を行う。APIサーバとwebクライアントを作成したいと思うが、近頃はコーディングを行っていないので、いろいろとプログラミングから置いていかれている。

APIサーバ

ステッピングモーターはpythonで動かしたので、pythonでAPIサーバを立ててみたいと思う。今までやったことはないので初挑戦だ。

ネットをあさってみると、FastAPIがよさそうということで、これを使って立ててみようと思う。
ということで、「pip install fastapi」としてみると、エラーが。
error: externally-managed-environment
調べてみると、「仮想環境を使わずにモジュールは入れるな」ということらしく、python開発時には仮想環境で行うのが習わしっぽい。いろいろ難しい。

ということで、以下で環境を作成。

# 作業フォルダ作成
mkdir faseAPI
cd fastAPI
# 仮想環境構築
python3  -m venv .venv --system-site-packages
# 仮想環境に入る
source .venv/bin/activate
# プロンプトが少し変化する
# モジュールインストール
pip install fastapi uvicorn gpiozero
# インストール出来た

gpiozeroを使う場合は、仮想環境構築時に「–system-site-packages」を入れておかなきゃらしい。いろいろと作法が多く大変。

とりあえず、ソースを作成する。まずはfastAPIの使い方から。なので簡単なものを各種ブログを参考に準備した。webサーバを立ててルートアクセスすると文字を返すというもの。

import uvicorn
from fastapi import FastAPI
import subprocess

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

上記を保存したら「python app.py」で実行。そしてブラウザで「http://{ラズパイホスト名}.local:8000」でアクセスすると、応答が返ってきた。なるほど。

詳しくは不明だがfastAPIとuvicornというのがwebサーバを立ててくれており、アクセスフォルダに合わせて処理を記述すればよさそうだ。

施錠動作

APIにアクセスすることで、モーターを動かしたいと思う。pythonのapiサーバなのでこのままここに施錠動作処理を記述すればよいかと思ったのだが、うまくいかない。
「@app.get(“/lock”)」を追加して、モーター動作関数を記述したのだが、アクセスするとステッピングモーターの動きが変でほぼ回転しない。何だろ?

ラズパイZERO2で動かすのでスペック不足かとも思い、ラズパイ4で動かしてみたが、同様に動きは変なまま。fastAPIはいろいろと裏で処理が動いていて短い時間の細かい制御処理は動作できないのかも。

ということで、fastAPI内に施錠の動作処理を記述するのはあきらめて、subprocess.run関数を使って外部プログラムとして動かしてみることとした。試してみると、これなら問題なく施錠動作が可能。runで処理を起動するので同期処理となり、施錠処理が終わると処理が戻ってくる。これならOKだ。ということで、下記処理でうまく動作できた。

import uvicorn
from fastapi import FastAPI
import subprocess

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/lock")
async def lock():
    result = subprocess.run(('python', 'lock.py'))
    return {"message": "motor locked"}

@app.get("/unlock")
async def unlock():
    result = subprocess.run(('python', 'lock.py', 'rev'))
    return {"message": "motor unlocked"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

当然app.pyと同じフォルダに施錠動作のための「lock.py」を置いておく必要がある。

web配信

APIとしてアクセスがあれば処理を行う動作はできたが、それらを要求するweb画面が必要。fastAPIでwebが立っているからここからhtml配信ができないか調べてみると可能な様子。

「jinja2」というhtmlテンプレートエンジンを使うのが定番らしいので、使用してみる。

# Jinja2を入れる
pip install Jinja2
# templatesフォルダを作成
mkdir templates
cd templates
# index.htmlを作成
nano index.html

index.htmlはひとまず簡単なものを用意。参考にしたブログの通りに作ってみる。

"<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8" />
    <title></title>
  </head>
  <body>
    Hello {{ name }}!
  </body>
</html>"

「templates」フォルダに「index.html」ファイルを用意したら、app.pyを以下のようにした。
・Jinja2Templatesをimport
・「@app.get(“/”)」の処理をhtmlを返却するように修正
・合わせて「name」のパラメータも返却(参考としたサンプル通り)

import uvicorn
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
import subprocess

app = FastAPI()
templates = Jinja2Templates(directory="templates")

@app.get("/")
async def index(name: str, request: Request):
    return templates.TemplateResponse("index.html", {
        "name": name,
        "request": request
    })

@app.get("/lock")
async def lock():
    result = subprocess.run(('python', 'lock.py'))
    return {"message": "motor locked"}

@app.get("/unlock")
async def unlock():
    result = subprocess.run(('python', 'lock.py', 'rev'))
    return {"message": "motor unlocked"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

これで、「http://ホスト名:8000/?name=HTML」でアクセスしてやると、画面には「Hello HTML!」が表示される。
「?name=HTML」のパラメータを渡さないとエラーになるので注意が必要だ。

たまたま見た参考ページであったが、API呼び出し時にパラメータを設定すると、それが返却されるhtmlに反映されるサンプルだったのでこれは助かる。API呼び出し時のパラメータの渡し方と、APIサーバからhtmlへの反映の仕方が、これだけで判明した。すばらしい。

現在の鍵の状態検知

さて、もう一つ重要な処理として、現在の鍵の状態を検知する必要がある。物理的にはリードスイッチというものを使い、鍵側には磁石を取り付けておいて、近づくか離れるかでリードスイッチのオンオフが切り替わるのでこれで検知する。

リードスイッチはラズパイのGPIOに接続するので、ボタンとして処理を行うこととなる。
ということで、fastAPIにgpiozeroをimportしてリードスイッチの検知を実施する。
いろいろと試した結果出来上がったのがこちら。

import uvicorn
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
import subprocess
from gpiozero import LED, Button

# fastAPI準備
app = FastAPI()
# テンプレートの準備
templates = Jinja2Templates(directory="templates")
        
# LEDの設定
ledBlue = LED(22)
ledRed = LED(27)
# 入力ボタンの設定
button = Button(5, bounce_time=0.05)
# リードスイッチ
reedSW = Button(11)
# 初期状態(開錠)
ledBlue.on()
        
@app.get("/")
# rootアクセス時
async def index(request: Request):
    name = "world"
    return templates.TemplateResponse("index.html", {
        "name": name,
        "request": request
    })
        
#API
# 鍵施錠
@app.get("/lock")
def lock():
    result = subprocess.run(('python', 'lock.py'))
    return {"message": "motor locked"}
        
# 鍵開錠
@app.get("/unlock")
def unlock():
    result = subprocess.run(('python', 'lock.py', 'rev'))
    return {"message": "motor unlocked"}
        
# 現在の施錠状態(リードスイッチ)
def keyStatus():
    print("reedSW:%d" % reedSW.value)
    if reedSW.value == 1:
        ledBlue.off()
        ledRed.on()
    else:
        ledBlue.on()
        ledRed.off()
        
# ボタン押下時処理
def btnPush():
    if ledBlue.value == 1:
        # 開錠状態なら施錠する
        ledBlue.blink(on_time=0.1, off_time=0.1,background=True)
        ledRed.blink(on_time=0.1, off_time=0.1,background=True)
        lock()
        keyStatus()
        print("施錠しました")
    else:
        # 施錠状態なら開錠する
        ledBlue.blink(on_time=0.1, off_time=0.1,background=True)
        ledRed.blink(on_time=0.1, off_time=0.1,background=True)
        unlock()
        keyStatus()
        print("開錠しました")
# ボタンが押されたら、btnPushを呼ぶ
button.when_pressed = btnPush
# リードスイッチに変化があったらkeyStatusを呼ぶ
reedSW.when_pressed = keyStatus
reedSW.when_released = keyStatus
        
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

接続したのは青いLED付きのスイッチと赤いLED付きのスイッチ。それぞれLEDは青がgpio22に、赤がgpio27に、そしてスイッチはgpio5に接続している。

そして鍵の状態検知のリードスイッチはgpio11に接続している。

gpiozeroのbuttonクラスを使用してボタン処理を行うのだが、ボタンが押されたとき発火する「when_pressed」と、ボタンが離されたときに発火する「when_released」というメソッドがあるので、これを使用して処理を作成する。

LED付きボタンは押下のみで発火し、リードスイッチはオンとオフのそれぞれの状態変化で発火するようにした。

リードスイッチの状態のより、LEDの青か赤を点灯させるようにしている(keyStatus())
LEDボタンを押すと、現在のリードスイッチの状態に合わせて施錠か開錠動作を実行する。
ボタン押下での施錠開錠動作中はLEDのblinkを使用して点滅を行い、動いている感を出してみた。

リアルタイムな感じで動いてくれるのでとても良い感じとなった。これは素晴らしい。

ファビコン

htmlを返却できるようになったが、ブラウザ側ではファビコンが無いのでちょっと寂しい。ファビコンを設定してやろうと思ったが、htmlレスポンス以外のファビコン要求を受け付けてファイルを返却しなきゃなので、どうやるか不明。

ひとまずアイコンは用意したがクライアントへの返却方法が不明なので、index.htmlに埋め込むこととした。
まずはアイコンファイルをbase64に変換。ラズベリーパイで「base64 アイコンファイル名」とすれば、base64の文字列がだらだら出てくる。なのでこれをコピーしておく。

ファビコンの設定はhtmlのヘッダ部に「<link rel=”icon” href=”{アイコンファイル名}”>」を記述すればブラウザ側がうまいことやってくれる。ただこのファイル形式は今回使わず、アイコンファイル名部分に「data:image/x-icon;base64,~base64文字列~」としてアイコンファイル情報を直接貼り付けておく。

これで、ファビコンが表示できるようになった。

静的マウント

fastAPIを調べてみると、静的マウントが可能で、htmlやjs、cssなど静的ファイルを牙を設定できるようだ。なんだこれを知っていれば上記ファビコン埋め込み作業は不要だった。

やり方としては、「from fastapi.staticfiles import StaticFiles」とStaticFilesをimportして、「app.mount(“/static”, StaticFiles(directory=”static”), name=”static”)」としてやることで、staticフォルダをマウントできる。これでstaticフォルダは「/static」でクライアントからアクセス可能になるようだ。

import uvicorn
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles # これを追加
import subprocess
from gpiozero import LED, Button

# fastAPI準備
app = FastAPI()
# テンプレートの準備
templates = Jinja2Templates(directory="templates")
app.mount(path="/static", app=StaticFiles(directory="static"), name="static") # これを追加

~

PWM点灯

現在LEDをgpiozeroのLEDで制御しているが、「PWMLED」というのがあるのでこちらを使用してみる。LEDだとOnかOffしかないが、PWMLEDだとLEDをpwm制御できるため、明るさを可変にできる。

夜中に玄関扉の鍵部分がやたら明るく光るのはよくないので、夜は暗く、昼間は明るくなどの制御を行いたい。

使用方法は簡単で、「from gpiozero import PWMLED」でPWMLEDをimportしておき、いままで「ledBlue = LED(22)」で定義していたところを「ledBlue = PWMLED(22)」に変えてやればよい。そして明るさ(Duty比)の制御は「ledBlue.value=0.7」などのようにvalue指定で設定できる。1.0がMAX(100%)で、小数値でDuty比率を設定する。デフォルト100HzでPWM点灯しているようだ。

スケジューラの導入

「apscheduler」というのを使うと、スケジューラ機能が使えるようで、使用してみる。
これを使うと、指定時刻に関数を呼び出せるので、上記時刻による明るさ変更を行おうと思う。

使用方法は、「pip install apscheduler」でスケジューラのモジュールをインストールする。そしてソース上では、以下のように記述する。

from apscheduler.schedulers.background import BackgroundScheduler

    # スケジューラ準備
    scheduler = BackgroundScheduler()
    # スケジュール設定
    scheduler.add_job({呼び出す関数名}, 'cron', hour=7, minute=0)
    # スケジューラ起動
    scheduler.start()

これで、関数が指定時刻に呼び出される。上記はcronタイプで7:00に呼び出すよう設定。これ以外も指定時間後に起動などいろいろと使い方があるようだ。

処理の重複

なんとなくできたと思っていたが、困ったことにAPIアクセスやボタン押下などを連続して何回も行うと、それらはキューにたまるようで、何度も施錠動作を繰り返してしまう。

普段の使用方法ではそんなことは起きにくいと思うが、ちょっとカッコ悪いので何とかしたい。
ただ、いろいろ調べても解消法は見つからないので、困っている人はいなさそう。う~ん困った。

施錠処理などは同期動作なので、処理中にほかの処理が来たらキャンセルするように、動作中フラグを作ってみたが、キューにたまっているようで処理がすべて完了してからさらに動き始めるみたいで、キャンセルできなかった。以下のような流れだが、④が終わってからまた①が呼ばれている。
 施錠関数()
  ①動作中フラグONなら何もせずリターン
  ②動作中フラグオン
  ③施錠処理起動
  ④動作中フラグオフ

ちょうど上記でスケジューラを導入していたので、これを活用してみる。上記流れで④のあとまたすぐに①が呼ばれているので、④の動作フラグオフを少し遅延してフラグオフさせるようにしてみる。

# 動作中フラグ
lockbusy = False

# フラグをオフにする
def clearBusy():
    global lockbusy
    lockbusy = False
    print("block clear")

# 2秒後にフラグをオフにする
def releaseBlock():
    run_datetime = datetime.datetime.now() + datetime.timedelta(seconds = 2)
    scheduler.add_job(clearBusy, 'date', run_date=run_datetime, id='clearBusy')

# 施錠処理
def lock():
    global lockbusy
    if lockbusy == True:                           # ①動作中フラグONなら何もせずリターン
        # ブロック中の場合は何もせず終了
        print("lock Cancel")
        return {"message": "lock cancel"}
    lockbusy = True                                # ②動作中フラグオン
    # 外部呼出しで施錠処理実行
    result = subprocess.run(('python', 'lock.py')) # ③施錠処理起動
    # 処理ブロック解除呼び出し
    releaseBlock()                                 # ④動作中フラグオフ
    return {"message": "motor locked"}

これで、上記26行目のフラグオフは、実際にはすこし後にフラグがオフになる。なので直後にすぐ呼ばれた処理は①に引っ掛かり何もせず終了となって、うまくキャンセルされる。
いろいろと試してみた結果、2秒後にフラグオフするのが違和感が少なかった。

このとき気づいたがpythonがグローバル関数操作するときは、関数内でglobal宣言しないといけないようだ。じゃないと関数内の別のローカル変数扱いとなり、グローバル変数は何も変化しない。pythonの人には常識なのだろうが、自分は全く分からず、なぜか値がセットできずに悩んだ。global宣言しなくても参照は可能なのが、pythonのひどいワナだと思う。

画面の準備

web画面を作る必要がある。機能としては
 ・施錠ボタン
 ・開錠ボタン
 ・現在のリードスイッチの状態表示
などができればよい。
なので、APIサーバからは現在のリードスイッチの状態を「lock」として受領(1:施錠中、0:開錠中)して表示。ボタンはそれぞれ施錠(/lock)、開錠(/unlock)を呼びだすようにした。

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8" />
    <link rel="icon" href="data:image/x-icon;base64,~長いので省略~">
    <title>SmartLock</title>
    <link rel="stylesheet" href="/static/styles.css">
  </head>
  <body>
    <script src="/static/script.js"></script>
    <div class="smartlock">
        <!-- ヘッダ -->
        <div class="header" ><h1>SmartLock</h1></div>
        {% if lock==1 %}
        <!-- 施錠中 -->
          <div class="status lock">
            <h2>施錠中</h2>
          </div>
        {% else %}
        <!-- 開錠中 -->
          <div class="status unlock">
            <h2>開錠中</h2>
          </div>
        {% endif %}
        <div class="contents">
          <button id="keyopen" class="btn unlock" onclick="onOpen()">開錠</button>
          <button id="keyclose" class="btn lock" onclick="onClose()">施錠</button>
        </div>
        <div class="footer">
          <h5>RaspberryPi Zero2 ,
            ULN2003AN ,
            SteppingMotor[28BYJ-48]
          </h5>
        </div>
    </div>
  </body>
</html>

staticが使えるようになったので、jsとCSSは分けてみた。

async function onOpen() {
  // 開錠操作
  const res = await fetch("/unlock");
  console.log(res)
  // 状態再読み込み
  location.reload()
}
async function onClose() {
  // 施錠操作
  const res = await fetch("/lock");
  console.log(res)
  // 状態再読み込み
  location.reload()
}
/* ヘッダ部 */
div.header, div.footer {
  height: 77px;
  background-color: #ccecff;
  text-align: center;
}
h1 {
  padding: 20px 10px 0px 10px;
  font-family: Arial, Helvetica, sans-serif;
}
h1.red, h5.red {
  color: #ff0000;
}
h5 {
  margin: 0px;
  padding: 30px 10px 0px 10px;
}
/* ステータス部 */
div.status {
  height: 77px;
  text-align: center;
}
div.unlock { /* 開錠中 */
  background-color: green;
}
div.lock { /* 施錠中 */
  background-color: red;
}
h2 {
  color: #ffffff;
  padding: 20px 10px 0px 10px;
  margin: 0px;
}
div.disabled {
  background-color: #c0c0c0;
}
/* コンテンツ部 */
div.contents {
  display: flex;
  flex-flow: row wrap;
  justify-content: space-around;
  margin:20px 0px 20px 0px;
}

.btn { /* ボタン表示 */
  display: inline-block;
  min-width: 170px;
  width: 40%;
  height: 200px;
  text-align: center;
  font-size: 30px;
  color: #FFF;
  text-decoration: none;
  font-weight: bold;
  padding: 10px 24px;
  border-radius: 4px;
}

button.btn:active { /* ボタン有効 */
  transform: translateY(4px);
  border-bottom: none;
}
button.btn:disabled { /* ボタン無効 */
  background-color: #c0c0c0;
  border-bottom: 0px solid #c0c0c0;
  color: #e0e0e0;
}
button.unlock { /* ボタン開錠用 */
  background-color: #8cd460;
  border-bottom: 4px solid #6cb440;
}
button.lock { /* ボタン施錠用 */
  background-color: #ff8181;
  border-bottom: 4px solid #df6161;
}
* {
  user-select: none;
  -moz-user-select: none;
  -webkit-user-select: none;
  -ms-user-select: none;
}

ということで、そこそこなアプリが完成した。画面系htmlは上記のまま。アプリ本体は下記。

# api-server
import uvicorn
# api
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
# static mount
from fastapi.staticfiles import StaticFiles
# jinjya2 template
from fastapi.templating import Jinja2Templates
# 外部処理実行
import subprocess
# timer関連
from time import sleep
import datetime
# スケジューラ
from apscheduler.schedulers.background import BackgroundScheduler
# GPIO
from gpiozero import LED, PWMLED, Button
        
# (FastAPI) https://note.com/npaka/n/ndaeb20d37deb
# (gpiozero) https://www.denshi.club/parts/2020/11/gpiozero1import-led.html
        
# 明るさ(小数値 max=1)
duty=0.1
# LEDの設定(PWM点灯)
ledBlue = PWMLED(22, initial_value = duty)
ledRed = PWMLED(27, initial_value = 0)
# 入力ボタンの設定(50ms以上押下)
button = Button(5, bounce_time=0.05)
# リードスイッチ
reedSW = Button(11)
# 動作中ブロックフラグ
lockbusy = False
# 動作中他処理ブロック時間(秒)
blockTime = 2
        
# fastAPI準備
app = FastAPI()
# テンプレートの準備
templates = Jinja2Templates(directory="templates")
# staticフォルダをstaticにマウント
app.mount(path="/static", app=StaticFiles(directory="static"), name="static")
        
@app.get("/", response_class=HTMLResponse)
# rootアクセス時
async def index(request: Request):
    lockData = reedSW.value
    print("response ReedSW: %d" % lockData)
    return templates.TemplateResponse("index.html", {
        "lock": lockData,
        "request": request
    })
        
@app.get("/duty")
# 「http://url/?val=100」で明るさ指定(0~100)
async def dutyset(val: int = duty * 100):
    global duty
    print("dutysetパラメータ:%d" % val)
    if val == duty * 100:
        return {"message": "Duty not changed(%d%)" % int(duty * 100)}
    else:
        changeDuty(val)
        return {"message": "Duty Changed(%d%)" % int(duty * 100)}
        
# LED明るさ変更処理(PWM Duty比変更)
def changeDuty(val: int = -1):
    global duty
    dt_now = datetime.datetime.now()
    # 単純呼び出し時は時刻で明るさ決定
    if val == -1:
        if 7 <= dt_now.hour and dt_now.hour < 21:
            duty=1.0 # 7時~21時は100%
        else:
            duty=0.2 # 夜中は20%
    else:
        # 明るさ指定時は指定の明るさに変更(%指定)
        if 0 <= val and val <= 100:
            duty = val / 100
    print("現在時刻:%d時 明るさは%d%にします" % (dt_now.hour, duty * 100))
    # LED設定更新
    keyStatus()
        
# 処理ブロック状態の解除
def clearBusy():
    global lockbusy
    lockbusy = False
    print("block clear")
        
# 2秒後に処理ブロックを起動
def releaseBlock():
    run_datetime = datetime.datetime.now() + datetime.timedelta(seconds = blockTime)
    # 処理ブロック解除呼び出し
    scheduler.add_job(clearBusy, 'date', run_date=run_datetime, id='clearBusy')
        
#API
@app.get("/lock")
# 施錠処理
def lock():
    global lockbusy
    if lockbusy == True:
        # ブロック中の場合は何もせず終了
        print("lock Cancel")
        return {"message": "lock cancel"}
    lockbusy = True
    # 外部呼出しで施錠処理実行
    result = subprocess.run(('python', 'lock.py'))
    # 処理ブロック解除呼び出し
    releaseBlock()
    return {"message": "motor locked"}
        
@app.get("/unlock")
# 開錠処理
def unlock():
    global lockbusy
    if lockbusy == True:
        # ブロック中の場合は何もせず終了
        print("unlock Cancel")
        return {"message": "lock cancel"}
    lockbusy = True
    # 外部呼出しで開錠処理実行
    result = subprocess.run(('python', 'lock.py', 'rev'))
    # 処理ブロック解除呼び出し
    releaseBlock()
    return {"message": "motor unlocked"}
        
# じんわり点灯
def fadeinLED(swled: PWMLED):
    # 0%からDuty%まで輝度を上げる
    for brightness in range(0, int(duty * 100), 1):
        swled.value = brightness / 100.0
        # 10ms待機(輝度の変化をなめらかに)
        sleep(0.01)
        
# 現在の施錠状態(リードスイッチ)
def keyStatus():
    print("リードスイッチ状態:%d 、明るさ:%d%" % (reedSW.value, duty * 100))
    if reedSW.value == 1:  # 施錠中の場合
        ledBlue.off()      #  青は消す
        fadeinLED(ledRed)  #  赤を点灯
    else:                  # 開錠中の場合
        ledRed.off()       #  赤は消す
        fadeinLED(ledBlue) #  青を点灯
        
# ボタン押下時処理
def btnPush():
    global lockbusy
    if lockbusy != True:
        if ledBlue.value > 0:
            # 開錠状態なら施錠する
            ledBlue.blink(on_time=0.1, off_time=0.1,background=True)
            ledRed.blink(on_time=0.1, off_time=0.1,background=True)
            lock()
            keyStatus()
            print("施錠しました")
        else:
            # 施錠状態なら開錠する
            ledBlue.blink(on_time=0.1, off_time=0.1,background=True)
            ledRed.blink(on_time=0.1, off_time=0.1,background=True)
            unlock()
            keyStatus()
            print("開錠しました")
        lockbusy = True
    else:
        print("btnPush Cancel")
        
# ボタンが押されたら、btnPushを呼ぶ
button.when_pressed = btnPush
# リードスイッチに変化があったらkeyStatusを呼ぶ
reedSW.when_pressed = keyStatus
reedSW.when_released = keyStatus
        
if __name__ == "__main__":
    print("アプリ開始")
    # スケジューラのインスタンスを作成する
    scheduler = BackgroundScheduler()
    # スケジューラーにジョブを追加する
    scheduler.add_job(changeDuty, 'cron', hour=7, minute=0)
    scheduler.add_job(changeDuty, 'cron', hour=21, minute=0)
    # スケジューラを開始する
    scheduler.start()
    # 初期明るさ設定
    changeDuty()
    # web開始
    print("API開始")
    uvicorn.run(app, host="0.0.0.0", port=8000)
    print("アプリ終了")

ステータス更新対応

ひとまずそれなりのものができたが、もう一つ大きな問題が残っている。

画面からのボタン押下などで施錠や開錠を行うと、reloadを入れているので、画面がリフレッシュされ現在の鍵の状態が表示される。しかし手動で鍵を開け閉めしたり、物理ボタンで施錠や開錠をしたとき、画面はリフレッシュしていないので、現在の鍵の状態が画面と不一致となる。

これについては、APIサーバ側からアクションを行いクライアント画面の内容を書き換える必要があるのだが、通常のwebではそんな仕組みが無い。

Server-Sent Events(SSE)

そこでいろいろと探してみると、Server-Sent Events(以下SSE)という技術が見つかった。主に動画など大きなデータをサーバからだらだら流すときに使用している技術っぽいのだが、なかなか仕組みを理解するのは難しい。

いろいろとググッていると、googleのAIがいい感じのサンプルを提示してくれた。まあこれもどこかのブログのパクリなんだと思うが、fastAPIベースで簡単な記述のサンプルなので試してみた。

import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
# SSE
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()
templates = Jinja2Templates(directory="templates")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Adjust as needed for security
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
async def event_generator():
    count = 0
    while True:
        yield {"event": "message", "data": {count}}
        await asyncio.sleep(1)  # Send an update every second
        count += 1
        if count > 1440: count = 0

@app.get("/stream")
async def stream_events():
    return EventSourceResponse(event_generator())

@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
    return templates.TemplateResponse("index.html", {
        "lock": 1,
        "request": request
    })

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

何を行ったかというと、6~8行目にimportを追加。13行目から30行目までがサンプル処理を入れたところ。
内容は1秒ごとにメッセージを送るというもの。
これを受け取るクライアント側は以下。

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8" />
    <title></title>
  </head>
  <body>
    Hello {{ name }}!
  <script>
// Create a new EventSource connection to your FastAPI SSE endpoint
const eventSource = new EventSource('/stream');

// Listen for messages without a specific event type
eventSource.onmessage = function(event) {
    console.log("Received message:", event.data);
};

// Listen for messages with a specific event type (e.g., 'update')
eventSource.addEventListener('update', function(event) {
    // console.log("Received 'update' event:", event.data);
    // リードスイッチの状態変化があったらリロード
    location.reload()
});

// Handle errors
eventSource.onerror = function(error) {
    console.error("EventSource error:", error);
    // You might want to implement custom reconnection logic here,
    // though EventSource handles automatic reconnection by default.
};
  </script>
  </body>
</html>

こちらには、9行目から31行目までのjavascriptを追加。ここで「/stream」につないだ後そこからメッセージを受け取る。メッセージはconsole.logに出力しているのでデベロッパー画面で見ることができる。

1秒ごとにメッセージが増えている。かっこ内のカウンタ値は毎回変化しているので、サーバ側でセットした変動値がきちんと送られてきている。

これで、サーバからクライアントにアクションを行う仕組みを構築することができた。

uvicorn落ちない問題

良い仕組みがそれなりの少ないコード量で構築出来て喜んでいたが、別の問題が発覚。APIサーバをCtrl+Cで処理を落とそうとしても、落ちない。

画面には、さらにCtrl+C押せば強制終了とあるが、実際には押してみても応答せず、アプリは生きたまま。アプリはまだ生きているため、SSEが動作しており、クライアント側ではログが出続けている。

調べてみるとwebサーバとなるuvicornはクライアントセッションがすべて切れてから落ちるとのことで、切断待ちに入っているようなのだが、SSE処理で切れることはないので、ずっとつながりっぱなしとなりwebサーバもずっと落ちない。困った。

セッションが切れると落ちるようなので、クライアントでリロードを行うと、セッションが切れてくれて、webサーバ(uvicorn)が落ちてくれた。う~ん、アプリを留めるときは毎回クライアントのリロードが必要っぽい。面倒だな。

まあ、クライアントがリロードをすればよいので、script部分に「setTimeout(() => { location.reload()}, 60000);」を突っ込んでみた。1分ごとに強制リロードを行う。これでサーバアプリ停止時には、最長1分以内には落ちてくれる。そもそも状態表示のためにはリロードはしょっちゅう必要なので、1分ごとにリロードがかかっても問題ないだろう。

ステータス更新対応

ということで、この技術を使用して、リードスイッチのステータスが変化したら、sseでアクションを送るようにする。クライアント側ではステータス変化の情報を受け取ったら、画面をリロードする。本来は画面更新せずにステータスを内部変化させるのが良いのだろうが、たいしたアプリではないので、リロードでやっつけてしまう。

ということで、状態変化フラグ「changeSWflag」を用意して、
 リードスイッチが変化したらフラグをオンにする。
 フラグがオンの場合、updateメッセージをSSEで送る
 updateメッセージを送ったらフラグはオフにする
として、状態変化時にはサーバーからクライアントにメッセージを送るようにした。

クライアント側は、updateメッセージが飛んできたら、画面をリロード。
とこれだけ。リロードすると最新状態の内容がもらえるので、最新表示が可能。

リアルタイム状態更新版アプリ完成

ということで、SSE処理を組み込んだひとまずの完成版が出来上がった。

# api-server(ASGI)
import uvicorn
# api
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
# static mount
from fastapi.staticfiles import StaticFiles
# jinjya2 template
from fastapi.templating import Jinja2Templates
# lifespan
from contextlib import asynccontextmanager
# 外部処理実行
import subprocess
# timer関連
from time import sleep
import datetime
# スケジューラ
from apscheduler.schedulers.background import BackgroundScheduler
# GPIO
from gpiozero import LED, PWMLED, Button
# SSE
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import asyncio

# 明るさ(小数値 max=1)
duty=0.1
# 動作中他処理ブロック時間(秒)
blockTime = 2
# LEDの設定(PWM点灯)
ledBlue = PWMLED(22, initial_value = duty)
ledRed = PWMLED(27, initial_value = 0)
# 入力ボタンの設定(50ms以上押下)
button = Button(5, bounce_time=0.05)
# リードスイッチ
reedSW = Button(11)
# 動作中ブロックフラグ
lockbusy = False
# リードスイッチの変化有無
changeSWflag = False

# lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    global changeSWflag
    # Load the ML model
    print("uvicornの開始")
    yield
    # Clean up the ML models and release the resources
    print("uvicornの終了")

# fastAPI準備
app = FastAPI(lifespan=lifespan)
# テンプレートの準備
templates = Jinja2Templates(directory="templates")
# staticフォルダをstaticにマウント
app.mount(path="/static", app=StaticFiles(directory="static"), name="static")


app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Adjust as needed for security
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
async def event_generator():
    global changeSWflag
    count = 0
    while True:
        if count%10 == 0:
            yield {"event": "message", "data": {int(count / 10)}}
        if changeSWflag:
            # Simulate real-time data updates
            yield {"event": "update", "data": f"Current count: {count}, changeSWflag: {changeSWflag}"}
            changeSWflag = False
        await asyncio.sleep(0.1)  # Send an update every second
        count += 1
        if count > 1440: count = 0

@app.get("/stream")
async def stream_events():
    return EventSourceResponse(event_generator())

@app.get("/", response_class=HTMLResponse)
# rootアクセス時
async def index(request: Request):
    lockData = reedSW.value
    print("response ReedSW: %d" % lockData)
    return templates.TemplateResponse("index.html", {
        "lock": lockData,
        "request": request
    })

@app.get("/duty")
# 「http://url/?val=100」で明るさ指定(0~100)
async def dutyset(val: int = duty * 100):
    global duty
    print("dutysetパラメータ:%d" % val)
    if val == duty * 100:
        return {"message": "Duty not changed(%d%)" % int(duty * 100)}
    else:
        changeDuty(val)
        return {"message": "Duty Changed(%d%)" % int(duty * 100)}

# LED明るさ変更処理(PWM Duty比変更)
def changeDuty(val: int = -1):
    global duty
    dt_now = datetime.datetime.now()
    # 単純呼び出し時は時刻で明るさ決定
    if val == -1:
        if 7 <= dt_now.hour and dt_now.hour < 21:
            duty=1.0 # 7時~21時は100%
        else:
            duty=0.2 # 夜中は20%
    else:
        # 明るさ指定時は指定の明るさに変更(%指定)
        if 0 <= val and val <= 100:
            duty = val / 100
    print("現在時刻:%d時 明るさは%d%にします" % (dt_now.hour, duty * 100))
    # LED設定更新
    keyStatus()

# 処理ブロック状態の解除
def clearBusy():
    global lockbusy
    lockbusy = False
    print("block clear")

# 2秒後に処理ブロックを起動
def releaseBlock():
    run_datetime = datetime.datetime.now() + datetime.timedelta(seconds = blockTime)
    # 処理ブロック解除呼び出し
    scheduler.add_job(clearBusy, 'date', run_date=run_datetime, id='clearBusy')

#API
@app.get("/lock")
# 施錠処理
def lock():
    global lockbusy
    if lockbusy == True:
        # ブロック中の場合は何もせず終了
        print("lock Cancel")
        return {"message": "lock cancel"}
    lockbusy = True
    # 外部呼出しで施錠処理実行
    result = subprocess.run(('python', 'lock.py'))
    # 処理ブロック解除呼び出し
    releaseBlock()
    return {"message": "motor locked"}

@app.get("/unlock")
# 開錠処理
def unlock():
    global lockbusy
    if lockbusy == True:
        # ブロック中の場合は何もせず終了
        print("unlock Cancel")
        return {"message": "lock cancel"}
    lockbusy = True
    # 外部呼出しで開錠処理実行
    result = subprocess.run(('python', 'lock.py', 'rev'))
    # 処理ブロック解除呼び出し
    releaseBlock()
    return {"message": "motor unlocked"}

# じんわり点灯
def fadeinLED(swled: PWMLED):
    # 0%からDuty%まで輝度を上げる
    for brightness in range(0, int(duty * 100), 1):
        swled.value = brightness / 100.0
        # 10ms待機(輝度の変化をなめらかに)
        sleep(0.01)

# 現在の施錠状態(リードスイッチ)
def keyStatus():
    global changeSWflag
    print("リードスイッチ状態:%d 、明るさ:%d%" % (reedSW.value, duty * 100))
    changeSWflag = True
    if reedSW.value == 1:  # 施錠中の場合
        ledBlue.off()      #  青は消す
        fadeinLED(ledRed)  #  赤を点灯
    else:                  # 開錠中の場合
        ledRed.off()       #  赤は消す
        fadeinLED(ledBlue) #  青を点灯

# ボタン押下時処理
def btnPush():
    global lockbusy
    if lockbusy != True:
        if ledBlue.value > 0:
            # 開錠状態なら施錠する
            ledBlue.blink(on_time=0.1, off_time=0.1,background=True)
            ledRed.blink(on_time=0.1, off_time=0.1,background=True)
            lock()
            keyStatus()
            print("施錠しました")
        else:
            # 施錠状態なら開錠する
            ledBlue.blink(on_time=0.1, off_time=0.1,background=True)
            ledRed.blink(on_time=0.1, off_time=0.1,background=True)
            unlock()
            keyStatus()
            print("開錠しました")
        lockbusy = True
    else:
        print("btnPush Cancel")

# ボタンが押されたら、btnPushを呼ぶ
button.when_pressed = btnPush
# リードスイッチに変化があったらkeyStatusを呼ぶ
reedSW.when_pressed = keyStatus
reedSW.when_released = keyStatus

if __name__ == "__main__":
    print("アプリ開始")
    # スケジューラのインスタンスを作成する
    scheduler = BackgroundScheduler()
    # スケジューラーにジョブを追加する
    scheduler.add_job(changeDuty, 'cron', hour=7, minute=0)
    scheduler.add_job(changeDuty, 'cron', hour=21, minute=0)
    # スケジューラを開始する
    scheduler.start()
    # 初期明るさ設定
    changeDuty()
    # web開始
    print("API開始")
    uvicorn.run(app, host="0.0.0.0", port=8000)
    print("アプリ終了")
        
<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8" />
    <link rel="icon" href="data:image/x-icon;base64,~省略~">
    <title>SmartLock</title>
    <link rel="stylesheet" href="/static/styles.css">
  </head>
  <body>
    <script src="/static/script.js"></script>
    <div class="smartlock">
        <!-- ヘッダ -->
        <div class="header" ><h1>SmartLock</h1></div>
        {% if lock==1 %}
        <!-- 施錠中 -->
          <div class="status lock">
            <h2>施錠中</h2>
          </div>
        {% else %}
          <div class="status unlock">
            <h2>開錠中</h2>
          </div>
        {% endif %}
        <div class="contents">
          <button id="keyopen" class="btn unlock" onclick="onOpen()">開錠</button>
          <button id="keyclose" class="btn lock" onclick="onClose()">施錠</button>
        </div>
        <div class="footer">
          <h5>RaspberryPi Zero2 ,
            ULN2003AN ,
            SteppingMotor[28BYJ-48]
          </h5>
        </div>
    </div>
  </body>
</html>
async function onOpen() {
  // 開錠操作
  const res = await fetch("/unlock");
  console.log(res)
  // 状態再読み込み
  location.reload()
}
async function onClose() {
  // 施錠操作
  const res = await fetch("/lock");
  console.log(res)
  // 状態再読み込み
  location.reload()
}

// Create a new EventSource connection to your FastAPI SSE endpoint
const eventSource = new EventSource('/stream'); 
// 1分ごとに勝手にリロード
let timerId = setTimeout(() => { location.reload()}, 60000);

// Listen for messages without a specific event type
eventSource.onmessage = function(event) {
    console.log("Received message:", event.data);
};

// Listen for messages with a specific event type (e.g., 'update')
eventSource.addEventListener('update', function(event) {
    // console.log("Received 'update' event:", event.data);
    // リードスイッチの状態変化があったらリロード
    location.reload()
});

// Handle errors
eventSource.onerror = function(error) {
    console.error("EventSource error:", error);
    // You might want to implement custom reconnection logic here, 
    // though EventSource handles automatic reconnection by default.
};

// Close the connection when no longer needed
// eventSource.close(); 
/* ヘッダ部 */
div.header, div.footer {
  height: 77px;
  background-color: #ccecff;
  text-align: center;
}
h1 {
  padding: 20px 10px 0px 10px;
  font-family: Arial, Helvetica, sans-serif;
}
h1.red, h5.red {
  color: #ff0000;
}
h5 {
  margin: 0px;
  padding: 30px 10px 0px 10px;
}
/* ステータス部 */
div.status {
  height: 77px;
  text-align: center;
}
div.unlock { /* 開錠中 */
  background-color: green;
}
div.lock { /* 施錠中 */
  background-color: red;
}
h2 {
  color: #ffffff;
  padding: 20px 10px 0px 10px;
  margin: 0px;
}
div.disabled {
  background-color: #c0c0c0;
}
/* コンテンツ部 */
div.contents {
  display: flex;
  flex-flow: row wrap;
  justify-content: space-around;
  margin:20px 0px 20px 0px;
}

.btn { /* ボタン表示 */
  display: inline-block;
  min-width: 170px;
  width: 40%;
  height: 200px;
  text-align: center;
  font-size: 30px;
  color: #FFF;
  text-decoration: none;
  font-weight: bold;
  padding: 10px 24px;
  border-radius: 4px;
}

button.btn:active { /* ボタン有効 */
  transform: translateY(4px);
  border-bottom: none;
}
button.btn:disabled { /* ボタン無効 */
  background-color: #c0c0c0;
  border-bottom: 0px solid #c0c0c0;
  color: #e0e0e0;
}
button.unlock { /* ボタン開錠用 */
  background-color: #8cd460;
  border-bottom: 4px solid #6cb440;
}
button.lock { /* ボタン施錠用 */
  background-color: #ff8181;
  border-bottom: 4px solid #df6161;
}
* {
  user-select: none;
  -moz-user-select: none;
  -webkit-user-select: none;
  -ms-user-select: none;
}
from gpiozero import LED
from gpiozero import LEDBoard
import time
import sys

leds = LEDBoard(6,13,19,26) # Blue, Orange, Yellow, Pink
s02 = ((1,0,0,1),(1,1,0,0),(0,1,1,0),(0,0,1,1))
startms = 0

ms = 2    # wait(ms)
agl = 128 # 512 = 360 degree
direc = 1 # 1 or -1

def motor_main(direction):
    for i in range(agl):
        motor_step(s02, direction, ms)
    time.sleep(0.2)
    for i in range(agl):
        motor_step(s02, -direction, ms)

def motor_step(pattern, direction, wtime):
    for j in range(len(pattern)):
        for k in range(len(pattern[j])):
            if pattern[j][k] == 1:
                leds[k * direction].on()
            else:
                leds[k * direction].off()
        time.sleep(wtime * 0.001)

def destroy():
    for i in range(4):
        leds[i].off()
    tms = int(time.time() * 1000) - startms
    print("rotation end")
    print("rotate-time  %d.%d s [wait %dms]" % (tms // 1000, tms % 1000, ms))

if __name__ == '__main__':
    try:
        args = sys.argv
        startms = int(time.time() * 1000)
        print("starting rotation(%d)" % len(args))
        if len(args) > 1:
            if args[1] == "rev":
                direc = -1
        motor_main(direc)
    except KeyboardInterrupt:
        pass
    destroy()

デーモン化

ひとまずそれなりに動きそうなものができたので、サービス化して常駐させたいと思う。
ラズベリーパイなのでsystemdに登録すればよいだろう。

# systemctlをリロードして認識させる
sudo systemctl daemon-reload
# 見えているか確認
systemctl status fastapi
# サービス開始
sudo systemctl start fastapi
# 起動できているか確認
systemctl status fastapi
# 自動起動するように設定
sudo systemctl enable fastapi

3Dプリンタ スマートロック編一覧

3Dプリンタ スマートロック編(1) ハード作成
3Dプリンタでスマートロックを作ってみる
3Dプリンタ スマートロック編(2) ソフト作成
3Dプリンタでスマートロックを作ってみる
3Dプリンタ スマートロック編(3) 状態検知
3Dプリンタでスマートロックを作ってみる
3Dプリンタ スマートロック編(4) obniz
3Dプリンタでスマートロックを作ってみる
3Dプリンタ スマートロック編(5) 取付中
3Dプリンタでスマートロックを作ってみる
3Dプリンタ スマートロック「2」編(1) 改善検討
3Dプリンタでスマートロック「2」を作ってみる
3Dプリンタ スマートロック「2」編(2) 交換取付
3Dプリンタでスマートロック「2」を作ってみる
3Dプリンタ スマートロック「2」編(3) 2台目
3Dプリンタでスマートロック「2」を作ってみる
3Dプリンタ スマートロック「2」編(4) ガタつき
3Dプリンタでスマートロック「2」を作ってみる
3Dプリンタ スマートロック「2」編(5) シンプルに
3Dプリンタでスマートロック「2」を作ってみる
3Dプリンタ スマートロック「2」編(6) 配線整理
3Dプリンタでスマートロック「2」を作ってみる
3Dプリンタ スマートロック「3」編(1) 再作成
3Dプリンタでスマートロック「3」を作ってみる
3Dプリンタ スマートロック「3」編(2) APIサーバ
3Dプリンタでスマートロック「3」を作ってみる