TP : Station Météo IoT — Simulation d'un système embarqué
Contexte
Vous travaillez pour une startup spécialisée dans les objets connectés. Votre mission est de développer le logiciel embarqué d'une station météo connectée basée sur un SoC (System on Chip).
Cette station doit : - Collecter des données de capteurs (température, humidité, pression) - Traiter ces données localement (calculs de moyennes, détection d'anomalies) - Simuler l'envoi vers le cloud - Optimiser la consommation énergétique
L'objectif est de comprendre comment fonctionne un système embarqué sur SoC et les contraintes associées.
Objectifs pédagogiques
- Comprendre le fonctionnement d'un système embarqué
- Manipuler des données de capteurs simulés
- Implémenter des algorithmes de traitement de données
- Appréhender les contraintes de ressources limitées
- Découvrir les concepts de l'IoT
Partie 1 : Simulation des capteurs
1.1. Classe Capteur
Un capteur embarqué a des caractéristiques spécifiques : - Un nom (température, humidité, etc.) - Une unité de mesure - Une plage de valeurs valides - Une précision (nombre de décimales)
Créer la classe Capteur :
import random
import time
class Capteur:
"""Simule un capteur embarqué sur SoC."""
def __init__(self, nom, unite, val_min, val_max, precision=1):
"""
Initialise un capteur.
:param nom: (str) Nom du capteur
:param unite: (str) Unité de mesure
:param val_min: (float) Valeur minimale possible
:param val_max: (float) Valeur maximale possible
:param precision: (int) Nombre de décimales
"""
self.nom = nom
self.unite = unite
self.val_min = val_min
self.val_max = val_max
self.precision = precision
self._derniere_valeur = None
def lire(self):
"""
Simule une lecture du capteur.
Retourne une valeur réaliste (proche de la précédente).
:return: (float) Valeur mesurée
"""
# À compléter
pass
def __repr__(self):
return f"Capteur({self.nom}, {self.unite})"
Consignes pour la méthode lire() :
- Si c'est la première lecture, générer une valeur aléatoire dans la plage
- Sinon, générer une valeur proche de la précédente (±10% de la plage)
- Arrondir selon la précision
- S'assurer que la valeur reste dans les limites
1.2. Création des capteurs de la station
Créer trois capteurs pour notre station :
# Capteur de température : -20°C à 50°C, précision 0.1°C
capteur_temp = Capteur("Température", "°C", -20, 50, precision=1)
# Capteur d'humidité : 0% à 100%, précision 1%
capteur_hum = Capteur("Humidité", "%", 0, 100, precision=0)
# Capteur de pression : 950 hPa à 1050 hPa, précision 0.1 hPa
capteur_pres = Capteur("Pression", "hPa", 950, 1050, precision=1)
Test :
>>> for _ in range(5):
... print(f"T={capteur_temp.lire()}°C, H={capteur_hum.lire()}%, P={capteur_pres.lire()}hPa")
T=22.3°C, H=65%, P=1013.2hPa
T=23.1°C, H=67%, P=1012.8hPa
T=22.8°C, H=66%, P=1013.5hPa
...
Partie 2 : Gestionnaire de données
2.1. Buffer circulaire
Les systèmes embarqués ont une mémoire limitée. On utilise un buffer circulaire pour stocker les N dernières mesures sans allouer de mémoire dynamiquement.
class BufferCirculaire:
"""
Buffer circulaire pour stocker les dernières mesures.
Mémoire fixe, adapté aux systèmes embarqués.
"""
def __init__(self, taille):
"""
Initialise le buffer.
:param taille: (int) Nombre maximum d'éléments
"""
self.taille = taille
self.donnees = [None] * taille # Pré-allocation
self.index = 0
self.compte = 0
def ajouter(self, valeur):
"""
Ajoute une valeur au buffer.
Écrase la plus ancienne si plein.
:param valeur: La valeur à ajouter
"""
# À compléter
pass
def moyenne(self):
"""
Calcule la moyenne des valeurs stockées.
:return: (float) Moyenne ou None si vide
"""
# À compléter
pass
def min_max(self):
"""
Retourne le minimum et le maximum.
:return: (tuple) (min, max) ou (None, None) si vide
"""
# À compléter
pass
def est_plein(self):
"""Vérifie si le buffer est plein."""
return self.compte >= self.taille
def __len__(self):
return self.compte
Principe du buffer circulaire :
Taille = 5, après ajout de 1, 2, 3, 4, 5, 6, 7 :
Étape 1-5 : [1, 2, 3, 4, 5] (remplissage normal)
Étape 6 : [6, 2, 3, 4, 5] (6 écrase 1)
Étape 7 : [6, 7, 3, 4, 5] (7 écrase 2)
L'index "tourne" : index = (index + 1) % taille
Partie 3 : Station météo complète
3.1. Classe StationMeteo
Assembler les composants pour créer la station :
class StationMeteo:
"""
Station météo IoT simulant un système sur SoC.
"""
def __init__(self, nom, taille_buffer=10):
"""
Initialise la station.
:param nom: (str) Identifiant de la station
:param taille_buffer: (int) Taille des buffers de données
"""
self.nom = nom
# Capteurs
self.capteurs = {
'temperature': Capteur("Température", "°C", -20, 50, 1),
'humidite': Capteur("Humidité", "%", 0, 100, 0),
'pression': Capteur("Pression", "hPa", 950, 1050, 1)
}
# Buffers pour chaque capteur
self.buffers = {
nom: BufferCirculaire(taille_buffer)
for nom in self.capteurs
}
# Compteur de mesures
self.nb_mesures = 0
# Mode économie d'énergie
self.mode_eco = False
def mesurer(self):
"""
Effectue une mesure sur tous les capteurs.
:return: (dict) Dictionnaire des mesures
"""
# À compléter
pass
def statistiques(self):
"""
Calcule les statistiques sur les données collectées.
:return: (dict) Statistiques par capteur
"""
# À compléter
pass
def detecter_anomalies(self):
"""
Détecte les valeurs anormales.
Une anomalie est une valeur qui s'écarte de plus de 2 écarts-types
de la moyenne.
:return: (list) Liste des anomalies détectées
"""
# À compléter (bonus)
pass
def rapport(self):
"""
Génère un rapport formaté.
:return: (str) Rapport textuel
"""
# À compléter
pass
3.2. Simulation d'envoi cloud
Dans un vrai système IoT, les données sont envoyées vers le cloud. Simuler cette fonctionnalité :
def envoyer_cloud(self, donnees):
"""
Simule l'envoi de données vers le cloud.
En mode éco, regroupe les envois.
:param donnees: (dict) Données à envoyer
:return: (bool) Succès de l'envoi
"""
# Simuler une latence réseau
import time
time.sleep(0.1) # 100ms de latence simulée
# Simuler un échec occasionnel (5% de chance)
if random.random() < 0.05:
print(f"[{self.nom}] Échec d'envoi - Nouvelle tentative...")
return False
# Afficher les données "envoyées"
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{self.nom}] Envoi cloud @ {timestamp}")
for cle, valeur in donnees.items():
print(f" {cle}: {valeur}")
return True
Partie 4 : Gestion de l'énergie
4.1. Mode économie d'énergie
Les systèmes embarqués doivent économiser l'énergie. Implémenter un mode économie :
def activer_mode_eco(self):
"""
Active le mode économie d'énergie.
- Réduit la fréquence des mesures
- Regroupe les envois cloud
"""
self.mode_eco = True
print(f"[{self.nom}] Mode économie activé")
def desactiver_mode_eco(self):
"""Désactive le mode économie d'énergie."""
self.mode_eco = False
print(f"[{self.nom}] Mode économie désactivé")
4.2. Simulation de consommation
Ajouter un suivi de la consommation énergétique simulée :
class StationMeteo:
# Constantes de consommation (en mW)
CONSO_VEILLE = 0.5
CONSO_MESURE = 10
CONSO_ENVOI = 50
CONSO_CALCUL = 5
def __init__(self, ...):
...
self.energie_totale = 0 # mWh consommés
def _consommer(self, activite, duree_ms):
"""
Enregistre la consommation d'énergie.
:param activite: (str) Type d'activité
:param duree_ms: (int) Durée en millisecondes
"""
conso = {
'veille': self.CONSO_VEILLE,
'mesure': self.CONSO_MESURE,
'envoi': self.CONSO_ENVOI,
'calcul': self.CONSO_CALCUL
}
# Convertir en mWh : mW * (ms / 3600000)
self.energie_totale += conso[activite] * (duree_ms / 3600000)
Partie 5 : Programme principal
5.1. Boucle de fonctionnement
Créer le programme principal qui simule le fonctionnement de la station :
def main():
"""Programme principal de la station météo."""
# Créer la station
station = StationMeteo("STATION_01", taille_buffer=20)
print("=" * 50)
print(" STATION MÉTÉO IoT - Simulation SoC")
print("=" * 50)
# Simuler 1 heure de fonctionnement (1 mesure par minute = 60 mesures)
nb_mesures = 60
intervalle = 1 # Secondes entre les mesures (accéléré pour la simulation)
try:
for i in range(nb_mesures):
# Effectuer une mesure
mesure = station.mesurer()
print(f"\nMesure {i+1}/{nb_mesures}: T={mesure['temperature']}°C, "
f"H={mesure['humidite']}%, P={mesure['pression']}hPa")
# Envoyer au cloud toutes les 10 mesures
if (i + 1) % 10 == 0:
stats = station.statistiques()
station.envoyer_cloud(stats)
# Attendre avant la prochaine mesure
time.sleep(intervalle)
except KeyboardInterrupt:
print("\n\nArrêt demandé par l'utilisateur.")
# Rapport final
print("\n" + "=" * 50)
print("RAPPORT FINAL")
print("=" * 50)
print(station.rapport())
print(f"Énergie consommée : {station.energie_totale:.4f} mWh")
if __name__ == "__main__":
main()
Partie 6 : Extensions (Bonus)
6.1. Détection d'anomalies
Implémenter la méthode detecter_anomalies() qui identifie les valeurs aberrantes.
6.2. Persistance locale
Ajouter une fonctionnalité de sauvegarde locale (fichier JSON) pour conserver les données en cas de perte de connexion :
def sauvegarder_local(self, fichier="backup.json"):
"""Sauvegarde les données en local (simulation de mémoire flash)."""
pass
def charger_local(self, fichier="backup.json"):
"""Charge les données depuis la sauvegarde locale."""
pass
6.3. Protocole de communication
Simuler un protocole de communication simple (type MQTT) :
def publier(self, topic, message):
"""Publie un message sur un topic (simulation MQTT)."""
print(f"[MQTT] {topic} -> {message}")
6.4. Multi-stations
Créer un réseau de plusieurs stations qui communiquent entre elles et agrègent leurs données.
Questions de synthèse
-
Pourquoi utilise-t-on un buffer circulaire plutôt qu'une liste dynamique sur un système embarqué ?
-
Quels sont les compromis à faire entre fréquence de mesure et consommation énergétique ?
-
Comment un vrai système IoT gère-t-il la perte de connexion réseau ?
-
Quelles sont les différences entre le code Python de simulation et le code réel sur un microcontrôleur ?
-
Comment l'architecture SoC permet-elle d'optimiser la consommation dans ce type d'application ?
Barème indicatif
| Partie | Points |
|---|---|
| Partie 1 : Capteurs | 3 |
| Partie 2 : Buffer circulaire | 4 |
| Partie 3 : Station complète | 5 |
| Partie 4 : Gestion énergie | 3 |
| Partie 5 : Programme principal | 3 |
| Partie 6 : Bonus | 2 |
| Total | 20 |
Auteur : Florian Mathieu
Licence CC BY NC
Ce cours est mis à disposition selon les termes de la Licence Creative Commons Attribution - Pas d'Utilisation Commerciale - Partage dans les Mêmes Conditions 4.0 International.