程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python periodic task artifact, too practical

編輯:Python

Preface

If you want to Linux Periodically execute a... On the server Python Script , The most famous choice should be Crontab Script , however Crontab It has the following disadvantages :

1. It's inconvenient to perform second level tasks .

2. When there are hundreds of scheduled tasks to be performed ,Crontab It will be particularly inconvenient to manage .

Another option is Celery, however Celery The configuration of is troublesome , If you just need a lightweight scheduling tool ,Celery It won't be a good choice .

When you want to use a lightweight task scheduler , And I hope it's as simple as possible 、 Easy to use 、 No need for external dependence , It's best to accommodate Crontab All bases of

This function , that Schedule Module is your best choice .

Using it to schedule tasks may only require a few lines of code , Feel the :

python Source code / material / plug-in unit :903971231####
# Python Practical treasure 
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)

The code above represents every 10 Once per minute job function , Very simple and convenient . You just need to introduce schedule modular , By calling scedule.every( The number of hours ). when

Inter type .do(job) Release cycle tasks .

The periodic task after release needs to use run_pending Function to detect whether to execute , So you need a While Loop continuously polling this function .

Let's talk about it in detail Schedule Installation and installation of modules 、 Advanced usage .

title 1. Get ready

Please choose one of the following ways to enter the command to install the dependency :

  1. Windows Environmental Science open Cmd ( Start - function -CMD).
  2. MacOS Environmental Science open Terminal (command+ Space input Terminal).
  3. If you're using a VSCode Editor or Pycharm, You can directly use the Terminal.
pip install schedule

2. Basic use

The most basic use has been mentioned at the beginning of the text , Let's show you more examples of scheduling tasks :

# Python Practical treasure 
import schedule
import time
def job():
print("I'm working...")
# Perform the task every ten minutes 
schedule.every(10).minutes.do(job)
# Perform tasks every hour 
schedule.every().hour.do(job)
# Daily 10:30 Perform tasks 
schedule.every().day.at("10:30").do(job)
# Perform tasks every month 
schedule.every().monday.do(job)
# Every Wednesday 13:15 To perform tasks 
schedule.every().wednesday.at("13:15").do(job)
# Every minute 17 Execute the task in seconds 
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)

You can see , Configuration from month to second , The above examples cover . But if you want to run the task only once , You can match it like this :

# Python Practical treasure 
import schedule
import time
def job_that_executes_once():
# The task written here will only be executed once ...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)

Parameter passing

If you have parameters that need to be passed to the job to execute , You just need to do this :

# Python Practical treasure 
import schedule
def greet(name):
print('Hello', name)
# do() Pass additional parameters to job function 
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

Get all current jobs

If you want to get all your current assignments :

# Python Practical treasure 
import schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()

Cancel all jobs

If some mechanism triggers , You need to clear all jobs in the current program immediately :

# Python Practical treasure 
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()

Label function

When setting up the job , For the convenience of subsequent management , You can label your homework , In this way, you can get jobs or cancel jobs through tag filtering .

# Python Practical treasure 
python plug-in unit / Source code / Material plus Q Group :903971231####
import schedule
def greet(name):
print('Hello {}'.format(name))
# .tag tagging 
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
# get_jobs( label ): You can get all the tasks of this tag 
friends = schedule.get_jobs('friend')
# Cancel all daily-tasks The task of the tag 
schedule.clear('daily-tasks')

Set job deadline

If you need to make an assignment due at a certain time , You can do this :

# Python Practical treasure 
import schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# Run jobs every hour ,18:30 Stop after 
schedule.every(1).hours.until("18:30").do(job)
# Run jobs every hour ,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Run jobs every hour ,8 Stop in an hour 
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run jobs every hour ,11:32:42 Stop after 
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# Run jobs every hour ,2020-5-17 11:36:20 Stop after 
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

After the deadline , The job will not run .

Run all jobs now , Regardless of the arrangement

If a mechanism triggers , You need to run all your jobs immediately , You can call schedule.run_all() :

# Python Practical treasure 
import schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# Run all jobs now , Every job interval 10 second 
schedule.run_all(delay_seconds=10)

3. Advanced use

Decorator scheduling work

If you think setting homework is too verbose , You can also use decorator mode :

# Python Practical treasure 
from schedule import every, repeat, run_pending
import time
# The effect of this decorator is equivalent to schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
print("I am a scheduled job")
while True:
run_pending()
time.sleep(1)

Parallel execution

By default ,Schedule Perform all jobs in sequence . The reason behind this is , It's hard to find a parallel execution model that makes everyone happy .

However, you can solve this limitation by running each job in the form of multithreading :

# Python Practical treasure 
import threading
import time
import schedule
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
schedule.run_pending()
time.sleep(1)

logging

Schedule The module also supports logging logging , Use it like this :

# Python Practical treasure 
import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# Log level is DEBUG
schedule_logger.setLevel(level=logging.DEBUG)
def job():
print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()

The effect is as follows :

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={
})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

exception handling

Schedule Will not automatically catch exceptions , When it encounters an exception, it will directly throw , This can lead to a serious problem : All subsequent jobs will be interrupted , So we need to catch these exceptions .

You can manually capture , But some situations you don't expect require the program to automatically capture , You can do it with a decorator :

# Python Practical treasure 
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)

such ,bad_task Any errors encountered during execution , Will be catch_exceptions Capture , This is very important to ensure the normal operation of scheduling tasks .


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved