13426109659
webmaster@21cto.com

​​如何构建完整的智能家居监控系统

物联网 0 48 21小时前

导读:本篇文章是使用 InfluxDB 3 和 Grafana 将智能家居设备的数据集中到统一平台的分步干货技术指南哦。

图片

各位伙伴们,你知道家里的智能家居设备会产生大量分散的数据。

那么,本教程将向大家展示如何使用 InfluxDB 3 和 Grafana 将这些数据集中到一个统一的平台。我们不仅可以追踪家中的生命体征,还可以学习专业的软件开发概念,例如时间序列数据库设计和构建适用于各种监控和分析系统的弹性数据管道。

图片

在开始之前,请确保您已经熟悉如下技术或知识储备:

  • 熟悉 Python 和 API 概念
  • 对路由器的管理访问权限(用于带宽监控)
  • 至少一个智能设备(Nest 恒温器、智能电表等)
  • 一台计算机或 Raspberry Pi,用于运行InfluxDB 3Grafana和 Python 程序。


了解您正在使用的内容


时间序列数据与传统的关系数据有着根本的不同。我们关注的不是实体之间的关系,而是值随时间的变化。每个数据点包含:


  • 时间戳:测量的时间。
  • 测量:我们正在测量的内容(温度、功率、带宽)。
  • 标签:帮助我们对数据进行分类的元数据(device_id、位置、类型)。
  • 字段:实际测量的值。


这种结构使得 InfluxDB 非常适合处理物联网数据,因为它针对基于时间的查询的写入密集型工作负载进行了优化。

我们用一种称为“行协议”的语法来定义它,请看如下所示:

1

2

weather,location=london,season=summer

temperature=301465839830100400200


来理解此语法:

  •  weather 是数据库表的名称,亦称为测量值。
  • “location=london,season=summer” 是用逗号分隔的键值对或“标签集”,提供元数据。
  •  temperature=30 是字段集,即实际的数据集。
  •  1465839830100400200 是可选的;它实际上是 RFC3339 格式的时间戳 2016-06-13T17:43:50.1004002Z。如果您不提供时间戳,InfluxDB 将使用您服务器的本地纳秒级 UTC 时间戳。


设置 InfluxDB 3 


我们将使用 InfluxDB 3 Enterprise 的免费家庭许可证,此许可证仅需要你提供电子邮件地址。它包含 2 个 CPU,仅供个人使用。您需要检查收件箱并验证链接以激活家庭许可证。


图片

InfluxData 是领先的时间序列平台 InfluxDB 的创建者。它收集、存储和分析各种规模的时间序列数据。开发者可以查询和分析带有时间戳的数据,从而实时进行预测、响应和调整。

来,我们进行如下的代码编写步骤:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

# Pull image from Docker for InfluxDB 3 Enterprise

docker pull influxdb:3-enterprise

 

# Run InfluxDB 3 Enterprise with proper configuration

docker run-d\

  --name influxdb3-enterprise\

  -p8181:8181\

  -v$PWD/data:/var/lib/influxdb3/data\

  -v$PWD/plugins:/var/lib/influxdb3/plugins\

  -eINFLUXDB3_ENTERPRISE_LICENSE_EMAIL=you@example.com\

  influxdb:3-enterprise\

    influxdb3 serve\

      --node-id=node0\

      --cluster-id=cluster0\

      --object-store=file\

      --data-dir=/var/lib/influxdb3/data\

      --host=0.0.0.0\

      --port=8181


构建强大的数据收集器


接下来,我们将专注于创建一个全面的数据收集器,以演示任何物联网集成所需的所有模式。

本示例使用 Nest 恒温器,但其原理适用于任何智能设备或 API。我们将构建一个单一的收集器,用于轮询Google Nest API并使用 v3 Python 客户端写入 InfluxDB 3 Enterprise。

  1. 创建数据库并选择性地设置保留期。

1

influxdb3 create database home-data


2. 开始Nest API 设置。要从 Nest 恒温器收集数据,您需要 API 访问权限,本例中我们使用Google云平台,其它的云平台类似。

  • 转到Google Cloud Console
  • 创建新项目。
  • 请保存您的项目 ID。
  • 访问设备访问控制台并按照控制台说明进行操作。
  • 创建一个项目并将其链接到您的 Google Cloud 项目并下载 OAuth 凭据。
  • 创建一个Python程序“get_nest_token.py”,代码内容如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

# get_nest_token.py

importrequests

importwebbrowser

fromurllib.parse importurlencode

 

CLIENT_ID="your-client-id-here"

CLIENT_SECRET="your-client-secret-here"

 

# Generate authorization URL

auth_url=f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode({

    'client_id': CLIENT_ID,

    'redirect_uri': 'http://localhost',

    'response_type': 'code',

    'scope': 'https://www.googleapis.com/auth/sdm.service',

    'access_type': 'offline'

})}"

 

