수행기록퀘스트4

Quest 4) 클라우드 연동 환경 알림 전광판
2022. 10. 3 (월) 22:59 최종수정 2022. 10. 17 (월) 18:24 MacGyver 조회 900 좋아요 1 스크랩 1 댓글 0

 

아이디어 소개 및 개요

 외출하기 전 꼭 확인해야하는 날씨, 기온, 습도, 미세 먼지 등을 간편하고 직관적으로 표시해주는 미니 전광판이 있다면 편리할 것이라는 아이디어로 시작하게 되었습니다. 미니 전광판은 클라우드와 연동되어 실시간으로 날씨, 미세먼지, 시간 데이터를 불러오고 표시해주는 역할을 합니다. Raspberry Pi Pico W를 통해서 실시간으로 클라우드에 접속하고 LED 및 FND를 사용하여 정보를 표시하고, 별도의 가제트 표시를 위한 스텝 모터 제어를 통해서 보다 친근하고 직관적인 표시가 가능하였습니다.

 

전체 구조 다이어그램

지역별 환경 정보를 수집하는 Windows 앱은 AWS 클라우드로 수집한 데이터를 Publish합니다. 라즈베리 파이 Pico W를 사용하는 미니 전광판들은 AWS 클라우드를 Subscribe하여 지역에 해당하는 환경정보 데이터를 가져와 사용자에게 표시해주는 역할을 합니다.

 

하드웨어 구성 다이어그램

 

소프트웨어 구성 다이어그램

회로도

소스 코드

 

# -*- coding: utf-8 -*-

# Environment Notification Mini Billboard Source File                         
# Target System : Raspberry Pi Pico                                           
# CPU Frequency : 250 MHz (Overclocked, Default 125 MHz)                      
# UF2 File      : https://rpf.io/pico-w-firmware                              
#                 rp2-pico-w-20220822-unstable-v1.19.1-299-gaf54d2ce9         
# IDE           : Thonny 3.3.13 (Editor: VS Code)                             
# Deleloper     : H. S. Lee                                                   

import os
import sys
import _thread
import time
import math
import json
import machine
from machine import Pin, SPI, PWM
import micropython
from micropython import const

from rp2_common import RP2_Common
from rp2_w_wifi import RP2_W_WiFi
from MQTT_client import MQTT_Client
from HTTP_API import HTTP_API
from stepper_motor import StepperMotor
from fnd_MAX7219 import FND_MAX7219

USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA = True                                  # MQTT Subscribe 날씨 데이터 가져오기 여부
USE_HTTP_API_TO_GET_WEATHER_DATA = not USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA  # HTTP API 날씨 데이터 가져오기 여부

DEBUG = True

ENABLE_GC_MAIN = False
ENABLE_GC_SUB = False

KEY_OPENWEATHERMAP_API = "f27f0db69081794eb704257ab53968e2"

SSID = "SM-G930L"                                                          # SSID (Service Set Identifier)
PASSWORD = "solar123"                                                      # Password

MQTT_BROKER_ENDPOINT = "a17cd1l862h9tu-ats.iot.us-west-1.amazonaws.com"
FILE_KEY = "./Keys_AWS_IoT/DER/private.der"
FILE_CRT = "./Keys_AWS_IoT/DER/certificate.der"
    
_pin_PROGRAM_EXIT = Pin(22, Pin.IN, Pin.PULL_UP)                               # Program Exit 스위치
_pin_LED = Pin(25, Pin.OUT, value = 0)                                         # LED on/off
_pin_SW_HOUR_SEL = Pin(26, Pin.IN, Pin.PULL_UP)                                # 12H / 24H 선택 스위치
_pin_SW_API_MQTT_SEL = Pin(27, Pin.IN, Pin.PULL_UP)                            # API / MQTT 선택 스위치
_pin_SW_LIMIT_0 = Pin(8, Pin.IN, Pin.PULL_UP)                                  # 리미트 스위치 0
_pin_SW_LIMIT_1 = Pin(9, Pin.IN, Pin.PULL_UP)                                  # 리미트 스위치 1

