Kapacitor写TICKscript
kapacitor define house -tick house.tick -type stream -dbrp house.autogen `type也可以是batch`
kapacitor show house `查看任务的信息`
kapacitor enable house
# house.tick
stream
|from()
.measurement('house')
|eval(lambda: float("Global_active_power") * 1000.0 / 60.0 - float("Sub_metering_1") - float("Sub_metering_2") - float("Sub_metering_3"))
.as('consumer')
|influxDBOut()
.database('house')
.retentionPolicy('autogen')
.measurement('house')
注意
- influxdb 与 kapacitor 之间是订阅(subscription)关系,所以如果他们之间是跨机房的关系,需要在kapacitor的配置文件中指定hostname,在influxdb中用
show subscriptions
查看。
- kapacitor 有测试脚本功能,然后通过重放来检查脚本是否正确执行。
模仿Chronograf操作Kapacitor
- 脚本 list.tick
```javascript
var data = stream
|from()
.database(‘house_data’)
.retentionPolicy(‘autogen’)
.measurement(‘house’)
.where(lambda: (“metric” == ‘ljz’))
|eval(lambda: “deviation”)
.as(‘value’)
var trigger = data
|alert()
.crit(lambda: “value” > 4)
.message(‘Alert: power consumption’)
.id(‘Power_Consumption_Rule’ + ‘:’)
.idTag(‘alertID’)
.levelTag(‘level’)
.messageField(‘message’)
.durationField(‘duration’)
.details(‘The error of power consumption is greater than the threshold(6Wh)’)
.email(‘lee_jiazh@163.com’, ‘qingquan@xiaogu-tech.com’)
trigger
|influxDBOut()
.create()
.database(‘httpTest’)
.retentionPolicy(‘autogen’)
.measurement(‘httptest’)
.tag(‘alertName’, ‘test_http’)
.tag(‘triggerType’, ‘threshold’)
trigger
|httpPost(‘https://xg-grafana.herokuapp.com/api/test’)
功能:向目标Post 一个Http请求,当trigger后向InfluxDB写数据。
1. InfluxDB 数据
time alertID alertName duration level message metric triggerType value
—- ——- ——— ——– —– ——- —— ———– —–
1511323486959072129 Power_Consumption_Rule:nil test_http 0 CRITICAL Alert: power consumption ljz threshold 18.4
1511323498954461470 Power_Consumption_Rule:nil test_http 11995389341 CRITICAL Alert: power consumption ljz threshold 27.96666666666667
2. Http Post的数据:
```json
{ series:
[ { name: 'house',
tags:
{ alertID: 'Power_Consumption_Rule:nil',
level: 'CRITICAL',
metric: 'ljz' },
columns: [ 'time', 'duration', 'message', 'value' ],
values:
[ [ '2017-11-22T05:14:26.541257857Z',
1344412905118,
'Alert: power consumption',
20.733333333333334 ] ] } ],
_id: '3oTa37s9Uva9swx5' },
Tips
- 如果只想要状态改变时候触发,在alert()下加入
.stateChangesOnly()
Udf 实践
- 写一个python2脚本,名为
ttest.py
,放在/tmp/kapacitor_udf
中。
- 从github上克隆下依赖的python-agent包。
git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor
- 修改配置文件(
/etc/kapacitor/kapacitor.conf
),PYTHONPATH为以来的目录中的两个py文件。
[udf]
[udf.functions]
[udf.functions.tTest]
# Run python
prog = "/usr/bin/python2"
# Pass args to python
# -u for unbuffered STDIN and STDOUT
# and the path to the script
args = ["-u", "/tmp/kapacitor_udf/ttest.py"]
# If the python process is unresponsive for 10s kill it
timeout = "10s"
# Define env vars for the process, in this case the PYTHONPATH
[udf.functions.tTest.env]
PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"
- 安装agent依赖的环境和包
```
apt install python-pip
python2 -m pip install six scipy
安装protocol buffer
wget https://github.com/google/protobuf/releases/download/v3.5.0/protobuf-all-3.5.0.zip
unzip -o protobuf-python-3.5.0.zip
cd protobuf-3.5.0/
./configure
make
make install
export LD_LIBRARY_PATH=”/usr/local/lib”
看看有没有protoc命令,是否安装成功
安装python-protocol
git clone https://github.com/google/protobuf.git
unzip protobuf-master.zip
cd protobuf-master/python/
python2 setup.py build
python2 setup.py install
4. 重启kapacitor
5. 写一个TICKscript脚本
```js
// This TICKscript monitors the three temperatures for a 3d printing job,
// and triggers alerts if the temperatures start to experience abnormal behavior.
// Define our desired significance level.
var alpha = 0.001
// Select the temperatures measurements
var data = stream
|from()
.measurement('temperatures')
|window()
.period(5m)
.every(5m)
data
//Run our tTest UDF on the hotend temperature
@tTest()
// specify the hotend field
.field('hotend')
// Keep a 1h rolling window
.size(3600)
// pass in the alpha value
.alpha(alpha)
|alert()
.id('hotend')
.crit(lambda: "pvalue" < alpha)
.log('/tmp/kapacitor_udf/hotend_failure.log')
// Do the same for the bed and air temperature.
data
@tTest()
.field('bed')
.size(3600)
.alpha(alpha)
|alert()
.id('bed')
.crit(lambda: "pvalue" < alpha)
.log('/tmp/kapacitor_udf/bed_failure.log')
data
@tTest()
.field('air')
.size(3600)
.alpha(alpha)
|alert()
.id('air')
.crit(lambda: "pvalue" < alpha)
.log('/tmp/kapacitor_udf/air_failure.log')
- 写产生测试数据的脚本
```python
#!/usr/bin/python2
from numpy import random
from datetime import timedelta, datetime
import sys
import time
import requests
Target temperatures in C
hotend_t = 220
bed_t = 90
air_t = 70
Connection info
write_url = ‘http://localhost:9092/write?db=printer&rp=autogen&precision=s’
measurement = ‘temperatures’
def temp(target, sigma):
“””
Pick a random temperature from a normal distribution
centered on target temperature.
“””
return random.normal(target, sigma)
def main():
hotend_sigma = 0
bed_sigma = 0
air_sigma = 0
hotend_offset = 0
bed_offset = 0
air_offset = 0
# Define some anomalies by changing sigma at certain times
# list of sigma values to start at a specified iteration
hotend_anomalies =[
(0, 0.5, 0), # normal sigma
(3600, 3.0, -1.5), # at one hour the hotend goes bad
(3900, 0.5, 0), # 5 minutes later recovers
]
bed_anomalies =[
(0, 1.0, 0), # normal sigma
(28800, 5.0, 2.0), # at 8 hours the bed goes bad
(29700, 1.0, 0), # 15 minutes later recovers
]
air_anomalies = [
(0, 3.0, 0), # normal sigma
(10800, 5.0, 0), # at 3 hours air starts to fluctuate more
(43200, 15.0, -5.0), # at 12 hours air goes really bad
(45000, 5.0, 0), # 30 minutes later recovers
(72000, 3.0, 0), # at 20 hours goes back to normal
]
# Start from 2016-01-01 00:00:00 UTC
# This makes it easy to reason about the data later
now = datetime(2016, 1, 1)
second = timedelta(seconds=1)
epoch = datetime(1970,1,1)
# 24 hours of temperatures once per second
points = []
for i in range(60*60*24+2):
# update sigma values
if len(hotend_anomalies) > 0 and i == hotend_anomalies[0][0]:
hotend_sigma = hotend_anomalies[0][1]
hotend_offset = hotend_anomalies[0][2]
hotend_anomalies = hotend_anomalies[1:]
if len(bed_anomalies) > 0 and i == bed_anomalies[0][0]:
bed_sigma = bed_anomalies[0][1]
bed_offset = bed_anomalies[0][2]
bed_anomalies = bed_anomalies[1:]
if len(air_anomalies) > 0 and i == air_anomalies[0][0]:
air_sigma = air_anomalies[0][1]
air_offset = air_anomalies[0][2]
air_anomalies = air_anomalies[1:]
# generate temps
hotend = temp(hotend_t+hotend_offset, hotend_sigma)
bed = temp(bed_t+bed_offset, bed_sigma)
air = temp(air_t+air_offset, air_sigma)
points.append("%s hotend=%f,bed=%f,air=%f %d" % (
measurement,
hotend,
bed,
air,
(now - epoch).total_seconds(),
))
now += second
# Write data to Kapacitor
r = requests.post(write_url, data='\n'.join(points))
if r.status_code != 204:
print >> sys.stderr, r.text
return 1
return 0
if name == ‘main’:
exit(main())
kapacitor define print_temps -type stream -dbrp printer.autogen -tick print_temps.tick
kapacitor enable print_temps
cat /tmp/kapacitor_udf/{hotend,bed,air}_failure.log
```