수행기록퀘스트4
Quest 4) 클라우드 연동 환경 알림 전광판
2022. 10. 3 (월) 22:59
최종수정 2022. 10. 17 (월) 18:24
MacGyver
조회 900
좋아요 1
스크랩 1
댓글 0
아이디어 소개 및 개요
외출하기 전 꼭 확인해야하는 날씨, 기온, 습도, 미세 먼지 등을 간편하고 직관적으로 표시해주는 미니 전광판이 있다면 편리할 것이라는 아이디어로 시작하게 되었습니다. 미니 전광판은 클라우드와 연동되어 실시간으로 날씨, 미세먼지, 시간 데이터를 불러오고 표시해주는 역할을 합니다. Raspberry Pi Pico W를 통해서 실시간으로 클라우드에 접속하고 LED 및 FND를 사용하여 정보를 표시하고, 별도의 가제트 표시를 위한 스텝 모터 제어를 통해서 보다 친근하고 직관적인 표시가 가능하였습니다.
전체 구조 다이어그램
지역별 환경 정보를 수집하는 Windows 앱은 AWS 클라우드로 수집한 데이터를 Publish합니다. 라즈베리 파이 Pico W를 사용하는 미니 전광판들은 AWS 클라우드를 Subscribe하여 지역에 해당하는 환경정보 데이터를 가져와 사용자에게 표시해주는 역할을 합니다.
하드웨어 구성 다이어그램
소프트웨어 구성 다이어그램
회로도
소스 코드
# -*- coding: utf-8 -*-
# Environment Notification Mini Billboard Source File
# Target System : Raspberry Pi Pico
# CPU Frequency : 250 MHz (Overclocked, Default 125 MHz)
# UF2 File : https://rpf.io/pico-w-firmware
# rp2-pico-w-20220822-unstable-v1.19.1-299-gaf54d2ce9
# IDE : Thonny 3.3.13 (Editor: VS Code)
# Deleloper : H. S. Lee
import os
import sys
import _thread
import time
import math
import json
import machine
from machine import Pin, SPI, PWM
import micropython
from micropython import const
from rp2_common import RP2_Common
from rp2_w_wifi import RP2_W_WiFi
from MQTT_client import MQTT_Client
from HTTP_API import HTTP_API
from stepper_motor import StepperMotor
from fnd_MAX7219 import FND_MAX7219
USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA = True # MQTT Subscribe 날씨 데이터 가져오기 여부
USE_HTTP_API_TO_GET_WEATHER_DATA = not USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA # HTTP API 날씨 데이터 가져오기 여부
DEBUG = True
ENABLE_GC_MAIN = False
ENABLE_GC_SUB = False
KEY_OPENWEATHERMAP_API = "f27f0db69081794eb704257ab53968e2"
SSID = "SM-G930L" # SSID (Service Set Identifier)
PASSWORD = "solar123" # Password
MQTT_BROKER_ENDPOINT = "a17cd1l862h9tu-ats.iot.us-west-1.amazonaws.com"
FILE_KEY = "./Keys_AWS_IoT/DER/private.der"
FILE_CRT = "./Keys_AWS_IoT/DER/certificate.der"
_pin_PROGRAM_EXIT = Pin(22, Pin.IN, Pin.PULL_UP) # Program Exit 스위치
_pin_LED = Pin(25, Pin.OUT, value = 0) # LED on/off
_pin_SW_HOUR_SEL = Pin(26, Pin.IN, Pin.PULL_UP) # 12H / 24H 선택 스위치
_pin_SW_API_MQTT_SEL = Pin(27, Pin.IN, Pin.PULL_UP) # API / MQTT 선택 스위치
_pin_SW_LIMIT_0 = Pin(8, Pin.IN, Pin.PULL_UP) # 리미트 스위치 0
_pin_SW_LIMIT_1 = Pin(9, Pin.IN, Pin.PULL_UP) # 리미트 스위치 1
_lock_print = _thread.allocate_lock()
_lock_time = _thread.allocate_lock()
_time_PE_Main = [0, 0, 0, 0]
_time_PE_Sub = [0, 0, 0]
_pico = RP2_Common(overclock_250MHz = True) # Raspberry Pi Pico 공용 기능 클래스 인스턴스
_http_api = HTTP_API(KEY_OPENWEATHERMAP_API) # HTTP (Hypertext Transfer Protocol) API (Application Programming Interface) 클래스 인스턴스
_smWeather = StepperMotor(lock_time = _lock_time, gpio_A1 = 2, gpio_B1 = 3, gpio_A2 = 4, gpio_B2 = 5, acceleration = 280.0, maximum_speed = 340.0)
_smAir = StepperMotor(lock_time = _lock_time, gpio_A1 = 6, gpio_B1 = 7, gpio_A2 = 14, gpio_B2 = 15, acceleration = 280.0, maximum_speed = 340.0)
_fnd = FND_MAX7219(gpio_DIN = 11, gpio_SCK = 10, gpio_SS = 13, fnd_count = 16, brightness = 15)
_count_GC_Main = int(0)
_count_GC_Sub = int(0)
def get_HTTP_API_WeatherData(printData = False):
try:
if True:
geolocationData = _http_api.get_GeolocationData()
(timezone, public_IP, latitude, longitude, country, countryCode, city) = _http_api.get_GeolocationData()
if bool(printData):
print_WithLock("■ Timezone : " + timezone)
print_WithLock("■ Puglic IP : " + public_IP)
print_WithLock("■ Latitude : " + str(latitude) + " ˚")
print_WithLock("■ Longitude : " + str(longitude) + " ˚")
print_WithLock("■ Country : " + country)
print_WithLock("■ Country Code : " + countryCode)
print_WithLock("■ City : " + city)
if True:
weatherData = _http_api.get_WeatherData()
(timestamp_UTC_Offset, country, countryCode, city, public_IP, timestamp_DataUpdate, timestamp_SunRise, timestamp_SunSet, temp, temp_Min, temp_Max, temp_FeelsLike, humidity, pressure, clouds, wind_Speed, wind_Direction, weather, weather_Description, weather_ID, weather_ImageFile) = weatherData
if bool(printData):
print_WithLock("■ Timestamp UTC Offset : " + str(timestamp_UTC_Offset) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_UTC_Offset, True)) + ")")
print_WithLock("■ Country : " + country)
print_WithLock("■ Country Code : " + countryCode)
print_WithLock("■ City : " + city)
print_WithLock("■ Puglic IP : " + public_IP)
print_WithLock("■ Timestamp DataUpdate : " + str(timestamp_DataUpdate) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_DataUpdate, True)) + ")")
print_WithLock("■ Timestamp Sunrise : " + str(timestamp_SunRise) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_SunRise, True)) + ")")
print_WithLock("■ Timestamp Sunset : " + str(timestamp_SunSet) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_SunSet, True)) + ")")
print_WithLock("■ Temperature : " + str(temp) + " °C")
print_WithLock("■ Temperature Min : " + str(temp_Min) + " °C")
print_WithLock("■ Temperature Max : " + str(temp_Max) + " °C")
print_WithLock("■ Temperature FeelsLike: " + str(temp_FeelsLike) + " °C")
print_WithLock("■ Humidity : " + str(humidity) + " %")
print_WithLock("■ Pressure : " + str(pressure) + " hPa")
print_WithLock("■ Clouds : " + str(clouds) + " %")
print_WithLock("■ Wind Speed : " + str(wind_Speed) + " m/s")
print_WithLock("■ Wind Direction : " + str(wind_Direction) + " °")
print_WithLock("■ Weather : " + weather)
print_WithLock("■ Weather Description : " + weather_Description)
print_WithLock("■ Weather ID : " + str(weather_ID))
print_WithLock("■ Weather Image File : " + weather_ImageFile)
if True:
airQualityData = _http_api.get_AirQualityData()
(timestamp_DataUpdate, AQI, PM10, PM2_5, CO, NO, NO2, O3, SO2, NH3) = airQualityData
if bool(printData):
print_WithLock("■ Timestamp DataUpdate : " + str(timestamp_DataUpdate) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_DataUpdate, True)) + ")")
print_WithLock("■ AQI : " + str(AQI) + " (" + _http_api._strAQI[AQI] + ")")
print_WithLock("■ PM10 : " + str(PM10) + " μg/m³")
print_WithLock("■ PM2.5 : " + str(PM2_5) + " μg/m³")
print_WithLock("■ CO : " + str(CO) + " μg/m³")
print_WithLock("■ NO : " + str(NO) + " μg/m³")
print_WithLock("■ NO2 : " + str(NO2) + " μg/m³")
print_WithLock("■ O3 : " + str(O3) + " μg/m³")
print_WithLock("■ SO2 : " + str(SO2) + " μg/m³")
print_WithLock("■ NH3 : " + str(NH3) + " μg/m³")
if True:
timeData = _http_api.get_TimeData()
(timezone, year, month, day, hour, minute, second, dayOfWeek) = timeData
if bool(printData):
print_WithLock("■ Timezone : " + timezone)
print_WithLock("■ Day : {0:d}-{1:02d}-{2:02d} ({3})".format(year, month, day, dayOfWeek))
print_WithLock("■ Time : {0:02d}:{1:02d}:{2:02d}".format(hour, minute, second))
strDay = "{0:d}-{1:02d}-{2:02d} ({3})".format(year, month, day, dayOfWeek)
strTime = "{0:02d}:{1:02d}:{2:02d}".format(hour, minute, second)
except Exception as err:
if DEBUG:
print_WithLock("[ERROR] " + str(err))
return None
return (strDay, strTime, country, city, temp, temp_FeelsLike, humidity, weather, weather_ImageFile, AQI, PM10, PM2_5)
def request_MQTT_Publish_WeatherData(mqttClient, public_IP):
message = "[E4DS MAKE 콘테스트] 날씨 데이터 Publish 요청"
payload = json.dumps({"message": message, "public_IP": public_IP})
topic = "weather data publish request"
mqttClient.publish(topic, payload)
def callback_MQTT_MessageReceived(topic, payload_UTF8, payload_Bytes):
global _weatherData
try:
data = json.loads(payload_UTF8) # Parse a JSON string
_weatherData = (data["day"], data["time"], data["country"], data["city"], data["temp"], data["temp_FeelsLike"], data["humidity"], data["weather"], data["weather_ImageFile"], data["AQI"], data["PM10"], data["PM2_5"])
except Exception as err:
if DEBUG:
print_WithLock("[ERROR] " + str(err))
def update_WeatherDisplay():
global _weatherData # 날씨 데이터
global _timeDot
global _timeSeconds
if not "_timeDot" in globals(): _timeDot = False
if not "_timeSeconds" in globals(): _timeSeconds = 0
_timeDot = not _timeDot
if _timeDot: _timeSeconds += 1 # 실행 주기: 1 sec
if _weatherData is not None:
try:
(strDay, strTime, country, city, temp, temp_FeelsLike, humidity, weather, weather_ImageFile, AQI, PM10, PM2_5) = _weatherData
_weatherData = None
t = strTime.split(":")
timeSeconds = (int(t[0]) * 3600) + (int(t[1]) * 60) + int(t[2])
if round(_timeSeconds / 60) != round(timeSeconds / 60):
_timeSeconds = timeSeconds # 현재 시간 업데이트 (단위: sec)
num = round(PM2_5)
_fnd.draw_Digit(0, num % 10)
_fnd.draw_Digit(1, (num % 100) // 10 if num > 9 else " ")
_fnd.draw_Digit(2, (num % 1000) // 100 if num > 99 else " ")
num = round(PM10)
_fnd.draw_Digit(3, num % 10)
_fnd.draw_Digit(4, (num % 100) // 10 if num > 9 else " ")
_fnd.draw_Digit(5, (num % 1000) // 100 if num > 99 else " ")
num = round(temp_FeelsLike)
if num < 0:
_fnd.draw_Digit(6, -num % 10)
_fnd.draw_Digit(7, (-num % 100) // 10 if num < -9 else "-", True if num < -9 else False)
else:
_fnd.draw_Digit(6, num % 10)
_fnd.draw_Digit(7, (num % 100) // 10 if num > 9 else " ")
num = round(humidity)
_fnd.draw_Digit(8, num % 10)
_fnd.draw_Digit(9, (num % 100) // 10 if num > 9 else " ")
num = round(temp)
if num < 0:
_fnd.draw_Digit(10, -num % 10)
_fnd.draw_Digit(11, (-num % 100) // 10 if num < -9 else "-", True if num < -9 else False)
else:
_fnd.draw_Digit(10, num % 10)
_fnd.draw_Digit(11, (num % 100) // 10 if num > 9 else " ")
iconName = weather_ImageFile.replace(".png", "")[-3:]
move_SM_Weather(iconName)
aqi = _pico.constrain(AQI, 1, 5) # AQI (1: Good, 2: Fair, 3: Moderate, 4: Poor, 5: Very poor)
move_SM_Air(str(aqi))
except Exception as err:
if DEBUG:
print_WithLock("[ERROR] " + str(err))
num = (_timeSeconds // 60) % 60
_fnd.draw_Digit(12, num % 10)
_fnd.draw_Digit(13, (num % 100) // 10)
if _pin_SW_HOUR_SEL.value() == 1: # 12H / 24H 선택 스위치 체크
num = (_timeSeconds // 3600) % 12
if num == 0: num = 12
_fnd.draw_Digit(14, num % 10, _timeDot)
_fnd.draw_Digit(15, (num % 100) // 10 if num > 9 else " ", _timeDot)
else:
num = (_timeSeconds // 3600) % 24
_fnd.draw_Digit(14, num % 10, _timeDot)
_fnd.draw_Digit(15, (num % 100) // 10, _timeDot)
_fnd.update()
def find_SM_ZeroPosition():
DRIVE_SPEED = 95.0
RETURN_DISTANCE = const(50)
sequence = [2, 2]
if DEBUG: print_WithLock("[INFO] Stepper Motor 기준 위치 찾기 시작")
_smWeather.drive(DRIVE_SPEED)
_smAir.drive(DRIVE_SPEED)
while sequence[0] > 0 or sequence[1] > 0:
if sequence[0] == 2:
if _pin_SW_LIMIT_0.value() == 0:
_smWeather.reset(position = RETURN_DISTANCE)
_smWeather.move(0)
sequence[0] = 1
elif sequence[0] == 1:
if _smWeather.get_Position() == 0:
sequence[0] = 0
if sequence[1] == 2:
if _pin_SW_LIMIT_1.value() == 0:
_smAir.reset(position = RETURN_DISTANCE)
_smAir.move(0)
sequence[1] = 1
elif sequence[1] == 1:
if _smAir.get_Position() == 0:
sequence[1] = 0
if DEBUG: print_WithLock("[INFO] Stepper Motor 기준 위치 찾기 완료")
def move_SM_Weather(weatherIcon):
SM_OFFSET_WEATHER = -105
SM_BLOCK_DIST_WEATHER = -151
posTable = {
"01": const(SM_BLOCK_DIST_WEATHER * 8 + SM_OFFSET_WEATHER), # Clear sky (맑음)
"02": const(SM_BLOCK_DIST_WEATHER * 7 + SM_OFFSET_WEATHER), # Few clouds (구름 조금)
"03": const(SM_BLOCK_DIST_WEATHER * 6 + SM_OFFSET_WEATHER), # Scattered clouds (구름 많음)
"04": const(SM_BLOCK_DIST_WEATHER * 5 + SM_OFFSET_WEATHER), # Broken clouds (흐림)
"10": const(SM_BLOCK_DIST_WEATHER * 4 + SM_OFFSET_WEATHER), # Rain (비)
"09": const(SM_BLOCK_DIST_WEATHER * 3 + SM_OFFSET_WEATHER), # Shower rain (소나기)
"11": const(SM_BLOCK_DIST_WEATHER * 2 + SM_OFFSET_WEATHER), # Thunderstorm (천둥번개)
"13": const(SM_BLOCK_DIST_WEATHER + SM_OFFSET_WEATHER), # Snow (눈)
"50": const(SM_OFFSET_WEATHER) # Mist (안개)
}
pos = posTable.get(str(weatherIcon[:2]), SM_OFFSET_WEATHER)
_smWeather.move(pos)
def move_SM_Air(aqi):
SM_OFFSET_AIR = -100
SM_BLOCK_DIST_AIR = -189
posTable = {
"1": const(SM_OFFSET_AIR), # 1: Good (좋음)
"2": const(SM_BLOCK_DIST_AIR + SM_OFFSET_AIR), # 2: Fair (보통)
"3": const(SM_BLOCK_DIST_AIR * 2 + SM_OFFSET_AIR), # 3: Moderate (약간 나쁨)
"4": const(SM_BLOCK_DIST_AIR * 3 + SM_OFFSET_AIR), # 4: Poor (나쁨)
"5": const(SM_BLOCK_DIST_AIR * 4 + SM_OFFSET_AIR) # 5: Very poor (매우 나쁨)
}
pos = posTable.get(str(aqi), SM_OFFSET_AIR)
_smAir.move(pos)
def main():
global _time_PE_Main
global _weatherData
wifi = RP2_W_WiFi(_lock_time, _lock_print)
mqttClient = MQTT_Client("", callback_MQTT_MessageReceived)
if _pin_SW_API_MQTT_SEL.value() == 1: # API / MQTT 선택 스위치 체크
if DEBUG and True: print_WithLock("[INFO] 날씨 데이터 가져오기 방법 → MQTT Subscribe")
USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA = True
else:
if DEBUG and True: print_WithLock("[INFO] 날씨 데이터 가져오기 방법 → HTTP API")
USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA = False # MQTT Subscribe 날씨 데이터 가져오기 여부
USE_HTTP_API_TO_GET_WEATHER_DATA = not USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA
_fnd.clear()
for i in range(0, 16): _fnd.draw_Digit(i, "O")
_fnd.update()
find_SM_ZeroPosition()
if DEBUG: print_WithLock("Connnecting Network (AP SSID: " + SSID + ")", end = "")
if wifi.connect(SSID, PASSWORD, timeout = 10): # 네트워크 연결
if DEBUG: print_WithLock(" → Success")
else:
if DEBUG: print_WithLock(" → Failure [System Reset]")
machine.reset() # 시스템 리셋
if DEBUG:
wifi.print_ConnectionStatus() # 네트워크 연결 상태 출력
if USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA:
if DEBUG: print_WithLock("Connnecting AWS MQTT Broker (Endpoint: " + MQTT_BROKER_ENDPOINT + ")", end = "")
if mqttClient.connect_To_AWS_MQTT_Broker(MQTT_BROKER_ENDPOINT, FILE_KEY, FILE_CRT): # MQTT Broker 연결 (AWS MQTT Broker)
if DEBUG: print_WithLock(" → Success")
else:
if DEBUG: print_WithLock(" → Failure [System Reset]")
machine.reset() # 시스템 리셋
public_IP = _http_api.get_GeolocationData()[1] # 공용 IP
topic = "weather data " + public_IP # Topic (예: weather data 59.6.230.229)
mqttClient.subscribe(topic) # MQTT Subscribe
if DEBUG: print_WithLock("[INFO] Subscribed to AWS MQTT Broker (Topic: {0})".format(topic))
request_MQTT_Publish_WeatherData(mqttClient, public_IP) # MQTT 날씨 데이터 Publish 요청
if USE_HTTP_API_TO_GET_WEATHER_DATA:
_weatherData = get_HTTP_API_WeatherData(printData = True) # HTTP API 날씨 데이터 가져오기
while True:
time_Elapsed = []
if time_Elapsed[0] > const(20 - 1):
_time_PE_Main[0] = time_Now
if ENABLE_GC_MAIN:
_pico.run_GC()
if time_Elapsed[1] > const(2500 - 1):
_time_PE_Main[1] = time_Now
if time_Elapsed[2] > const(500 - 1):
_time_PE_Main[2] = time_Now
if USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA:
mqttClient.process_CheckMessage()
if time_Elapsed[3] > const(5 * 60 * 1000 - 1):
_time_PE_Main[3] = time_Now
if USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA:
request_MQTT_Publish_WeatherData(mqttClient, public_IP)
if USE_HTTP_API_TO_GET_WEATHER_DATA:
_weatherData = get_HTTP_API_WeatherData()
if _weatherData is not None:
(strDay, strTime, country, city, temp, temp_FeelsLike, humidity, weather, weather_ImageFile, AQI, PM10, PM2_5) = _weatherData
def thread_SubCore(name = str(""), id = int(0)):
global _time_PE_Sub
while True:
time_Elapsed = []
if time_Elapsed[0] > const(25 - 1):
_time_PE_Sub[0] = time_Now
if ENABLE_GC_SUB:
_pico.run_GC()
if time_Elapsed[1] > const(2500 - 1):
_time_PE_Sub[1] = time_Now
_pin_LED.toggle() # LED on/off
if time_Elapsed[2] > const(500 - 1):
_time_PE_Sub[2] = time_Now
update_WeatherDisplay()
if __name__ == "__main__":
main()
작동 동영상
- 첨부파일
- 소스_회로도.zip 다운로드
로그인 후
참가 상태를 확인할 수 있습니다.