_lock_print = _thread.allocate_lock()
_lock_time = _thread.allocate_lock()

_time_PE_Main = [0, 0, 0, 0]
_time_PE_Sub = [0, 0, 0]

_pico = RP2_Common(overclock_250MHz = True)                                    # Raspberry Pi Pico 공용 기능 클래스 인스턴스
_http_api = HTTP_API(KEY_OPENWEATHERMAP_API)                                   # HTTP (Hypertext Transfer Protocol) API (Application Programming Interface) 클래스 인스턴스

_smWeather = StepperMotor(lock_time = _lock_time, gpio_A1 = 2, gpio_B1 = 3, gpio_A2 = 4, gpio_B2 = 5, acceleration = 280.0, maximum_speed = 340.0)
_smAir = StepperMotor(lock_time = _lock_time, gpio_A1 = 6, gpio_B1 = 7, gpio_A2 = 14, gpio_B2 = 15, acceleration = 280.0, maximum_speed = 340.0)
_fnd = FND_MAX7219(gpio_DIN = 11, gpio_SCK = 10, gpio_SS = 13, fnd_count = 16, brightness = 15)

_count_GC_Main = int(0)
_count_GC_Sub = int(0)


def get_HTTP_API_WeatherData(printData = False):
    try:
        if True:
            geolocationData = _http_api.get_GeolocationData()
            (timezone, public_IP, latitude, longitude, country, countryCode, city) = _http_api.get_GeolocationData()

            if bool(printData):
                print_WithLock("■ Timezone             : " + timezone)
                print_WithLock("■ Puglic IP            : " + public_IP)
                print_WithLock("■ Latitude             : " + str(latitude) + " ˚")
                print_WithLock("■ Longitude            : " + str(longitude) + " ˚")
                print_WithLock("■ Country              : " + country)
                print_WithLock("■ Country Code         : " + countryCode)
                print_WithLock("■ City                 : " + city)
        
        if True:
            weatherData = _http_api.get_WeatherData()
            (timestamp_UTC_Offset, country, countryCode, city, public_IP, timestamp_DataUpdate, timestamp_SunRise, timestamp_SunSet, temp, temp_Min, temp_Max, temp_FeelsLike, humidity, pressure, clouds, wind_Speed, wind_Direction, weather, weather_Description, weather_ID, weather_ImageFile) = weatherData

            if bool(printData):
                print_WithLock("■ Timestamp UTC Offset : " + str(timestamp_UTC_Offset) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_UTC_Offset, True)) + ")")
                print_WithLock("■ Country              : " + country)
                print_WithLock("■ Country Code         : " + countryCode)
                print_WithLock("■ City                 : " + city)
                print_WithLock("■ Puglic IP            : " + public_IP)
                print_WithLock("■ Timestamp DataUpdate : " + str(timestamp_DataUpdate) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_DataUpdate, True)) + ")")
                print_WithLock("■ Timestamp Sunrise    : " + str(timestamp_SunRise) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_SunRise, True)) + ")")
                print_WithLock("■ Timestamp Sunset     : " + str(timestamp_SunSet) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_SunSet, True)) + ")")
                print_WithLock("■ Temperature          : " + str(temp) + " °C")
                print_WithLock("■ Temperature Min      : " + str(temp_Min) + " °C")
                print_WithLock("■ Temperature Max      : " + str(temp_Max) + " °C")
                print_WithLock("■ Temperature FeelsLike: " + str(temp_FeelsLike) + " °C")
                print_WithLock("■ Humidity             : " + str(humidity) + " %")
                print_WithLock("■ Pressure             : " + str(pressure) + " hPa")
                print_WithLock("■ Clouds               : " + str(clouds) + " %")
                print_WithLock("■ Wind Speed           : " + str(wind_Speed) + " m/s")
                print_WithLock("■ Wind Direction       : " + str(wind_Direction) + " °")
                print_WithLock("■ Weather              : " + weather)
                print_WithLock("■ Weather Description  : " + weather_Description)
                print_WithLock("■ Weather ID           : " + str(weather_ID))
                print_WithLock("■ Weather Image File   : " + weather_ImageFile)
        
        if True:
            airQualityData = _http_api.get_AirQualityData()
            (timestamp_DataUpdate, AQI, PM10, PM2_5, CO, NO, NO2, O3, SO2, NH3) = airQualityData
            
            if bool(printData):
                print_WithLock("■ Timestamp DataUpdate : " + str(timestamp_DataUpdate) + " (" + str(_http_api.convert_TimestampToDatetime(timestamp_DataUpdate, True)) + ")")
                print_WithLock("■ AQI                  : " + str(AQI) + " (" + _http_api._strAQI[AQI] + ")")
                print_WithLock("■ PM10                 : " + str(PM10) + " μg/m³")
                print_WithLock("■ PM2.5                : " + str(PM2_5) + " μg/m³")
                print_WithLock("■ CO                   : " + str(CO) + " μg/m³")
                print_WithLock("■ NO                   : " + str(NO) + " μg/m³")
                print_WithLock("■ NO2                  : " + str(NO2) + " μg/m³")
                print_WithLock("■ O3                   : " + str(O3) + " μg/m³")
                print_WithLock("■ SO2                  : " + str(SO2) + " μg/m³")
                print_WithLock("■ NH3                  : " + str(NH3) + " μg/m³")
            
        if True:
            timeData = _http_api.get_TimeData()
            (timezone, year, month, day, hour, minute, second, dayOfWeek) = timeData
            
            if bool(printData):
                print_WithLock("■ Timezone             : " + timezone)
                print_WithLock("■ Day                  : {0:d}-{1:02d}-{2:02d} ({3})".format(year, month, day, dayOfWeek))
                print_WithLock("■ Time                 : {0:02d}:{1:02d}:{2:02d}".format(hour, minute, second))
                
            strDay = "{0:d}-{1:02d}-{2:02d} ({3})".format(year, month, day, dayOfWeek)
            strTime = "{0:02d}:{1:02d}:{2:02d}".format(hour, minute, second)
    except Exception as err:
        if DEBUG:
            print_WithLock("[ERROR] " + str(err))
        return None
    
    return (strDay, strTime, country, city, temp, temp_FeelsLike, humidity, weather, weather_ImageFile, AQI, PM10, PM2_5)


