Merge pull request #142 from zodb/cache-file-temp-better
Extra safety writing temporary cache files.
This commit is contained in:
commit
37a1ddc658
|
@ -6,7 +6,10 @@
|
|||
2.0.0b10 (unreleased)
|
||||
=====================
|
||||
|
||||
- Nothing changed yet.
|
||||
- Writing persistent cache files has been changed to reduce the risk
|
||||
of stale temporary files remaining. Also, files are kept open for a
|
||||
shorter period of time and removed in a way that should work better
|
||||
on Windows.
|
||||
|
||||
|
||||
2.0.0b9 (2016-11-29)
|
||||
|
|
|
@ -97,12 +97,13 @@ def _gzip_file(options, filename, fileobj, **kwargs):
|
|||
|
||||
|
||||
def _list_cache_files(options, prefix):
|
||||
"Returns a list of absolute paths"
|
||||
path = _normalize_path(options)
|
||||
possible_caches = glob.glob(os.path.join(path, 'relstorage-cache-'
|
||||
+ prefix
|
||||
+ '.*'
|
||||
+ _gzip_ext(options)))
|
||||
return possible_caches
|
||||
return [os.path.abspath(x) for x in possible_caches]
|
||||
|
||||
|
||||
def trace_file(options, prefix):
|
||||
|
@ -141,23 +142,21 @@ def trace_file(options, prefix):
|
|||
|
||||
|
||||
def _stat_cache_files(options, prefix):
|
||||
fds = []
|
||||
"""
|
||||
Return a list of cache file names,
|
||||
sorted so that the newest and largest files are first.
|
||||
"""
|
||||
stats = []
|
||||
try:
|
||||
for possible_cache_path in _list_cache_files(options, prefix):
|
||||
cache_file = _open(options, possible_cache_path, 'rb')
|
||||
fds.append(cache_file)
|
||||
buf_cache_file = _gzip_file(options, possible_cache_path, fileobj=cache_file, mode='rb')
|
||||
stats.append((os.fstat(cache_file.fileno()), buf_cache_file, possible_cache_path, cache_file))
|
||||
except: # pragma: no cover
|
||||
for _f in fds:
|
||||
_f.close()
|
||||
raise
|
||||
for possible_cache_path in _list_cache_files(options, prefix):
|
||||
try:
|
||||
stats.append((os.stat(possible_cache_path), possible_cache_path))
|
||||
except os.error: # pragma: no cover
|
||||
# file must be gone, probably we're cleaning things out
|
||||
pass
|
||||
|
||||
# Newest and biggest first
|
||||
stats.sort(key=lambda s: (s[0].st_mtime, s[0].st_size), reverse=True)
|
||||
|
||||
return stats
|
||||
# Newest and biggest first; tie breaker of the filename
|
||||
stats.sort(key=lambda s: (s[0].st_mtime, s[0].st_size, s[1]), reverse=True)
|
||||
return [s[1] for s in stats]
|
||||
|
||||
|
||||
def count_cache_files(options, prefix):
|
||||
|
@ -170,80 +169,45 @@ def load_local_cache(options, prefix, local_client_bucket):
|
|||
stats = _stat_cache_files(options, prefix)
|
||||
if not stats:
|
||||
log.debug("No cache files found")
|
||||
|
||||
max_load = options.cache_local_dir_read_count or len(stats)
|
||||
loaded_count = 0
|
||||
try:
|
||||
for _, fd, cache_path, _ in stats:
|
||||
if loaded_count >= max_load:
|
||||
break
|
||||
|
||||
try:
|
||||
_, stored = local_client_bucket.read_from_stream(fd)
|
||||
loaded_count += 1
|
||||
if not stored or local_client_bucket.size >= local_client_bucket.limit:
|
||||
break # pragma: no cover
|
||||
except (NameError, AttributeError): # pragma: no cover
|
||||
# Programming errors, need to be caught in testing
|
||||
raise
|
||||
except: # pylint:disable=bare-except
|
||||
log.exception("Invalid cache file %s", cache_path)
|
||||
fd.close()
|
||||
os.remove(cache_path)
|
||||
finally:
|
||||
for e in stats:
|
||||
e[1].close()
|
||||
e[3].close()
|
||||
for cache_path in stats:
|
||||
if loaded_count >= max_load:
|
||||
break
|
||||
|
||||
try:
|
||||
with _open(options, cache_path, 'rb') as raw_cache_file:
|
||||
with _gzip_file(options, cache_path, fileobj=raw_cache_file, mode='rb') as gzf:
|
||||
_, stored = local_client_bucket.read_from_stream(gzf)
|
||||
loaded_count += 1
|
||||
if not stored or local_client_bucket.size >= local_client_bucket.limit:
|
||||
break # pragma: no cover
|
||||
except (NameError, AttributeError): # pragma: no cover
|
||||
# Programming errors, need to be caught in testing
|
||||
raise
|
||||
except Exception: # pylint:disable=broad-except
|
||||
log.exception("Invalid cache file %r", cache_path)
|
||||
__quiet_remove(cache_path)
|
||||
return loaded_count
|
||||
|
||||
|
||||
def save_local_cache(options, prefix, persistent_cache, _pid=None):
|
||||
# Dump the file.
|
||||
tempdir = _normalize_path(options)
|
||||
def __write_temp_cache_file(options, prefix, parent_dir, persistent_cache):
|
||||
prefix = 'relstorage-cache-' + prefix + '.'
|
||||
suffix = _gzip_ext(options) + '.T'
|
||||
fd, path = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=parent_dir)
|
||||
try:
|
||||
# make it if needed. try to avoid a time-of-use/check
|
||||
# race (not that it matters here)
|
||||
os.makedirs(tempdir)
|
||||
except os.error:
|
||||
pass
|
||||
|
||||
fd, path = tempfile.mkstemp('._rscache_', dir=tempdir)
|
||||
with _open(options, fd, 'wb') as f:
|
||||
with _gzip_file(options, filename=path, fileobj=f, mode='wb', compresslevel=5) as fz:
|
||||
try:
|
||||
with _open(options, fd, 'wb') as f:
|
||||
with _gzip_file(options, filename=path, fileobj=f, mode='wb', compresslevel=5) as fz:
|
||||
persistent_cache.write_to_stream(fz)
|
||||
except (NameError, AttributeError): # pragma: no cover
|
||||
# Programming errors, need to be caught in testing
|
||||
raise
|
||||
except:
|
||||
log.exception("Failed to save cache file %s", path)
|
||||
fz.close()
|
||||
f.close()
|
||||
os.remove(path)
|
||||
return
|
||||
|
||||
# Ok, now pick a place to put it, dropping the oldest file,
|
||||
# if necessary.
|
||||
|
||||
files = _list_cache_files(options, prefix)
|
||||
if len(files) < options.cache_local_dir_count:
|
||||
pid = _pid or os.getpid() # allowing passing for testing
|
||||
# Odds of same pid existing already are too low to worry about
|
||||
new_name = 'relstorage-cache-' + prefix + '.' + str(pid) + _gzip_ext(options)
|
||||
new_path = os.path.join(tempdir, new_name)
|
||||
os.rename(path, new_path)
|
||||
# fd is now closed (by the fileobj)
|
||||
except:
|
||||
__quiet_remove(path)
|
||||
raise
|
||||
else:
|
||||
stats = _stat_cache_files(options, prefix)
|
||||
# oldest and smallest first
|
||||
stats.reverse()
|
||||
try:
|
||||
stats[0][1].close()
|
||||
new_path = stats[0][2]
|
||||
os.rename(path, new_path)
|
||||
finally:
|
||||
for e in stats:
|
||||
e[1].close()
|
||||
e[3].close()
|
||||
return path
|
||||
|
||||
def __set_mod_time(new_path, persistent_cache):
|
||||
try:
|
||||
f = persistent_cache.get_cache_modification_time_for_stream
|
||||
except AttributeError:
|
||||
|
@ -252,10 +216,76 @@ def save_local_cache(options, prefix, persistent_cache, _pid=None):
|
|||
mod_time = f()
|
||||
|
||||
if mod_time and mod_time > 0:
|
||||
# PyPy on Linux raises an OSError/Errno22 if the mod_time is less than 0
|
||||
# and is a float
|
||||
# Older PyPy on Linux raises an OSError/Errno22 if the mod_time is less than 0
|
||||
# and is a float (https://bitbucket.org/pypy/pypy/issues/2408/cpython-difference-osutime-path-11-11)
|
||||
logger.debug("Setting date of %r to cache time %s (current time %s)",
|
||||
new_path, mod_time, time.time())
|
||||
os.utime(new_path, (mod_time, mod_time))
|
||||
|
||||
def __quiet_remove(path):
|
||||
try:
|
||||
os.unlink(path)
|
||||
except os.error: # pragma: no cover
|
||||
log.debug("Failed to remove %r", path)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def save_local_cache(options, prefix, persistent_cache):
|
||||
# Dump the file.
|
||||
parent_dir = _normalize_path(options)
|
||||
try:
|
||||
# make it if needed. try to avoid a time-of-use/check
|
||||
# race (not that it matters here)
|
||||
os.makedirs(parent_dir)
|
||||
except os.error:
|
||||
pass
|
||||
|
||||
|
||||
try:
|
||||
path = __write_temp_cache_file(options, prefix, parent_dir, persistent_cache)
|
||||
except (NameError, AttributeError): # pragma: no cover
|
||||
# programming errors that should be caught in testing
|
||||
raise
|
||||
except Exception: # pylint:disable=broad-except
|
||||
log.exception("Failed to save cache file %s", persistent_cache)
|
||||
return
|
||||
|
||||
# Ok, now pick a place to put it, dropping the oldest file,
|
||||
# if necessary.
|
||||
|
||||
# Now assign our permanent name by stripping the tmp suffix and renaming
|
||||
assert path.endswith(".T")
|
||||
new_path = path[:-2]
|
||||
|
||||
try:
|
||||
os.rename(path, new_path)
|
||||
except os.error: # pragma: no cover
|
||||
log.exception("Failed to rename %r to %r", path, new_path)
|
||||
__quiet_remove(path)
|
||||
raise
|
||||
|
||||
del path
|
||||
|
||||
__set_mod_time(new_path, persistent_cache)
|
||||
|
||||
|
||||
# Now remove any extra (old, small) files if we have too many
|
||||
# If there are multiple storages shutting down, they will race
|
||||
# each other to do this.
|
||||
stats = _stat_cache_files(options, prefix)
|
||||
while len(stats) > options.cache_local_dir_count and len(stats) > 1:
|
||||
oldest_file = stats[-1]
|
||||
# It's possible but unlikely for two processes to write to disk within the limit
|
||||
# of filesystem modification time tracking. If one of those processes
|
||||
# was us, then we still have to pick a loser.
|
||||
|
||||
if not __quiet_remove(oldest_file):
|
||||
# One process will succeed, all the others will fail
|
||||
log.info("Failed to prune file %r; stopping", oldest_file)
|
||||
break
|
||||
|
||||
stats = _stat_cache_files(options, prefix)
|
||||
|
||||
|
||||
return new_path
|
||||
|
|
|
@ -54,12 +54,16 @@ def _check_load_and_store_multiple_files_hit_limit(self, mapping, wrapping_stora
|
|||
mapping[str(i)] = b'abc'
|
||||
mapping[str(i)] # Increment so it gets saved
|
||||
|
||||
persistence.save_local_cache(options, 'test', dump_object, _pid=i)
|
||||
persistence.save_local_cache(options, 'test', dump_object)
|
||||
self.assertEqual(persistence.count_cache_files(options, 'test'),
|
||||
i + 1)
|
||||
|
||||
# make sure it's not in the dict so that even if we find the most recent
|
||||
# cache file first, we still have something to load. If we don't we can sometimes
|
||||
# find that file and fail to store anything and prematurely break out of the loop
|
||||
del mapping[str(i)]
|
||||
files_loaded = persistence.load_local_cache(options, 'test', dump_object)
|
||||
# XXX: This sometimes fails on Travis, returning 1 Why?
|
||||
|
||||
self.assertEqual(files_loaded, 2)
|
||||
|
||||
import shutil
|
||||
|
@ -1152,6 +1156,7 @@ class LocalClientTests(unittest.TestCase):
|
|||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
import time
|
||||
temp_dir = tempfile.mkdtemp(".rstest_cache")
|
||||
root_temp_dir = temp_dir
|
||||
if not _make_dir:
|
||||
|
@ -1177,13 +1182,27 @@ class LocalClientTests(unittest.TestCase):
|
|||
c2 = self._makeOne(cache_local_dir=temp_dir)
|
||||
self.assertEqual(c2.get('k0'), b'abc')
|
||||
|
||||
# Change and save and we overwrite the
|
||||
# Change and save and we replace the
|
||||
# existing file.
|
||||
c2.set('k1', b'def')
|
||||
c2.get('k1') # increment
|
||||
|
||||
c2.save()
|
||||
new_cache_files = os.listdir(temp_dir)
|
||||
self.assertEqual(cache_files, new_cache_files)
|
||||
self.assertEqual(len(cache_files), len(new_cache_files))
|
||||
# No files in common
|
||||
self.assertTrue(set(new_cache_files).isdisjoint(set(cache_files)))
|
||||
|
||||
# And again
|
||||
cache_files = new_cache_files
|
||||
c2.get_cache_modification_time_for_stream = lambda: time.time() + 2000
|
||||
c2.save()
|
||||
new_cache_files = os.listdir(temp_dir)
|
||||
self.assertEqual(len(cache_files), len(new_cache_files))
|
||||
# No files in common
|
||||
self.assertTrue(set(new_cache_files).isdisjoint(set(cache_files)),
|
||||
(cache_files, new_cache_files))
|
||||
|
||||
|
||||
c3 = self._makeOne(cache_local_dir=temp_dir)
|
||||
self.assertEqual(c3.get('k0'), b'abc')
|
||||
|
|
Loading…
Reference in New Issue