著者:米田聡、麻生二郎
シェルスクリプトマガジンでは、小型コンピュータボード「Raspberry Pi」のオリジナル拡張ボード第2弾として「ラズパイセンサーボード」を製作しました。サンプルプログラムで、搭載されている湿温度・気圧センサー、
ガスセンサー、照度センサーの使い方を紹介します。
シェルスクリプトマガジン Vol.57は以下のリンク先でご購入できます。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | 1 #!/usr/bin/env python3 2 # 3 # apt install python3-pip 4 # sudo pip3 install RPi.BME280 5 # 6 7 import time 8 import smbus2 9 import bme280 10 11 from EbOled import EbOled 12 13 BME280_ADDR = 0x76 14 BUS_NO = 1 15 16 # BME280 17 i2c = smbus2.SMBus(BUS_NO) 18 bme280.load_calibration_params(i2c, BME280_ADDR) 19 20 # OLEDパネル 21 oled = EbOled() 22 oled.begin() 23 oled.clear() 24 oled.display() 25 26 try: 27   while True: 28     data = bme280.sample(i2c, BME280_ADDR) 29     oled.drawString('気温 :' + str(round(data.temperature,1)) + '℃', 0) 30     oled.drawString('湿度 :' + str(round(data.humidity,1)) + '%', 1) 31     oled.drawString('気圧 :' + str(round(data.pressure,1)) + 'hPa', 2) 32     oled.display() 33 34     time.sleep(1) 35 except KeyboardInterrupt: 36   pass | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | 1 import time 2 import Adafruit_GPIO.SPI as SPI 3 import Adafruit_SSD1306 4 5 from PIL import Image 6 from PIL import ImageDraw 7 from PIL import ImageFont 8 9 10 class EbOled(Adafruit_SSD1306.SSD1306_128_64): 11 12   WIDTH = 128 13   HEIGHT = 64 14 15   __RST = 24 16   __DC = 23 17   SPI_PORT = 0 18   SPI_DEVICE = 0 19 20   DEFAULT_FONT = '/usr/share/fonts/truetype/fonts-japanese-gothic.ttf' 21   FONT_SIZE = 14 22   _LINE_HEIGHT = 16 23 24   def __init__(self): 25     self.__spi = SPI.SpiDev(self.SPI_PORT, self.SPI_DEVICE, max_speed_hz=8000000) 26     super().__init__(rst=self.__RST, dc=self.__DC, spi=self.__spi) 27     self._image = Image.new('1', (self.WIDTH, self.HEIGHT) ,0) 28     self._draw = ImageDraw.Draw(self._image) 29     self._font = ImageFont.truetype(self.DEFAULT_FONT, self.FONT_SIZE, encoding='unic') 30 31   def image(self, image): 32     self._image = image 33     super().image(self._image) 34 35   def drawString(self, str, line=0): 36     self._draw.rectangle((0, line*self._LINE_HEIGHT, self.WIDTH,line*self._LINE_HEIGHT+self._LINE_HEIGHT), fill=(0)) 37     self._draw.text((0, line*self._LINE_HEIGHT), str, font=self._font, fill=1) 38     self.image(self._image) | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | 1 #!/usr/bin/env python3 2 import time 3 import RPi.GPIO as GPIO 4 5 from EbOled import EbOled 6 from TP401T import TP401T 7 8 BUZZER = 18 9 10 sensor = TP401T() 11 oled = EbOled() 12 oled.begin() 13 oled.clear() 14 oled.display() 15 16 # BUZZER 17 GPIO.setmode(GPIO.BCM) 18 GPIO.setup(BUZZER, GPIO.OUT) 19 bz = GPIO.PWM(BUZZER, 1000) 20 bz.stop() 21 22 try: 23   sensor.start() 24   oled.drawString('待機中です') 25   oled.display() 26   while sensor.state == TP401T.WAITING: # 測定開始待ち 27     time.sleep(3) 28 29   while True: 30     if sensor.state == TP401T.NORMAL: 31       oled.drawString('空気は正常です') 32     else: 33       oled.drawString('汚染されています!!') 34 35     if sensor.state == TP401T.ALERT: 36       bz.start(50) 37     else: 38       bz.stop() 39 40     oled.display() 41     time.sleep(3) 42 43 except KeyboardInterrupt: 44   sensor.stop() # センサー停止 45 46 GPIO.cleanup() | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |  1 import threading  2 import time  3 from MCP3424 import MCP3424  4  5 class TP401T(MCP3424):  6  7   WAITING = -1 # 測定開始待ち  8   NORMAL = 0 # 空気に問題はない  9   ALERT = 1 # 汚染されている 10   WARNING = 2 # 汚染されているが減少中 11 12   term = False 13   __current_state = -1 14 15   def __init__(self, ch = 0): 16     super().__init__() 17     self.tp401_ch = ch 18     self.term = False 19     self.prev_value = self.getVoltage(self.tp401_ch) 20     self.normal_value = self.prev_value 21 22   def start(self): 23     self.worker = threading.Thread(target=self.__measure) 24     self.term = False 25     self.worker.start() 26 27   def stop(self): 28     self.term = True 29     self.worker.join() 30 31   def __sleep(self, sec): 32     for i in range(sec * 100): 33       if self.term == True: 34         return False 35       time.sleep(1/100) 36 37     return True 38      39   def __measure(self): 40     self.__current_state = self.WAITING 41      42     sum = 0.0 43     for i in range(10): 44       sum += self.getVoltage(self.tp401_ch) 45       if self.__sleep(3) == False: 46         return 47     # 30秒間の平均値を平時の値として採用する 48     self.normal_value = sum / 10 49     self.prev_value = self.normal_value 50     self.__current_state = self.NORMAL 51      52     while self.__sleep(3): 53       value = self.getVoltage(self.tp401_ch) 54       if value < self.normal_value * 1.5: 55         self.__current_state = self.NORMAL 56       else: 57         if (self.prev_value - value) < 0:  # 汚染が増加中 58           self.__current_state = self.ALERT 59         else: 60           self.__current_state = self.WARNING 61        62       self.prev_value = value 63    64   @property 65   def state(self): 66     return self.__current_state 67    68   def __del__(self): 69     self.term = True | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |  1 import os  2   3 class MCP3424():  4     5   SYSFS_PATH = '/sys/bus/i2c/devices/i2c-1/1-0068'  6   SYSFS_IIO  = '/iio:device0/'  7   __enable = False  8     9   def __init__(self): 10     if not os.path.exists(self.SYSFS_PATH): 11       os.system('sudo /bin/bash -c "echo \'mcp3424 0x68\' > /sys/bus/i2c/devices/i2c-1/new_device"') 12      13     if not os.path.exists(self.SYSFS_PATH): 14       raise Exception('sysfs error') 15      16     self.__enable = True 17    18   def getVoltage(self, ch): 19     if not self.__enable: 20       return 0 21      22     raw = 0.0 23     scale = 0.0 24      25     with open(self.SYSFS_PATH+self.SYSFS_IIO+'in_voltage{}_raw'.format(ch), "r") as f: 26       raw = float(f.read()) 27     with open(self.SYSFS_PATH+self.SYSFS_IIO+'in_voltage{}_scale'.format(ch), "r") as f: 28       scale = float(f.read()) 29      30     return (raw * scale) 31    32   @property 33   def ch0(self): 34     return self.getVoltage(0) 35    36   @property 37   def ch1(self): 38     return self.getVoltage(1) 39    40   @property 41   def ch2(self): 42     return self.getVoltage(2) 43  44   @property 45   def ch3(self): 46     return self.getVoltage(3) | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |  1 #!/usr/bin/env python3  2 import time  3 import RPi.GPIO as GPIO  4 import threading  5   6 from EbOled import EbOled  7 from  VCNL4020 import  VCNL4020  8   9 BUZZER = 18 10  11 term = False 12 sensor = VCNL4020() 13 oled = EbOled() 14 oled.begin() 15 oled.clear() 16 oled.display() 17  18 def prox(): 19   global term 20    21   GPIO.setmode(GPIO.BCM) 22   GPIO.setup(BUZZER, GPIO.OUT) 23   bz = GPIO.PWM(BUZZER, 1000) 24    25   while term == False: 26     if sensor.proximity > 4000: 27       bz.start(50) 28     else: 29       bz.stop() 30     time.sleep(0.1) 31    32   GPIO.cleanup(BUZZER) 33  34  35 t = threading.Thread(target=prox) 36 t.start() 37  38 try: 39   while True: 40     oled.drawString('明るさ: ' + str(sensor.luminance) +' lux') 41     oled.display() 42     time.sleep(0.2) 43 except KeyboardInterrupt: 44   term = True | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |  1 # https://www.vishay.com/docs/83476/vcnl4020.pdf  2   3 import smbus  4 import time  5 from threading import BoundedSemaphore  6   7 class VCNL4020():  8   9   _ALS_OD       = 0b00010000  # オンデマンド明るさ計測スタート 10   _PROX_OD      = 0b00001000  # オンデマンド近接計測スタート 11   _ALS_EN       = 0b00000100  # 明るさ繰り返し計測有効 12   _PROX_EN      = 0b00000010  # 近接繰り返し計測有効 13   _SELFTIMED_EN = 0b00000001  # 内蔵タイマー有効 14    15   _CONT_CONV    = 0b10000000  # Continue Conversion有効 16   _AMBIENT_RATE = 0b00010000  # 明るさの計測レート(default:2sample/s) 17   _AUTO_OFFSET  = 0b00001000  # 自動オフセットモード有効 18   _AVERAGING    = 0b00000101  # 平均化(default:32conv) 19    20   _COMMAND_REG       = 0x80  # コマンドレジスタ 21   _PID_REG           = 0x81  # プロダクトIDレジスタ 22   _PROX_RATE_REG     = 0x82  # 近接測定レートジスタ 23   _IR_CURRENT_REG    = 0x83  # 近接測定用赤外線LED電流設定レジスタ(default=20mA) 24   _AMBIENT_PARAM_REG = 0x84  # 明るさセンサーパラメータレジスタ 25    26   _AMBIENT_MSB       = 0x85  # 明るさ上位バイト 27   _AMBIENT_LSB     = 0x86  # 明るさ下位バイト 28    29   _PROX_MSB          = 0x87  # 近接上位バイト 30   _PROX_LSB          = 0x88  # 近接下位バイト 31    32   def __init__(self, i2c_addr = 0x13, busno = 1): 33     self.addr = i2c_addr 34     self.i2c = smbus.SMBus(busno) 35      36     self._write_reg(self._COMMAND_REG, self._ALS_OD  |\ 37                        self._PROX_OD |\ 38                        self._ALS_EN  |\ 39                        self._PROX_EN |\ 40                        self._SELFTIMED_EN ) 41                         42     self._write_reg(self._IR_CURRENT_REG, 2 )  # 20mA 43                         44     self._write_reg(self._AMBIENT_PARAM_REG, self._CONT_CONV    |\ 45                          self._AMBIENT_RATE |\ 46                          self._AUTO_OFFSET  |\ 47                          self._AVERAGING ) 48     self.semaphore = BoundedSemaphore() 49     time.sleep(0.6)      # 初回測定まで待つ 50      51   def _write_reg(self, reg, value): 52     self.i2c.write_byte_data(self.addr, reg, value) 53    54   @property 55   def luminance(self): 56     self.semaphore.acquire() 57     d = self.i2c.read_i2c_block_data(self.addr, self._AMBIENT_MSB, 2) 58     self.semaphore.release() 59     return (d[0] * 256 + d[1]) 60    61   @property 62   def proximity(self): 63     self.semaphore.acquire() 64     d = self.i2c.read_i2c_block_data(self.addr, self._PROX_MSB, 2) 65     self.semaphore.release() 66     return (d[0] * 256 + d[1]) |