著者:米田 聡
本連載では、人気のマイコンボード「Raspberry Pi Pico W/WH」を活用していきます。同ボードは、無
線LANやBluetoothの通信機能を搭載し、入手しやすく価格も手頃なので、IoT機器を自作するのに最適なハードウエアです。第6回は、DHT-11で測った湿温度をグラフ化します。
シェルスクリプトマガジン Vol.90は以下のリンク先でご購入できます。
図2 10 分おきに湿温度を測定して配列に保存する「DhtDataLogger」クラスのコード(ddl.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
import dht import time import gc import micropython import _thread from machine import Pin from machine import RTC from machine import Timer DHT_PIN = 15 TICK = 10 SIZE = 144 class DhtDataLogger(): def __init__(self, dht_gpio = DHT_PIN, tick=TICK, size=SIZE): self.rtc = RTC() self.dht = dht.DHT11(Pin(dht_gpio)) self.data = [] self.tick = tick self.size = size # 1分おきにタイマー割り込みを呼ぶ self.timer = Timer(period=60*1000, mode=Timer.PERIODIC, callback=self._timer_handler) self.lock = _thread.allocate_lock() def _timer_handler(self, t): now = self.rtc.datetime() if now[5] % self.tick == 0: micropython.schedule(self._logger, now) def _logger(self, now): # ロックをかける self.lock.acquire() try: self.dht.measure() except OSError as e: pass # 測定日時 nowstr = "{0:02d}:{1:02d}:{2:02d}".format(now[4],now[5],now[6]) self.data.append([nowstr, self.dht.temperature(), self.dht.humidity()]) if len(self.data) > self.size: del self.data[0] # debug # print(self.data[-1]) # ロック解除 self.lock.release() # ガベージコレクト gc.collect() def get_data(self): if len(self.data) == 0: return None # 配列のコピーを返す rval = [] self.lock.acquire() for v in self.data: rval.append(v) self.lock.release() return rval |
図3 蓄積した湿温度をグラフとして表示する簡易Webサーバーのプログラム(main.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import usocket as socket import network import time import ujson as json import gc from machine import Timer from ddl import DhtDataLogger http_header = ''' HTTP/1.1 200 OK\r Content-Type: text/html;charset=UTF-8\r Cache-Control: no-store, no-cache, max-age=0, must-revalidate, proxy-revalidate\r Connection: close\r \r ''' html_base=''' <!DOCTYPE html> <html lang="ja"> <head> <meta charset="utf-8"> <title>Pico W</title> </head> %s </html> ''' graph_body = ''' <body> <canvas id="DHTChart"></canvas> <script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.2/Chart.bundle.js"></script> <script> var dhtc = document.getElementById("DHTChart"); var DHTChart = new Chart(dhtc, { type: 'line', data: { labels: %s, datasets: [ { label: '気温(度)', data: %s, borderColor: "rgba(255,0,0,1)", backgroundColor: "rgba(0,0,0,0)" }, { label: '湿度(%)', data: %s, borderColor: "rgba(0,0,255,1)", backgroundColor: "rgba(0,0,0,0)" } ], }, options: { title: { display: true, text: '気温と湿度' }, scales: { yAxes: [{ ticks: { suggestedMax: 100, suggestedMin: -10, stepSize: 10, } }] }, } }); </script> </body> ''' logger = DhtDataLogger() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('', 80)) sock.listen(4) try: while True: conn, addr = sock.accept() connect_from = str(addr[0]) req = conn.recv(1024).decode() if req.startswith('GET / HTTP/1.1'): tlist = logger.get_data() if tlist is None: body = html_base % ("<body><h1>Data not ready</h1></body>") else: labels = [] temps = [] hums = [] for v in tlist: labels.append(v[0]) temps.append(v[1]) hums.append(v[2]) body = graph_body % (json.dumps(labels), json.dumps(temps), json.dumps(hums)) html = html_base % (body) # 送信 conn.send(http_header) conn.sendall(html) conn.close() else: conn.sendall('HTTP/1.1 404 Not Found\r\n') conn.close() gc.collect() except KeyboardInterrupt: pass sock.close() |