拉取小米监控(open-falcon)历史监控数据

杭州天气变冷了

最近杭州的温度下降的有点多,提前进入冬天的节奏,鼻涕不停的流。相对冷,我更喜欢热。冷真的让我受不了,杭州可还算是南方啊!

好久也没写博客了,这段时间一直在忙工作的事情,甚至都没顾上自己的生活。也好久没去骑车了,天气变冷了,身体也越来越垮了。想想下个月5号还有一场180KM的骑行比赛,腿上顿时一阵抽搐,该练练腿了。

该平衡下工作和生活了。没事骑骑车,写写博客了。好了废话不多说,今天分享的是拉取小米监控(open-falcon)历史监控数据。

open-falcon是个啥?


正如官网介绍的那样,open-falcon是一个分布式的、高性能的互联网企业级监控系统。

我自己使用的感觉来说,还是很方便的。自定义插件,集群监控,报警等,能很好的发挥监控价值。

通过小米监控的dashboard可以看到很多counter的监控数据曲线,但是往往有些关系是比较隐秘的关系,比如某几个counter之间的作用之后的数据是我们比较关心的,那么这里我们就可以拉取小米监控的历史数据,然后进行分析,进行处理。

拉取历史数据,对于v0.1和v0.2有稍微不同,v2加了鉴权。

open-falcon v0.1

官方文档中介绍的很明白,拉取代码很简单。这里就直接上代码吧。其中我使用了配置文件的方式配置endpoints/counters等。

配置文件point-counter.json如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"dimension":"http_s",
"bandqps":"qps",
"hour": 24,
"compare":
{
"time":
{
"before":"2017-09-13 18:30:00",
"after":"2017-09-13 18:30:00"
},
"filesize":{},
"https":{}
},
"counters":[
"cpu_util",
"traffic_out_80",
"traffic_out_443",
"Nginx-Requests"
],
"endpoints":
{
"default-point":[
"TCP-CDN"
],
"another-point":[
"TAZ-03",
"TAZ-04"
]
}
}

拉取代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/python
#-*- coding:utf8 -*-

import requests
import time
import json
import copy
import os
import sys

import random

'''
pull history data of marco from open-falcon(小米监控) and write to Historydatafile
'''

# result:
'''
[{"endpoint":"hostname1","counter":"cpu_util","dstype":"GAUGE","step":60,"Values":[{"timestamp":1492417260,"value":103.670000},{"timestamp":1492417320,"value":106.060000}]}]
'''

upyun = 'smaug'
endcounter_file = './point-counter.json'

example = {
"start": "",
"end": "",
"cf": "AVERAGE",
"endpoint_counters": [
],
}

queryload = []
CONFIG = {}


def config_load(filename):
if os.path.isfile(filename):
with open(filename,'r') as f:
return json.load(f)
else:
print("%s not exist" % filename)
sys.exit(1)

def get_endpointlist_And_counters(conf_dict):
#print conf_dict
return conf_dict.get("endpoints"), conf_dict.get("counters")


def append(d,endpoint, counter):
endpoint_counter = {
"endpoint": endpoint,
"counter": counter,
}
d['endpoint_counters'].append(endpoint_counter)

def mkd(start,end,ep,counterlist):
d = copy.deepcopy(example)
d['start']=start
d['end'] = end
for cv in counterlist[:]:
append(d,ep, cv)
queryload.append(d)


def start_end(endtime,hour):
#endtime 格式为 "2017-04-16 23:59:59"
end = int(time.mktime(time.strptime(endtime,'%Y-%m-%d %H:%M:%S')))
ts = 3600 * hour
start = end - ts # 截至时间戳 (例子中为查询过去一个小时的数据)
return start,end


def time_is_dimension(CONF,hours,com):
endpointlists , counterlist = get_endpointlist_And_counters(CONF)
endpointlist = endpointlists.get("default-point")
if not endpointlist or not counterlist :
sys.exit(1)
before = com.get("time").get("before")
sb,eb= start_end(before,hours)
for ep in endpointlist[:]:
mkd(sb,eb,ep,counterlist)

after = com.get("time").get("after")
sa,ea = start_end(after,hours)
for ep in endpointlist[:]:
mkd(sa,ea,ep,counterlist)

def filesize_is_dimension(CONF,hours):
endpointlists , counterlist = get_endpointlist_And_counters(CONF)
default_point = endpointlists.get("default-point")
another_point = endpointlists.get("another-point")
if not default_point or not another_point:
sys.exit(1)
end = int(time.mktime(time.strptime("2017-05-25 08:01:30",'%Y-%m-%d %H:%M:%S')))
#end = int(time.time())
start = end - 3600 * hours
for ep in default_point[:]:
mkd(start,end,ep,counterlist)

