python操作ES(Elasticsearch)

最近好忙

最近这段时间工作上很忙,几乎是没有时间照顾到自己的生活,有时候甚至废寝忘食,虽然挺享受这个过程的,但是感觉身体慢慢垮了下去,好久也没骑车了。

今年元旦定下的骑车计划目前看来是完不成了,计划完成2000公里,目前只骑了200多公里,而现在已经是快9月份了,离明年元旦也没几个月了。

哎…生活和工作真的平衡不了么?

好了废话不多说,今天分享的是使用python来操作es,包括数据的插入,查询,拉取,删除等。

Elasticsearch?

Elasticsearch是一个分布式的 RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

通常使用 Elasticsearch + Logstash + Kibana 搭建实时日志集中分析平台。这里推荐可以阅读Elasticsearch 权威指南(中文版)

让ES跑一会儿

首先当然是安装ES,安装完成之后,直接执行:

1
./elasticsearch

会看到一大堆输出,最后如果看到:

1
[2017-10-16T22:34:53,271][INFO ][o.e.n.Node               ] [7YPd2Iu] started

证明没啥问题了。es监听9200端口: http://127.0.0.1:9200/

接下来键入命令:

1
curl 'http://127.0.0.1:9200/?pretty'

你会看到如下输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"name" : "7YPd2Iu",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "1BDZA7JKRV2jfrjtvQCpGw",
"version" : {
"number" : "5.5.0",
"build_hash" : "260387d",
"build_date" : "2017-06-30T23:16:05.735Z",
"build_snapshot" : false,
"lucene_version" : "6.6.0"
},
"tagline" : "You Know, for Search"
}

几个重要的概念

索引(index)

索引(index)就是一个有相似特征的文档的集合,es把数据存储到一个或多个索引中。通常index是一个比较大的分类,比如按照不同的客户,或者不同的产品,或者不同的时间戳。

通常构造index的时候需要考虑好分类的粒度,因为如果后面想要批量删除的话基本单位是索引。如果构造的index分类粒度太大,那么批量删除的时候将会变得很麻烦。

如果是采用ES来搭建实时日志搜索平台,那么通常建议index按照时间戳来进行命名,因为这样的话可以设置超过几天的日志全部删除,清理磁盘空间。

文档(document)

文档(document)是es中存储数据的实体,它是可以被索引的基本单元。每一个文档都以json格式表示,每一个索引可以任意多个文档。

简单来说就是文档其实就是真实的每一条的存储的数据,比如每条以json格式表示的日志。每一个文档物理上存在一个索引中,但文档必须有一个文档类型(type)。

类型(type)

类型(type)是索引之下的逻辑分区,相对index更细的分类粒度。比如如果index以日期命名,那么type可以以具体的客户命名。然后每一个客户下面,又有很多的文档实体。

id

每一个文档实体,ES会自动为其创建一个唯一的id,比如“AV8tQS119lFHJS7Mv7v8”

这里不太重要不需要关注,但有一点需要注意,日志的字段不能有id这个字段,否则会跟自动创建的id冲突,导致可能插入数据失败。

python操作ES实战

理解了上面的几个概念,然后就可以开始真正使用ES了。其实可以简单的把ES类比为一个数据库,可以执行插入,查询,更新,删除等操作。

预热

ES支持很多语言,其中就有python。安装ES的python库:

1
pip install elasticsearch

亮剑

前面使用了类似下面的命令,那么使用python怎么操作呢?

1
curl 'http://127.0.0.1:9200/'

很简单,使用requests库:

1
2
3
4
# make sure ES is up and running
import requests
res = requests.get('http://127.0.0.1:9200')
print(res.content)

先来执行一个插入操作尝尝鲜。执行任何操作前,需要首先创建ES的实例:

1
2
3
#connect to our cluster
import elasticsearch
es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

完成后,执行插入操作,比如:

1
es.index(index='test', doc_type='people',id=1, body=source)

其中source是一个python dict对象。

完成插入操作后,如何判断有没有插入成功呢?

1
r = es.get(index='test',doc_type= 'people',id = 2)

如果插入成功的话使用get操作是可以看到你插入的数据的。完整代码见下文。

威力更大的武器

上面简单执行了插入操作,但可以看到的是插入是一条一条的插入,效率低下,我们可以使用批量的方式进行插入:

1
2
3
4
5
6
7
#connect to our cluster
import elasticsearch
from elasticsearch import helpers
es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

#action为文档的list
helpers.bulk(es,action)

批量的方式能更加高效的执行插入操作。完整代码以及输出见后文。

冲锋

要说ES里插入,删除,更新,查询这几个操作,最核心的操作在于查询,或者说搜索,这也正如ES介绍的那样“Elasticsearch是一个分布式的 RESTful 风格的搜索和数据分析引擎”。搜索操作体现了ES的高性能。