def request_MQTT_Publish_WeatherData(mqttClient, public_IP):
    message = "[E4DS MAKE 콘테스트] 날씨 데이터 Publish 요청"
    payload = json.dumps({"message": message, "public_IP": public_IP})
    topic = "weather data publish request"
    
    mqttClient.publish(topic, payload)
    
def callback_MQTT_MessageReceived(topic, payload_UTF8, payload_Bytes):
    global _weatherData
    
    try:
        data = json.loads(payload_UTF8)                                        # Parse a JSON string
        _weatherData = (data["day"], data["time"], data["country"], data["city"], data["temp"], data["temp_FeelsLike"], data["humidity"], data["weather"], data["weather_ImageFile"], data["AQI"], data["PM10"], data["PM2_5"])
    except Exception as err:
        if DEBUG:
            print_WithLock("[ERROR] " + str(err))


def update_WeatherDisplay():
    global _weatherData                                                        # 날씨 데이터
    
    global _timeDot
    global _timeSeconds
    
    if not "_timeDot" in globals(): _timeDot = False
    if not "_timeSeconds" in globals(): _timeSeconds = 0
    
    _timeDot = not _timeDot
    if _timeDot: _timeSeconds += 1                                             # 실행 주기: 1 sec
    
    if _weatherData is not None:
        try:
            (strDay, strTime, country, city, temp, temp_FeelsLike, humidity, weather, weather_ImageFile, AQI, PM10, PM2_5) = _weatherData
            _weatherData = None
        
            t = strTime.split(":")
            timeSeconds = (int(t[0]) * 3600) + (int(t[1]) * 60) + int(t[2])
            
            if round(_timeSeconds / 60) != round(timeSeconds / 60):
                _timeSeconds = timeSeconds                                     # 현재 시간 업데이트 (단위: sec)
            
            num = round(PM2_5)
            _fnd.draw_Digit(0, num % 10)
            _fnd.draw_Digit(1, (num % 100) // 10 if num > 9 else " ")
            _fnd.draw_Digit(2, (num % 1000) // 100 if num > 99 else " ")

            num = round(PM10)
            _fnd.draw_Digit(3, num % 10)
            _fnd.draw_Digit(4, (num % 100) // 10 if num > 9 else " ")
            _fnd.draw_Digit(5, (num % 1000) // 100 if num > 99 else " ")

            num = round(temp_FeelsLike)
            if num < 0:
                _fnd.draw_Digit(6, -num % 10)
                _fnd.draw_Digit(7, (-num % 100) // 10 if num < -9 else "-", True if num < -9 else False)
            else:
                _fnd.draw_Digit(6, num % 10)
                _fnd.draw_Digit(7, (num % 100) // 10 if num > 9 else " ")

            num = round(humidity)
            _fnd.draw_Digit(8, num % 10)
            _fnd.draw_Digit(9, (num % 100) // 10 if num > 9 else " ")

            num = round(temp)
            if num < 0:
                _fnd.draw_Digit(10, -num % 10)
                _fnd.draw_Digit(11, (-num % 100) // 10 if num < -9 else "-", True if num < -9 else False)
            else:
                _fnd.draw_Digit(10, num % 10)
                _fnd.draw_Digit(11, (num % 100) // 10 if num > 9 else " ")

            iconName = weather_ImageFile.replace(".png", "")[-3:]
            
            move_SM_Weather(iconName)
            
            aqi = _pico.constrain(AQI, 1, 5)                                   # AQI (1: Good, 2: Fair, 3: Moderate, 4: Poor, 5: Very poor)

            move_SM_Air(str(aqi))
        except Exception as err:
            if DEBUG:
                print_WithLock("[ERROR] " + str(err))
        
    num = (_timeSeconds // 60) % 60
    _fnd.draw_Digit(12, num % 10)
    _fnd.draw_Digit(13, (num % 100) // 10)
    
    if _pin_SW_HOUR_SEL.value() == 1:                                          # 12H / 24H 선택 스위치 체크
        num = (_timeSeconds // 3600) % 12                                      
        if num == 0: num = 12
        _fnd.draw_Digit(14, num % 10, _timeDot)
        _fnd.draw_Digit(15, (num % 100) // 10 if num > 9 else " ", _timeDot)
    else:
        num = (_timeSeconds // 3600) % 24
        _fnd.draw_Digit(14, num % 10, _timeDot)
        _fnd.draw_Digit(15, (num % 100) // 10, _timeDot)
    
    _fnd.update()

def find_SM_ZeroPosition():
    DRIVE_SPEED = 95.0
    RETURN_DISTANCE = const(50)
    
    sequence = [2, 2]

    if DEBUG: print_WithLock("[INFO] Stepper Motor 기준 위치 찾기 시작")
    
    _smWeather.drive(DRIVE_SPEED)                                          
    _smAir.drive(DRIVE_SPEED)                                                  

    while sequence[0] > 0 or sequence[1] > 0:
        if sequence[0] == 2:
            if _pin_SW_LIMIT_0.value() == 0:                                  
                _smWeather.reset(position = RETURN_DISTANCE)               
                _smWeather.move(0)                                            
                sequence[0] = 1
        elif sequence[0] == 1:
            if _smWeather.get_Position() == 0:                               
                sequence[0] = 0
        
        if sequence[1] == 2:
            if _pin_SW_LIMIT_1.value() == 0:                                 
                _smAir.reset(position = RETURN_DISTANCE)                       
                _smAir.move(0)                                               
                sequence[1] = 1
        elif sequence[1] == 1:
            if _smAir.get_Position() == 0:                                   
                sequence[1] = 0
    
    if DEBUG: print_WithLock("[INFO] Stepper Motor 기준 위치 찾기 완료")

def move_SM_Weather(weatherIcon):
    SM_OFFSET_WEATHER = -105
    SM_BLOCK_DIST_WEATHER = -151
    
    posTable = {
        "01": const(SM_BLOCK_DIST_WEATHER * 8 + SM_OFFSET_WEATHER),            # Clear sky (맑음)
        "02": const(SM_BLOCK_DIST_WEATHER * 7 + SM_OFFSET_WEATHER),            # Few clouds (구름 조금)
        "03": const(SM_BLOCK_DIST_WEATHER * 6 + SM_OFFSET_WEATHER),            # Scattered clouds (구름 많음)
        "04": const(SM_BLOCK_DIST_WEATHER * 5 + SM_OFFSET_WEATHER),            # Broken clouds (흐림)
        "10": const(SM_BLOCK_DIST_WEATHER * 4 + SM_OFFSET_WEATHER),            # Rain (비)
        "09": const(SM_BLOCK_DIST_WEATHER * 3 + SM_OFFSET_WEATHER),            # Shower rain (소나기)
        "11": const(SM_BLOCK_DIST_WEATHER * 2 + SM_OFFSET_WEATHER),            # Thunderstorm (천둥번개)
        "13": const(SM_BLOCK_DIST_WEATHER + SM_OFFSET_WEATHER),                # Snow (눈)
        "50": const(SM_OFFSET_WEATHER)                                         # Mist (안개)
    }
            
    pos = posTable.get(str(weatherIcon[:2]), SM_OFFSET_WEATHER)
    _smWeather.move(pos)                                                


def move_SM_Air(aqi):
    SM_OFFSET_AIR = -100
    SM_BLOCK_DIST_AIR = -189
    
    posTable = {
        "1": const(SM_OFFSET_AIR),                                             # 1: Good (좋음)
        "2": const(SM_BLOCK_DIST_AIR + SM_OFFSET_AIR),                         # 2: Fair (보통)
        "3": const(SM_BLOCK_DIST_AIR * 2 + SM_OFFSET_AIR),                     # 3: Moderate (약간 나쁨)
        "4": const(SM_BLOCK_DIST_AIR * 3 + SM_OFFSET_AIR),                     # 4: Poor (나쁨)
        "5": const(SM_BLOCK_DIST_AIR * 4 + SM_OFFSET_AIR)                      # 5: Very poor (매우 나쁨)
    }
    
    pos = posTable.get(str(aqi), SM_OFFSET_AIR)
    _smAir.move(pos)                                               


def main():
    global _time_PE_Main
    global _weatherData
    
    wifi = RP2_W_WiFi(_lock_time, _lock_print)
    mqttClient = MQTT_Client("", callback_MQTT_MessageReceived)               
    
    if _pin_SW_API_MQTT_SEL.value() == 1:                                      # API / MQTT 선택 스위치 체크
        if DEBUG and True: print_WithLock("[INFO] 날씨 데이터 가져오기 방법 → MQTT Subscribe")
        USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA = True
    else:
        if DEBUG and True: print_WithLock("[INFO] 날씨 데이터 가져오기 방법 → HTTP API")
        USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA = False                         # MQTT Subscribe 날씨 데이터 가져오기 여부
    USE_HTTP_API_TO_GET_WEATHER_DATA = not USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA
    
    _fnd.clear()
    for i in range(0, 16): _fnd.draw_Digit(i, "O")
    _fnd.update()

    find_SM_ZeroPosition()                                                     
    
    if DEBUG: print_WithLock("Connnecting Network (AP SSID: " + SSID + ")", end = "")
            
    if wifi.connect(SSID, PASSWORD, timeout = 10):                             # 네트워크 연결
        if DEBUG: print_WithLock(" → Success")
    else:
        if DEBUG: print_WithLock(" → Failure [System Reset]")
        machine.reset()                                                        # 시스템 리셋
    
    if DEBUG:
        wifi.print_ConnectionStatus()                                          # 네트워크 연결 상태 출력
    
    if USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA:
        if DEBUG: print_WithLock("Connnecting AWS MQTT Broker (Endpoint: " + MQTT_BROKER_ENDPOINT + ")", end = "")
        
        if mqttClient.connect_To_AWS_MQTT_Broker(MQTT_BROKER_ENDPOINT, FILE_KEY, FILE_CRT):        # MQTT Broker 연결 (AWS MQTT Broker)
            if DEBUG: print_WithLock(" → Success")
        else:
            if DEBUG: print_WithLock(" → Failure [System Reset]")
            machine.reset()                                                    # 시스템 리셋
    
        public_IP = _http_api.get_GeolocationData()[1]                         # 공용 IP
        topic = "weather data " + public_IP                                    # Topic (예: weather data 59.6.230.229)
        mqttClient.subscribe(topic)                                            # MQTT Subscribe 
        
        if DEBUG: print_WithLock("[INFO] Subscribed to AWS MQTT Broker (Topic: {0})".format(topic))
    
        request_MQTT_Publish_WeatherData(mqttClient, public_IP)                # MQTT 날씨 데이터 Publish 요청

    if USE_HTTP_API_TO_GET_WEATHER_DATA:
        _weatherData = get_HTTP_API_WeatherData(printData = True)              # HTTP API 날씨 데이터 가져오기
    
   while True:
        time_Elapsed = []
        
        if time_Elapsed[0] > const(20 - 1):                              
            _time_PE_Main[0] = time_Now
            if ENABLE_GC_MAIN:
                _pico.run_GC()                                                
        
        if time_Elapsed[1] > const(2500 - 1):                                
            _time_PE_Main[1] = time_Now
            
        if time_Elapsed[2] > const(500 - 1):                                  
            _time_PE_Main[2] = time_Now
            if USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA:
                mqttClient.process_CheckMessage()                             
            
        if time_Elapsed[3] > const(5 * 60 * 1000 - 1):                 
            _time_PE_Main[3] = time_Now
            if USE_MQTT_SUBSCRIBE_TO_GET_WEATHER_DATA:
                request_MQTT_Publish_WeatherData(mqttClient, public_IP)  

            if USE_HTTP_API_TO_GET_WEATHER_DATA:
                _weatherData = get_HTTP_API_WeatherData()                

                if _weatherData is not None:
                    (strDay, strTime, country, city, temp, temp_FeelsLike, humidity, weather, weather_ImageFile, AQI, PM10, PM2_5) = _weatherData



def thread_SubCore(name = str(""), id = int(0)):
    global _time_PE_Sub
    
    while True:
        time_Elapsed = []
        
        if time_Elapsed[0] > const(25 - 1):                                    
            _time_PE_Sub[0] = time_Now
            if ENABLE_GC_SUB:
                _pico.run_GC()                                                 
            
        if time_Elapsed[1] > const(2500 - 1):                                 
            _time_PE_Sub[1] = time_Now
            _pin_LED.toggle()                                                  # LED on/off
            
        if time_Elapsed[2] > const(500 - 1):                                  
            _time_PE_Sub[2] = time_Now
            update_WeatherDisplay()
        
if __name__ == "__main__":
    main()

 

 

작동 동영상

 

 

첨부파일
소스_회로도.zip 다운로드

로그인 후
참가 상태를 확인할 수 있습니다.