最近好忙
最近这段时间工作上很忙,几乎是没有时间照顾到自己的生活,有时候甚至废寝忘食,虽然挺享受这个过程的,但是感觉身体慢慢垮了下去,好久也没骑车了。
今年元旦定下的骑车计划目前看来是完不成了,计划完成2000公里,目前只骑了200多公里,而现在已经是快9月份了,离明年元旦也没几个月了。
哎…生活和工作真的平衡不了么?
好了废话不多说,今天分享的是使用python来操作es,包括数据的插入,查询,拉取,删除等。
Elasticsearch?
Elasticsearch是一个分布式的 RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。
通常使用 Elasticsearch + Logstash + Kibana 搭建实时日志集中分析平台。这里推荐可以阅读Elasticsearch 权威指南(中文版)。
让ES跑一会儿
首先当然是安装ES,安装完成之后,直接执行:
会看到一大堆输出,最后如果看到:
1
| [2017-10-16T22:34:53,271][INFO ][o.e.n.Node ] [7YPd2Iu] started
|
证明没啥问题了。es监听9200端口: http://127.0.0.1:9200/
接下来键入命令:
你会看到如下输出:
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "name" : "7YPd2Iu", "cluster_name" : "elasticsearch", "cluster_uuid" : "1BDZA7JKRV2jfrjtvQCpGw", "version" : { "number" : "5.5.0", "build_hash" : "260387d", "build_date" : "2017-06-30T23:16:05.735Z", "build_snapshot" : false, "lucene_version" : "6.6.0" }, "tagline" : "You Know, for Search" }
|
几个重要的概念
索引(index)
索引(index)就是一个有相似特征的文档的集合,es把数据存储到一个或多个索引中。通常index是一个比较大的分类,比如按照不同的客户,或者不同的产品,或者不同的时间戳。
通常构造index的时候需要考虑好分类的粒度,因为如果后面想要批量删除的话基本单位是索引。如果构造的index分类粒度太大,那么批量删除的时候将会变得很麻烦。
如果是采用ES来搭建实时日志搜索平台,那么通常建议index按照时间戳来进行命名,因为这样的话可以设置超过几天的日志全部删除,清理磁盘空间。
文档(document)
文档(document)是es中存储数据的实体,它是可以被索引的基本单元。每一个文档都以json格式表示,每一个索引可以任意多个文档。
简单来说就是文档其实就是真实的每一条的存储的数据,比如每条以json格式表示的日志。每一个文档物理上存在一个索引中,但文档必须有一个文档类型(type)。
类型(type)
类型(type)是索引之下的逻辑分区,相对index更细的分类粒度。比如如果index以日期命名,那么type可以以具体的客户命名。然后每一个客户下面,又有很多的文档实体。
id
每一个文档实体,ES会自动为其创建一个唯一的id,比如“AV8tQS119lFHJS7Mv7v8”。
这里不太重要不需要关注,但有一点需要注意,日志的字段不能有id这个字段,否则会跟自动创建的id冲突,导致可能插入数据失败。
python操作ES实战
理解了上面的几个概念,然后就可以开始真正使用ES了。其实可以简单的把ES类比为一个数据库,可以执行插入,查询,更新,删除等操作。
预热
ES支持很多语言,其中就有python。安装ES的python库:
1
| pip install elasticsearch
|
亮剑
前面使用了类似下面的命令,那么使用python怎么操作呢?
1
| curl 'http://127.0.0.1:9200/'
|
很简单,使用requests库:
1 2 3 4
| # make sure ES is up and running import requests res = requests.get('http://127.0.0.1:9200') print(res.content)
|
先来执行一个插入操作尝尝鲜。执行任何操作前,需要首先创建ES的实例:
1 2 3
| #connect to our cluster import elasticsearch es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
|
完成后,执行插入操作,比如:
1
| es.index(index='test', doc_type='people',id=1, body=source)
|
其中source是一个python dict对象。
完成插入操作后,如何判断有没有插入成功呢?
1
| r = es.get(index='test',doc_type= 'people',id = 2)
|
如果插入成功的话使用get操作是可以看到你插入的数据的。完整代码见下文。
威力更大的武器
上面简单执行了插入操作,但可以看到的是插入是一条一条的插入,效率低下,我们可以使用批量的方式进行插入:
1 2 3 4 5 6 7
| #connect to our cluster import elasticsearch from elasticsearch import helpers es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
#action为文档的list helpers.bulk(es,action)
|
批量的方式能更加高效的执行插入操作。完整代码以及输出见后文。
冲锋
要说ES里插入,删除,更新,查询这几个操作,最核心的操作在于查询,或者说搜索,这也正如ES介绍的那样“Elasticsearch是一个分布式的 RESTful 风格的搜索和数据分析引擎”。搜索操作体现了ES的高性能。
先来看一个简单的搜索:
1 2 3 4 5 6 7 8 9 10 11
| qbody = { "query":{ "match_all":{} } } try: res = es.search(index = 'test2',body = json.dumps(qbody)) RES = json.dumps(res,indent = 2) print RES except elasticsearch.exceptions.NotFoundError: print "not found %s" % query
|
其中很重要的部分就是查询条件Qbody,或者说过滤条件。而且这里涉及到的过滤条件相对来说比较复杂。“match_all”表示该index下的所有document。
搜索这节是比较重要的,此处只是简单的执行了一下搜索命令,后面有单独的章节进行介绍。
前面涉及到的源代码以及输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
import copy
import elasticsearch from elasticsearch import helpers es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
import requests res = requests.get('http://127.0.0.1:9200') print(res.content)
import json
example = { "name":"", "age":"", "sex":"" }
def append(name,age,sex): source = copy.deepcopy(example) source["name"] = name source["age"] = age source["sex"] = sex return source
document = { "_index":"", "_type":"", "_id":"", "_source":"" }
action = [] def create_doc(_index,_type,_id,_source): oneindex = copy.deepcopy(example) oneindex['_index'] = _index oneindex['_type'] = _type oneindex['_id'] = _id oneindex['_source'] = _source action.append(oneindex)
if __name__ == "__main__": print "index!!!...." source = append("jsper",17,"male") es.index(index='test', doc_type='people',id=1, body=source) source = append("david",18,"female") es.index(index='test', doc_type='people', id=2,body=source) source = append("enhen",19,"male") es.index(index='test', doc_type='people', id=3,body=source) print 'index success! \n'
print "get!!!..." try: r = es.get(index='test',doc_type= 'people',id = 2) print json.dumps(r,indent = 4) except elasticsearch.exceptions.NotFoundError: print 'error'
print "bulk\n" source = append("smaug",25,"male") create_doc("test2","people",1,source) source = append("ablex",24,"female") create_doc("test2","people",2,source) source = append("lucas",26,"female") create_doc("test2","people",3,source)
helpers.bulk(es,action) print "bulk finished! " r = es.get(index='test2',doc_type= 'people',id = 2) print json.dumps(r,indent = 4)
print "search\n" qbody = { "query":{ "match_all":{} } } try: res = es.search(index = 'test2',body = json.dumps(qbody)) RES = json.dumps(res,indent = 2) print RES except elasticsearch.exceptions.NotFoundError: print "not found %s" % query
|
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| [localhost@vm es]$ python index_markdown.py { "name" : "7YPd2Iu", "cluster_name" : "elasticsearch", "cluster_uuid" : "1BDZA7JKRV2jfrjtvQCpGw", "version" : { "number" : "5.5.0", "build_hash" : "260387d", "build_date" : "2017-06-30T23:16:05.735Z", "build_snapshot" : false, "lucene_version" : "6.6.0" }, "tagline" : "You Know, for Search" }
index!!!.... index success!
get!!!... { "_type": "people", "_source": { "age": 18, "name": "david", "sex": "female" }, "_index": "test", "_version": 7, "found": true, "_id": "2" } bulk
bulk finished! { "_type": "people", "_source": { "age": 24, "name": "ablex",
▽ "sex": "female" }, "_index": "test2", "_version": 4, "found": true, "_id": "2" } search
{ "hits": { "hits": [ { "_score": 1.0, "_type": "people", "_id": "2", "_source": { "age": 24, "name": "ablex", "sex": "female" }, "_index": "test2" }, { "_score": 1.0, "_type": "people", "_id": "1", "_source": { "age": 25, "name": "smaug", "sex": "male" }, "_index": "test2" }, { "_score": 1.0, "_type": "people", "_id": "3", "_source": { "age": 26, "name": "lucas", "sex": "female" }, "_index": "test2" } ], "total": 3, "max_score": 1.0 }, "_shards": { "successful": 5, "failed": 0, "total": 5 }, "took": 185, "timed_out": false }
|
过滤/搜索
这部分我觉得是比较核心的部分。因为搜索比较复杂,涉及到过滤条件,以及使用scroll的方式进行更加高性能的搜索等。所以我打算另写一篇博客详细介绍我的使用感受,配合我日常工作中碰到的具体事情来进行说明。
收工
好了,暂时写到这。后续关于ES更多的内容,见我之后的博客。