数据库相关操作


Elasticsearch数据库

环境配置

安装环境

pip install elasticsearch==7.6.0

EsDao包装类

# -- coding: utf-8 --

"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""

from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd


class EsDao(object):
    """
    ElasticSearch的数据操作类
    """
    # 查询批次大小
    DEFAULT_BATCH_SIZE = 1000

    # 写入批次大小
    BULK_BATCH_SIZE = 10000

    def __init__(self, hosts, timeout=3600*24):
        self.hosts = hosts
        self.timeout = timeout
        self.es = Elasticsearch(hosts, timeout=self.timeout)

    def save_data_list(self, index_name, data_list):
        """
        保存数据列表到es的指定索引中
        :param index_name: 索引名称
        :param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
        :return:
        """
        bulk_data_lst = [
            data_list[i:i + self.BULK_BATCH_SIZE]
            for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
        ]

        if len(data_list) > 0 and '_id' in data_list[0]:
            for bulk_data in bulk_data_lst:
                actions = [{
                    "_index": index_name,
                    "_type": index_name,
                    "_id": data.pop("_id"),
                    "_source": data
                }
                    for data in bulk_data
                ]
                bulk(self.es, actions, index=index_name, raise_on_error=True)
        else:
            for bulk_data in bulk_data_lst:
                actions = [{
                    "_index": index_name,
                    "_type": index_name,
                    "_source": data
                }
                    for data in bulk_data
                ]
                bulk(self.es, actions, index=index_name, raise_on_error=True)

    def is_index_exists(self, index_name):
        """
        判断指定索引是否存在
        :param index_name: 索引名称
        :return:
        """
        return self.es.indices.exists(index=index_name)

    def delete_by_query(self, index_name, query_body):
        """
        按查询结果删除数据
        :param index_name:
        :param query_body:
        :return:
        """
        return self.es.delete_by_query(index_name, query_body)

    def clear_index_data(self, index_name):
        """
        清空指定索引的数据
        :param index_name:
        :return:
        """
        return self.delete_by_query(
            index_name=index_name,
            query_body={
                "query": {
                    "match_all": {}
                }
            }
        )

    def save_df_data(self, index_name, df):
        """
        保存pandas的DataFrame到es的指定索引中
        :param index_name: 索引名称
        :param df: 要保存的dataframe
        :return:
        """
        col_lst = df.columns.tolist()
        dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
        self.save_data_list(index_name=index_name, data_list=dic_lst)

    def create_index(self, index_name, mapping_properties):
        """
        创建索引
        :param index_name: 索引名称
        :param mapping_properties: 索引mapping中的属性列表
        :return:
        """
        if not self.es.indices.exists(index=index_name):
            mapping = {
                "mappings": {
                    index_name: {
                        "properties": mapping_properties
                    }
                }
            }
            res = self.es.indices.create(index=index_name, body=mapping)
            if res is not None and 'acknowledged' in res:
                return res.get('acknowledged')
        return False

    def _search_with_scroll(self, index_name, query_body):
        if "size" not in query_body:
            query_body["size"] = self.DEFAULT_BATCH_SIZE
        response = self.es.search(
            index=index_name,
            body=query_body,
            search_type="dfs_query_then_fetch",
            scroll="120m",
            timeout="60m"
        )
        scroll_id = response["_scroll_id"]
        while True:
            sources = [doc["_source"] for doc in response["hits"]["hits"]]
            if len(sources) == 0:
                break
            yield sources
            response = self.es.scroll(scroll_id=scroll_id, scroll="60m")

    def query_for_df(self, index_name, query_body):
        """
        执行查询并获取pandas.DataFrame格式的返回值
        :param index_name: 索引名称
        :param query_body: 查询条件
        :return:
        """
        sources = []
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            sources.extend(sub_source)
        return pd.DataFrame(sources)

    def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
        """
        按批次大小查询并返回pandas.DataFrame的generator格式的返回值
        :param index_name: 索引名称
        :param query_body: 查询条件
        :param batch_size: 批次大小
        :return:
        """
        if "size" not in query_body:
            query_body["size"] = batch_size
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            yield pd.DataFrame(sub_source)

    def get_first_row_with_df(self, index_name):
        """
        获取指定索引的首行数据,格式为pandas.DataFrame
        可用于获取索引的元信息
        :param index_name: 索引名称
        :return:
        """
        query_body = {
            "size": 1,
            "query": {
                "match_all": {}
            }
        }
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            return pd.DataFrame(sub_source)

使用案例

Oracle数据库

Python操作Oracle数据库:cx_Oracle

环境配置

第一步:安装库

pip install cx-Oracle

第二步:文件拷贝

需要将oci.dll、oraocci11.dll、oraociei11.dll复制到sitepackages路径下

sql基础

建表

查询

获取连接

读写数据库

相关操作

关于数据库的连接,查询和写入

Postgresql数据库

我终于学会了使用python操作postgresql

保姆级 CentOS 7离线安装PostgreSQL 14教程

易百_PostgreSQL教程

离线安装数据库

先从centos7-pg_14.2下载下载rpm包(微云下载centos7.6_PostgreSQL14.2),或者直接官方下载安装教程安装,如果离线安装就下载rpm包

出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法

创建数据库data和log文件夹

授权给安装数据时自动创建的postgres用户

切换到安装数据时自动创建的postgres用户

初始化数据库到新建数据目录

