【Python】rq队列的使用
1. 什么是Job?
Job直译过来就是工作,可以是任意的Python函数,你可以把你想要异步执行的任务都写成Job函数。简而言之,Job就是你想执行的操作。例如,我想统计任意网页的字符数量,可以写一个这样的Job函数:
创新互联专注于企业营销型网站、网站重做改版、竹山网站定制设计、自适应品牌网站建设、HTML5、商城建设、集团公司官网建设、外贸网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为竹山等各大城市提供网站开发制作服务。
import requests
def count_words(url):
return len(requests.get(url).text.split())
这样一个函数就可以称之为Job。
2. 什么是Queue?
当我有很多Job时,假如我现在有3个Job,分别是j1、j2、j3,那么当计算机要执行这些任务的时候,会按照j1、j2、j3加入的顺序来执行这些Job,这样的一个可以忘里面添加Job,并且能够顺序执行队列称之为Queue。
例如,我们可以这样来构建一个Queue:
import redis from rq import Queue redis_conn = redis.Redis() q = Queue('default', connection=redis_conn) # 第一个参数是Queue的名称,可以不传,默认为default
3. 怎么把Job放到队列里面去?
j = q.enqueue(count_words, args=('https://www.baidu.com',))
enqueue第一参数是Job函数,args是Job函数的参数,关键字参数可以通过kwargs传入。
4. 什么是Worker?
Worker是Job的消费者,简单来说,你把很多Job加入到了Queue,谁来运行这些Job呢?当然就是Worker啦,你也可以看出Worker必须是独立的进程,这个进程从Redis里面获取Job的信息(包括函数、参数等等),然后运行这个Job。
启动Worker进程也很简单:
$ rq worker low high default 16:56:02 RQ worker 'rq:worker:s2.6443' started, version 0.8.1 16:56:02 Cleaning registries for queue: low 16:56:02 Cleaning registries for queue: high 16:56:02 Cleaning registries for queue: default 16:56:02 16:56:02 *** Listening on low, high, default...
后面的三个参数low、high、default,就是这个Worker将要运行哪些Queue里面的Job,这个顺序很重要,排在前面的Queue里面的Job将优先被运行。
5. 一个完整的例子
jobs.py
[root@iZ2ze66bhrbxkc31nljgjnZ ~]# more jobs.py
import requests
import redis
from rq import Queue
import pyMySQL
def count_words(url):
return len(requests.get(url).text.split())
def recover_to_db(sql, dbinfo):
dbinfo['charset'] = 'utf8mb4'
dbinfo['autocommit'] = True
dbconn = pymysql.Connect(**dbinfo)
dbconn.autocommit(1)
cur = dbconn.cursor()
cur.execute(sql)
dbconn.close()
app.py
from jobs import count_words,recover_to_db import requests import redis from rq import Queue import time def run(): redis_conn = redis.Redis() q = Queue(connection=redis_conn) for i in range(92,99): j = q.enqueue(recover_to_db, 'insert into `tt`(`id`) VALUES (%d);' % i,{'host': '47.93.243.162', 'password': 'ESBecs00', 'port': 3306, 'user': 'root','db':'te st'}) #j = q.enqueue(, 'https://www.baidu.com') #print(j.result) #time.sleep(3) #print(j.result) if __name__ == '__main__': run()
启动Worker:
$ rq worker
运行:
$ python app.py
在查看rq work端的日志
-
17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (92);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (de54dfcb-c2c0-4d7e-a082-273ae40e5316)
-
17:08:53 default: Job OK (de54dfcb-c2c0-4d7e-a082-273ae40e5316)
-
17:08:53 Result is kept for 500 seconds
-
17:08:53
-
17:08:53 *** Listening on default...
-
17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (93);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (46fb8ef2-7ad6-4369-8377-80e728fa3129)
-
17:08:53 default: Job OK (46fb8ef2-7ad6-4369-8377-80e728fa3129)
-
17:08:53 Result is kept for 500 seconds
-
17:08:53
-
17:08:53 *** Listening on default...
-
17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (94);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (334ef4da-ea8f-4ec1-8c82-952fa9300f6f)
-
17:08:53 default: Job OK (334ef4da-ea8f-4ec1-8c82-952fa9300f6f)
-
17:08:53 Result is kept for 500 seconds
-
17:08:53
-
17:08:53 *** Listening on default...
-
17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (95);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (e7fcbed6-0d75-40ff-ae89-f135d1f6fb45)
-
17:08:53 default: Job OK (e7fcbed6-0d75-40ff-ae89-f135d1f6fb45)
-
17:08:53 Result is kept for 500 seconds
-
17:08:53
-
17:08:53 *** Listening on default...
-
17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (96);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (3642c31a-c2af-4cdc-99b8-773c00d1dbd5)
-
17:08:53 default: Job OK (3642c31a-c2af-4cdc-99b8-773c00d1dbd5)
-
17:08:53 Result is kept for 500 seconds
-
17:08:53
-
17:08:53 *** Listening on default...
-
17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (97);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (99c4fdf3-1c41-494b-bb70-b287f59cf452)
-
17:08:54 default: Job OK (99c4fdf3-1c41-494b-bb70-b287f59cf452)
-
17:08:54 Result is kept for 500 seconds
-
17:08:54
-
17:08:54 *** Listening on default...
-
17:08:54 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (98);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (842ea53a-feba-4f54-af77-ef191a28014b)
-
17:08:54 default: Job OK (842ea53a-feba-4f54-af77-ef191a28014b)
-
17:08:54 Result is kept for 500 seconds
-
17:08:54
- 17:08:54 *** Listening on default...
参考:
https://segmentfault.com/a/1190000010654775
标题名称:【Python】rq队列的使用
链接分享:http://scyanting.com/article/jshpjp.html