Source code for openquake.commands.purge
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import getpass
from openquake.baselib.general import humansize
from openquake.commonlib import logs, datastore
datadir = datastore.get_datadir()
[docs]def purge_one(calc_id, user, force):
"""
Remove one calculation ID from the database and remove its datastore
"""
logs.dbcmd('del_calc', calc_id, user, force)
f1 = os.path.join(datadir, 'calc_%s.hdf5' % calc_id)
f2 = os.path.join(datadir, 'calc_%s_tmp.hdf5' % calc_id)
for f in [f1, f2]:
if os.path.exists(f): # not removed yet
os.remove(f)
print('Removed %s' % f)
# used in the reset command
[docs]def purge_all(user=None):
"""
Remove all calculations of the given user
"""
user = user or getpass.getuser()
if os.path.exists(datadir):
for fname in os.listdir(datadir):
if fname.endswith('.pik'):
os.remove(os.path.join(datadir, fname))
mo = re.match(r'calc_(\d+)(_tmp)?\.hdf5', fname)
if mo is not None:
calc_id = int(mo.group(1))
purge_one(calc_id, user, force=True)
[docs]def purge(status, days, force):
"""
Remove calculations of the given status older than days
"""
rows = logs.dbcmd(
f'SELECT id, ds_calc_dir || ".hdf5" FROM job '
f'WHERE status IN (?X)'
f"AND start_time < datetime('now', '-{days}')", status)
todelete = []
totsize = 0
for calc_id, fname in rows:
if os.path.exists(fname) and os.access(fname, os.W_OK):
todelete.append(fname)
totsize += os.path.getsize(fname)
tname = fname.replace('.hdf5', '_tmp.hdf5')
if os.path.exists(tname) and os.access(tname, os.W_OK):
todelete.append(tname)
totsize += os.path.getsize(tname)
size = humansize(totsize)
for fname in todelete:
print(fname)
if force:
os.remove(fname)
print('Processed %d HDF5 files, %s' % (len(todelete), size))
[docs]def main(what, force=False):
"""
Remove calculations from the file system.
If you want to remove everything, use oq reset.
"""
if what == 'failed':
purge(['failed'], '1 days', force)
return
elif what == 'old':
purge('complete failed'.split(), '30 days', force)
return
calc_id = int(what)
if calc_id < 0:
try:
calc_id = datastore.get_calc_ids(datadir)[calc_id]
except IndexError:
print('Calculation %d not found' % calc_id)
return
purge_one(calc_id, getpass.getuser(), force)
main.what = 'a calculation ID or the string "failed"'
main.force = 'ignore dependent calculations'