著者:米田 聡
本連載では、人気のマイコンボード「Raspberry Pi Pico W/WH」を活用していきます。同ボードは、無線LANやBluetoothの通信機能を搭載し、入手しやすく価格も手頃なので、IoT機器を自作するのに最適なハードウエアです。第5回は、DHT-11で測った湿温度をスマートフォンに通知します。
シェルスクリプトマガジン Vol.89は以下のリンク先でご購入できます。
図4 LINE Notifyを使って通知を送る「notify()」関数を定義したPythonプログラム
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import usocket as socket from urequests import post from micropython import const TOKEN = const("your_access_token") def notify(msg): url = 'https://notify-api.line.me/api/notify' rheaders = { 'Content-type':'application/x-www-form-urlencoded', 'Authorization':'Bearer ' + TOKEN } message = "message=" + msg req = post(url,headers=rheaders,data=message) return req.status_code |
図6 1時間おきに湿温度をスマートフォンに通知するサンプルプログラム(sample.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
from machine import Pin from machine import Timer from micropython import const from micropython import schedule import _thread import dht import line import time DHT_PIN = const(15) TICK = const(60) class Climate(): def __init__(self, dht_gpio=DHT_PIN, tick=TICK, callback=None, expire=TICK): self.dht = dht.DHT11(Pin(dht_gpio)) self.tick = tick self.lock = _thread.allocate_lock() self.timer = Timer(period=tick*1000, mode=Timer.PERIODIC, callback=self._timer_handler) self._humidity = 0 self._temperature = 0 self._measurement_dt = 0 self.measure() self._callback_func = callback self._expire = expire self._counter = 0 def set_callback(self, callback, expire): self._callback_func = callback self._expire = expire def measure(self, arg=0): self.dht.measure() self.lock.acquire() self._humidity = self.dht.humidity() self._temperature = self.dht.temperature() self._measurement_dt = time.time() self.lock.release() def _timer_handler(self,t): schedule(self.measure, 0) self._counter -= 1 if self._counter <= 0: self._counter = self._expire if self._callback_func is not None: schedule(self._callback_func, self) @property def temp(self): self.lock.acquire() rval = self._temperature self.lock.release() return rval @property def hum(self): self.lock.acquire() rval = self._humidity self.lock.release() return rval @property def measurement_date_time(self): self.lock.acquire() rval = self._measurement_dt self.lock.release() return rval def mycalback(c): tm = time.localtime(c.measurement_date_time) line.notify("temp=%d,hum=%d@%4d/%02d/%02d_%02d:%02d:%02d" % (c.temp, c.hum, tm[0],tm[1],tm[2],tm[3],tm[4],tm[5])) if __name__ == '__main__': cl = Climate(callback = mycalback) while(True): machine.idle() |