Linux / python · 1月 16, 2023 0

Django: How to run Django/Python on a recurring basis with cron

内容纲要

e two most common ways in which most people go about this is either writing custom python scripts or a management command per cron.Django-cron lets you run Django/Python code on a recurring basis.

Install django_cron (ideally in your virtualenv!) .

INSTALLED_APPS:.

INSTALLED_APPS = [
    # ...
    "django_cron",
]

Run python manage.py migrate django_cron

Write a cron class somewhere in your code, that extends CronJobBase class.

from django_cron import CronJobBase, Schedule

class MyCronJob(CronJobBase):
    RUN_EVERY_MINS = 120 # every 2 hours

    schedule = Schedule(run_every_mins=RUN_EVERY_MINS)
    code = 'my_app.my_cron_job'    # a unique code

    def do(self):
        pass    # do your thing here

Add a variable called CRON_CLASSES (similar to MIDDLEWARE_CLASSES etc.) each being a cron class.

CRON_CLASSES = [
    "my_app.cron.MyCronJob",
    # ...
]

Now everytime you run the management command python manage.py runcrons required.

> crontab -e
*/5 * * * * source /home/ubuntu/.bashrc && source /home/ubuntu/work/your-project/bin/activate && python /home/ubuntu/work/your-project/src/manage.py runcrons > /home/ubuntu/cronjob.log

Run a specific cron with python manage.py runcrons cron_class … .

# only run "my_app.cron.MyCronJob"
$ python manage.py runcrons "my_app.cron.MyCronJob"

# run "my_app.cron.MyCronJob" and "my_app.cron.AnotherCronJob"
$ python manage.py runcrons "my_app.cron.MyCronJob" "my_app.cron.AnotherCronJob"

force run your crons with python manage.py runcrons –force, for example:

# run all crons, immediately, regardless of run time
$ python manage.py runcrons --force

example_crons.py

import datetime

from django_cron import CronJobBase, Schedule

class WriteDateToFileMixin:
    """
    Write current date to file.
    """

    file_path = "cron-demo.txt"

    def do(self):
        message = f"Code: {self.code}    Current date: {datetime.datetime.now()}\n"
        with open(self.file_path, "a") as myfile:
            myfile.write(message)

class RunAtTimeCronJob(CronJobBase, WriteDateToFileMixin):
    """
    Run job every day at 11:00 and 12:30
    """

    RUN_AT_TIMES = ["11:00", "12:30"]
    schedule = Schedule(run_at_times=RUN_AT_TIMES, retry_after_failure_mins=1)
    code = "cron.RunAtTimeCronJob"

class RunEveryTenMinutesCronJob(CronJobBase, WriteDateToFileMixin):
    """
    Run the job every 10 minutes
    """

    RUN_EVERY_MINS = 10
    schedule = Schedule(run_every_mins=RUN_EVERY_MINS)
    code = "cron.RunEveryTenMinutesCronJob"

class RunMonthlyCronJob(CronJobBase, WriteDateToFileMixin):
    """
    Run the job every 15 minutes at the 1st and 10th day of month.
    """

    RUN_MONTHLY_ON_DAYS = [1, 10]
    RUN_EVERY_MINS = 15
    schedule = Schedule(
        run_monthly_on_days=RUN_MONTHLY_ON_DAYS, run_every_mins=RUN_EVERY_MINS
    )
    code = "cron.RunMonthlyCronJob"

class RunWeeklyCronJob(CronJobBase, WriteDateToFileMixin):
    """
    Run the job every 10 minutes on Mondays.
    """

    RUN_WEEKLY_ON_DAYS = [0]
    RUN_AT_TIMES = ["12:00", "12:30"]
    schedule = Schedule(run_on_days=RUN_WEEKLY_ON_DAYS, run_at_times=RUN_AT_TIMES)
    code = "cron.RunWeeklyCronJob"

https://github.com/tivix/django-cron
https://django-cron.readthedocs.io/en/latest/installation.html

%d 博主赞过: