Python量化數據倉庫搭建系列2:Python操作數據庫
Python量化數據倉庫搭建系列2:Python操作數據庫
本系列教程為量化開發者,提供本地量化金融數據倉庫的搭建教程與全套源代碼。我們以恆有數(UDATA)金融數據社區為數據源,將金融基礎數據落到本地數據庫。教程提供全套源代碼,包括歷史數據下載與增量數據更新,數據更新任務部署與日常監控等操作。
在上一節講述中,我們選擇了MySQL作為本系列教程的數據庫,故本文着重講解Python操作MySQL的步驟,並封裝方法。在文末簡單介紹Python操作MongoDB、SQLite、PostgreSQL數據庫;
一、pymysql用法
1、安裝pymysql模塊
pip install pymysql
2、連接數據庫
from pymysql import *
# 打開數據庫連接,數據庫參數可以在MySQL界面或數據庫配置文件中查看
conn = pymysql.connect(host = '數據庫IP',
port = '端口',
user = '用户名',
password = '密碼',
database='數據庫名稱')
# 使用 cursor() 方法創建一個遊標對象 cursor
cursor = conn.cursor()
# 在數據庫操作執行完畢後,關閉數據庫連接
# conn.close()
3、常見SQL代碼執行
from pymysql import *
# 執行SQL代碼:建表、刪表、插入數據
def Execute_Code(sql_str):
# 打開數據庫連接
conn = pymysql.connect(host = '127.0.0.1',port = 3306,user = 'root',
password = '密碼',database='udata')
# 使用 cursor() 方法創建一個遊標對象 cursor
cursor = conn.cursor()
try:
# 使用execute()方法執行SQL
cursor.execute(sql)
# 提交到數據庫執行
conn.commit()
except:
# 發生錯誤時回滾
conn.rollback()
# 關閉數據庫連接
conn.close()
A、建表
sql_str = '''CREATE TABLE TB_Stock_List_Test (
secu_code CHAR(20),
hs_code CHAR(20),
secu_abbr CHAR(20),
chi_name CHAR(40),
secu_market CHAR(20),
listed_state CHAR(20),
listed_sector CHAR(20),
updatetime CHAR(20));'''
Execute_Code(sql_str)
B、插入數據
sql_str = '''
INSERT INTO TB_Stock_List_Test
(`secu_code`,`hs_code`,`secu_abbr`,`chi_name`,`secu_market`,`listed_state`
,`listed_sector`,`updatetime`)
VALUES
('000001','000001.SZ','平安銀行','平安銀行股份有限公司','深圳證券交易所','上市',
'主板','2021-10-25 20:10:55');
'''
Execute_Code(sql_str)
C、更新數據
sql_str = "UPDATE tb_stock_list SET updatetime = '2021-10-30 20:10:55' "
Execute_Code(sql_str)
D、刪除數據
sql_str = 'DELETE FROM tb_stock_list'
Execute_Code(sql_str)
E、刪除表格
sql_str = 'DROP TABLE IF EXISTS tb_stock_list'
Execute_Code(sql_str)
4、查詢操作
def Select_Code(sql_str):
# 打開數據庫連接
conn = pymysql.connect(host = '127.0.0.1',port = 3306,user = 'root',
password = '密碼',database='udata')
# 使用 cursor() 方法創建一個遊標對象 cursor
cursor = conn.cursor()
# 使用execute()方法執行SQL
cursor.execute(sql_str)
# 獲取所有記錄列表
results = cursor.fetchall()
# 關閉數據庫連接
conn.close()
return results
sql_str = 'select * from tb_stock_list'
results = Select_Code(sql_str)
results
5、方法封裝
將上述用法,封裝為自定義類,存為MySQLOperation.py文件,代碼如下:
from pymysql import *
# MySQL操作函數
class MySQLOperation:
def __init__(self, host, port, db, user, passwd, charset='utf8'):
# 參數初始化
self.host = host
self.port = port
self.db = db
self.user = user
self.passwd = passwd
self.charset = charset
def open(self):
# 打開數據庫連接
self.conn = connect(host=self.host,port=self.port
,user=self.user,passwd=self.passwd
,db=self.db,charset=self.charset)
# 使用 cursor() 方法創建一個遊標對象 cursor
self.cursor = self.conn.cursor()
def close(self):
# 斷開數據庫連接
self.cursor.close()
self.conn.close()
def Execute_Code(self, sql):
# 執行SQL代碼:建表、刪表、插入數據
try:
self.open() # 打開數據庫連接
self.cursor.execute(sql) # 使用execute()方法執行SQL
self.conn.commit() # 提交到數據庫執行
self.close() # 斷開數據庫連接
except Exception as e:
self.conn.rollback() # 發生錯誤時回滾
self.close() # 斷開數據庫連接
print(e)
def Select_Code(self, sql):
# 執行SQL代碼,查詢數據
try:
self.open() # 打開數據庫連接
self.cursor.execute(sql) # 使用execute()方法執行SQL
result = self.cursor.fetchall() # 獲取所有記錄列表
self.close() # 斷開數據庫連接
return result # 返回查詢數據
except Exception as e:
self.conn.rollback() # 發生錯誤時回滾
self.close() # 斷開數據庫連接
print(e)
插入與查詢用法如下,其餘用法類似,此處不再贅述;
import pandas as pd
host='127.0.0.1'
port=3306
user='root'
passwd="密碼"
db='udata'
# 方法實例化
MySQL = MySQLOperation(host, port, db, user, passwd)
# 插入操作代碼
sql_str = '''
INSERT INTO tb_stock_list
(`secu_code`,`hs_code`,`secu_abbr`,`chi_name`,`secu_market`,`listed_state`,`listed_sector`,`updatetime`)
VALUES
('000001','000001.SZ','平安銀行','平安銀行股份有限公司','深圳證券交易所','上市',
'主板','2021-10-25 20:15:55');
'''
MySQL.Execute_Code(sql_str)
# 查詢數據
sql_str = 'select * from tb_stock_list'
results = MySQL.Select_Code(sql_str)
results
二、sqlalchemy用法
由於上述pymysql用法已經可以滿足大部分使用需求,sqlalchemy實現功能與之類似。這裏着重介紹一下基於sqlalchemy鏈接數據庫的pandas.to_sql和pandas.read_sql操作。
1、安裝pymysql模塊
pip install sqlalchemy
2、連接數據庫
from sqlalchemy import create_engine
host='127.0.0.1'
port = 3306
user='root'
password='密碼'
database='udata'
engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user
,password
,host
,port
,database))
3、pandas.to_sql
將DataFrame中的數據,寫入MySQL數據庫,代碼示例如下:
import pandas as pd
# 定義需要寫入的數據,DataFrame格式
data = pd.DataFrame([['000001','000001.SZ','平安銀行','平安銀行股份有限公司'
,'深圳證券交易所','上市','主板','2021-10-25 20:12:55'],
['000002','000002.SZ','萬 科A','萬科企業股份有限公司'
,'深圳證券交易所','上市','主板','2021-10-25 20:12:55']])
# 列名賦值
data.columns = ['secu_code','hs_code', 'secu_abbr', 'chi_name'
, 'secu_market', 'listed_state','listed_sector','updatetime']
# 寫入數據庫
data.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')
if_exists
參數用於當目標表已經存在時的處理方式,默認是 fail
,即目標表存在就失敗。另外兩個選項是 replace
表示替代原表,即刪除再創建,append
選項僅添加數據。
4、pandas.read_sql
從數據庫中,將數據讀取為DataFrame,代碼示例如下:
# 將sql查詢結果,賦值為result
result = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)
result
三、Python操作其他常見數據庫
1、MongoDB
(1)安裝pymongo:pip install pymongo
(2)操作簡介
import pymongo
# 連接MongoDB
conn = pymongo.MongoClient(host='localhost',port=27017
,username='username', password='password')
# 指定數據庫
db = conn['udata'] # db = client.udata
# 指定集合
collection = db['tb_stock_list'] # collection = db.tb_stock_list
# 插入數據 insert_one()、insert_many()
data1 = {} # 集合,鍵值對,1條數據
data2 = {} # 集合,鍵值對,1條數據
result = collection.insert_many([data1, data2])
# result = collection.insert_one(data1)
# 查詢數據 find_one()、find()
result = collection.find_one({'secu_code': '000001'})
# 更新數據 update_one()、update()
result = collection.update_one({'secu_code': '000001'}, {'$set': {'hs_code': '000001'}})
# 刪除數據 remove()、delete_one()和delete_many()
result = collection.remove({'secu_code': '000001'})
2、SQLite
(1)安裝sqlite3:pip install sqlite3
(2)操作簡介
import sqlite3
# 連接數據庫
conn = sqlite3.connect('udata.db')
# 創建遊標
cursor = conn.cursor()
# 執行SQL
sql = "增減刪等SQL代碼"
cursor.execute(sql)
# 查詢數據
sql = "查詢sql代碼"
values = cursor.execute(sql)
# 提交事物
conn.commit()
# 關閉遊標
cursor.close()
# 關閉連接
conn.close()
3、PostgreSQL
(1)安裝psycopg2:pip install psycopg2
(2)操作簡介
import psycopg2
# 連接數據庫
conn = psycopg2.connect(database="udata", user="postgres"
, password="密碼", host="127.0.0.1", port="5432")
# 創建遊標
cursor = conn.cursor()
# 執行SQL
sql = "增減刪等SQL代碼"
cursor.execute(sql)
# 查詢全部數據
sql = "查詢sql代碼"
cursor.execute(sql)
rows = cursor.fetchall()
# 事物提交
conn.commit()
# 關閉數據庫連接
conn.close()
綜上,Python操作數據庫的簡要介紹就結束了;還有很多類型的數據庫,Python操作它們的過程大同小異,後續我也將會繼續梳理相關資料。
下一節《Python量化投資數據倉庫搭建3:數據落庫代碼封裝》
「其他文章」
- 淺析大數據框架 Hadoop
- 一款好用的Java插件 - Lombok
- 深入代碼理解MVC模式、MVP模式和MVVM模式
- 深入理解大數據數倉工具 Apache Hive 底層原理
- ES2022 有什麼新功能
- 前端緩存那些事
- 大數據分佈式計算系統 Spark 入門核心之 RDD
- Nginx基本介紹 跨域解決方案
- 「MySQL」數據庫備份和還原
- 雲原生PaaS平台LIGHT-CORE實踐之道:基於K8s,聚焦服務與價值
- 聊聊 ab 和 jmeter 的併發模型
- 【Pandas學習筆記01】強大的分析結構化數據的工具集
- 白話 Linux 容器資源的隔離限制原理
- Hadoop 入門筆記—核心組件 YARN
- Hadoop 入門筆記—核心組件 YARN
- Hadoop 入門筆記—核心組件 MapRuduce
- Hadoop 入門筆記—核心組件 HDFS
- Python量化數據倉庫搭建3:數據落庫代碼封裝
- Python量化數據倉庫搭建系列2:Python操作數據庫
- Python量化數據倉庫搭建系列2:Python操作數據庫