python
  • Introduction
  • python多进程
  • python多线程
  • python协程
  • python异步编程
  • python装饰器
  • python魔法函数
  • python元编程
  • collections模块
  • logging模块
  • 数据库相关操作
  • RegularExpressionOP
  • 文件和目录访问
  • 枚举类Enum
  • python基础知识
  • pythonic的几个办法
  • pandas小记
  • numpy小记
  • python_pipe包管道包学习
  • Python增强提案PEP
  • python进阶问题
  • python性能优化模块
  • python网络编程
  • Anaconda开发环境
  • elementary_os记录
  • Jupyter使用教程
  • Kafka基本安装和使用
  • ML一元函数微积分
由 GitBook 提供支持
在本页
  • Elasticsearch数据库
  • 环境配置
  • EsDao包装类
  • 使用案例
  • Oracle数据库
  • 环境配置
  • sql基础
  • 相关操作
  • Postgresql数据库
  • 离线安装数据库
  • 环境配置
  • sql语法
  • ORM框架
  • ORM框架比较
  • SQLAlchemy

这有帮助吗?

数据库相关操作


Elasticsearch数据库

环境配置

安装环境

pip install elasticsearch==7.6.0

EsDao包装类

# -- coding: utf-8 --

"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""

from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd


class EsDao(object):
    """
    ElasticSearch的数据操作类
    """
    # 查询批次大小
    DEFAULT_BATCH_SIZE = 1000

    # 写入批次大小
    BULK_BATCH_SIZE = 10000

    def __init__(self, hosts, timeout=3600*24):
        self.hosts = hosts
        self.timeout = timeout
        self.es = Elasticsearch(hosts, timeout=self.timeout)

    def save_data_list(self, index_name, data_list):
        """
        保存数据列表到es的指定索引中
        :param index_name: 索引名称
        :param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
        :return:
        """
        bulk_data_lst = [
            data_list[i:i + self.BULK_BATCH_SIZE]
            for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
        ]

        if len(data_list) > 0 and '_id' in data_list[0]:
            for bulk_data in bulk_data_lst:
                actions = [{
                    "_index": index_name,
                    "_type": index_name,
                    "_id": data.pop("_id"),
                    "_source": data
                }
                    for data in bulk_data
                ]
                bulk(self.es, actions, index=index_name, raise_on_error=True)
        else:
            for bulk_data in bulk_data_lst:
                actions = [{
                    "_index": index_name,
                    "_type": index_name,
                    "_source": data
                }
                    for data in bulk_data
                ]
                bulk(self.es, actions, index=index_name, raise_on_error=True)

    def is_index_exists(self, index_name):
        """
        判断指定索引是否存在
        :param index_name: 索引名称
        :return:
        """
        return self.es.indices.exists(index=index_name)

    def delete_by_query(self, index_name, query_body):
        """
        按查询结果删除数据
        :param index_name:
        :param query_body:
        :return:
        """
        return self.es.delete_by_query(index_name, query_body)

    def clear_index_data(self, index_name):
        """
        清空指定索引的数据
        :param index_name:
        :return:
        """
        return self.delete_by_query(
            index_name=index_name,
            query_body={
                "query": {
                    "match_all": {}
                }
            }
        )

    def save_df_data(self, index_name, df):
        """
        保存pandas的DataFrame到es的指定索引中
        :param index_name: 索引名称
        :param df: 要保存的dataframe
        :return:
        """
        col_lst = df.columns.tolist()
        dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
        self.save_data_list(index_name=index_name, data_list=dic_lst)

    def create_index(self, index_name, mapping_properties):
        """
        创建索引
        :param index_name: 索引名称
        :param mapping_properties: 索引mapping中的属性列表
        :return:
        """
        if not self.es.indices.exists(index=index_name):
            mapping = {
                "mappings": {
                    index_name: {
                        "properties": mapping_properties
                    }
                }
            }
            res = self.es.indices.create(index=index_name, body=mapping)
            if res is not None and 'acknowledged' in res:
                return res.get('acknowledged')
        return False

    def _search_with_scroll(self, index_name, query_body):
        if "size" not in query_body:
            query_body["size"] = self.DEFAULT_BATCH_SIZE
        response = self.es.search(
            index=index_name,
            body=query_body,
            search_type="dfs_query_then_fetch",
            scroll="120m",
            timeout="60m"
        )
        scroll_id = response["_scroll_id"]
        while True:
            sources = [doc["_source"] for doc in response["hits"]["hits"]]
            if len(sources) == 0:
                break
            yield sources
            response = self.es.scroll(scroll_id=scroll_id, scroll="60m")

    def query_for_df(self, index_name, query_body):
        """
        执行查询并获取pandas.DataFrame格式的返回值
        :param index_name: 索引名称
        :param query_body: 查询条件
        :return:
        """
        sources = []
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            sources.extend(sub_source)
        return pd.DataFrame(sources)

    def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
        """
        按批次大小查询并返回pandas.DataFrame的generator格式的返回值
        :param index_name: 索引名称
        :param query_body: 查询条件
        :param batch_size: 批次大小
        :return:
        """
        if "size" not in query_body:
            query_body["size"] = batch_size
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            yield pd.DataFrame(sub_source)

    def get_first_row_with_df(self, index_name):
        """
        获取指定索引的首行数据,格式为pandas.DataFrame
        可用于获取索引的元信息
        :param index_name: 索引名称
        :return:
        """
        query_body = {
            "size": 1,
            "query": {
                "match_all": {}
            }
        }
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            return pd.DataFrame(sub_source)