print(f"Visit: {auth_url}")

webbrowser.open(auth_url)

 

# Get authorization code from redirect URL

auth_code=input("Enter the code from the redirect URL: ")

 

# Exchange for tokens

token_response=requests.post('https://oauth2.googleapis.com/token',data={

    'client_id':CLIENT_ID,

    'client_secret':CLIENT_SECRET,

    'code':auth_code,

    'grant_type':'authorization_code',

    'redirect_uri':'http://localhost'

})

 

tokens=token_response.json()

print(f"Access Token: {tokens['access_token']}")

print(f"Refresh Token: {tokens['refresh_token']}")


构建数据收集器


1. 创建.env文件,然后请保存到本地。

1

2

3

4

5

NEST_ACCESS_TOKEN=your_access_token_here

GOOGLE_CLOUD_PROJECT_ID=your_project_id

INFLUXDB_HOST=http://localhost:8181

INFLUXDB_TOKEN=your_influxdb_token

INFLUXDB_DATABASE=home-data

  1. 安装 Python 依赖项。

1

pip install influxdb3-python requests python-dotenv

  1. 创建一个新的 Python 程序“nest_collector.py”,充当数据收集器。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

# nest_collector.py

importos

importtime

importlogging

fromdatetimeimportdatetime,timezone

fromfunctoolsimportwraps

importrequests

frominfluxdb_client_3 importInfluxDBClient3

fromdotenv importload_dotenv

load_dotenv()

logging.basicConfig(

    level=logging.INFO,

    format="%(asctime)s - %(levelname)s - %(message)s"

)

defretry_on_failure(max_retries=3,delay=5):

    defdecorator(func):

        @wraps(func)

        defwrapper(*args,**kwargs):

            forattempt inrange(max_retries):

                try:

                    returnfunc(*args,**kwargs)

                exceptExceptionase:

                    ifattempt==max_retries-1:

                        logging.error(f"{func.__name__} failed: {e}")

                        raise

                    logging.warning(f"Retry {attempt + 1}: {e}")

                    time.sleep(delay)

        returnwrapper

    returndecorator

classNestCollector:

    def__init__(self):

        self.access_token=os.getenv("NEST_ACCESS_TOKEN")

        self.project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID")

        ifnotself.access_token ornotself.project_id:

            raiseValueError("Missing NEST_ACCESS_TOKEN or GOOGLE_CLOUD_PROJECT_ID in .env file")

        # Initialize InfluxDB 3 client

        self.client=InfluxDBClient3(

            host=os.getenv("INFLUXDB_HOST","http://localhost:8181"),

            token=os.getenv("INFLUXDB_TOKEN"),

            database=os.getenv("INFLUXDB_DATABASE","home-data"),

        )

        # Test connection

        try:

            list(self.client.query("SELECT 1",language="sql"))

            logging.info("InfluxDB connection successful")

        exceptExceptionase:

            logging.error(f"InfluxDB connection failed: {e}")

            raise

    @retry_on_failure(max_retries=3,delay=5)

    defget_thermostat_data(self):

        """Fetch data from Nest API"""

        url=f"https://smartdevicemanagement.googleapis.com/v1/enterprises/{self.project_id}/devices"

        headers={

            "Authorization":f"Bearer {self.access_token}",

            "Content-Type":"application/json"

        }

        response=requests.get(url,headers=headers,timeout=30)

        response.raise_for_status()

        devices=response.json().get("devices",[])

        data_points=[]

        fordeviceindevices:

            if"THERMOSTAT"notindevice.get("type",""):

                continue

            traits=device.get("traits",{})

            device_id=device.get("name","").split("/")[-1]

            # Extract measurements

            temp_trait=traits.get("sdm.devices.traits.Temperature",{})

            humidity_trait=traits.get("sdm.devices.traits.Humidity",{})

            hvac_trait=traits.get("sdm.devices.traits.ThermostatHvac",{})

            setpoint_trait=traits.get("sdm.devices.traits.ThermostatTemperatureSetpoint",{})

            info_trait=traits.get("sdm.devices.traits.Info",{})

            try:

                temp_celsius=float(temp_trait.get("ambientTemperatureCelsius",0))

                humidity=float(humidity_trait.get("ambientHumidityPercent",0))

            except(TypeError,ValueError):

                continue

            # Build data point for InfluxDB

            point={

                "measurement":"nest_thermostat",

                "tags":{

                    "device_id":device_id,

                    "room":info_trait.get("customName","main"),

                    "device_type":"thermostat"

                },

                "fields":{

                    "temperature_celsius":temp_celsius,

                    "temperature_fahrenheit":temp_celsius*9/5+32,

                    "humidity_percent":humidity,

                    "hvac_status":hvac_trait.get("status","OFF"),

                    "hvac_mode":hvac_trait.get("mode","UNKNOWN")

                },

                "time":int(datetime.now(timezone.utc).timestamp())

            }

            # Add setpoint temperatures if available

            if"heatCelsius"insetpoint_trait:

                heat_c=float(setpoint_trait["heatCelsius"])

                point["fields"]["heat_setpoint_celsius"]=heat_c

                point["fields"]["heat_setpoint_fahrenheit"]=heat_c*9/5+32

            if"coolCelsius"insetpoint_trait:

                cool_c=float(setpoint_trait["coolCelsius"])

                point["fields"]["cool_setpoint_celsius"]=cool_c

                point["fields"]["cool_setpoint_fahrenheit"]=cool_c*9/5+32

            data_points.append(point)

            logging.info(f"Collected {device_id}: {temp_celsius:.1f}°C, {humidity:.0f}%")

        returndata_points

    defwrite_to_influx(self,points):

        """Write data to InfluxDB"""

        ifnotpoints:

            logging.warning("No data to write")

            return

        success_count=0

        forpoint inpoints:

            try:

                self.client.write(record=point,write_precision="s")

                success_count+=1

            exceptExceptionase:

                logging.error(f"Write failed: {e}")

        logging.info(f"Wrote {success_count}/{len(points)} points")

    defrun_cycle(self):

        """Run one collection cycle"""

        try:

            data=self.get_thermostat_data()

            self.write_to_influx(data)

        exceptExceptionase:

            logging.error(f"Cycle failed: {e}")

