질문과 답변

node-RED에 Sensor 와 Dashboard 연결하기 (예제코드)
2023. 12. 6 (수) 12:04 goingdol 조회 42 좋아요 0 스크랩 0 댓글 1

안녕하세요.

 

오늘 진행한 웨비나에 사용된 시험코드 입니다.  raspberry PI의 main.py 와 sensor.py 입니다.

dht22 온/습도 센서, 7-segment led, Relay control 코드 입니다.   먼저 main.py 입니다.

 

from sensor import TempHumiSensorClient
import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("rasp/rcv/#")

# Callback function for disconnection
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Unexpected disconnection. Reconnecting...")
        while True:
            try:
                client.reconnect()
                break  # Exit the loop if reconnection is successful
            except:
                time.sleep(5)

def main():
    print("Rasp node-RED start..")
    mqtt_client = mqtt.Client()
    mqtt_client.connect("192.168.0.3", 1883)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_disconnect = on_disconnect

    temphumi_client = TempHumiSensorClient("temphumi", mqtt_client)

    try:
        # Set the callback functions
        temphumi_client.run()
        mqtt_client.loop_start()
    except KeyboardInterrupt:
        pass
    finally:
        temphumi_client.mqtt_client.disconnect()

if __name__ == "__main__":
    main()

 

다음은 sensor.py 입니다.

import RPi.GPIO as GPIO
import Adafruit_DHT as dht
import tm1637
import threading
import time

# TM1637을 설정할때 Borad connector pin번호를 입력하는것이 아니고 BCM의 실제 GPIO 번호를 입력해야함.
# CLK -> GPIO23 (Pin 16)
# Di0 -> GPIO24 (Pin 18)
tm = tm1637.TM1637(22, 23)

class Sensor:
    def __init__(self, sensor_type, mqtt_client):
        GPIO.setmode(GPIO.BOARD)
        # LED On/Off
        GPIO.setup(12, GPIO.OUT)
        # Motor Test
        GPIO.setup(36, GPIO.OUT)
        # 초기값 = On
        GPIO.output(12, GPIO.HIGH)

        self.next_t = time.time()
        self.increment = 2
        self.sensor_type = sensor_type
        self.mqtt_client = mqtt_client
        self.mqtt_client.on_message = self.on_message # set the callback for incomming message.
        self.setup_sensor()
        print(f"Sensor : {sensor_type} {mqtt_client} constructed..")

    def publish_data(self, topic, payload):
        self.mqtt_client.publish(topic, payload)

    def on_message(self, client, userdata, message):
        received_message = message.payload.decode("utf-8")
        print(f"Rasp Node rcv message on topic {message.topic}: {received_message}")
        if message.topic == "rasp/rcv/led":
            if received_message == "off":
                GPIO.output(12, GPIO.LOW)
                print(f"received off.")

            elif received_message == "on":
                GPIO.output(12, GPIO.HIGH)
                print(f"received on.")

        if message.topic == "rasp/rcv/humimotor":
            if received_message == "off":
                GPIO.output(12, GPIO.LOW)
                print(f"received off.")

            elif received_message == "on":
                GPIO.output(12, GPIO.HIGH)
                print(f"received on.")

        if message.topic == "rasp/rcv/motor":
            if received_message == "off":
                GPIO.output(36, GPIO.LOW)
                print(f"received off.")

            elif received_message == "on":
                GPIO.output(36, GPIO.HIGH)
                print(f"received on.")

            elif received_message == "do_something":
                self.do_someting()

    def do_someting(self):
        # Handle command from broket.
        pass

#-------------------- Temp&Humi Sensor ---------------------#

class TempHumiSensorClient(Sensor):

    def setup_sensor(self):
        print(f"Temp & Humi sensor_setup.")

    def report_light_level(self):
        segment = "    "
        tm.show(segment, colon=True)
        humidity, temperature = dht.read_retry(dht.DHT22, 4)
        humi = round(humidity, 3)
        temp = round(temperature, 3)
        print(f"{time.ctime()} : Temp: {temp}    Humi: {humi}")
        self.publish_data(f"rasp/send/temp", temp)
        self.publish_data(f"rasp/send/humi", humi)
        segment = str(round(temp))+str(round(humi))
        tm.show(segment, colon=True)
        # tm.write([127, 255, 127, 127])

    def run(self):
        print(f"Temp & Humi SensorClient run..")
        def report_periodically():
            self.report_light_level()
            self.next_t += self.increment
            # self.timer = threading.Timer(2, report_periodically)  # Report every 2 seconds
            self.timer = threading.Timer(self.next_t - time.time(), report_periodically)
            self.timer.start()

        # self.timer = threading.Timer(2, report_periodically)  # Initial start
        self.timer = threading.Timer(self.next_t - time.time(), report_periodically)
        self.timer.start()

 

 

다음은 BeagleBoneBlack 의 main.py 와 sensor.py 입니다.  Ambient light sensor, Motion sensor, LED on/off처리를 합니다. Motion sensor에서 GPIO event 발생시 처리하는 routine이 raspberry PI와 다릅니다. 

먼저 main.py 입니다.

from sensor import MotionSensorClient
from sensor import AmbientLightSensorClient
import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("bbb/rcv/#")

