欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

python多线程详解_Python多线程详解-Python-火龙果软件

发布时间:2023/12/29 42 豆豆
生活随笔 收集整理的这篇文章主要介绍了 python多线程详解_Python多线程详解-Python-火龙果软件 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

编辑推荐:

本文来自于博客,本文详细介绍了如何使用Python操作MySQL、使用Python操作Redis及项目实战。

操作MySQL

1)Windows中安装python和pycharm

2)ubuntu中安装python和pycharm

安装步骤不做赘述,pycharm运行脚本

#!/usr/bin/env python

import MySQLdb

#get connection

try:

con = MySQLdb.connect(

host='localhost',

user='root',

passwd='12346',

port=3308,

db='sakila',

charset='utf8'

)

except MySQLdb.Error as e:

print('error:%s'% e)

cursor = con.cursor()

cursor.execute('SELECT * FROM `store`')

rest = cursor.fetchone()

print(rest)

#close connection

con.close()

3)查询数据库

#!/usr/bin/env python

import MySQLdb

class MysqlQuery(object):

def __init__(self):

self.get_conn()

def get_conn(self):

# get connection

try:

self.conn = MySQLdb.connect(

host='localhost',

user='root',

passwd='123456',

port=3308,

db='sakila',

charset='utf8'

)

except MySQLdb.Error as e:

print('error:%s' % e)

cursor = self.conn.cursor()

cursor.execute('SELECT * FROM `store`')

rest = cursor.fetchone()

print(rest)

def close_conn(self):

try:

if self.conn:

# close connection

self.conn.close()

except MySQLdb.Error as e:

print('Error:%s'%e)

def get_one(self):

#prepare SQL

/*虽然定义类型为int,可使用string*/

sql='SELECT * FROM `store` where `store_id` =%s'

#get cursor

cursor=self.conn.cursor()

cursor.execute(sql,('1',))

print(cursor.rowcount)

rest=dict(zip([k[0] for k in cursor.description],cursor.fetchone()))

print(rest)

print(rest['store_id'])

self.close_conn()

return rest

def main():

obj = MysqlQuery()

rest = obj.get_one();

print(rest['store_id'])

if __name__ == '__main__':

main()

/*取走所有数据,形成数组*/

rest = [dict(zip([k[0] for k in cursor.description],row))for row in cursor.fetchall()]

zip([iterable, …])

Python的一个内建函数,它接受一系列可迭代的对象作为参数,将对象中对应的元素打包成一个个tuple(元组),然后返回由这些tuples组成的list(列表)。若传入参数的长度不等,则返回list的长度和参数中长度最短的对象相同。利用*号操作符,可以将list unzip(解压)。

dict()作用:dict() 函数用于创建一个字典。返回一个字典。

class dict(**kwarg)

class dict(mapping, **kwarg)

class dict(iterable, **kwarg)

/*

kwargs -- 关键字

mapping -- 元素的容器。

iterable -- 可迭代对象

*/

4)更新数据

def add_one(self):

row_count=0

try:

sql = ("insert into `film`(`title`,`description`,`language_id`) value" "(%s,%s,%s);")

cursor = self.conn.cursor()

cursor.execute(sql, ('chia', 'ashajhsjah','1'))

self.conn.commit()

except:

print('error')

self.conn.rollback()

row_count=cursor.rowcount

cursor.close()

self.close_conn()

return row_count

5)ORM:SQLAlChemy

pip install SQLAlchemy

import sqlalchemy

declarative_base() 创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联。

增删改查

#!/usr/bin/python

#coding=utf-8

from sqlalchemy import create_engine

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

from sqlalchemy import Column, Integer, String, DateTime, Boolean

import datetime

#engine=create_engine('数据库类型://用户名:密码@ip:端口/数据库名')

engine=create_engine('mysql://root:123456@localhost:3308/sakila')

Base=declarative_base()

Session = sessionmaker(bind=engine)

class Store(Base):

__tablename__='country'#数据库表名

country_id = Column(Integer, primary_key=True)

country = Column(String(200), nullable=False)

last_update = Column(String(2000), nullable=False)

class MysqlOrmTest(object):

def __init__(self):

self.session=Session()

def add_one(self):

new_obj=Store(

country_id='130',

country='hhhsahsa',

last_update=datetime.datetime.now()#此处需要import datetime

)

self.session.add(new_obj)

self.session.commit()

return new_obj

def get_one(self):

return self.session.query(Store).get(1)

def update_date(self):

obj=self.session.query(Store).get(1)

obj.manager_staff_id=1

self.session.add(obj)

self.session.commit()

return obj

def delete_data(self):

data=self.session.query(Store).get(3)

self.session.delete(data)

self.session.commit()

def main():

# rest = obj.add_one()

# print(dir(rest))

# print(obj.get_one().title)

# print(obj.get_more().count())