使用案例

class TaskMeta:
    '''
    数据元类
    '''
    def __init__(self, text, doc_id, sentence_id, reg_lst, flag, has_reg, text_source="primitive"):
        self.text = text
        self.doc_id = doc_id
        self.sentence_id = sentence_id
        self.reg_lst = reg_lst
        self.flag = flag
        self.has_reg = has_reg
        self.text_source = text_source

    def __repr__(self):
        return f'{self.text} {self.doc_id} {self.sentence_id} {self.reg_lst} {self.flag} {self.has_reg} {self.text_source}'

    def to_dict(self):
        return {"text": self.text,
                "doc_id": self.doc_id,
                "sentence_id": self.sentence_id,
                "reg_lst": self.reg_lst,
                "flag": self.flag,
                "has_reg": self.has_reg,
                "text_source": self.text_source}
def create_index(target_es_dao, index_name, mapping):
    '''
    创建es索引
    :return: 是否创建成功
    '''
    if not target_es_dao.is_index_exists(index_name):
        target_es_dao.create_index(index_name, mapping)
    else:
        target_es_dao.clear_index_data(index_name)
        print(f"索引{index_name}已存在, 已清除数据")

def writer_fun(target_es_dao, target_index, sample_lst):
    '''
    写数据到es库
    '''
    df_sample_lst = []
    [df_sample_lst.append(sample.to_dict()) for sample in sample_lst]
    df_sample_lst = pd.DataFrame(df_sample_lst)
    target_es_dao.save_df_data(target_index, df_sample_lst)
    print(f'写入数据{len(sample_lst)}条')

def es_cal_test():
    # 获取连接
    source_es_dao = EsDao(f"http://{aug_config.SOURCE_IP}:{aug_config.SOURCE_PORT}/")
    query_condition = {
        "query_string": {
            "default_field": "has_reg",
            "query": "true"
        }
    }
    query_body = {
        "query": query_condition
    }
    # 查询数据
    datas = source_es_dao.query_for_df(index_name=aug_config.SOURCE_INDEX, query_body=query_body)
    records = datas.to_dict(orient='record')
    sample_lst = []
    for record in records:
        sample_lst.append(
            TaskMeta(
                text=record["text"],
                doc_id=record["doc_id"],
                sentence_id=record["sentence_id"],
                reg_lst=record["reg_lst"],
                flag=record["flag"],
                has_reg=record["has_reg"]
            )
        )

    # 创建索引
    create_index(target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING)
    # 写入数据
    writer_fun(target_es_dao, aug_config.TARGET_INDEX, sample_lst=sample_lst)

if __name__ == '__main__':
    es_cal_test()

Oracle数据库

环境配置

第一步:安装库

pip install cx-Oracle

第二步:文件拷贝

需要将oci.dll、oraocci11.dll、oraociei11.dll复制到sitepackages路径下

sql基础

建表

