数据库相关操作
Elasticsearch数据库
环境配置
安装环境
pip install elasticsearch==7.6.0
EsDao包装类
# -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd
class EsDao(object):
"""
ElasticSearch的数据操作类
"""
# 查询批次大小
DEFAULT_BATCH_SIZE = 1000
# 写入批次大小
BULK_BATCH_SIZE = 10000
def __init__(self, hosts, timeout=3600*24):
self.hosts = hosts
self.timeout = timeout
self.es = Elasticsearch(hosts, timeout=self.timeout)
def save_data_list(self, index_name, data_list):
"""
保存数据列表到es的指定索引中
:param index_name: 索引名称
:param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
:return:
"""
bulk_data_lst = [
data_list[i:i + self.BULK_BATCH_SIZE]
for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
]
if len(data_list) > 0 and '_id' in data_list[0]:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_id": data.pop("_id"),
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
else:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
def is_index_exists(self, index_name):
"""
判断指定索引是否存在
:param index_name: 索引名称
:return:
"""
return self.es.indices.exists(index=index_name)
def delete_by_query(self, index_name, query_body):
"""
按查询结果删除数据
:param index_name:
:param query_body:
:return:
"""
return self.es.delete_by_query(index_name, query_body)
def clear_index_data(self, index_name):
"""
清空指定索引的数据
:param index_name:
:return:
"""
return self.delete_by_query(
index_name=index_name,
query_body={
"query": {
"match_all": {}
}
}
)
def save_df_data(self, index_name, df):
"""
保存pandas的DataFrame到es的指定索引中
:param index_name: 索引名称
:param df: 要保存的dataframe
:return:
"""
col_lst = df.columns.tolist()
dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
self.save_data_list(index_name=index_name, data_list=dic_lst)
def create_index(self, index_name, mapping_properties):
"""
创建索引
:param index_name: 索引名称
:param mapping_properties: 索引mapping中的属性列表
:return:
"""
if not self.es.indices.exists(index=index_name):
mapping = {
"mappings": {
index_name: {
"properties": mapping_properties
}
}
}
res = self.es.indices.create(index=index_name, body=mapping)
if res is not None and 'acknowledged' in res:
return res.get('acknowledged')
return False
def _search_with_scroll(self, index_name, query_body):
if "size" not in query_body:
query_body["size"] = self.DEFAULT_BATCH_SIZE
response = self.es.search(
index=index_name,
body=query_body,
search_type="dfs_query_then_fetch",
scroll="120m",
timeout="60m"
)
scroll_id = response["_scroll_id"]
while True:
sources = [doc["_source"] for doc in response["hits"]["hits"]]
if len(sources) == 0:
break
yield sources
response = self.es.scroll(scroll_id=scroll_id, scroll="60m")
def query_for_df(self, index_name, query_body):
"""
执行查询并获取pandas.DataFrame格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:return:
"""
sources = []
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
sources.extend(sub_source)
return pd.DataFrame(sources)
def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
"""
按批次大小查询并返回pandas.DataFrame的generator格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:param batch_size: 批次大小
:return:
"""
if "size" not in query_body:
query_body["size"] = batch_size
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
yield pd.DataFrame(sub_source)
def get_first_row_with_df(self, index_name):
"""
获取指定索引的首行数据,格式为pandas.DataFrame
可用于获取索引的元信息
:param index_name: 索引名称
:return:
"""
query_body = {
"size": 1,
"query": {
"match_all": {}
}
}
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
return pd.DataFrame(sub_source)
使用案例
Oracle数据库
环境配置
第一步:安装库
pip install cx-Oracle
第二步:文件拷贝
需要将
oci.dll、oraocci11.dll、oraociei11.dll复制到sitepackages路径下
sql基础
建表
查询
获取连接
读写数据库
相关操作
关于数据库的连接,查询和写入
Postgresql数据库
离线安装数据库
先从centos7-pg_14.2下载下载rpm包(微云下载centos7.6_PostgreSQL14.2),或者直接官方下载安装教程安装,如果离线安装就下载rpm包
出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法
创建数据库data和log文件夹
授权给安装数据时自动创建的postgres用户
切换到安装数据时自动创建的postgres用户
初始化数据库到新建数据目录
启动服务器(初始化数据库日志文件)
切换到管理员开启端口并重启防火墙
修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf
修改可访问的用户IP段
postgres用户重启数据库服务
数据库安装结束,管理员postgres,默认密码123456

