シェルスクリプトマガジン

Raspberry Pi Pico W/WHで始める電子工作(Vol.90掲載)

著者:米田 聡

本連載では、人気のマイコンボード「Raspberry Pi Pico W/WH」を活用していきます。同ボードは、無
線LANやBluetoothの通信機能を搭載し、入手しやすく価格も手頃なので、IoT機器を自作するのに最適なハードウエアです。第6回は、DHT-11で測った湿温度をグラフ化します。

シェルスクリプトマガジン Vol.90は以下のリンク先でご購入できます。

図2 10 分おきに湿温度を測定して配列に保存する「DhtDataLogger」クラスのコード(ddl.py)

import dht
import time
import gc
import micropython
import _thread
from machine import Pin
from machine import RTC
from machine import Timer

DHT_PIN = 15
TICK = 10
SIZE = 144

class DhtDataLogger():
    
    def __init__(self, dht_gpio = DHT_PIN, tick=TICK, size=SIZE):
        self.rtc = RTC()
        self.dht = dht.DHT11(Pin(dht_gpio))
        self.data = []
        self.tick = tick
        self.size = size
        # 1分おきにタイマー割り込みを呼ぶ
        self.timer = Timer(period=60*1000, mode=Timer.PERIODIC, callback=self._timer_handler)
        self.lock = _thread.allocate_lock()
    
    def _timer_handler(self, t):
        now = self.rtc.datetime()
        if now[5] % self.tick == 0:        
            micropython.schedule(self._logger, now)
    
    def _logger(self, now):
        # ロックをかける
        self.lock.acquire()
        try:
            self.dht.measure()
        except OSError as e:
            pass
        
        # 測定日時
        nowstr = "{0:02d}:{1:02d}:{2:02d}".format(now[4],now[5],now[6])
        self.data.append([nowstr, self.dht.temperature(), self.dht.humidity()])
        if len(self.data) > self.size:
            del self.data[0]
        # debug
        # print(self.data[-1])
        # ロック解除
        self.lock.release()
        # ガベージコレクト
        gc.collect()

    def get_data(self):
        if len(self.data) == 0:
            return None
        # 配列のコピーを返す        
        rval = []
        self.lock.acquire()
        for v in self.data:
            rval.append(v)
        self.lock.release()
        return rval

図3 蓄積した湿温度をグラフとして表示する簡易Webサーバーのプログラム(main.py)

import usocket as socket
import network
import time
import ujson as json
import gc
from machine import Timer
from ddl import DhtDataLogger

http_header = '''
HTTP/1.1 200 OK\r
Content-Type: text/html;charset=UTF-8\r
Cache-Control: no-store, no-cache, max-age=0, must-revalidate, proxy-revalidate\r
Connection: close\r
\r
'''

html_base='''
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="utf-8">
    <title>Pico W</title>
</head>
%s
</html>
'''

graph_body = '''
<body>
  <canvas id="DHTChart"></canvas>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.2/Chart.bundle.js"></script>
    
  <script>
  var dhtc = document.getElementById("DHTChart");
  var DHTChart = new Chart(dhtc, {
    type: 'line',
    data: {
      labels: %s,
      datasets: [
        {
          label: '気温(度)',
          data: %s,
          borderColor: "rgba(255,0,0,1)",
          backgroundColor: "rgba(0,0,0,0)"
        },
        {
          label: '湿度(%)',
          data: %s,
          borderColor: "rgba(0,0,255,1)",
          backgroundColor: "rgba(0,0,0,0)"
        }
      ],
    },
    options: {
      title: {
        display: true,
        text: '気温と湿度'
      },
      scales: {
        yAxes: [{
          ticks: {
            suggestedMax: 100,
            suggestedMin: -10,
            stepSize: 10,
          }
        }]
      },
    }
  });
  </script>
</body>
'''

logger = DhtDataLogger()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 80))
sock.listen(4)

try:
    while True:
        conn, addr = sock.accept()
        connect_from = str(addr[0])
        req = conn.recv(1024).decode()
        if req.startswith('GET / HTTP/1.1'):
            tlist = logger.get_data()
            if tlist is None:
                body = html_base % ("<body><h1>Data not ready</h1></body>")
            else:
                labels = []
                temps = []
                hums = []
                for v in tlist:
                    labels.append(v[0])
                    temps.append(v[1])
                    hums.append(v[2])
                body = graph_body % (json.dumps(labels), json.dumps(temps), json.dumps(hums))
            
            html = html_base % (body)
            # 送信            
            conn.send(http_header)
            conn.sendall(html)
            conn.close()
                        
        else:
            conn.sendall('HTTP/1.1 404 Not Found\r\n')
            conn.close()
    
        gc.collect()

except KeyboardInterrupt:
    pass

sock.close()