15.5. Manage Commands¶
https://docs.djangoproject.com/en/stable/howto/custom-management-commands/
Custom management commands are stored in the
shop/management/commands/COMMAND_NAME.py
fileThe file must contain a class named
Command
that extendsBaseCommand
class
Custom management commands are especially useful for running standalone scripts or for scripts that are periodically executed from the UNIX crontab or from Windows scheduled tasks control panel.
15.5.1. Location¶
Custom management commands are stored in the shop/management/commands/COMMAND_NAME.py
file:
shop
├── __init__.py
├── admin.py
├── apps.py
├── management
│ └── commands
│ └── my_command.py
├── models.py
├── tests.py
└── views.py
15.5.2. Example¶
The file must contain a class named Command
that extends BaseCommand
class:
>>>
... from django.core.management.base import BaseCommand
...
...
... class Command(BaseCommand):
... help = 'Help message for the command'
...
... def handle(self, *args, **options):
... # do something
15.5.3. Arguments¶
>>>
... from django.core.management.base import BaseCommand
...
...
... class Command(BaseCommand):
... help = 'Help message for the command'
...
... def add_arguments(self, parser):
... parser.add_argument('--action', dest='action', help='Action to do')
... parser.add_argument('--file', dest='file', help='Filename to parse')
...
... def handle(self, *args, **options):
... action = options['action']
... file = options['file']
... if action == 'parse':
... # do something
15.5.4. Use Case - 0x01¶
File management/commands/importcsv.py
:
>>>
... import csv
... from django.core.management import BaseCommand
... from django.utils.translation import gettext_lazy as _
... from contact.models import Contact
...
...
... class Command(BaseCommand):
... help = _('Import data from CSV file')
...
... def add_arguments(self, parser):
... parser.add_argument('--file', dest='file', help=_('File to import data from'))
...
... def handle(self, *args, **options):
... file = options['file']
... with open(file) as f:
... header = f.readline()
... reader = csv.DictReader(f, fieldnames=['firstname', 'lastname'])
... for line in reader:
... Contact.add(**line)
15.5.5. Run¶
$ python manage.py importcsv --file=myfile.csv
15.5.6. Call¶
>>> from django.core import management
>>>
>>> management.call_command('importcsv', file='myfile.csv')
15.5.7. Use Case - 0x01¶
Cleaning data in database
from django.core.management.base import BaseCommand
from shop.models import Customer
class Command(BaseCommand):
help = 'Clean all customers names in database'
def handle(self, *args, **options):
for c in Customer.objects.all():
c.firstname = c.firstname.title()
c.lastname = c.lastname.title()
c.save()
15.5.8. Use Case - 0x02¶
Parse file line by line
>>>
... from django.core.management.base import BaseCommand
... from shop.models import Customer
...
...
... class Command(BaseCommand):
... help = 'Moj tekst pomocy'
...
... def add_arguments(self, parser):
... parser.add_argument('--file', dest='file', nargs='?', help='Log File')
... parser.add_argument('--format', nargs='?', dest='format', help='Log File Format')
...
... def parse_line(self, line, format):
... if format == 'text':
... return line.upper()
...
... def handle(self, *args, **options):
... filename = options['file']
... format = options['format']
... content = []
... with open(filename, encoding='utf-8') as file:
... for line in file:
... line = self.parse_line(line, format)
... content.append(line)
... print('\n'.join(content))
15.5.9. Use Case - 0x03¶
>>>
... from django.core.management.base import BaseCommand, CommandError
... from polls.models import Question as Poll
...
...
... class Command(BaseCommand):
... help = 'Closes the specified poll for voting'
...
... def add_arguments(self, parser):
... parser.add_argument('poll_id', nargs='+', type=int)
... parser.add_argument(
... '--delete',
... action='store_true',
... dest='delete',
... help='Delete poll instead of closing it')
...
... def handle(self, *args, **options):
... for poll_id in options['poll_id']:
... try:
... poll = Poll.objects.get(pk=poll_id)
... except Poll.DoesNotExist:
... raise CommandError('Poll "%s" does not exist' % poll_id)
... if options['delete']:
... return self.delete_poll(poll)
... else:
... return self.close_poll(poll)
...
... def close_poll(self, poll):
... poll.opened = False
... poll.save()
... self.stdout.write(self.style.SUCCESS('Successfully closed poll "{poll.pk}"'))
...
... def delete_poll(self, poll):
... poll.delete()