for ep in another_point[:]:
mkd(start,end,ep,counterlist)


CONFIG = config_load(endcounter_file)
dimension = CONFIG.get("dimension")
hour = CONFIG.get("hour")
compare = CONFIG.get("compare")

if dimension == "time":
time_is_dimension(CONFIG,hour,compare)
elif dimension == "filesize":
filesize_is_dimension(CONFIG,hour)

endname = endcounter_file[2:-5]

Historydatafile = "./{0}_hist{1}h_{2}_{3}.data".format(upyun,hour,endname,dimension)

query_api = "http://queryip:9966/graph/history"
if os.path.isfile(Historydatafile):
os.remove(Historydatafile)
for vd in queryload[:]:
r = requests.post(query_api, data=json.dumps(vd))
#print r.text
STORE = r.json()
with open(Historydatafile, 'aw') as fout:
json.dump(STORE, fout)
fout.write('\n')
print("pull %s hours historydata to %s" % (hour, Historydatafile))

我上面代码实现的功能是能同时拉取不同的endpoint-counter对写入到文件中保存。

open-falcon v0.2

v0.2相比v0.1,加了一层鉴权。另外也修改了查询的json数据格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"start": start,
"end": end,
"cf": "AVERAGE",
"endpoint_counters": [
{
"endpoint": "host1",
"counter": "cpu.idle",
},
{
"endpoint": "host1",
"counter": "load.1min",
},
]
}

以上是v0.1版本的历史数据查询参数格式,对比v0.2版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"step": 60,
"start_time": 1481854596,
"hostnames": [
"docker-a",
"docker-b",
"docker-c"
],
"end_time": 1481858193,
"counters": [
"cpu.idle",
"cpu.iowait"
],
"consol_fun": "AVERAGE"
}

官方API: POST : /api/v1/graph/history

这里重点需要拿到的一个东西叫做”sig”。形如:

1
{"name":"root","sig":"427d6803b78311e68afd0242ac130006"}

拿取方式通过用户名和密码查询。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#username和password是v0.2版本新加的,前端dashboard需要注册。ip就是open-falcon运行的机器ip,具体应该是query组件的地址
mytoken = token("username", "password", "http://queryip:8080/api/v1")


def auth_requests(method, *args, **kwargs):
if mytoken.SIG == None:
# 定义一个要提交的数据数组(字典)
data = {}
data['name'] = mytoken.NAME
data['password'] = mytoken.PASSWORD

# 定义post的地址
url = mytoken.API_ADDR + '/user/login'
post_data = urllib.urlencode(data)

# 提交,发送数据
req = urllib2.urlopen(url, post_data)

# 获取提交后返回的信息
content = req.read()

Name = json.loads(content)['name']
Sig = json.loads(content)['sig']
Admin = json.loads(content)['admin']

print '{"name: %s","Sig: %s","Admin: %s"}' % (Name, Sig, Admin)
mytoken.SIG = Sig

if not content:
raise Exception("no api token")

headers = {
"Apitoken": json.dumps({"name": mytoken.NAME, "sig": mytoken.SIG})
}

if not kwargs:
kwargs = {}

if "headers" in kwargs:
headers.update(kwargs["headers"])
del kwargs["headers"]

if method == "POST":
return requests.post(*args, headers=headers, **kwargs)
else:
raise Exception("invalid http method")


def post_history(method,d):
h = {"Content-type": "application/json","X-Forwarded-For":"127.0.0.1"}
r = auth_requests(method, "%s/graph/history" % (mytoken.API_ADDR), headers=h, data=json.dumps(d))
j = r.json()
return j

#省略了部分代码

if os.path.isfile(Historydatafile):
os.remove(Historydatafile)
for vd in queryload[:]:
STORE = post_history("POST",vd)
# print STORE
with open(Historydatafile, 'aw') as fout:
json.dump(STORE, fout)
fout.write('\n')
print("pull %s hours historydata to %s" % (hour, Historydatafile))

官方相关链接:

http://open-falcon.org/falcon-plus/#/authentication

http://open-falcon.org/falcon-plus/doc/graph.html

历史数据用来做什么

通常单一的一个指标并不能反映我们需要的信息,往往好多指标是相互作用的,那么这些相互作用的指标并非很直观的显示,拉取历史数据,然后本地处理,采用另外的一些方式,可以更加精确的得到指标间的关系。

下图是拉取历史监控数据后,分析某台机器上的某个服务CPU-BandWidth之间的关系:

周末骑行龙门古镇

骑行距离120KM ~ 150KM,好激动!

buy me a cola!

欢迎关注我的其它发布渠道