# for row in obj.get_more():

# print(row.title)

# print(obj.update_data())

if __name__ == '__main__':

main()

6)项目实战

使用pycharm专业版,选择flask框架,代码如下:

from flask import Flask

app = Flask(__name__)

@app.route('/hello')

def hello_world():

return 'Hello World!hello'

if __name__ == '__main__':

app.run(debug=True)

##flask支持热部署

简单搭建flask架构网站

本人使用pycharm开发flask项目,可以利用工具导入工具包:

##引入相关包

from flask_sqlalchemy import SQLAlchemy

from flask import Flask, render_template, flash, redirect, url_for, abort, request

app = Flask(__name__)

##配置数据库

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:123456@localhost:3308/flaskdb'

db=SQLAlchemy(app)

##完成ORM映射

class News(db.Model):

__tablename__='news'

id = db.Column(db.Integer, primary_key=True)

title = db.Column(db.String(200), nullable=False)

img_url = db.Column(db.String(2000), nullable=False)

content = db.Column(db.String(2000), nullable=True)

is_valid = db.Column(db.String(2000), nullable=True)

created_at = db.Column(db.String(2000), nullable=True)

updated_at = db.Column(db.String(2000), nullable=True)

news_type = db.Column(db.String(2000), nullable=True)

def __repr__(self):

return '' % self.title

##python生成HTML效果不好

@app.route('/hello')

def hello_world():

return 'Hello World!hello'

##使用渲染引擎Jinja2

@app.route('/')

def index():

news_list=News.query.all()

return render_template("index.html",news_list=news_list)

@app.route('/cat//')

def cat(name):

news_list=News.query.filter(News.news_type==name)

return render_template('cat.html',new_list=news_list)

@app.route('/detail//')

def detail(pk):

new_obj=News.query.get(pk)

return render_template('detail.html',new_obj=new_obj)

@app.route('/admin/')

@app.route('/admin//')

def admin(page=None):

return render_template("admin/index.html")

@app.route('/admin/add/', methods=['GET', 'POST'])

def add():

return render_template("admin/add.html")

@app.route('/admin/update//', methods=['GET', 'POST'])

def update(pk):

return render_template("admin/update.html")

@app.route('/admin/delete//', methods=['POST'])

def delete(pk):

return 'no'

if __name__ == '__main__':

app.run(debug=True)

操作Redis

1) Redis安装

sudo apt-get update

sudo apt-get install redis-server

##启动Redis服务器

redis-server

##查看 redis 是否启动?

redis-cli

2)Redis命令

Set animal 'cat'

get animal

##添加value

append animal 'dog'

mset user1 'chu' user2 'yao'

mget user1 user2

set num 9

incr/decr num /*增加减少1*/

set user:chuyao;age:45 'asasasasa'

列表(list)相关操作

lpush/rpush q1 'chu' 'yao' 'Amy'/*从左、右插入数据*/

lrange/*获取指定长度的数据*/

ltrim/*截取一定长度的数据*/

lpop/rpop/*移除最左、右的元素并返回*/

lpushx/rpushx --key/* key存在时候才插入数据,不存在时不做任何处理*/

集合(Set)相关操作

sadd/srem /*添加、删除元素*/

sismember /*判断是否为set的一个元素*/

smembers /*返回该集合的所有成员*/

sdiff /*返回一个集合与其他集合的差异*/

sinter/*返回几个集合的交集*/

sunion/*返回几个集合的并集*/

散列(hash)相关操作

3)redis-py连接

import redis

r=redis.StrictRedis(host='120.95.132.174',port=6379,db=0)

user1=r.get('user1')

print(user1)

注意,如果是远程连接数据库,需要修改Redis配置文件。

1)注释掉bind 127.0.0.1可以使所有的ip访问redis。

2)修改办法:protected-mode no

4)Python 操作String类型

import redis

class TestString(object):

def __init__(self):

self.r=redis.StrictRedis(host='120.95.132.174',port=6379,db=0)

def test_set(self):

rest=self.r.set('user2','Amy');

print(rest)

def test_get(self):

rest=self.r.get('user1')

print rest

return rest

def test_mset(self):

d={

'user3':'Bob',

'user4':'BobX'

}

rest=self.r.mset(d)

print(rest)

return rest

def test_mget(self):

l=['user1','user2']

rest=self.r.mget(l)

print(rest)

return rest

def test_del(self):

rest=self.r.delete('user1')

print (rest)

def main():

str_obj=TestString();

# str_obj.test_set();

str_obj.test_get();

# str_obj.test_mset();

# str_obj.test_mget();

# str_obj.test_del();

if __name__=='__main__':

main()

5)项目实战

新闻数据,Hash

新闻ID,String

分页数据,List

排序,Sorted Set

总结

以上是生活随笔为你收集整理的python多线程详解_Python多线程详解-Python-火龙果软件的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。