if__name__=="__main__":

    collector=NestCollector()

    try:

        whileTrue:

            collector.run_cycle()

            time.sleep(300)  # Run every 5 minutes

    exceptKeyboardInterrupt:

        logging.info("Stopped by user")


安装和配置Grafana


1

2

3

4

5

6

7

# Install Grafana using Docker

docker run-d\

  --name grafana\

  -p3000:3000\

  -vgrafana-storage:/var/lib/grafana\

  -e"GF_SECURITY_ADMIN_PASSWORD=your-secure-password"\

  grafana/grafana:latest


基本仪表板配置


  1. 确保 Grafana 在端口 3000 上本地启动并运行。
  • 使用您的用户名/密码在 localhost:3000 登录 Grafana
  • 导航到连接 —> 类型为“InfluxDB” —> “添加新数据源”
  • 类型:InfluxDB3 企业主页
  • 语言:SQL
  • 数据库:home-data
  • URL:http://influxdb3-enterprise:8181,用于连接到 InfluxDB 3 Enterprise
  • INFLUXDB_TOKEN令牌:从 .env 文件中粘贴环境变量的字符串值,并将“不安全连接”切换为“开”

2. 可以尝试使用以下 SQL 查询创建具有两个面板的仪表板来监控数据:

当前温度面板


1

2

3

4

5

6

7

SELECT

  temperature_fahrenheit,

  device_id

FROMnest_thermostat 

WHERE time>=now()-interval'5 minutes'

ORDER BY timeDESC 

LIMIT1


24小时趋势面板


1

2

3

4

5

6

7

SELECT

  date_trunc('minute',time)astime,

  AVG(temperature_fahrenheit)asavg_temp

FROMnest_thermostat 

WHERE time>=now()-interval'24 hours'

GROUP BY date_trunc('minute',time)

ORDER BY time


(本级为可选项)健康监测脚本:通过创建脚本“health_check.py”进行简单检查,以便保持系统健康,如下代码所示:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

# health_check.py

importrequests

fromdatetimeimportdatetime

 

defcheck_health():

    services={

        'InfluxDB':'http://localhost:8181/health',

        'Grafana':'http://localhost:3000/api/health'

    }

    

    print(f"\n=== Health Check - {datetime.now().strftime('%H:%M:%S')} ===")

    

    all_healthy=True

    forservice,url inservices.items():

        try:

            response=requests.get(url,timeout=5)

            healthy=response.status_code==200

            status="✅"ifhealthy else"❌"

            print(f"{service}: {status}")

            all_healthy=all_healthy andhealthy

        exceptException:

            print(f"{service}: ❌ Connection failed")

            all_healthy=False

    

    print(f"Overall: {'✅ HEALTHY' if all_healthy else '❌ ISSUES'}\n")

 

if__name__=="__main__":

    check_health()


结语


其实,我们在这里构建的远远不止监控恒温器,已经实现了支持现代大规模可观测系统的基础模式。

这里编写的用于处理不稳定的物联网 API 的重试逻辑和断路器,与在服务故障时维持 Netflix 正常运行的弹性模式相同,而创建的时间序列数据建模和可视化管道则反映了大型科技公司用于每秒跟踪数百万个指标的监控基础设施。

最重要的是,我们现在了解了如何将数据视为随时间推移的事件流,而不是表中的静态记录,这是一种思维方式的转变,无论你是构建应用程序监控仪表板、分析业务指标还是使用任何生成连续数据流的系统,它都会提供非常良好的服务。

作者:洛逸

评论