#coding=utf-8
__author__ = 'nate'
from sqlalchemy import text, create_engine, MetaData, Table, Column, String, Integer
from sqlalchemy.orm import sessionmaker
import hashlib
if __name__ == '__main__':
'''
创建engine
echo 设成 False 将不显示SQL
'''
DB_CONNECT_STR = 'mysql+mysqldb://root:123123@localhost/test'
engine = create_engine(DB_CONNECT_STR, echo=False)
'''
#新建数据表
meta = MetaData()
users = Table('users', meta,
Column('u_id', Integer, primary_key=True),
Column('u_name', String(40)),
Column('u_password', String(64)))
users.create(engine)
'''
'''
创建session
'''
DB_Session = sessionmaker(bind = engine, autoflush=True, autocommit=True, expire_on_commit=True)
#DB_Session = sessionmaker(bind = engine)
session = DB_Session()
#生成密码
password = hashlib.sha256('this is password').hexdigest()
#新增数据
sql = text('insert into users (u_name, u_password) values (:name, :password)')
data = session.execute(sql, {'name': 'nate1', 'password': password})
row_num = data.rowcount
last_id = data.lastrowid
#删除数据
sql = text('delete from users where u_id = :id')
data = session.execute(sql, {'id': last_id})
print data.rowcount
print data.lastrowid
#获取所有数据
sql = text('select * from users')
data = session.execute(sql)
print data.rowcount
for row in data.fetchall():
print row['u_id'], ' => ', row
#获取一条数据
sql = text('select * from users limit 1')
data = session.execute(sql).fetchone()
print data['u_id'], ' # ', data['u_name'], ' # ', data['u_password']
name = data['u_name']
#获取不存在的数据
sql = text('select * from users where u_id = :id')
data = session.execute(sql, {'id': 10000})
print data.rowcount
print data.fetchone()
#更新数据
sql = text('update users set u_name = :new_name where u_name = :name')
data = session.execute(sql, {'new_name': 'nate_yhz', 'name': name})
print data.rowcount
用SQLAlchemy非ORM方式执行SQL语句
Published: at 00:00