Elasticsearch数据库
环境配置
安装环境
pip install elasticsearch==7.6.0
EsDao包装类
复制 # -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""
from elasticsearch . helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd
class EsDao ( object ):
"""
ElasticSearch的数据操作类
"""
# 查询批次大小
DEFAULT_BATCH_SIZE = 1000
# 写入批次大小
BULK_BATCH_SIZE = 10000
def __init__ ( self , hosts , timeout = 3600 * 24 ):
self . hosts = hosts
self . timeout = timeout
self . es = Elasticsearch (hosts, timeout = self.timeout)
def save_data_list ( self , index_name , data_list ):
"""
保存数据列表到es的指定索引中
:param index_name: 索引名称
:param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
:return:
"""
bulk_data_lst = [
data_list [ i : i + self . BULK_BATCH_SIZE ]
for i in range ( 0 , len (data_list), self.BULK_BATCH_SIZE)
]
if len (data_list) > 0 and '_id' in data_list [ 0 ]:
for bulk_data in bulk_data_lst :
actions = [ {
"_index" : index_name ,
"_type" : index_name ,
"_id" : data . pop ( "_id" ),
"_source" : data
}
for data in bulk_data
]
bulk (self.es, actions, index = index_name, raise_on_error = True )
else :
for bulk_data in bulk_data_lst :
actions = [ {
"_index" : index_name ,
"_type" : index_name ,
"_source" : data
}
for data in bulk_data
]
bulk (self.es, actions, index = index_name, raise_on_error = True )
def is_index_exists ( self , index_name ):
"""
判断指定索引是否存在
:param index_name: 索引名称
:return:
"""
return self . es . indices . exists (index = index_name)
def delete_by_query ( self , index_name , query_body ):
"""
按查询结果删除数据
:param index_name:
:param query_body:
:return:
"""
return self . es . delete_by_query (index_name, query_body)
def clear_index_data ( self , index_name ):
"""
清空指定索引的数据
:param index_name:
:return:
"""
return self . delete_by_query (
index_name = index_name,
query_body = {
"query" : {
"match_all" : {}
}
}
)
def save_df_data ( self , index_name , df ):
"""
保存pandas的DataFrame到es的指定索引中
:param index_name: 索引名称
:param df: 要保存的dataframe
:return:
"""
col_lst = df . columns . tolist ()
dic_lst = [ dict ([(c, v) for c, v in zip (col_lst, r)]) for r in df . values . tolist () ]
self . save_data_list (index_name = index_name, data_list = dic_lst)
def create_index ( self , index_name , mapping_properties ):
"""
创建索引
:param index_name: 索引名称
:param mapping_properties: 索引mapping中的属性列表
:return:
"""
if not self . es . indices . exists (index = index_name):
mapping = {
"mappings" : {
index_name : {
"properties" : mapping_properties
}
}
}
res = self . es . indices . create (index = index_name, body = mapping)
if res is not None and 'acknowledged' in res :
return res . get ( 'acknowledged' )
return False
def _search_with_scroll ( self , index_name , query_body ):
if "size" not in query_body :
query_body [ "size" ] = self . DEFAULT_BATCH_SIZE
response = self . es . search (
index = index_name,
body = query_body,
search_type = "dfs_query_then_fetch" ,
scroll = "120m" ,
timeout = "60m"
)
scroll_id = response [ "_scroll_id" ]
while True :
sources = [doc [ "_source" ] for doc in response [ "hits" ] [ "hits" ]]
if len (sources) == 0 :
break
yield sources
response = self . es . scroll (scroll_id = scroll_id, scroll = "60m" )
def query_for_df ( self , index_name , query_body ):
"""
执行查询并获取pandas.DataFrame格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:return:
"""
sources = []
for sub_source in self . _search_with_scroll (index_name = index_name, query_body = query_body):
sources . extend (sub_source)
return pd . DataFrame (sources)
def query_for_df_with_batch ( self , index_name , query_body , batch_size = DEFAULT_BATCH_SIZE):
"""
按批次大小查询并返回pandas.DataFrame的generator格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:param batch_size: 批次大小
:return:
"""
if "size" not in query_body :
query_body [ "size" ] = batch_size
for sub_source in self . _search_with_scroll (index_name = index_name, query_body = query_body):
yield pd . DataFrame (sub_source)
def get_first_row_with_df ( self , index_name ):
"""
获取指定索引的首行数据,格式为pandas.DataFrame
可用于获取索引的元信息
:param index_name: 索引名称
:return:
"""
query_body = {
"size" : 1 ,
"query" : {
"match_all" : {}
}
}
for sub_source in self . _search_with_scroll (index_name = index_name, query_body = query_body):
return pd . DataFrame (sub_source)
使用案例
复制 class TaskMeta :
'''
数据元类
'''
def __init__ ( self , text , doc_id , sentence_id , reg_lst , flag , has_reg , text_source = "primitive" ):
self . text = text
self . doc_id = doc_id
self . sentence_id = sentence_id
self . reg_lst = reg_lst
self . flag = flag
self . has_reg = has_reg
self . text_source = text_source
def __repr__ ( self ):
return f ' { self . text } { self . doc_id } { self . sentence_id } { self . reg_lst } { self . flag } { self . has_reg } { self . text_source } '
def to_dict ( self ):
return { "text" : self . text ,
"doc_id" : self . doc_id ,
"sentence_id" : self . sentence_id ,
"reg_lst" : self . reg_lst ,
"flag" : self . flag ,
"has_reg" : self . has_reg ,
"text_source" : self . text_source }
复制 def create_index ( target_es_dao , index_name , mapping ):
'''
创建es索引
:return: 是否创建成功
'''
if not target_es_dao . is_index_exists (index_name):
target_es_dao . create_index (index_name, mapping)
else :
target_es_dao . clear_index_data (index_name)
print ( f "索引 { index_name } 已存在, 已清除数据" )
def writer_fun ( target_es_dao , target_index , sample_lst ):
'''
写数据到es库
'''
df_sample_lst = []
[df_sample_lst . append (sample. to_dict ()) for sample in sample_lst]
df_sample_lst = pd . DataFrame (df_sample_lst)
target_es_dao . save_df_data (target_index, df_sample_lst)
print ( f '写入数据 { len (sample_lst) } 条' )
def es_cal_test ():
# 获取连接
source_es_dao = EsDao ( f "http:// { aug_config.SOURCE_IP } : { aug_config.SOURCE_PORT } /" )
query_condition = {
"query_string" : {
"default_field" : "has_reg" ,
"query" : "true"
}
}
query_body = {
"query" : query_condition
}
# 查询数据
datas = source_es_dao . query_for_df (index_name = aug_config.SOURCE_INDEX, query_body = query_body)
records = datas . to_dict (orient = 'record' )
sample_lst = []
for record in records :
sample_lst . append (
TaskMeta (
text = record[ "text" ],
doc_id = record[ "doc_id" ],
sentence_id = record[ "sentence_id" ],
reg_lst = record[ "reg_lst" ],
flag = record[ "flag" ],
has_reg = record[ "has_reg" ]
)
)
# 创建索引
create_index (target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING)
# 写入数据
writer_fun (target_es_dao, aug_config.TARGET_INDEX, sample_lst = sample_lst)
if __name__ == '__main__' :
es_cal_test ()
Oracle数据库
Python操作Oracle数据库:cx_Oracle
环境配置
第一步:安装库
pip install cx-Oracle
第二步:文件拷贝
需要将oci.dll、oraocci11.dll、oraociei11.dll
复制到sitepackages路径下
sql基础
建表
复制 --blob字段插入实例
create table blob_table_tmp (
id number primary key ,
blob_cl blob not null ,
clob_cl clob not null
);
insert into blob_table_tmp values ( 1 ,rawtohex( '11111000011111' ), '增加一条记录时,碰到插入blob类型数据出错' );
insert into blob_table_tmp values ( 3 ,rawtohex( '4561888' ), '增加一条记录时,碰到插入blob类型数据出错' );
insert into blob_table_tmp values ( 4 ,rawtohex( '增加一条记录时333' ), '增加一条记录时,碰到插入blob类型数据出错' );
查询
获取连接
复制 FINANCE_DB_HOST = "192.168.x.x"
FINANCE_DB_PORT = 1521
FINANCE_DB_USER = "hyc"
FINANCE_DB_PASSWORD = "123456"
FINANCE_DB_DB = "ORCL"
class OracleConn ():
config_path = ''
@ staticmethod
def get_conn ( conn_name , encoding = "UTF-8" ):
conn_str = str ( eval ( " %s _DB_USER" % (OracleConn.config_path, conn_name))) + "/" + str ( eval ( " %s . %s _DB_PASSWORD" % (OracleConn.config_path, conn_name)))
conn_str += "@" + str ( eval ( " %s _DB_HOST" % (OracleConn.config_path, conn_name)))
conn_str += ":" + str ( eval ( " %s _DB_PORT" % (OracleConn.config_path, conn_name)))
conn_str += "/" + str ( eval ( " %s _DB_DB" % (OracleConn.config_path, conn_name)))
return ora . connect (conn_str, encoding = encoding, nencoding = encoding)
读写数据库
复制 def oracle_test ():
# 获取数据库连接
conn = OracleConn . get_conn ( "FINANCE" )
cur = conn . cursor ()
# 查询数据
sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp"
datas = []
r = cur . execute (sql)
# 假设name是clob字段类型
[datas . append ((gg[ 0 ], gg[ 1 ]. read (). decode ( 'utf-8' ), gg[ 2 ]. read ())) for gg in r]
# 写入数据
insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
res = []
[res . append ((data[ 0 ], data[ 1 ])) for data in datas]
cur . executemany (insert_sql, res)
cur . execute ( 'commit' )
cur . close ()
conn . close ()
print ( "写入结束" )
if __name__ == '__main__' :
oracle_test ()
相关操作
关于数据库的连接,查询和写入
复制 import cx_Oracle
class Setting :
DB_USER = 'narutohyc'
DB_PASSWORD = 'hyc'
DB_IP = '192.168.0.1'
DB_PORT = ''
DB_SERVICE = 'dataBaseName'
setting = Setting ()
def oracle_test ():
# 获取数据库连接
conn = cx_Oracle . connect ( ' %s / %s @ %s / %s ' % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding = 'utf-8' )
cur = conn . cursor ()
# 查询数据
sql = "select ID, name from hyc_database"
datas = []
r = cur . execute (sql)
# 假设name是clob字段类型
[datas . append ((gg[ 0 ], gg[ 1 ]. read ())) for gg in r]
# 写入数据
insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
res = []
[res . append ((data[ 0 ], data[ 1 ])) for data in datas]
cur . executemany (insert_sql, res)
cur . execute ( 'commit' )
cur . close ()
conn . close ()
print ( "写入结束" )
if __name__ == '__main__' :
oracle_test ()
Postgresql数据库
我终于学会了使用python操作postgresql
保姆级 CentOS 7离线安装PostgreSQL 14教程
易百_PostgreSQL教程
离线安装数据库
先从centos7-pg_14.2下载 下载rpm包(微云下载centos7.6_PostgreSQL14.2 ),或者直接官方下载安装教程 安装,如果离线安装就下载rpm包
复制 # 离线安装执行以下命令安装
rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm
出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法
复制 yum install python3-devel
创建数据库data和log文件夹
复制 # 创建数据库data和log文件夹
mkdir -p /home/postgres/pgsql_data
mkdir -p /home/postgres/pgsql_log
# 创建日志文件
touch /home/postgres/pgsql_log/pgsql.log
授权给安装数据时自动创建的postgres用户
复制 chown -R postgres:postgres /home/postgres/pgsql_data
chown -R postgres:postgres /home/postgres/pgsql_log
切换到安装数据时自动创建的postgres用户
初始化数据库到新建数据目录
复制 /usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data
启动服务器(初始化数据库日志文件)
复制 /usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start
# 查看状态
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status
切换到管理员开启端口并重启防火墙
复制 su root
firewall-cmd --zone=public --add-port=5432/tcp --permanent
firewall-cmd --reload
修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf
复制 # 修改监听地址
listen_addresses = '*'
# 修改最大连接数(按需)
max_connections = 1000
# 修改密码认证
password_encryption = md5
修改可访问的用户IP段
复制 vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存)
IPV4下修改为或新增
host all all 0.0.0.0/0 trust
postgres用户重启数据库服务
复制 su - postgres
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart
数据库安装结束,管理员postgres,默认密码123456
使用navicat连接pg库后新建数据库
环境配置
pip install psycopg2
sql语法
数据库连接
复制 -- 获取数据库实例连接数
select count ( * ) from pg_stat_activity;
-- 获取数据库最大连接数
show max_connections;
-- 查询当前连接数详细信息
select * from pg_stat_activity;
-- 查询数据库中各个用户名对应的数据库连接数
select usename, count ( * ) from pg_stat_activity group by usename;
数据库信息
复制 -- 查询数据库大小
select pg_size_pretty (pg_database_size( 'pg_fac_stk' ));
-- 查询各表磁盘占用
SELECT
table_schema || '.' || table_name AS table_full_name,
pg_size_pretty(pg_total_relation_size( '"' || table_schema || '"."' || table_name || '"' )) AS size
FROM information_schema.tables where table_name like 'finance_%'
ORDER BY
pg_total_relation_size( '"' || table_schema || '"."' || table_name || '"' ) DESC ;
-- 获取各个表中的数据记录数
select relname as TABLE_NAME, reltuples as rowCounts from pg_class where relkind = 'r' order by rowCounts desc ;
-- 查看数据库表对应的数据文件
select pg_relation_filepath( 'product' );
-- 查看数据库实例的版本
select version ();
-- 分析评估SQL执行情况
EXPLAIN ANALYZE SELECT * FROM t_cfg_opinfo;
-- 获取数据库当前的回滚事务数以及死锁数
select datname,xact_rollback,deadlocks from pg_stat_database;
数据备份与恢复
使用pgdump备份数据库
pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:
备份数据可以保持原有的结构和特性,还原时可以保证数据准确性
备份文件可以进行压缩,减小文件大小,方便传输和存储
可以新建数据库,建几张表做测试
复制 # 学生表
CREATE TABLE students (
id SERIAL PRIMARY KEY ,
name VARCHAR ( 100 ) NOT NULL ,
gender VARCHAR ( 10 ) NOT NULL ,
age INTEGER NOT NULL ,
class VARCHAR ( 20 ) NOT NULL
);
# 学科表
CREATE TABLE subjects (
id SERIAL PRIMARY KEY ,
name VARCHAR ( 100 ) NOT NULL
);
# 成绩表
CREATE TABLE scores (
id SERIAL PRIMARY KEY ,
student_id INTEGER NOT NULL ,
subject_id INTEGER NOT NULL ,
score INTEGER NOT NULL ,
FOREIGN KEY (student_id) REFERENCES students (id),
FOREIGN KEY (subject_id) REFERENCES subjects (id)
);
# 插入一些测试数据
INSERT INTO students ( name , gender, age, class)
VALUES
( 'Alice' , 'Female' , 18 , 'Class A' ),
( 'Bob' , 'Male' , 17 , 'Class B' ),
( 'Charlie' , 'Male' , 19 , 'Class A' ),
( 'Diana' , 'Female' , 18 , 'Class B' );
# 插入学科表数据
INSERT INTO subjects ( name )
VALUES
( 'Mathematics' ),
( 'English' ),
( 'Science' );
-- Alice 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
( 1 , 1 , 90 ),
( 1 , 2 , 85 ),
( 1 , 3 , 92 );
-- Bob 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
( 2 , 1 , 78 ),
( 2 , 2 , 80 ),
( 2 , 3 , 75 );
-- Charlie 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
( 3 , 1 , 88 ),
( 3 , 2 , 92 ),
( 3 , 3 , 90 );
-- Diana 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
( 4 , 1 , 95 ),
( 4 , 2 , 88 ),
( 4 , 3 , 92 );
备份
使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可
备份整个数据库
复制 pg_dump - h < 数据库地址 > - p < 数据库端口 > - U < 数据库用户名 > - F c - b - v - f < 备份文件路径 > < 数据库名称 >
# 示例
/usr/pgsql -14 /bin/pg_dump - h 127.0.0.1 - U postgres - p 5432 - F t - b - v - f build_hyc_test.sql.tar hyc_test
备份指定表或数据
复制 pg_dump - h < 数据库地址 > - p < 数据库端口 > - U < 数据库用户名 > - F c - b - v - t < 表名1 > - t < 表名2 > - f < 备份文件路径 > < 数据库名称 >
# 示例
-- 备份指定表到sql文件
-- '-c --if-exists' 会生成 'drop table if exist' 命令
-- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息
pg_dump -- verbose -- host = 192.168 .xx.xx -- port = 5432 -- username = postgres -- file /home/huangyc/pg_bak_test/bak_hyc.sql -- encoding = UTF -8 - t "public.tushare_wz_index" - t "public.tushare_us_basic" - t "public.dim_fund" - t "public.dim_index" - c --if- exists -- no - owner pg_fac_stk
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
-p:数据库服务的监听端口,一般为默认端口5432
-F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t
复制 -- 备份postgres库并tar打包
pg_dump - h 127 . 0 . 0 . 1 - p 5432 - U postgres - f postgres.sql.tar - Ft;
-- 备份postgres库,转储数据为带列名的INSERT命令
pg_dumpall - d postgres - U postgres - f postgres.sql --column-inserts;
还原
使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可
恢复整个库
复制 pg_restore - h < 数据库地址 > - p < 数据库端口 > - U < 数据库用户名 > - d < 数据库名称 > < 备份文件路径 >
# 示例
/usr/pgsql -14 /bin/pg_restore - h 127.0.0.1 - U postgres - p 5432 - d hyc_test_bak build_hyc_test.sql.tar
恢复指定数据
复制 pg_restore - h < 数据库地址 > - p < 数据库端口 > - U < 数据库用户名 > - t < 表名1 > - t < 表名2 > - d < 数据库名称 > < 备份文件路径 >
# 示例
-- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复
-- 还原指定sql文件到bak_test库(需要自己建库)
psql -- host = 192.168 .xx.xx -- port = 5432 -- username = postgres - d bak_test -- file /home/huangyc/pg_bak_test/bak_hyc.sql.tar
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
-p:数据库服务的监听端口,一般为默认端口5432
命令详解
复制 [postgres@pg01 ~ ]$ pg_dump --help
用法:
pg_dump [选项]... [数据库名字]
** 一般选项 ** :
-f, --file=FILENAME 输出文件或目录名
-F, --format=c | d | t | p 输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值))
-j, --jobs=NUM 执行多个并行任务进行备份转储工作
-v, --verbose 详细模式
-V, --version 输出版本信息,然后退出
-Z, --compress=0-9 被压缩格式的压缩级别,0表示不压缩
--lock-wait-timeout = TIMEOUT 在等待表锁超时后操作失败
--no-sync 不用等待变化安全写入磁盘
-?, --help 显示此帮助, 然后退出
** 控制输出内容选项( 常用 ) ** :
-a, --data-only 只转储数据,不包括模式,只对纯文本输出有意义
-s, --schema-only 只转储模式, 不包括数据
-c, --clean 在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义
-C, --create 指定输出文件中是否生成create database语句,只对纯文本输出有意义
-n, --schema=PATTERN 指定要导出的schema,不指定则导出所有的非系统schema
-N, --exclude-schema=PATTERN 排除导出哪些schema
-O, --no-owner 在明文格式中, 忽略恢复对象所属者
-t, --table=PATTERN 指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了
-T, --exclude-table=PATTERN 排除表
-x, --no-privileges 不要转储权限 (grant/revoke)
--disable-triggers 在只恢复数据的过程中禁用触发器
--exclude-table-data = PATTERN do NOT dump data for the specified table ( s )
--if-exists 当删除对象时使用IF EXISTS
--inserts 以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢
该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据
--column-inserts 以带有列名的INSERT命令形式转储数据,例如insert into table_name ( column,... ) values ( value1,... )
--load-via-partition-root 通过根表加载分区
--no-comments 不转储注释
--no-tablespaces 不转储表空间分配信息
--no-unlogged-table-data 不转储没有日志的表数据
--on-conflict-do-nothing 将ON CONFLICT DO NOTHING添加到INSERT命令
** 控制输出内容选项( 不常用 ) ** :
-S, --superuser=NAME 指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。
-b, --blobs 在转储中包括大对象
-B, --no-blobs 排除转储中的大型对象
-E, --encoding=ENCODING 转储以ENCODING形式编码的数据
--binary-upgrade 只能由升级工具使用
--enable-row-security 启用行安全性(只转储用户能够访问的内容)
--extra-float-digits = NUM 覆盖extra_float_digits的默认设置
--disable-dollar-quoting 取消美元 (符号) 引号, 使用 SQL 标准引号
--no-publications 不转储发布
--no-security-labels 不转储安全标签的分配
--no-subscriptions 不转储订阅
--no-synchronized-snapshots 在并行工作集中不使用同步快照
--quote-all-identifiers 所有标识符加引号,即使不是关键字
--rows-per-insert = NROWS 每个插入的行数;意味着--inserts
--section = SECTION 备份命名的节 (数据前, 数据, 及 数据后 )
--serializable-deferrable 等到备份可以无异常运行
--snapshot = SNAPSHOT 为转储使用给定的快照
--strict-names 要求每个表和 ( 或 ) schema包括模式以匹配至少一个实体
--use-set-session-authorization
使用 SESSION AUTHORIZATION 命令代替
ALTER OWNER 命令来设置所有权
** 联接选项 ** :
-d, --dbname=DBNAME 对数据库 DBNAME备份
-h, --host=主机名 数据库服务器的主机名或套接字目录
-p, --port=端口号 数据库服务器的端口号
-U, --username=名字 以指定的数据库用户联接
-w, --no-password 永远不提示输入口令
-W, --password 强制口令提示 (自动)
--role = ROLENAME 在转储前运行SET ROLE
对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下
复制 [postgres@pg01 pg_backup]$ pg_restore --help
pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库.
用法:
pg_restore [选项]... [文件名]
一般选项:
-d, --dbname=名字 连接数据库名字
-f, --file=文件名 输出文件名 ( - 对于stdout )
-F, --format=c | d | t 备份文件格式 ( 应该自动进行 )
-l, --list 打印归档文件的 TOC 概述
-v, --verbose 详细模式
-V, --version 输出版本信息, 然后退出
-?, --help 显示此帮助, 然后退出
恢复控制选项:
-a, --data-only 只恢复数据, 不包括模式
-c, --clean 在重新创建之前,先清除(删除)数据库对象
-C, --create 创建目标数据库
-e, --exit-on-error 发生错误退出, 默认为继续
-I, --index=NAME 恢复指定名称的索引
-j, --jobs=NUM 执行多个并行任务进行恢复工作
-L, --use-list=FILENAME 从这个文件中使用指定的内容表排序
输出
-n, --schema=NAME 在这个模式中只恢复对象
-N, --exclude-schema=NAME 不恢复此模式中的对象
-O, --no-owner 不恢复对象所属者
-P, --function=NAME(args ) 恢复指定名字的函数
-s, --schema-only 只恢复模式, 不包括数据
-S, --superuser=NAME 使用指定的超级用户来禁用触发器
-t, --table=NAME 恢复命名关系(表、视图等)
-T, --trigger=NAME 恢复指定名字的触发器
-x, --no-privileges 跳过处理权限的恢复 (grant/revoke)
-1, --single-transaction 作为单个事务恢复
--disable-triggers 在只恢复数据的过程中禁用触发器
--enable-row-security 启用行安全性
--if-exists 当删除对象时使用IF EXISTS
--no-comments 不恢复注释
--no-data-for-failed-tables 对那些无法创建的表不进行
数据恢复
--no-publications 不恢复发行
--no-security-labels 不恢复安全标签信息
--no-subscriptions 不恢复订阅
--no-tablespaces 不恢复表空间的分配信息
--section = SECTION 恢复命名节 (数据前、数据及数据后)
--strict-names 要求每个表和 ( 或 ) schema包括模式以匹配至少一个实体
--use-set-session-authorization
使用 SESSION AUTHORIZATION 命令代替
ALTER OWNER 命令来设置所有权
联接选项:
-h, --host=主机名 数据库服务器的主机名或套接字目录
-p, --port=端口号 数据库服务器的端口号
-U, --username=名字 以指定的数据库用户联接
-w, --no-password 永远不提示输入口令
-W, --password 强制口令提示 (自动)
--role = ROLENAME 在恢复前执行SET ROLE操作
选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定
多次用于选择多个对象.
如果没有提供输入文件名, 则使用标准输入.
表空间
新建表空间
复制 # 新建表空间目录 t_fac_ts
mkdir /home/huangyc/t_fac_ts
# 修改表空间的用户权限
chown postgres /home/huangyc/t_fac_ts
pg库新建表空间
复制 create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts' ;
表空间有关的一些语法
复制 # 删除表空间 (需要先drop表空间所有的表, 或者将该空间下所有的表移除才能drop表空间)
DROP TABLE SPACE t_fac_ts;
# 修改具体的表到指定表空间下
ALTER TABLE t_fac_tushare_stock_basic SET TABLESPACE t_fac_ts;
# 修改指定库到指定表空间下
ALTER DATABASE name SET TABLESPACE new_tablespace;
锁表处理
pg锁表解锁
查看被锁的表
复制 select a.locktype,a.database,a.pid,a.mode,a.relation,b.relname
from pg_locks a
join pg_class b on a.relation = b.oid where relname = 't_opt_strhdk_blsj' ;
杀死被锁的pid
复制 select pg_terminate_backend(pid);
表结构修改
复制 -- 修改表名
alter table "user" rename to "ts_user" ;
-- 添加新字段
alter table table_name add column col_name varchar ( 50 );
-- 丢弃某列
alter table table_name drop column col_name;
-- 添加主键
alter table table_name add primary key ( "col_name" );
-- 修改字段名
alter table table_name rename column old_col_name to new_col_name;
数据更新和查询
设置某字段的值
复制 -- 设置某字段的值
update table_name set col_name = new_value;
删除表中重复数据
复制 -- 查询[旧表]数据的重复情况
select col1,col2, count ( * ) from old_table group by col1,col2;
-- 所有字段都一样的情况
create table bak_table as select distinct * from table_name;
-- 查询[新表]数据的重复情况
select col1,col2, count ( * ) from bak_table group by col1,col2;
truncate table old_table;
insert into old_table (col1,col2) select col1,col2 from bak_table;
不存在插入,存在更新
复制 insert into ... on conflict(column_name) do update set ...
conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件
column_name必须是主键 或者其他具有唯一性的字段(如唯一键或排他键)
复制 insert into user(id,username, address ,create_date,create_by)
values ( '1' , '刘德华' , '香港' , now (), 'system' )
on conflict(id)
do update set address= '中国' ,update_date =now (),update_by = 'system' ;
复制 # 批量的方式
insert into testunnest(id, age, name ) values (unnest( array [1,3]), unnest( array [18,10]), unnest( array ['valupdated', 'val3'])) on conflict (id) do update set age = excluded.age, name = excluded.name;
数据和结构复制
复制 -- [复制表和数据] 复制表结构和数据 自动建表,不会复制主键什么的
create table new_table as select * from old_table;
-- [复制数据] 复制数据到 新表 表需要提前建,并且表字段要一致,不会复制主键什么的
insert into new_table (col_0, col_1) select col_0, col_1 from old_table;
分页查询
复制 select * from table_name limit 10000 offset 20000 ;
删除重复记录
postgresql 常用的删除重复数据方法
复制 -- 初始化数据
create table hyc_tmp_del_test (id int , name varchar ( 255 ));
create table hyc_tmp_del_test_bk ( like hyc_tmp_del_test);
insert into hyc_tmp_del_test select generate_series ( 1 , 10000 ), 'huangyc' ;
insert into hyc_tmp_del_test select generate_series ( 1 , 10000 ), 'huangyc' ;
insert into hyc_tmp_del_test_bk select * from hyc_tmp_del_test;
-- 最容易想到的方法就是判断数据是否重复,对于重复的数据只保留ctid最小(或最大)的数据,删除其他的
-- id相同的数据,保留ctid最小的,其他的删除
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid <> ( select min (t.ctid) from hyc_tmp_del_test_bk t where a.id = t.id); -- 17.112s
-- group by方法通过分组找到ctid最小的数据,然后删除其他数据
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid not in ( select min (ctid) from hyc_tmp_del_test_bk group by id); -- 0.052s
-- 高效删除方法
explain analyze delete from hyc_tmp_del_test_bk a where a.ctid = any( array ( select ctid from ( select row_number () over ( partition by id), ctid from hyc_tmp_del_test_bk) t where t.row_number > 1 )); -- 0.055s
第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本
关键
pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid
其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关
ctid表示行版本在表中的物理位置 : 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。
索引
复制 -- 获取数据库表中的索引
select * from pg_indexes where tablename = 't_cfg_opinfo' ;
-- 创建索引
create index index_name on table_name (col_0, col_1);
-- 查询索引
select * from pg_indexes where tablename = 'table_name' ;
-- 删除索引
drop index index_name;
什么情况下要避免使用索引?
虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引
使用索引时,需要考虑下列准则 :
索引不应该使用在有频繁的大批量的更新或插入操作的表上
其他语法
筛选某列,逗号拼接
复制 select string_agg (bs_org_id, ',' ) as bs_org_ids
from bs_org
where par_org_id = '100'
日期转换
复制 select to_char(col_name, 'yyyyMMDD' ) - interval '2 day' from table_name
-- -interval '2 day' 表示往前2天
转时间戳
复制 select '2011-01-06 09:57:59' :: timestamp ;
TO_TIMESTAMP( '2011-01-06 09:57:59' , 'YYYY-MM-DD HH24:MI:S' )
postgresql 获取分组第一条数据 窗口函数
给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名
从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1
获取前N名的数据,将一中第二步的条件换成where 排名 < N+1
distributed key
复制 alter table table_name set distributed by (id);
alter table table_name add primary key (id);
ORM框架
ORM框架比较
一文了解 Python 的三种数据源架构模式
SQLAlchemy 和其他的 ORM 框架
SQLObject
优点:
采用了易懂的ActiveRecord 模式
一个相对较小的代码库
缺点:
方法和类的命名遵循了Java 的小驼峰风格
不支持数据库session隔离工作单元
Storm
优点:
清爽轻量的API,短学习曲线和长期可维护性
不需要特殊的类构造函数,也没有必要的基类
缺点:
迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生
Storm的贡献者必须把他们的贡献的版权给Canonical公司
Django's ORM
优点:
易用,学习曲线短
和Django紧密集合,用Django时使用约定俗成的方法去操作数据库
缺点:
不好处理复杂的查询,强制开发者回到原生SQL
紧密和Django集成,使得在Django环境外很难使用
peewee
优点:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成
缺点:
不支持自动化 schema 迁移
多对多查询写起来不直观
SQLAlchemy
优点:
企业级API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
缺点:
工作单元概念不常见
重量级API,导致长学习曲线
相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。
SQLAlchemy
SQLAlchemy 1.4 Documentation
sqlalchemy操作数据库
sqlalchemy外键和relationship查询
SQLALlchemy数据查询小集合
SQLAlchemy 的连接池机制
SQLAlchemy 中的 Session、sessionmaker、scoped_session
Contextual/Thread-local Sessions
SQLAlchemy(常用的SQLAlchemy列选项)
查询官网例子Object Relational Tutorial (1.x API)
sqlalchemy外键和relationship查询
session和scoped_session
session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值
复制 s1 = Session ()
s2 = Session ()
s1 . add (person)
s1 . commit ()
# 必须先close,s2才能继续操作person
s1 . close ()
s2 . add (person)
session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()
Engine Configuration
使用 create_engine
创建我们需要的DB starting point
复制 from sqlalchemy import create_engine
scheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8'
engine = create_engine (scheme, pool_size = 10 , max_overflow =- 1 , pool_recycle = 1200 )
create_engine
函数常用参数:
pool_size=10 # 连接池的大小,0表示连接数无限制
pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away
max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接
pool_timeout=30 # 在连接池获取一个空闲连接等待的时间
echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout
创建Engine
之后,接下来的问题,就是如何使用Engine
在单进程中,建议在在初始化的模块的时候创建Engine
, 使Engine
成为全局变量, 而不是为每个调用Engine
的对象或者函数中创建, Engine
不同于connect
, connect
函数会创建数据库连接的资源,Engine
是管理connect
创建的连接资源
在多进程中,为每个子进程都创建各自的Engine
, 因为进程之间是不能共享Engine
几种操作方式
Working with Engines and Connections
SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?
Engine方式
Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;
复制 result = engine . execute ( 'SELECT * FROM tablename;' )
conn = engine . connect (close_with_result = True )
result = conn . execute ( 'SELECT * FROM tablename;' )
for row in result :
print (result[ 'columnname' ]
result. close ()
Connection方式
Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;
复制 connection = engine . connect ()
trans = connection . begin ()
try :
connection . execute ( "INSERT INTO films VALUES ('Comedy', '82 minutes');" )
connection . execute ( "INSERT INTO datalog VALUES ('added a comedy');" )
trans . commit ()
except :
trans . rollback ()
raise
Session方式
Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;
总结: 从应用角度来看,可以把这三类分为两种:
直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句
使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式