数据库相关操作
Elasticsearch数据库
环境配置
EsDao包装类
# -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd
class EsDao(object):
"""
ElasticSearch的数据操作类
"""
# 查询批次大小
DEFAULT_BATCH_SIZE = 1000
# 写入批次大小
BULK_BATCH_SIZE = 10000
def __init__(self, hosts, timeout=3600*24):
self.hosts = hosts
self.timeout = timeout
self.es = Elasticsearch(hosts, timeout=self.timeout)
def save_data_list(self, index_name, data_list):
"""
保存数据列表到es的指定索引中
:param index_name: 索引名称
:param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
:return:
"""
bulk_data_lst = [
data_list[i:i + self.BULK_BATCH_SIZE]
for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
]
if len(data_list) > 0 and '_id' in data_list[0]:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_id": data.pop("_id"),
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
else:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
def is_index_exists(self, index_name):
"""
判断指定索引是否存在
:param index_name: 索引名称
:return:
"""
return self.es.indices.exists(index=index_name)
def delete_by_query(self, index_name, query_body):
"""
按查询结果删除数据
:param index_name:
:param query_body:
:return:
"""
return self.es.delete_by_query(index_name, query_body)
def clear_index_data(self, index_name):
"""
清空指定索引的数据
:param index_name:
:return:
"""
return self.delete_by_query(
index_name=index_name,
query_body={
"query": {
"match_all": {}
}
}
)
def save_df_data(self, index_name, df):
"""
保存pandas的DataFrame到es的指定索引中
:param index_name: 索引名称
:param df: 要保存的dataframe
:return:
"""
col_lst = df.columns.tolist()
dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
self.save_data_list(index_name=index_name, data_list=dic_lst)
def create_index(self, index_name, mapping_properties):
"""
创建索引
:param index_name: 索引名称
:param mapping_properties: 索引mapping中的属性列表
:return:
"""
if not self.es.indices.exists(index=index_name):
mapping = {
"mappings": {
index_name: {
"properties": mapping_properties
}
}
}
res = self.es.indices.create(index=index_name, body=mapping)
if res is not None and 'acknowledged' in res:
return res.get('acknowledged')
return False
def _search_with_scroll(self, index_name, query_body):
if "size" not in query_body:
query_body["size"] = self.DEFAULT_BATCH_SIZE
response = self.es.search(
index=index_name,
body=query_body,
search_type="dfs_query_then_fetch",
scroll="120m",
timeout="60m"
)
scroll_id = response["_scroll_id"]
while True:
sources = [doc["_source"] for doc in response["hits"]["hits"]]
if len(sources) == 0:
break
yield sources
response = self.es.scroll(scroll_id=scroll_id, scroll="60m")
def query_for_df(self, index_name, query_body):
"""
执行查询并获取pandas.DataFrame格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:return:
"""
sources = []
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
sources.extend(sub_source)
return pd.DataFrame(sources)
def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
"""
按批次大小查询并返回pandas.DataFrame的generator格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:param batch_size: 批次大小
:return:
"""
if "size" not in query_body:
query_body["size"] = batch_size
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
yield pd.DataFrame(sub_source)
def get_first_row_with_df(self, index_name):
"""
获取指定索引的首行数据,格式为pandas.DataFrame
可用于获取索引的元信息
:param index_name: 索引名称
:return:
"""
query_body = {
"size": 1,
"query": {
"match_all": {}
}
}
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
return pd.DataFrame(sub_source)
使用案例
Oracle数据库
环境配置
sql基础
建表
查询
相关操作
Postgresql数据库
离线安装数据库

环境配置
sql语法
数据库连接
数据库信息
数据备份与恢复
表空间
锁表处理
表结构修改
数据更新和查询
数据和结构复制
分页查询
删除重复记录
索引
其他语法
ORM框架
ORM框架比较
SQLAlchemy
session和scoped_session
几种操作方式
最后更新于