シェルスクリプトマガジン

Raspberry Piを100%活用しよう(Vol.89掲載)

著者:米田 聡

本連載では、人気のマイコンボード「Raspberry Pi Pico W/WH」を活用していきます。同ボードは、無線LANやBluetoothの通信機能を搭載し、入手しやすく価格も手頃なので、IoT機器を自作するのに最適なハードウエアです。第5回は、DHT-11で測った湿温度をスマートフォンに通知します。

シェルスクリプトマガジン Vol.89は以下のリンク先でご購入できます。

図4 LINE Notifyを使って通知を送る「notify()」関数を定義したPythonプログラム

import usocket as socket
from urequests import post
from micropython import const

TOKEN = const("your_access_token")

def notify(msg):
    url = 'https://notify-api.line.me/api/notify'
    rheaders = {
            'Content-type':'application/x-www-form-urlencoded',
            'Authorization':'Bearer ' + TOKEN
    }

    message = "message=" + msg
    req = post(url,headers=rheaders,data=message)
    return req.status_code

図6 1時間おきに湿温度をスマートフォンに通知するサンプルプログラム(sample.py)

from machine import Pin
from machine import Timer
from micropython import const
from micropython import schedule
import _thread
import dht
import line
import time

DHT_PIN = const(15)
TICK = const(60)

class Climate():
    def __init__(self, dht_gpio=DHT_PIN, tick=TICK, callback=None, expire=TICK):
        self.dht = dht.DHT11(Pin(dht_gpio))
        self.tick = tick
        self.lock = _thread.allocate_lock()
        self.timer = Timer(period=tick*1000, mode=Timer.PERIODIC, callback=self._timer_handler)
        self._humidity = 0
        self._temperature = 0
        self._measurement_dt = 0
        self.measure()
        self._callback_func = callback
        self._expire = expire
        self._counter = 0
    
    def set_callback(self, callback, expire):
        self._callback_func = callback
        self._expire = expire
    
    def measure(self, arg=0):
        self.dht.measure()
        self.lock.acquire()
        self._humidity = self.dht.humidity()
        self._temperature = self.dht.temperature()
        self._measurement_dt = time.time()
        self.lock.release()
    
    def _timer_handler(self,t):
        schedule(self.measure, 0)
        self._counter -= 1
        if self._counter <= 0:
            self._counter = self._expire
            if self._callback_func is not None:
                schedule(self._callback_func, self)
    
    @property
    def temp(self):
        self.lock.acquire()
        rval = self._temperature
        self.lock.release()
        return rval
    
    @property
    def hum(self):
        self.lock.acquire()
        rval = self._humidity
        self.lock.release()
        return rval
    
    @property
    def measurement_date_time(self):
        self.lock.acquire()
        rval = self._measurement_dt
        self.lock.release()
        return rval
        

def mycalback(c):
    tm = time.localtime(c.measurement_date_time)
    line.notify("temp=%d,hum=%d@%4d/%02d/%02d_%02d:%02d:%02d" % (c.temp, c.hum, tm[0],tm[1],tm[2],tm[3],tm[4],tm[5]))
    
if __name__ == '__main__':
    cl = Climate(callback = mycalback)
    
    while(True):
        machine.idle()