著者:小河原 直道
Raspberry Pi 5(以下、ラズパイ5)が日本でも発売され、私も購入しました。センサーとの接続やサーバーの構築をしてみたいと思ったため、湿温度/気圧/CO₂濃度を測定できるセンサーを接続し、その値をWebブラウザから確認できるシステムを作成しました。このシステムの作成ポイントについて解説します。
シェルスクリプトマガジン Vol.90は以下のリンク先でご購入できます。![]()
![]()
図8 BME280のチップIDを読み出すPythonコード
import spidev
# 読み取りは8ビット目を「1」に, 書き込みは「0」にする
def get_reg_addr(rw: str, addr: int) -> int:
rw_flag = 0b1000_0000
if rw == "w":
rw_flag = 0b0000_0000
return rw_flag | (addr & 0b0111_1111)
spi = spidev.SpiDev()
spi.open(0, 0) # bus0, cs0
spi.mode = 0 # mode0, CPOL=CPHA=0
spi.max_speed_hz = 10 * 10**6 # 10MHz
bme_id = spi.xfer2([get_reg_addr("r", 0xD0), 0])
print(f"id: {hex(bme_id[1])}")
spi.close()
図9 BME280の気温データを読み出すPythonコード
import spidev
# 読み取りは8ビット目を「1」に, 書き込みは「0」にする
def get_reg_addr(rw: str, addr: int) -> int:
rw_flag = 0b1000_0000
if rw == "w":
rw_flag = 0b0000_0000
return rw_flag | (addr & 0b0111_1111)
spi = spidev.SpiDev()
spi.open(0, 0) # bus0, cs0
spi.mode = 0 # mode0, CPOL=CPHA=0
spi.max_speed_hz = 10 * 10**6 # 10MHz
# 湿度の測定を有効化。オーバーサンプリングx1
r = spi.xfer2([get_reg_addr("r", 0xF2), 0])[1]
ctrl_hum = r & 0b1111_1000 | 0b001
spi.xfer2([get_reg_addr("w", 0xF2), ctrl_hum])
# 気圧、気温の測定を有効化。オーバーサンプリングx1、ノーマルモードに
ctrl_meas = (0b001 << 5) + (0b001 << 3) + 0b11
spi.xfer2([get_reg_addr("w", 0xF4), ctrl_meas])
# 気温データの読み出し
temp_raw_bins = spi.xfer2([get_reg_addr("r", 0xFA), 0, 0, 0])
temp_raw = (temp_raw_bins[1] << 8+4) | (temp_raw_bins[2] << 4) |\
((temp_raw_bins[3]>>4) & 0b00001111)
print(f"気温: {temp_raw}")
spi.close()
図10 MH-Z19CのCO2濃度データを読み出すPythonコード
import serial
import sys
import time
serial = serial.Serial(port="/dev/ttyAMA0", timeout=5)
# 念のためバッファをリセットする
serial.reset_input_buffer()
serial.reset_output_buffer()
time.sleep(2) # 待ち時間は適宜増減して調整
s = bytearray.fromhex("FF 01 86 00 00 00 00 00 79")
serial.write(s)
r = serial.read(size=9)
serial.close()
if r[0] != 0xFF:
print("invalid data\n")
sys.exit(-1)
for_check=0
for i, b in enumerate(r):
if i==0 or i==8:
continue
# チェックサムは1バイトのみ、あふれた分は捨てる
for_check = for_check + b & 0b1111_1111
for_check = 0xFF - for_check + 1
if r[8] != for_check:
print("checksum error\n")
sys.exit(-1)
print(f"{r[2]*256 + r[3]} ppm")
図12 測定データをデータベースに保存するPythonコード
import mariadb
import sys
try:
conn = mariadb.connect(
user = ユーザー名, password = パスワード,
host = "127.0.0.1", port = 3306,
database = "sensor1", autocommit = True
)
except mariadb.Error as e:
print(f"db connect error: {e}")
sys.exit(-1)
cur = conn.cursor()
sql = 'INSERT INTO sensor (datetime, temp, humid, press, co2) VALUES(NOW(), ?, ?, ?, ?)'
データ(temp_data、humid_data、press_data、co2_data)取得用コードをここに記述
try:
cur.execute(sql, (temp_data, humid_data, press_data, co2_data))
except Exception as e:
print(f"insert error: {e}")
finally:
cur.close()
conn.close()
図13 「db/db_pool.js」ファイルに記述するコード
const mariadb = require("mariadb");
const pool = mariadb.createPool({
host: "localhost",
user: ユーザー名,
password: パスワード,
database: "sensor1",
connectionLimit: 5,
});
module.exports = pool;
図14 「route/api_v1.js」ファイルに記述するコード(抜粋)
const router = require("express").Router();
const pool = require("../db/db_pool");
function getColumnName(sensor){
switch(sensor){
case "all":return "*";
case "temperature": return "datetime, temp"
case "humidity": return "datetime, humid"
case "pressure": return "datetime, press"
case "co2": return "datetime, co2"
default: throw new Error("sensor name error");
}
}
router.get("/:sensor/latest", async(req, res) => {
let sqlCol;
try{
sqlCol = getColumnName(req.params.sensor);
} catch (e) { return res.status(404).send(bad sensor name); }
try{
let dbRes = await pool.query(
select ${sqlCol} from sensor order by datetime desc limit 1
);
// クエリー結果の最初のレコードをJSON形式で送る
return res.status(200).json(dbRes[0]);
} catch(err) {
return res.status(500).json({mes:err.toString()});
}
});
module.exports = router;
図15 「route/index.js」ファイルに記述するコード
const express = require('express');
const router = express.Router();
const path = require('path');
router.get("/", (req, res) => {
res.sendFile(path.join(__dirname, "../view/index.html"));
});
router.use("/api/v1", require("./api_v1"));
module.exports = router;
図16 「app.js」ファイルに記述するコード
const express = require("express");
const app = express();
const path = require('path');
app.use("/", require("./route/index"));
app.use(express.static(path.join(__dirname, "public")));
app.all("*", (req, res) => {
return res.sendStatus(404);
});
app.listen(3000, () => {
console.log("Listening at port 3000");
});