先来看一个简单的搜索:

1
2
3
4
5
6
7
8
9
10
11
qbody = {
"query":{
"match_all":{}
}
}
try:
res = es.search(index = 'test2',body = json.dumps(qbody))
RES = json.dumps(res,indent = 2)
print RES
except elasticsearch.exceptions.NotFoundError:
print "not found %s" % query

其中很重要的部分就是查询条件Qbody,或者说过滤条件。而且这里涉及到的过滤条件相对来说比较复杂。“match_all”表示该index下的所有document。

搜索这节是比较重要的,此处只是简单的执行了一下搜索命令,后面有单独的章节进行介绍。

前面涉及到的源代码以及输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python
#-*- coding:utf8 -*-

import copy

#connect to our cluster
import elasticsearch
from elasticsearch import helpers
es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])


# make sure ES is up and running
import requests
res = requests.get('http://127.0.0.1:9200')
print(res.content)


#let's iterate over swapi people documents and index them
import json


example = {
"name":"",
"age":"",
"sex":""
}

def append(name,age,sex):
source = copy.deepcopy(example)
source["name"] = name
source["age"] = age
source["sex"] = sex
return source

document = {
"_index":"",
"_type":"",
"_id":"",
"_source":""
}


action = []
def create_doc(_index,_type,_id,_source):
oneindex = copy.deepcopy(example)
oneindex['_index'] = _index
oneindex['_type'] = _type
oneindex['_id'] = _id
oneindex['_source'] = _source
action.append(oneindex)


if __name__ == "__main__":
print "index!!!...."
source = append("jsper",17,"male")
es.index(index='test', doc_type='people',id=1, body=source)
source = append("david",18,"female")
es.index(index='test', doc_type='people', id=2,body=source)
source = append("enhen",19,"male")
es.index(index='test', doc_type='people', id=3,body=source)
print 'index success! \n'

print "get!!!..."
try:
r = es.get(index='test',doc_type= 'people',id = 2)
print json.dumps(r,indent = 4)
except elasticsearch.exceptions.NotFoundError:
print 'error'

print "bulk\n"
source = append("smaug",25,"male")
create_doc("test2","people",1,source)
source = append("ablex",24,"female")
create_doc("test2","people",2,source)
source = append("lucas",26,"female")
create_doc("test2","people",3,source)



helpers.bulk(es,action)
print "bulk finished! "
r = es.get(index='test2',doc_type= 'people',id = 2)
print json.dumps(r,indent = 4)


print "search\n"
qbody = {
"query":{
"match_all":{}
}
}
try:
res = es.search(index = 'test2',body = json.dumps(qbody))
RES = json.dumps(res,indent = 2)
print RES
except elasticsearch.exceptions.NotFoundError:
print "not found %s" % query

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
[localhost@vm es]$ python index_markdown.py
{
"name" : "7YPd2Iu",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "1BDZA7JKRV2jfrjtvQCpGw",
"version" : {
"number" : "5.5.0",
"build_hash" : "260387d",
"build_date" : "2017-06-30T23:16:05.735Z",
"build_snapshot" : false,
"lucene_version" : "6.6.0"
},
"tagline" : "You Know, for Search"
}

index!!!....
index success!

get!!!...
{
"_type": "people",
"_source": {
"age": 18,
"name": "david",
"sex": "female"
},
"_index": "test",
"_version": 7,
"found": true,
"_id": "2"
}
bulk

bulk finished!
{
"_type": "people",
"_source": {
"age": 24,
"name": "ablex",


"sex": "female"
},
"_index": "test2",
"_version": 4,
"found": true,
"_id": "2"
}
search

{
"hits": {
"hits": [
{
"_score": 1.0,
"_type": "people",
"_id": "2",
"_source": {
"age": 24,
"name": "ablex",
"sex": "female"
},
"_index": "test2"
},
{
"_score": 1.0,
"_type": "people",
"_id": "1",
"_source": {
"age": 25,
"name": "smaug",
"sex": "male"
},
"_index": "test2"
},
{
"_score": 1.0,
"_type": "people",
"_id": "3",
"_source": {
"age": 26,
"name": "lucas",
"sex": "female"
},
"_index": "test2"
}
],
"total": 3,
"max_score": 1.0
},
"_shards": {
"successful": 5,
"failed": 0,
"total": 5
},
"took": 185,
"timed_out": false
}

过滤/搜索

这部分我觉得是比较核心的部分。因为搜索比较复杂,涉及到过滤条件,以及使用scroll的方式进行更加高性能的搜索等。所以我打算另写一篇博客详细介绍我的使用感受,配合我日常工作中碰到的具体事情来进行说明。

收工

好了,暂时写到这。后续关于ES更多的内容,见我之后的博客。

buy me a cola!

欢迎关注我的其它发布渠道