# Callback function for disconnection
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Unexpected disconnection. Reconnecting...")
        while True:
            try:
                client.reconnect()
                break  # Exit the loop if reconnection is successful
            except:
                time.sleep(5)

def main():
    print("BBB node-RED start..")
    mqtt_client = mqtt.Client()
    mqtt_client.connect("192.168.0.3", 1883)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_disconnect = on_disconnect

    motion_client = MotionSensorClient("motion", mqtt_client)
    ambient_light_client = AmbientLightSensorClient("light", mqtt_client)

    try:
        # Set the callback functions
        motion_client.run()
        ambient_light_client.run()
        mqtt_client.loop_start()
    except KeyboardInterrupt:
        pass
    finally:
        motion_client.mqtt_client.disconnect()
        ambient_light_client.mqtt_client.disconnect()

if __name__ == "__main__":
    main()

 

다음은 BeagleBoneBlack의 sensor.py 입니다.

import paho.mqtt.client as mqtt
import Adafruit_BBIO.GPIO as GPIO
import Adafruit_BBIO.ADC as ADC
import threading
import time

class Sensor:
    def __init__(self, sensor_type, mqtt_client):
        self.sensor_type = sensor_type
        self.mqtt_client = mqtt_client
        self.mqtt_client.on_message = self.on_message # set the callback for incomming message.
        self.next_t = time.time()
        self.increment = 2
        self.setup_sensor()
        print(f"Sensor : {sensor_type} {mqtt_client} constructed..")

    def publish_data(self, topic, payload):
        self.mqtt_client.publish(topic, payload)

    def on_message(self, client, userdata, message):
        received_message = message.payload.decode("utf-8")
        print(f"BBB Node rcv message on topic {message.topic}: {received_message}")
        if message.topic == "bbb/rcv/led":
            if received_message == "off":
                GPIO.output("P8_7", GPIO.LOW)
                print(f"received off.")
            elif received_message == "on":
                GPIO.output("P8_7", GPIO.HIGH)
                print(f"received on.")
            elif received_message == "do_something":
                self.do_someting()

        if message.topic == "bbb/rcv/motion":
            if received_message == "off":
                GPIO.output("P8_7", GPIO.LOW)
                print(f"received off.")
            elif received_message == "on":
                GPIO.output("P8_7", GPIO.HIGH)
                print(f"received on.")

        if message.topic == "bbb/rcv/als":
            if received_message == "off":
                GPIO.output("P8_7", GPIO.LOW)
                print(f"received off.")
            elif received_message == "on":
                GPIO.output("P8_7", GPIO.HIGH)
                print(f"received on.")
            elif received_message == "do_something":
                self.do_someting()

    def do_someting(self):
        # Handle command from broket.
        pass

#-------------------- Motion Sensor ---------------------#

class MotionSensorClient(Sensor):
#    def __init__(self, sensor_type, mqtt_client):
#        Sensor.__init__(self, sensor_type, mqtt_client)
#        print(f"Motion sensor_init {mqtt_client} {mqtt_client.on_message}..")

    def setup_sensor(self):
        GPIO.setup("P9_12", GPIO.IN)
        GPIO.setup("P8_7", GPIO.OUT)
        GPIO.add_event_detect("P9_12", GPIO.RISING, callback=self.motion_detected)
        print(f"Motion sensor_setup..")

    def motion_detected(self, channel):
        self.publish_data(f"bbb/rcv/motion", "on")
        print(f"Motion on report...")

    def run(self):
        print(f"MotionSensorClient run..")

#-------------------- Ambient Light Sensor ---------------------#

class AmbientLightSensorClient(Sensor):
#    def __init__(self, sensor_type, mqtt_client):
#        Sensor.__init__(self, sensor_type, mqtt_client)
#        print(f"ALS sensor_init {mqtt_client} {mqtt_client.on_message}..")

    def setup_sensor(self):
        ADC.setup()
        print(f"ALS sensor_setup..")

    def report_light_level(self):
        adc_value = ADC.read("P9_40")
        light_level = round((adc_value * 1.8), 2)
        print(f"{time.ctime()} : ALS report_light_level : {light_level} ")
        self.publish_data(f"bbb/send/als", light_level)

    def run(self):
        print(f"AmbientLightSensorClient run..")
        def report_periodically():
            self.report_light_level()
            self.next_t += self.increment
            self.timer = threading.Timer(self.next_t - time.time(), report_periodically).start()  # Report every 2 seconds
            # self.timer = threading.Timer(2, report_periodically)  # Report every 2 seconds
            # self.timer.start()

        self.timer = threading.Timer(self.next_t - time.time(), report_periodically).start()  # Report every 2 seconds
        #self.timer = threading.Timer(2, report_periodically)  # Initial start
        #self.timer.start()

 

감사합니다.

 

초코별 2023.12.06 16:43 답글  | 
웨비나 잘 들었습니다. 혹시 오늘 웨비나에서 사용하셨던 Node-RED 플로우 json 파일과(dashboard layout 포함) 설치하신 widget 이름을 올려 주실수 있나요?

로그인 후
참가 상태를 확인할 수 있습니다.