日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python使用elasticsearch维护数据_使用Python对ElasticSearch获取数据及操作

發(fā)布時間:2024/9/27 python 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python使用elasticsearch维护数据_使用Python对ElasticSearch获取数据及操作 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

#!/usr/bin/env python#-*- coding: utf-8 -*-

"""@Time : 2018/7/4

@Author : LiuXueWen

@Site :

@File : ElasticSearchOperation.py

@Software: PyCharm

@Description: 對elasticsearch數(shù)據(jù)的操作,包括獲取數(shù)據(jù),發(fā)送數(shù)據(jù)"""

importelasticsearchimportjsonimportUtil_Ini_Operationclasselasticsearch_data():def __init__(self,hosts,username,password,maxsize,is_ssl):#初始化ini操作腳本,獲取配置文件

try:#判斷請求方式是否ssl加密

if is_ssl == "true":#獲取證書地址

cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")

es_ssl=elasticsearch.Elasticsearch(#地址

hosts=hosts,#用戶名密碼

http_auth=(username,password),#開啟ssl

use_ssl=True,#確認(rèn)有加密證書

verify_certs=True,#對應(yīng)的加密證書地址

client_cert=cert_pem

)

self.es=es_sslelif is_ssl == "false":#創(chuàng)建普通類型的ES客戶端

es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))

self.es=es_ordinaryexceptException as e:print(e)defquery_data(self,keywords_list,date):

gte= "now-"+str(date)

query_data={#查詢語句

"query": {"bool": {"must": [

{"query_string": {"query": keywords_list,"analyze_wildcard": True

}

},

{"range": {"@timestamp": {"gte": gte,"lte": "now","format": "epoch_millis"}

}

}

],"must_not": []

}

}

}returnquery_data#從es獲取數(shù)據(jù)

defget_datas_by_query(self,index_name,keywords,param,date):''':param index_name: 索引名稱

:param keywords: 關(guān)鍵字詞,數(shù)組

:param param: 需要數(shù)據(jù)條件,例如_source

:param date: 過去時間范圍,字符串格式,例如過去30分鐘內(nèi)數(shù)據(jù),"30m"

:return: all_datas 返回查詢到的所有數(shù)據(jù)(已經(jīng)過param過濾)'''all_datas=[]#遍歷所有的查詢條件

for keywords_list inkeywords:#DSL語句

query_data =self.query_data(keywords_list,date)

res=self.es.search(

index=index_name,

body=query_data

)for hit in res['hits']['hits']:#獲取指定的內(nèi)容

response =hit[param]#添加所有數(shù)據(jù)到數(shù)據(jù)集中

all_datas.append(response)#返回所有數(shù)據(jù)內(nèi)容

returnall_datas#當(dāng)索引不存在創(chuàng)建索引

defcreate_index(self,index_name):''':param index_name: 索引名稱

:return:如果創(chuàng)建成功返回創(chuàng)建結(jié)果信息,試過已經(jīng)存在創(chuàng)建新的index失敗返回index的名稱'''

#獲取索引的映射

#index_mapping = IndexMapping.index_mapping

## 判斷索引是否存在

#if self.es.indices.exists(index=index_name) is not True:

## 創(chuàng)建索引

#res = self.es.indices.create(index=index_name,body=index_mapping)

## 返回結(jié)果

#return res

#else:

## 返回索引名稱

#return index_name

pass

#插入指定的單條數(shù)據(jù)內(nèi)容

definsert_single_data(self,index_name,doc_type,data):''':param index_name: 索引名稱

:param doc_type: 文檔類型

:param data: 需要插入的數(shù)據(jù)內(nèi)容

:return: 執(zhí)行結(jié)果'''res= self.es.index(index=index_name,doc_type=doc_type,body=data)returnres#向ES中新增數(shù)據(jù),批量插入

definsert_datas(self,index_name):''':desc 通過讀取指定的文件內(nèi)容獲取需要插入的數(shù)據(jù)集

:param index_name: 索引名稱

:return: 插入成功的數(shù)據(jù)條數(shù)'''insert_datas=[]#判斷插入數(shù)據(jù)的索引是否存在

self.createIndex(index_name=index_name)#獲取插入數(shù)據(jù)的文件地址

data_file_path = self.ini.get_key_value("datafile","datafilepath")#獲取需要插入的數(shù)據(jù)集

with open(data_file_path,"r+") as data_file:#獲取文件所有數(shù)據(jù)

data_lines =data_file.readlines()for data_line indata_lines:#string to json

data_line =json.loads(data_line)

insert_datas.append(data_line)#批量處理

res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)returnres#從ES中在指定的索引中刪除指定數(shù)據(jù)(根據(jù)id判斷)

defdelete_data_by_id(self,index_name,doc_type,id):''':param index_name: 索引名稱

:param index_type: 文檔類型

:param id: 唯一標(biāo)識id

:return: 刪除結(jié)果信息'''res= self.es.delete(index=index_name,doc_type=doc_type,id=id)returnres#根據(jù)條件刪除數(shù)據(jù)

defdelete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):''':param index_name:索引名稱,為空查詢所有索引

:param doc_type:文檔類型,為空查詢所有文檔類型

:param param:過濾條件值

:param gt_time:時間范圍,大于該時間

:param lt_time:時間范圍,小于該時間

:return:執(zhí)行條件刪除后的結(jié)果信息'''

#DSL語句

query_data ={#查詢語句

"query": {"bool": {"must": [

{"query_string": {"query": param,"analyze_wildcard": True

}

},

{"range": {"@timestamp": {"gte": gt_time,"lte": lt_time,"format": "epoch_millis"}

}

}

],"must_not": []

}

}

}

res= self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)returnres#指定index中刪除指定時間段內(nèi)的全部數(shù)據(jù)

defdelete_all_datas(self,index_name,doc_type,gt_time,lt_time):''':param index_name:索引名稱,為空查詢所有索引

:param doc_type:文檔類型,為空查詢所有文檔類型

:param gt_time:時間范圍,大于該時間

:param lt_time:時間范圍,小于該時間

:return:執(zhí)行條件刪除后的結(jié)果信息'''

#DSL語句

query_data ={#查詢語句

"query": {"bool": {"must": [

{"match_all": {}

},

{"range": {"@timestamp": {"gte": gt_time,"lte": lt_time,"format": "epoch_millis"}

}

}

],"must_not": []

}

}

}

res= self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)returnres#修改ES中指定的數(shù)據(jù)

defupdate_data_by_id(self,index_name,doc_type,id,data):''':param index_name: 索引名稱

:param doc_type: 文檔類型,為空表示所有類型

:param id: 文檔唯一標(biāo)識編號

:param data: 更新的數(shù)據(jù)

:return: 更新結(jié)果信息'''res= self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)return res

總結(jié)

以上是生活随笔為你收集整理的python使用elasticsearch维护数据_使用Python对ElasticSearch获取数据及操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。