使用navicat连接pg库后新建数据库
环境配置
pip install psycopg2
sql语法
数据库连接
数据库信息
数据备份与恢复
pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:
备份数据可以保持原有的结构和特性,还原时可以保证数据准确性
备份文件可以跨平台传输,方便进行远程备份
备份文件可以进行压缩,减小文件大小,方便传输和存储
可以新建数据库,建几张表做测试
备份
使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可
备份整个数据库
备份指定表或数据
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
-p:数据库服务的监听端口,一般为默认端口5432
-U:连接数据库的用户名
-F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t
-b:在备份文件中包含备份的数据库的模式信息
-v:备份过程中输出详细的信息
-f:备份文件的保存路径和文件名
-t:只备份指定的表和数据
还原
使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可
恢复整个库
恢复指定数据
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
-p:数据库服务的监听端口,一般为默认端口5432
-U:连接数据库的用户名
-d:恢复数据的目标数据库名称
-t:只恢复指定的表和数据
命令详解
对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下
表空间
新建表空间
pg库新建表空间
表空间有关的一些语法
锁表处理
pg锁表解锁
查看被锁的表
杀死被锁的pid
表结构修改
数据更新和查询
设置某字段的值
删除表中重复数据
不存在插入,存在更新
conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件
column_name必须是主键或者其他具有唯一性的字段(如唯一键或排他键)
数据和结构复制
分页查询
删除重复记录
第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本
关键
pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid
其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关
ctid表示行版本在表中的物理位置: 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。
索引
什么情况下要避免使用索引?
虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引
使用索引时,需要考虑下列准则:
索引不应该使用在较小的表上
索引不应该使用在有频繁的大批量的更新或插入操作的表上
索引不应该使用在含有大量的 NULL 值的列上
索引不应该使用在频繁操作的列上
其他语法
筛选某列,逗号拼接
日期转换
转时间戳
postgresql 获取分组第一条数据 窗口函数
给数据分组并排名,使用
row_number() over (partition by 分组的字段名 order by 排序规则) as 排名从上述第一步中取出,排名为第一的数据,即为第一条数据
select * from 上述第一步 where 排名=1获取前N名的数据,将一中第二步的条件换成
where 排名 < N+1
distributed key
ORM框架
ORM框架比较
SQLObject
优点:
采用了易懂的ActiveRecord 模式
一个相对较小的代码库
缺点:
方法和类的命名遵循了Java 的小驼峰风格
不支持数据库session隔离工作单元
Storm
优点:
清爽轻量的API,短学习曲线和长期可维护性
不需要特殊的类构造函数,也没有必要的基类
缺点:
迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生
Storm的贡献者必须把他们的贡献的版权给Canonical公司
Django's ORM
优点:
易用,学习曲线短
和Django紧密集合,用Django时使用约定俗成的方法去操作数据库
缺点:
不好处理复杂的查询,强制开发者回到原生SQL
紧密和Django集成,使得在Django环境外很难使用
peewee
优点:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成
缺点:
不支持自动化 schema 迁移
多对多查询写起来不直观
SQLAlchemy
优点:
企业级API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
缺点:
工作单元概念不常见
重量级API,导致长学习曲线
相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。
SQLAlchemy
SQLAlchemy 中的 Session、sessionmaker、scoped_session
Contextual/Thread-local Sessions
session和scoped_session
session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值
session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()
使用 create_engine创建我们需要的DB starting point
create_engine 函数常用参数:
pool_size=10 # 连接池的大小,0表示连接数无限制
pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away
max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接
pool_timeout=30 # 在连接池获取一个空闲连接等待的时间
echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout
创建Engine之后,接下来的问题,就是如何使用Engine
在单进程中,建议在在初始化的模块的时候创建Engine, 使Engine成为全局变量, 而不是为每个调用Engine的对象或者函数中创建, Engine不同于connect, connect函数会创建数据库连接的资源,Engine是管理connect创建的连接资源
在多进程中,为每个子进程都创建各自的Engine, 因为进程之间是不能共享Engine
几种操作方式
Engine方式
Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;
Connection方式
Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;
Session方式
Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;
总结: 从应用角度来看,可以把这三类分为两种:
直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句
使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式
最后更新于
这有帮助吗?