질문과 답변
node-RED에 Sensor 와 Dashboard 연결하기 (예제코드)
2023. 12. 6 (수) 12:04
goingdol
조회 42
좋아요 0
스크랩 0
댓글 1
안녕하세요.
오늘 진행한 웨비나에 사용된 시험코드 입니다. raspberry PI의 main.py 와 sensor.py 입니다.
dht22 온/습도 센서, 7-segment led, Relay control 코드 입니다. 먼저 main.py 입니다.
from sensor import TempHumiSensorClient
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe("rasp/rcv/#")
# Callback function for disconnection
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection. Reconnecting...")
while True:
try:
client.reconnect()
break # Exit the loop if reconnection is successful
except:
time.sleep(5)
def main():
print("Rasp node-RED start..")
mqtt_client = mqtt.Client()
mqtt_client.connect("192.168.0.3", 1883)
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
temphumi_client = TempHumiSensorClient("temphumi", mqtt_client)
try:
# Set the callback functions
temphumi_client.run()
mqtt_client.loop_start()
except KeyboardInterrupt:
pass
finally:
temphumi_client.mqtt_client.disconnect()
if __name__ == "__main__":
main()
다음은 sensor.py 입니다.
import RPi.GPIO as GPIO
import Adafruit_DHT as dht
import tm1637
import threading
import time
# TM1637을 설정할때 Borad connector pin번호를 입력하는것이 아니고 BCM의 실제 GPIO 번호를 입력해야함.
# CLK -> GPIO23 (Pin 16)
# Di0 -> GPIO24 (Pin 18)
tm = tm1637.TM1637(22, 23)
class Sensor:
def __init__(self, sensor_type, mqtt_client):
GPIO.setmode(GPIO.BOARD)
# LED On/Off
GPIO.setup(12, GPIO.OUT)
# Motor Test
GPIO.setup(36, GPIO.OUT)
# 초기값 = On
GPIO.output(12, GPIO.HIGH)
self.next_t = time.time()
self.increment = 2
self.sensor_type = sensor_type
self.mqtt_client = mqtt_client
self.mqtt_client.on_message = self.on_message # set the callback for incomming message.
self.setup_sensor()
print(f"Sensor : {sensor_type} {mqtt_client} constructed..")
def publish_data(self, topic, payload):
self.mqtt_client.publish(topic, payload)
def on_message(self, client, userdata, message):
received_message = message.payload.decode("utf-8")
print(f"Rasp Node rcv message on topic {message.topic}: {received_message}")
if message.topic == "rasp/rcv/led":
if received_message == "off":
GPIO.output(12, GPIO.LOW)
print(f"received off.")
elif received_message == "on":
GPIO.output(12, GPIO.HIGH)
print(f"received on.")
if message.topic == "rasp/rcv/humimotor":
if received_message == "off":
GPIO.output(12, GPIO.LOW)
print(f"received off.")
elif received_message == "on":
GPIO.output(12, GPIO.HIGH)
print(f"received on.")
if message.topic == "rasp/rcv/motor":
if received_message == "off":
GPIO.output(36, GPIO.LOW)
print(f"received off.")
elif received_message == "on":
GPIO.output(36, GPIO.HIGH)
print(f"received on.")
elif received_message == "do_something":
self.do_someting()
def do_someting(self):
# Handle command from broket.
pass
#-------------------- Temp&Humi Sensor ---------------------#
class TempHumiSensorClient(Sensor):
def setup_sensor(self):
print(f"Temp & Humi sensor_setup.")
def report_light_level(self):
segment = " "
tm.show(segment, colon=True)
humidity, temperature = dht.read_retry(dht.DHT22, 4)
humi = round(humidity, 3)
temp = round(temperature, 3)
print(f"{time.ctime()} : Temp: {temp} Humi: {humi}")
self.publish_data(f"rasp/send/temp", temp)
self.publish_data(f"rasp/send/humi", humi)
segment = str(round(temp))+str(round(humi))
tm.show(segment, colon=True)
# tm.write([127, 255, 127, 127])
def run(self):
print(f"Temp & Humi SensorClient run..")
def report_periodically():
self.report_light_level()
self.next_t += self.increment
# self.timer = threading.Timer(2, report_periodically) # Report every 2 seconds
self.timer = threading.Timer(self.next_t - time.time(), report_periodically)
self.timer.start()
# self.timer = threading.Timer(2, report_periodically) # Initial start
self.timer = threading.Timer(self.next_t - time.time(), report_periodically)
self.timer.start()
다음은 BeagleBoneBlack 의 main.py 와 sensor.py 입니다. Ambient light sensor, Motion sensor, LED on/off처리를 합니다. Motion sensor에서 GPIO event 발생시 처리하는 routine이 raspberry PI와 다릅니다.
먼저 main.py 입니다.
from sensor import MotionSensorClient
from sensor import AmbientLightSensorClient
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe("bbb/rcv/#")
# Callback function for disconnection
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnection. Reconnecting...")
while True:
try:
client.reconnect()
break # Exit the loop if reconnection is successful
except:
time.sleep(5)
def main():
print("BBB node-RED start..")
mqtt_client = mqtt.Client()
mqtt_client.connect("192.168.0.3", 1883)
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
motion_client = MotionSensorClient("motion", mqtt_client)
ambient_light_client = AmbientLightSensorClient("light", mqtt_client)
try:
# Set the callback functions
motion_client.run()
ambient_light_client.run()
mqtt_client.loop_start()
except KeyboardInterrupt:
pass
finally:
motion_client.mqtt_client.disconnect()
ambient_light_client.mqtt_client.disconnect()
if __name__ == "__main__":
main()
다음은 BeagleBoneBlack의 sensor.py 입니다.
import paho.mqtt.client as mqtt
import Adafruit_BBIO.GPIO as GPIO
import Adafruit_BBIO.ADC as ADC
import threading
import time
class Sensor:
def __init__(self, sensor_type, mqtt_client):
self.sensor_type = sensor_type
self.mqtt_client = mqtt_client
self.mqtt_client.on_message = self.on_message # set the callback for incomming message.
self.next_t = time.time()
self.increment = 2
self.setup_sensor()
print(f"Sensor : {sensor_type} {mqtt_client} constructed..")
def publish_data(self, topic, payload):
self.mqtt_client.publish(topic, payload)
def on_message(self, client, userdata, message):
received_message = message.payload.decode("utf-8")
print(f"BBB Node rcv message on topic {message.topic}: {received_message}")
if message.topic == "bbb/rcv/led":
if received_message == "off":
GPIO.output("P8_7", GPIO.LOW)
print(f"received off.")
elif received_message == "on":
GPIO.output("P8_7", GPIO.HIGH)
print(f"received on.")
elif received_message == "do_something":
self.do_someting()
if message.topic == "bbb/rcv/motion":
if received_message == "off":
GPIO.output("P8_7", GPIO.LOW)
print(f"received off.")
elif received_message == "on":
GPIO.output("P8_7", GPIO.HIGH)
print(f"received on.")
if message.topic == "bbb/rcv/als":
if received_message == "off":
GPIO.output("P8_7", GPIO.LOW)
print(f"received off.")
elif received_message == "on":
GPIO.output("P8_7", GPIO.HIGH)
print(f"received on.")
elif received_message == "do_something":
self.do_someting()
def do_someting(self):
# Handle command from broket.
pass
#-------------------- Motion Sensor ---------------------#
class MotionSensorClient(Sensor):
# def __init__(self, sensor_type, mqtt_client):
# Sensor.__init__(self, sensor_type, mqtt_client)
# print(f"Motion sensor_init {mqtt_client} {mqtt_client.on_message}..")
def setup_sensor(self):
GPIO.setup("P9_12", GPIO.IN)
GPIO.setup("P8_7", GPIO.OUT)
GPIO.add_event_detect("P9_12", GPIO.RISING, callback=self.motion_detected)
print(f"Motion sensor_setup..")
def motion_detected(self, channel):
self.publish_data(f"bbb/rcv/motion", "on")
print(f"Motion on report...")
def run(self):
print(f"MotionSensorClient run..")
#-------------------- Ambient Light Sensor ---------------------#
class AmbientLightSensorClient(Sensor):
# def __init__(self, sensor_type, mqtt_client):
# Sensor.__init__(self, sensor_type, mqtt_client)
# print(f"ALS sensor_init {mqtt_client} {mqtt_client.on_message}..")
def setup_sensor(self):
ADC.setup()
print(f"ALS sensor_setup..")
def report_light_level(self):
adc_value = ADC.read("P9_40")
light_level = round((adc_value * 1.8), 2)
print(f"{time.ctime()} : ALS report_light_level : {light_level} ")
self.publish_data(f"bbb/send/als", light_level)
def run(self):
print(f"AmbientLightSensorClient run..")
def report_periodically():
self.report_light_level()
self.next_t += self.increment
self.timer = threading.Timer(self.next_t - time.time(), report_periodically).start() # Report every 2 seconds
# self.timer = threading.Timer(2, report_periodically) # Report every 2 seconds
# self.timer.start()
self.timer = threading.Timer(self.next_t - time.time(), report_periodically).start() # Report every 2 seconds
#self.timer = threading.Timer(2, report_periodically) # Initial start
#self.timer.start()
감사합니다.
초코별
2023.12.06 16:43
답글
|
웨비나 잘 들었습니다. 혹시 오늘 웨비나에서 사용하셨던 Node-RED 플로우 json 파일과(dashboard layout 포함) 설치하신 widget 이름을 올려 주실수 있나요?
로그인 후
참가 상태를 확인할 수 있습니다.