--blob字段插入实例
create table blob_table_tmp(
  id number primary key,
  blob_cl blob not null,
	clob_cl clob not null
);
insert into blob_table_tmp values(1,rawtohex('11111000011111'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(3,rawtohex('4561888'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(4,rawtohex('增加一条记录时333'),'增加一条记录时,碰到插入blob类型数据出错');

查询

获取连接

FINANCE_DB_HOST = "192.168.x.x"
FINANCE_DB_PORT = 1521
FINANCE_DB_USER = "hyc"
FINANCE_DB_PASSWORD = "123456"
FINANCE_DB_DB = "ORCL"

class OracleConn():
    config_path = ''
    @staticmethod
    def get_conn(conn_name, encoding="UTF-8"):
        conn_str = str(eval("%s_DB_USER" % (OracleConn.config_path, conn_name))) + "/" + str(eval("%s.%s_DB_PASSWORD" % (OracleConn.config_path, conn_name)))
        conn_str += "@" + str(eval("%s_DB_HOST" % (OracleConn.config_path, conn_name)))
        conn_str += ":" + str(eval("%s_DB_PORT" % (OracleConn.config_path, conn_name)))
        conn_str += "/" + str(eval("%s_DB_DB" % (OracleConn.config_path, conn_name)))
        return ora.connect(conn_str, encoding=encoding, nencoding=encoding)

读写数据库

def oracle_test():
    # 获取数据库连接
    conn = OracleConn.get_conn("FINANCE")
    cur = conn.cursor()

    # 查询数据
    sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp"
    datas = []
    r = cur.execute(sql)
    # 假设name是clob字段类型
    [datas.append((gg[0], gg[1].read().decode('utf-8'), gg[2].read())) for gg in r]

    # 写入数据
    insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
    res = []
    [res.append((data[0], data[1])) for data in datas]
    cur.executemany(insert_sql, res)
    cur.execute('commit')

    cur.close()
    conn.close()
    print("写入结束")


if __name__ == '__main__':
    oracle_test()

相关操作

关于数据库的连接,查询和写入

import cx_Oracle

class Setting:
    DB_USER = 'narutohyc'
    DB_PASSWORD = 'hyc'
    DB_IP = '192.168.0.1'
    DB_PORT = ''
    DB_SERVICE = 'dataBaseName'
setting = Setting()

def oracle_test():
    # 获取数据库连接
    conn = cx_Oracle.connect('%s/%s@%s/%s' % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding='utf-8')
    cur = conn.cursor()

    # 查询数据
    sql = "select ID, name from hyc_database"
    datas = []
    r = cur.execute(sql)
    # 假设name是clob字段类型
    [datas.append((gg[0], gg[1].read())) for gg in r]

    # 写入数据
    insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
    res = []
    [res.append((data[0], data[1])) for data in datas]
    cur.executemany(insert_sql, res)
    cur.execute('commit')

    cur.close()
    conn.close()
    print("写入结束")

if __name__ == '__main__':
    oracle_test()

Postgresql数据库

离线安装数据库

# 离线安装执行以下命令安装
rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm

出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法

yum install python3-devel

创建数据库data和log文件夹

# 创建数据库data和log文件夹
mkdir -p /home/postgres/pgsql_data
mkdir -p /home/postgres/pgsql_log

# 创建日志文件
touch /home/postgres/pgsql_log/pgsql.log

授权给安装数据时自动创建的postgres用户

chown -R postgres:postgres /home/postgres/pgsql_data
chown -R postgres:postgres /home/postgres/pgsql_log

切换到安装数据时自动创建的postgres用户

su - postgres

初始化数据库到新建数据目录

/usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data

启动服务器(初始化数据库日志文件)

/usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start
# 查看状态
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status

切换到管理员开启端口并重启防火墙

su root
firewall-cmd --zone=public --add-port=5432/tcp --permanent
firewall-cmd --reload

修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf

# 修改监听地址
listen_addresses = '*'
# 修改最大连接数(按需)
max_connections = 1000
# 修改密码认证
password_encryption = md5

修改可访问的用户IP段

vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存)
IPV4下修改为或新增
host    all             all             0.0.0.0/0               trust

postgres用户重启数据库服务

su - postgres
/usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart

数据库安装结束,管理员postgres,默认密码123456

使用navicat连接pg库后新建数据库

环境配置

pip install psycopg2

sql语法

数据库连接

-- 获取数据库实例连接数
select count(*) from pg_stat_activity;
-- 获取数据库最大连接数
show max_connections;
-- 查询当前连接数详细信息
select * from pg_stat_activity;
-- 查询数据库中各个用户名对应的数据库连接数
select usename, count(*) from pg_stat_activity group by usename;

数据库信息

-- 查询数据库大小
select pg_size_pretty (pg_database_size('pg_fac_stk'));

-- 查询各表磁盘占用
SELECT
    table_schema || '.' || table_name AS table_full_name,
    pg_size_pretty(pg_total_relation_size('"' || table_schema || '"."' || table_name || '"')) AS size
FROM information_schema.tables where table_name like 'finance_%'
ORDER BY
    pg_total_relation_size('"' || table_schema || '"."' || table_name || '"') DESC;
    
-- 获取各个表中的数据记录数
select relname as TABLE_NAME, reltuples as rowCounts from pg_class where relkind = 'r' order by rowCounts desc;

-- 查看数据库表对应的数据文件
select pg_relation_filepath('product');

-- 查看数据库实例的版本
select version();

-- 分析评估SQL执行情况
EXPLAIN ANALYZE SELECT * FROM t_cfg_opinfo;

-- 获取数据库当前的回滚事务数以及死锁数
select datname,xact_rollback,deadlocks from pg_stat_database;

数据备份与恢复

pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:

  1. 备份数据可以保持原有的结构和特性,还原时可以保证数据准确性

  2. 备份文件可以跨平台传输,方便进行远程备份

  3. 备份文件可以进行压缩,减小文件大小,方便传输和存储

可以新建数据库,建几张表做测试

# 学生表
CREATE TABLE students (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    gender VARCHAR(10) NOT NULL,
    age INTEGER NOT NULL,
    class VARCHAR(20) NOT NULL
);
# 学科表
CREATE TABLE subjects (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL
);
# 成绩表
CREATE TABLE scores (
    id SERIAL PRIMARY KEY,
    student_id INTEGER NOT NULL,
    subject_id INTEGER NOT NULL,
    score INTEGER NOT NULL,
    FOREIGN KEY (student_id) REFERENCES students (id),
    FOREIGN KEY (subject_id) REFERENCES subjects (id)
);

# 插入一些测试数据
INSERT INTO students (name, gender, age, class)
VALUES
    ('Alice', 'Female', 18, 'Class A'),
    ('Bob', 'Male', 17, 'Class B'),
    ('Charlie', 'Male', 19, 'Class A'),
    ('Diana', 'Female', 18, 'Class B');

# 插入学科表数据
INSERT INTO subjects (name)
VALUES
    ('Mathematics'),
    ('English'),
    ('Science');

-- Alice 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (1, 1, 90),
    (1, 2, 85),
    (1, 3, 92);
-- Bob 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (2, 1, 78),
    (2, 2, 80),
    (2, 3, 75);
-- Charlie 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (3, 1, 88),
    (3, 2, 92),
    (3, 3, 90);
-- Diana 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (4, 1, 95),
    (4, 2, 88),
    (4, 3, 92);

备份

使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可

  • 备份整个数据库

    pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -f <备份文件路径> <数据库名称>
    
    # 示例
    /usr/pgsql-14/bin/pg_dump -h 127.0.0.1 -U postgres -p 5432 -F t -b -v -f build_hyc_test.sql.tar hyc_test
  • 备份指定表或数据

    pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -t <表名1> -t <表名2> -f <备份文件路径> <数据库名称>
    
    # 示例
    -- 备份指定表到sql文件
    -- '-c --if-exists' 会生成 'drop table if exist' 命令
    -- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息
    pg_dump --verbose --host=192.168.xx.xx --port=5432 --username=postgres --file /home/huangyc/pg_bak_test/bak_hyc.sql --encoding=UTF-8 -t "public.tushare_wz_index" -t "public.tushare_us_basic" -t "public.dim_fund" -t "public.dim_index" -c --if-exists --no-owner pg_fac_stk

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址

  • -p:数据库服务的监听端口,一般为默认端口5432

  • -U:连接数据库的用户名

  • -F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t

  • -b:在备份文件中包含备份的数据库的模式信息

  • -v:备份过程中输出详细的信息

  • -f:备份文件的保存路径和文件名

  • -t:只备份指定的表和数据

-- 备份postgres库并tar打包
pg_dump -h 127.0.0.1 -p 5432 -U postgres -f postgres.sql.tar -Ft;

-- 备份postgres库,转储数据为带列名的INSERT命令
pg_dumpall -d postgres -U postgres -f postgres.sql --column-inserts;

还原

使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可

  • 恢复整个库

    pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -d <数据库名称> <备份文件路径>
    
    # 示例
    /usr/pgsql-14/bin/pg_restore -h 127.0.0.1 -U postgres -p 5432 -d hyc_test_bak build_hyc_test.sql.tar
  • 恢复指定数据

    pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -t <表名1> -t <表名2> -d <数据库名称> <备份文件路径>
    
    # 示例
    -- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复
    -- 还原指定sql文件到bak_test库(需要自己建库)
    psql --host=192.168.xx.xx --port=5432 --username=postgres -d bak_test --file /home/huangyc/pg_bak_test/bak_hyc.sql.tar

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址

  • -p:数据库服务的监听端口,一般为默认端口5432

  • -U:连接数据库的用户名

  • -d:恢复数据的目标数据库名称

  • -t:只恢复指定的表和数据

命令详解

[postgres@pg01 ~]$ pg_dump --help
用法:
  pg_dump [选项]... [数据库名字]
**一般选项**:
  -f, --file=FILENAME          输出文件或目录名
  -F, --format=c|d|t|p         输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值))
  -j, --jobs=NUM               执行多个并行任务进行备份转储工作
  -v, --verbose                详细模式
  -V, --version                输出版本信息,然后退出
  -Z, --compress=0-9           被压缩格式的压缩级别,0表示不压缩
  --lock-wait-timeout=TIMEOUT  在等待表锁超时后操作失败
  --no-sync                    不用等待变化安全写入磁盘
  -?, --help                   显示此帮助, 然后退出
**控制输出内容选项(常用)**:
  -a, --data-only              只转储数据,不包括模式,只对纯文本输出有意义
  -s, --schema-only            只转储模式, 不包括数据
  -c, --clean                  在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义
  -C, --create                 指定输出文件中是否生成create database语句,只对纯文本输出有意义
  -n, --schema=PATTERN         指定要导出的schema,不指定则导出所有的非系统schema
  -N, --exclude-schema=PATTERN 排除导出哪些schema
  -O, --no-owner               在明文格式中, 忽略恢复对象所属者
  -t, --table=PATTERN          指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了
  -T, --exclude-table=PATTERN  排除表
  -x, --no-privileges          不要转储权限 (grant/revoke)
  --disable-triggers           在只恢复数据的过程中禁用触发器
  --exclude-table-data=PATTERN do NOT dump data for the specified table(s)
  --if-exists                  当删除对象时使用IF EXISTS
  --inserts                    以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢
                               该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据
  --column-inserts             以带有列名的INSERT命令形式转储数据,例如insert into table_name(column,...) values(value1,...)
  --load-via-partition-root    通过根表加载分区
  --no-comments                不转储注释
  --no-tablespaces             不转储表空间分配信息
  --no-unlogged-table-data     不转储没有日志的表数据
  --on-conflict-do-nothing     将ON CONFLICT DO NOTHING添加到INSERT命令
**控制输出内容选项(不常用)**:
  -S, --superuser=NAME         指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。
  -b, --blobs                  在转储中包括大对象
  -B, --no-blobs               排除转储中的大型对象
  -E, --encoding=ENCODING      转储以ENCODING形式编码的数据
  --binary-upgrade             只能由升级工具使用
  --enable-row-security        启用行安全性(只转储用户能够访问的内容)
  --extra-float-digits=NUM     覆盖extra_float_digits的默认设置
  --disable-dollar-quoting     取消美元 (符号) 引号, 使用 SQL 标准引号
  --no-publications            不转储发布
  --no-security-labels         不转储安全标签的分配
  --no-subscriptions           不转储订阅
  --no-synchronized-snapshots  在并行工作集中不使用同步快照
  --quote-all-identifiers      所有标识符加引号,即使不是关键字
  --rows-per-insert=NROWS      每个插入的行数;意味着--inserts
  --section=SECTION            备份命名的节 (数据前, 数据, 及 数据后)
  --serializable-deferrable    等到备份可以无异常运行
  --snapshot=SNAPSHOT          为转储使用给定的快照
  --strict-names               要求每个表和(或)schema包括模式以匹配至少一个实体
  --use-set-session-authorization
                               使用 SESSION AUTHORIZATION 命令代替
                               ALTER OWNER 命令来设置所有权
**联接选项**:
  -d, --dbname=DBNAME      对数据库 DBNAME备份
  -h, --host=主机名        数据库服务器的主机名或套接字目录
  -p, --port=端口号        数据库服务器的端口号
  -U, --username=名字      以指定的数据库用户联接
  -w, --no-password        永远不提示输入口令
  -W, --password           强制口令提示 (自动)
  --role=ROLENAME          在转储前运行SET ROLE

对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下

[postgres@pg01 pg_backup]$ pg_restore --help
pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库.
用法:
  pg_restore [选项]... [文件名]
一般选项:
  -d, --dbname=名字        连接数据库名字
  -f, --file=文件名       输出文件名(- 对于stdout)
  -F, --format=c|d|t       备份文件格式(应该自动进行)
  -l, --list               打印归档文件的 TOC 概述
  -v, --verbose            详细模式
  -V, --version            输出版本信息, 然后退出
  -?, --help               显示此帮助, 然后退出
恢复控制选项:
  -a, --data-only              只恢复数据, 不包括模式
  -c, --clean                  在重新创建之前,先清除(删除)数据库对象
  -C, --create                 创建目标数据库
  -e, --exit-on-error          发生错误退出, 默认为继续
  -I, --index=NAME             恢复指定名称的索引
  -j, --jobs=NUM               执行多个并行任务进行恢复工作
  -L, --use-list=FILENAME      从这个文件中使用指定的内容表排序
                               输出
  -n, --schema=NAME            在这个模式中只恢复对象
  -N, --exclude-schema=NAME    不恢复此模式中的对象
  -O, --no-owner               不恢复对象所属者
  -P, --function=NAME(args)    恢复指定名字的函数
  -s, --schema-only            只恢复模式, 不包括数据
  -S, --superuser=NAME         使用指定的超级用户来禁用触发器
  -t, --table=NAME             恢复命名关系(表、视图等)
  -T, --trigger=NAME           恢复指定名字的触发器
  -x, --no-privileges          跳过处理权限的恢复 (grant/revoke)
  -1, --single-transaction     作为单个事务恢复
  --disable-triggers           在只恢复数据的过程中禁用触发器
  --enable-row-security        启用行安全性
  --if-exists                  当删除对象时使用IF EXISTS
  --no-comments                不恢复注释
  --no-data-for-failed-tables  对那些无法创建的表不进行
                               数据恢复
  --no-publications            不恢复发行
  --no-security-labels         不恢复安全标签信息
  --no-subscriptions           不恢复订阅
  --no-tablespaces             不恢复表空间的分配信息
  --section=SECTION            恢复命名节 (数据前、数据及数据后)
  --strict-names               要求每个表和(或)schema包括模式以匹配至少一个实体
  --use-set-session-authorization
                               使用 SESSION AUTHORIZATION 命令代替
                               ALTER OWNER 命令来设置所有权
联接选项:
  -h, --host=主机名        数据库服务器的主机名或套接字目录
  -p, --port=端口号        数据库服务器的端口号
  -U, --username=名字      以指定的数据库用户联接
  -w, --no-password        永远不提示输入口令
  -W, --password           强制口令提示 (自动)
  --role=ROLENAME          在恢复前执行SET ROLE操作
选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定
多次用于选择多个对象.
如果没有提供输入文件名, 则使用标准输入.

表空间

新建表空间

# 新建表空间目录 t_fac_ts
mkdir /home/huangyc/t_fac_ts
# 修改表空间的用户权限
chown postgres /home/huangyc/t_fac_ts

pg库新建表空间

create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts';

表空间有关的一些语法

# 删除表空间 (需要先drop表空间所有的表, 或者将该空间下所有的表移除才能drop表空间)
DROP TABLESPACE t_fac_ts;
# 修改具体的表到指定表空间下
ALTER TABLE t_fac_tushare_stock_basic SET TABLESPACE t_fac_ts;
# 修改指定库到指定表空间下
ALTER DATABASE name SET TABLESPACE new_tablespace;

锁表处理

pg锁表解锁

  1. 查看被锁的表

    select a.locktype,a.database,a.pid,a.mode,a.relation,b.relname
    from pg_locks a
    join pg_class b on a.relation = b.oid where relname='t_opt_strhdk_blsj';
  2. 杀死被锁的pid

    select pg_terminate_backend(pid);

表结构修改

-- 修改表名
alter table "user" rename to "ts_user";
-- 添加新字段
alter table table_name add column col_name varchar(50);
-- 丢弃某列
alter table table_name drop column col_name;
-- 添加主键
alter table table_name add primary key("col_name");
-- 修改字段名
alter table table_name rename column old_col_name to new_col_name;

数据更新和查询

设置某字段的值

-- 设置某字段的值
update table_name set col_name=new_value;

删除表中重复数据

-- 查询[旧表]数据的重复情况
select col1,col2,count(*) from old_table group by col1,col2;

-- 所有字段都一样的情况
create table bak_table as select distinct * from table_name;

-- 查询[新表]数据的重复情况
select col1,col2,count(*) from bak_table group by col1,col2;
truncate table old_table;
insert into old_table (col1,col2) select col1,col2 from bak_table;

不存在插入,存在更新

insert into ... on conflict(column_name) do update set ...

conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件

insert into user(id,username,address,create_date,create_by) 
values('1','刘德华','香港',now(),'system') 
on conflict(id) 
do update set address='中国',update_date=now(),update_by='system';
# 批量的方式
insert into testunnest(id, age, name) values (unnest(array[1,3]), unnest(array[18,10]), unnest(array['valupdated', 'val3'])) on conflict (id) do update set age = excluded.age, name = excluded.name;

数据和结构复制

-- [复制表和数据] 复制表结构和数据 自动建表,不会复制主键什么的
create table new_table as select * from old_table;
-- [复制数据] 复制数据到 新表 表需要提前建,并且表字段要一致,不会复制主键什么的
insert into new_table (col_0, col_1) select col_0, col_1 from old_table;

分页查询

select * from table_name limit 10000 offset 20000;

删除重复记录

-- 初始化数据
create table hyc_tmp_del_test(id int, name varchar(255));
create table hyc_tmp_del_test_bk (like hyc_tmp_del_test);
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test_bk select * from hyc_tmp_del_test;

-- 最容易想到的方法就是判断数据是否重复,对于重复的数据只保留ctid最小(或最大)的数据,删除其他的
-- id相同的数据,保留ctid最小的,其他的删除
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid <> (select min(t.ctid) from hyc_tmp_del_test_bk t where a.id=t.id); -- 17.112s

-- group by方法通过分组找到ctid最小的数据,然后删除其他数据
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid not in (select min(ctid) from hyc_tmp_del_test_bk group by id); -- 0.052s

-- 高效删除方法
explain analyze delete from hyc_tmp_del_test_bk a where a.ctid = any(array (select ctid from (select row_number() over (partition by id), ctid from hyc_tmp_del_test_bk) t where t.row_number > 1)); -- 0.055s

第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本

关键

pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid

其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关

ctid表示行版本在表中的物理位置: 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。

索引

-- 获取数据库表中的索引
select * from pg_indexes where tablename = 't_cfg_opinfo'; 
-- 创建索引
create index index_name on table_name (col_0, col_1);
-- 查询索引
select * from pg_indexes where tablename='table_name';
-- 删除索引
drop index index_name;

什么情况下要避免使用索引?

虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引

使用索引时,需要考虑下列准则:

  • 索引不应该使用在较小的表上

  • 索引不应该使用在有频繁的大批量的更新或插入操作的表上

  • 索引不应该使用在含有大量的 NULL 值的列上

  • 索引不应该使用在频繁操作的列上

其他语法

筛选某列,逗号拼接

select string_agg(bs_org_id,',') as bs_org_ids 
  from bs_org 
  where par_org_id ='100'

日期转换

select to_char(col_name,'yyyyMMDD')-interval '2 day' from table_name
-- -interval '2 day' 表示往前2天

转时间戳

select '2011-01-06 09:57:59'::timestamp;
TO_TIMESTAMP('2011-01-06 09:57:59', 'YYYY-MM-DD HH24:MI:S')

postgresql 获取分组第一条数据 窗口函数

  1. 给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名

  2. 从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1

  3. 获取前N名的数据,将一中第二步的条件换成where 排名 < N+1

distributed key

alter table table_name set distributed by (id);
alter table table_name add primary key (id);

ORM框架

ORM框架比较

SQLObject

  • 优点:

    采用了易懂的ActiveRecord 模式

    一个相对较小的代码库

  • 缺点:

    方法和类的命名遵循了Java 的小驼峰风格

    不支持数据库session隔离工作单元

Storm

  • 优点:

    清爽轻量的API,短学习曲线和长期可维护性

    不需要特殊的类构造函数,也没有必要的基类

  • 缺点:

    迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生

    Storm的贡献者必须把他们的贡献的版权给Canonical公司

Django's ORM

  • 优点:

    易用,学习曲线短

    和Django紧密集合,用Django时使用约定俗成的方法去操作数据库

  • 缺点:

    不好处理复杂的查询,强制开发者回到原生SQL

    紧密和Django集成,使得在Django环境外很难使用

peewee

  • 优点:

    Django式的API,使其易用

    轻量实现,很容易和任意web框架集成

  • 缺点:

    不支持自动化 schema 迁移

    多对多查询写起来不直观

SQLAlchemy

  • 优点:

    企业级API,使得代码有健壮性和适应性

    灵活的设计,使得能轻松写复杂查询

  • 缺点:

    工作单元概念不常见

    重量级API,导致长学习曲线

相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。

SQLAlchemy

session和scoped_session

session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值

s1 = Session()
s2 = Session()
s1.add(person)
s1.commit()
# 必须先close,s2才能继续操作person
s1.close()
s2.add(person)

session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()

使用 create_engine创建我们需要的DB starting point

from sqlalchemy import create_engine

scheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8'
engine = create_engine(scheme, pool_size=10 , max_overflow=-1, pool_recycle=1200)

create_engine 函数常用参数:

  • pool_size=10 # 连接池的大小,0表示连接数无限制

  • pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away

  • max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接

  • pool_timeout=30 # 在连接池获取一个空闲连接等待的时间

  • echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout

创建Engine之后,接下来的问题,就是如何使用Engine

在单进程中,建议在在初始化的模块的时候创建Engine, 使Engine成为全局变量, 而不是为每个调用Engine的对象或者函数中创建, Engine不同于connect, connect函数会创建数据库连接的资源,Engine是管理connect创建的连接资源

在多进程中,为每个子进程都创建各自的Engine, 因为进程之间是不能共享Engine

几种操作方式

Engine方式

Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;

result = engine.execute('SELECT * FROM tablename;') 
conn = engine.connect(close_with_result=True)
result = conn.execute('SELECT * FROM tablename;')
for row in result:
    print(result['columnname']
result.close()

Connection方式

Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;

connection = engine.connect()
trans = connection.begin()
try:
    connection.execute("INSERT INTO films VALUES ('Comedy', '82 minutes');")
    connection.execute("INSERT INTO datalog VALUES ('added a comedy');")
    trans.commit()
except:
    trans.rollback()
    raise

Session方式

Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;

总结: 从应用角度来看,可以把这三类分为两种:

  1. 直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句

  2. 使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式

上一页logging模块下一页RegularExpressionOP

最后更新于1年前

这有帮助吗?

先从下载rpm包(),或者直接安装,如果离线安装就下载rpm包

img

column_name必须是或者其他具有唯一性的字段(如唯一键或排他键)

Python操作Oracle数据库:cx_Oracle
我终于学会了使用python操作postgresql
保姆级 CentOS 7离线安装PostgreSQL 14教程
易百_PostgreSQL教程
centos7-pg_14.2下载
微云下载centos7.6_PostgreSQL14.2
官方下载安装教程
使用pgdump备份数据库
主键
postgresql 常用的删除重复数据方法
一文了解 Python 的三种数据源架构模式
SQLAlchemy 和其他的 ORM 框架
SQLAlchemy 1.4 Documentation
sqlalchemy操作数据库
sqlalchemy外键和relationship查询
SQLALlchemy数据查询小集合
SQLAlchemy 的连接池机制
SQLAlchemy 中的 Session、sessionmaker、scoped_session
Contextual/Thread-local Sessions
SQLAlchemy(常用的SQLAlchemy列选项)
查询官网例子Object Relational Tutorial (1.x API)
sqlalchemy外键和relationship查询
Engine Configuration
Working with Engines and Connections
SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?
Page cover image