Source code for openquake.commands.purge

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2026 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import shutil
import getpass
from openquake.baselib import config
from openquake.baselib.general import humansize
from openquake.commonlib import logs

datadir = logs.get_datadir()


[docs]def purge_one(calc_id, user, force): """ Remove one calculation ID from the database and remove its datastore """ logs.dbcmd('del_calc', calc_id, user, False, force) f1 = os.path.join(datadir, 'calc_%s.hdf5' % calc_id) f2 = os.path.join(datadir, 'calc_%s_tmp.hdf5' % calc_id) for f in [f1, f2]: if os.path.exists(f): # not removed yet os.remove(f) print('Removed %s' % f)
# used in the reset command
[docs]def purge_all(user=None): """ Remove all calculations of the given user """ user = user or getpass.getuser() if os.path.exists(datadir): for fname in os.listdir(datadir): if fname.endswith('.pik'): os.remove(os.path.join(datadir, fname)) mo = re.match(r'calc_(\d+)(_tmp)?\.hdf5', fname) if mo is not None: calc_id = int(mo.group(1)) purge_one(calc_id, user, force=True) custom_tmp = config.directory.custom_tmp if custom_tmp and os.path.exists(custom_tmp): for path in os.listdir(custom_tmp): fullpath = os.path.join(custom_tmp, path) if os.path.isdir(fullpath): try: shutil.rmtree(fullpath) except PermissionError: pass else: print(f'Removed {fullpath}')
[docs]def purge_orphan(force): """ Purge orphan files not referenced in the database """ # first purge non-relevant workflows, thus leaving orphan calculations logs.dbcmd("DELETE FROM job WHERE calculation_mode='workflow'" " AND not relevant") logs.dbcmd("VACUUM") dbfiles = {rec[0] for rec in logs.dbcmd( 'SELECT ds_calc_dir || ".hdf5" FROM job')} hdf5files = {os.path.join(datadir, f) for f in os.listdir(datadir) if f.endswith('.hdf5')} n = 0 size = 0 for orphan in hdf5files - dbfiles: if os.access(orphan, os.W_OK): size += os.path.getsize(orphan) if force: os.remove(orphan) n += 1 if force: print(f'Removed {n:_d} orphan files [{humansize(size)}]') else: print(f'Found {n:_d} orphan files [{humansize(size)}]') print('Use --force to really delete the files')
# NB: by looking at the database start_time we are not removing calculations # used in the cache, however orphan files may linger in the filesystem
[docs]def purge_db(status, days, force): """ Remove calculations of the given `status` older than `days`, unless they belong to a workflow. """ query = 'SELECT id, ds_calc_dir || ".hdf5" FROM job ' query += "WHERE workflow_id is NULL " query += f"AND start_time < datetime('now', '-{days} days')" if status: query += f'AND status IN {status}' todelete = [] totsize = 0 calc_ids = [] for calc_id, fname in logs.dbcmd(query): if os.path.exists(fname) and os.access(fname, os.W_OK): calc_ids.append(calc_id) todelete.append(fname) totsize += os.path.getsize(fname) tname = fname.replace('.hdf5', '_tmp.hdf5') if os.path.exists(tname) and os.access(tname, os.W_OK): todelete.append(tname) totsize += os.path.getsize(tname) for fname in set(todelete): # avoid possible duplicates due to the cache print(fname) if force: os.remove(fname) size = humansize(totsize) if force: print('Removed %d HDF5 files, %s' % (len(todelete), size)) logs.dbcmd('DELETE FROM job WHERE id in (?X)', calc_ids) elif todelete: print('Found %d HDF5 files, %s' % (len(todelete), size)) print('Use --force to really delete the calculations and jobs')
[docs]def main(what:str, force:bool=False, *, days:int=30): """ Remove calculations from the database and the file system. If you want to remove everything, use oq reset. """ if what == 'failed': purge_db(('failed', 'aborted'), '0 days', force) return elif what == 'orphan': purge_orphan(force) return elif what == 'old': purge_db((), days, force) return calc_id = int(what) if calc_id < 0: try: calc_id = logs.get_calc_ids(datadir)[calc_id] except IndexError: print('Calculation %d not found' % calc_id) return purge_one(calc_id, getpass.getuser(), force)
main.what = 'a calculation ID or the string "failed", "old" or "orphan"' main.days = 'purge calculations older than days, if given' main.force = 'really remove, otherwise just print a message'