If you want to Linux Periodically execute a... On the server Python Script , The most famous choice should be Crontab Script , however Crontab It has the following disadvantages :
1. It's inconvenient to perform second level tasks .
2. When there are hundreds of scheduled tasks to be performed ,Crontab It will be particularly inconvenient to manage .
Another option is Celery, however Celery The configuration of is troublesome , If you just need a lightweight scheduling tool ,Celery It won't be a good choice .
When you want to use a lightweight task scheduler , And I hope it's as simple as possible 、 Easy to use 、 No need for external dependence , It's best to accommodate Crontab All bases of
This function , that Schedule Module is your best choice .
Using it to schedule tasks may only require a few lines of code , Feel the :
python Source code / material / plug-in unit :903971231####
# Python Practical treasure
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)
The code above represents every 10 Once per minute job function , Very simple and convenient . You just need to introduce schedule modular , By calling scedule.every( The number of hours ). when
Inter type .do(job) Release cycle tasks .
The periodic task after release needs to use run_pending Function to detect whether to execute , So you need a While Loop continuously polling this function .
Let's talk about it in detail Schedule Installation and installation of modules 、 Advanced usage .
Please choose one of the following ways to enter the command to install the dependency :
pip install schedule
The most basic use has been mentioned at the beginning of the text , Let's show you more examples of scheduling tasks :
# Python Practical treasure
import schedule
import time
def job():
print("I'm working...")
# Perform the task every ten minutes
schedule.every(10).minutes.do(job)
# Perform tasks every hour
schedule.every().hour.do(job)
# Daily 10:30 Perform tasks
schedule.every().day.at("10:30").do(job)
# Perform tasks every month
schedule.every().monday.do(job)
# Every Wednesday 13:15 To perform tasks
schedule.every().wednesday.at("13:15").do(job)
# Every minute 17 Execute the task in seconds
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
You can see , Configuration from month to second , The above examples cover . But if you want to run the task only once , You can match it like this :
# Python Practical treasure
import schedule
import time
def job_that_executes_once():
# The task written here will only be executed once ...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
If you have parameters that need to be passed to the job to execute , You just need to do this :
# Python Practical treasure
import schedule
def greet(name):
print('Hello', name)
# do() Pass additional parameters to job function
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
If you want to get all your current assignments :
# Python Practical treasure
import schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()
If some mechanism triggers , You need to clear all jobs in the current program immediately :
# Python Practical treasure
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()
When setting up the job , For the convenience of subsequent management , You can label your homework , In this way, you can get jobs or cancel jobs through tag filtering .
# Python Practical treasure
python plug-in unit / Source code / Material plus Q Group :903971231####
import schedule
def greet(name):
print('Hello {}'.format(name))
# .tag tagging
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
# get_jobs( label ): You can get all the tasks of this tag
friends = schedule.get_jobs('friend')
# Cancel all daily-tasks The task of the tag
schedule.clear('daily-tasks')
If you need to make an assignment due at a certain time , You can do this :
# Python Practical treasure
import schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# Run jobs every hour ,18:30 Stop after
schedule.every(1).hours.until("18:30").do(job)
# Run jobs every hour ,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Run jobs every hour ,8 Stop in an hour
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run jobs every hour ,11:32:42 Stop after
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# Run jobs every hour ,2020-5-17 11:36:20 Stop after
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
After the deadline , The job will not run .
Run all jobs now , Regardless of the arrangement
If a mechanism triggers , You need to run all your jobs immediately , You can call schedule.run_all() :
# Python Practical treasure
import schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# Run all jobs now , Every job interval 10 second
schedule.run_all(delay_seconds=10)
Decorator scheduling work
If you think setting homework is too verbose , You can also use decorator mode :
# Python Practical treasure
from schedule import every, repeat, run_pending
import time
# The effect of this decorator is equivalent to schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
print("I am a scheduled job")
while True:
run_pending()
time.sleep(1)
By default ,Schedule Perform all jobs in sequence . The reason behind this is , It's hard to find a parallel execution model that makes everyone happy .
However, you can solve this limitation by running each job in the form of multithreading :
# Python Practical treasure
import threading
import time
import schedule
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
schedule.run_pending()
time.sleep(1)
Schedule The module also supports logging logging , Use it like this :
# Python Practical treasure
import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# Log level is DEBUG
schedule_logger.setLevel(level=logging.DEBUG)
def job():
print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
The effect is as follows :
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={
})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs
Schedule Will not automatically catch exceptions , When it encounters an exception, it will directly throw , This can lead to a serious problem : All subsequent jobs will be interrupted , So we need to catch these exceptions .
You can manually capture , But some situations you don't expect require the program to automatically capture , You can do it with a decorator :
# Python Practical treasure
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
such ,bad_task Any errors encountered during execution , Will be catch_exceptions Capture , This is very important to ensure the normal operation of scheduling tasks .