Source code for openquake.commands.purge
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import getpass
from openquake.baselib.general import humansize
from openquake.commonlib import logs, datastore
datadir = datastore.get_datadir()
[docs]def purge_one(calc_id, user, force):
    """
    Remove one calculation ID from the database and remove its datastore
    """
    logs.dbcmd('del_calc', calc_id, user, False, force)
    f1 = os.path.join(datadir, 'calc_%s.hdf5' % calc_id)
    f2 = os.path.join(datadir, 'calc_%s_tmp.hdf5' % calc_id)
    for f in [f1, f2]:
        if os.path.exists(f):  # not removed yet
            os.remove(f)
            print('Removed %s' % f) 
# used in the reset command
[docs]def purge_all(user=None):
    """
    Remove all calculations of the given user
    """
    user = user or getpass.getuser()
    if os.path.exists(datadir):
        for fname in os.listdir(datadir):
            if fname.endswith('.pik'):
                os.remove(os.path.join(datadir, fname))
            mo = re.match(r'calc_(\d+)(_tmp)?\.hdf5', fname)
            if mo is not None:
                calc_id = int(mo.group(1))
                purge_one(calc_id, user, force=True) 
[docs]def purge(status, days, force):
    """
    Remove calculations of the given status older than days
    """
    rows = logs.dbcmd(
        f'SELECT id, ds_calc_dir || ".hdf5" FROM job '
        f'WHERE status IN (?X)'
        f"AND start_time < datetime('now', '-{days}')", status)
    todelete = []
    totsize = 0
    for calc_id, fname in rows:
        if os.path.exists(fname) and os.access(fname, os.W_OK):
            todelete.append(fname)
            totsize += os.path.getsize(fname)
            tname = fname.replace('.hdf5', '_tmp.hdf5')
            if os.path.exists(tname) and os.access(tname, os.W_OK):
                todelete.append(tname)
                totsize += os.path.getsize(tname)
    size = humansize(totsize)
    for fname in todelete:
        print(fname)
        if force:
            os.remove(fname)
    print('Processed %d HDF5 files, %s' % (len(todelete), size)) 
[docs]def main(what, force=False):
    """
    Remove calculations from the file system.
    If you want to remove everything,  use oq reset.
    """
    if what == 'failed':
        purge(['failed'], '1 days', force)
        return
    elif what == 'old':
        purge('complete failed deleted'.split(), '30 days', force)
        return
    calc_id = int(what)
    if calc_id < 0:
        try:
            calc_id = logs.get_calc_ids(datadir)[calc_id]
        except IndexError:
            print('Calculation %d not found' % calc_id)
            return
    purge_one(calc_id, getpass.getuser(), force) 
main.what = 'a calculation ID or the string "failed"'
main.force = 'ignore dependent calculations'