启动服务器(初始化数据库日志文件)

切换到管理员开启端口并重启防火墙

修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf

修改可访问的用户IP段

postgres用户重启数据库服务

数据库安装结束,管理员postgres,默认密码123456

img

使用navicat连接pg库后新建数据库

环境配置

pip install psycopg2

sql语法

数据库连接

数据库信息

数据备份与恢复

使用pgdump备份数据库

pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:

  1. 备份数据可以保持原有的结构和特性,还原时可以保证数据准确性

  2. 备份文件可以跨平台传输,方便进行远程备份

  3. 备份文件可以进行压缩,减小文件大小,方便传输和存储

可以新建数据库,建几张表做测试

备份

使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可

  • 备份整个数据库

  • 备份指定表或数据

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址

  • -p:数据库服务的监听端口,一般为默认端口5432

  • -U:连接数据库的用户名

  • -F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t

  • -b:在备份文件中包含备份的数据库的模式信息

  • -v:备份过程中输出详细的信息

  • -f:备份文件的保存路径和文件名

  • -t:只备份指定的表和数据

还原

使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可

  • 恢复整个库

  • 恢复指定数据

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址

  • -p:数据库服务的监听端口,一般为默认端口5432

  • -U:连接数据库的用户名

  • -d:恢复数据的目标数据库名称

  • -t:只恢复指定的表和数据

命令详解

对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下

表空间

新建表空间

pg库新建表空间

表空间有关的一些语法

锁表处理

pg锁表解锁

  1. 查看被锁的表

  2. 杀死被锁的pid

表结构修改

数据更新和查询

设置某字段的值

删除表中重复数据

不存在插入,存在更新

conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件

column_name必须是主键或者其他具有唯一性的字段(如唯一键或排他键)

数据和结构复制

分页查询

删除重复记录

postgresql 常用的删除重复数据方法

第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本

关键

pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid

其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关

ctid表示行版本在表中的物理位置: 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。

索引

什么情况下要避免使用索引?

虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引

使用索引时,需要考虑下列准则

  • 索引不应该使用在较小的表上

  • 索引不应该使用在有频繁的大批量的更新或插入操作的表上

  • 索引不应该使用在含有大量的 NULL 值的列上

  • 索引不应该使用在频繁操作的列上

其他语法

筛选某列,逗号拼接

日期转换

转时间戳

postgresql 获取分组第一条数据 窗口函数

  1. 给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名

  2. 从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1

  3. 获取前N名的数据,将一中第二步的条件换成where 排名 < N+1

distributed key

ORM框架

ORM框架比较

一文了解 Python 的三种数据源架构模式

SQLAlchemy 和其他的 ORM 框架

SQLObject

  • 优点:

    采用了易懂的ActiveRecord 模式

    一个相对较小的代码库

  • 缺点:

    方法和类的命名遵循了Java 的小驼峰风格

    不支持数据库session隔离工作单元

Storm

  • 优点:

    清爽轻量的API,短学习曲线和长期可维护性

    不需要特殊的类构造函数,也没有必要的基类

  • 缺点:

    迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生

    Storm的贡献者必须把他们的贡献的版权给Canonical公司

Django's ORM

  • 优点:

    易用,学习曲线短

    和Django紧密集合,用Django时使用约定俗成的方法去操作数据库

  • 缺点:

    不好处理复杂的查询,强制开发者回到原生SQL

    紧密和Django集成,使得在Django环境外很难使用

peewee

  • 优点:

    Django式的API,使其易用

    轻量实现,很容易和任意web框架集成

  • 缺点:

    不支持自动化 schema 迁移

    多对多查询写起来不直观

SQLAlchemy

  • 优点:

    企业级API,使得代码有健壮性和适应性

    灵活的设计,使得能轻松写复杂查询

  • 缺点:

    工作单元概念不常见

    重量级API,导致长学习曲线

相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。

SQLAlchemy

SQLAlchemy 1.4 Documentation

sqlalchemy操作数据库

sqlalchemy外键和relationship查询

SQLALlchemy数据查询小集合

SQLAlchemy 的连接池机制

SQLAlchemy 中的 Session、sessionmaker、scoped_session

Contextual/Thread-local Sessions

SQLAlchemy(常用的SQLAlchemy列选项)

查询官网例子Object Relational Tutorial (1.x API)

sqlalchemy外键和relationship查询

session和scoped_session

session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值

session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()

Engine Configuration

使用 create_engine创建我们需要的DB starting point

create_engine 函数常用参数:

  • pool_size=10 # 连接池的大小,0表示连接数无限制

  • pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away

  • max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接

  • pool_timeout=30 # 在连接池获取一个空闲连接等待的时间

  • echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout

创建Engine之后,接下来的问题,就是如何使用Engine

在单进程中,建议在在初始化的模块的时候创建Engine, 使Engine成为全局变量, 而不是为每个调用Engine的对象或者函数中创建, Engine不同于connect, connect函数会创建数据库连接的资源,Engine是管理connect创建的连接资源

在多进程中,为每个子进程都创建各自的Engine, 因为进程之间是不能共享Engine

几种操作方式

Working with Engines and Connections

SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?

Engine方式

Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;

Connection方式

Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;

Session方式

Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;

总结: 从应用角度来看,可以把这三类分为两种:

  1. 直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句

  2. 使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式

最后更新于

这有帮助吗?