use context managers in cache.py & add tests

pull/644/head
Mike Fährmann 5 years ago
parent 913b8333cc
commit ec85bf90de
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016-2019 Mike Fährmann
# Copyright 2016-2020 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@ -96,12 +96,12 @@ class DatabaseCacheDecorator():
# database lookup
fullkey = "%s-%s" % (self.key, key)
cursor = self.cursor()
try:
cursor.execute("BEGIN EXCLUSIVE")
except sqlite3.OperationalError:
pass # Silently swallow exception - workaround for Python 3.6
try:
with self.database() as db:
cursor = db.cursor()
try:
cursor.execute("BEGIN EXCLUSIVE")
except sqlite3.OperationalError:
pass # Silently swallow exception - workaround for Python 3.6
cursor.execute(
"SELECT value, expires FROM data WHERE key=? LIMIT 1",
(fullkey,),
@ -118,43 +118,38 @@ class DatabaseCacheDecorator():
"INSERT OR REPLACE INTO data VALUES (?,?,?)",
(fullkey, pickle.dumps(value), expires),
)
finally:
self.db.commit()
self.cache[key] = value, expires
return value
def update(self, key, value):
expires = int(time.time()) + self.maxage
self.cache[key] = value, expires
try:
self.cursor().execute(
with self.database() as db:
db.execute(
"INSERT OR REPLACE INTO data VALUES (?,?,?)",
("%s-%s" % (self.key, key), pickle.dumps(value), expires),
)
finally:
self.db.commit()
def invalidate(self, key):
try:
del self.cache[key]
except KeyError:
pass
try:
self.cursor().execute(
with self.database() as db:
db.execute(
"DELETE FROM data WHERE key=?",
("%s-%s" % (self.key, key),),
)
finally:
self.db.commit()
def cursor(self):
def database(self):
if self._init:
self.db.execute(
"CREATE TABLE IF NOT EXISTS data "
"(key TEXT PRIMARY KEY, value TEXT, expires INTEGER)"
)
DatabaseCacheDecorator._init = False
return self.db.cursor()
return self.db
def memcache(maxage=None, keyarg=None):

@ -2,7 +2,7 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
TESTS_CORE=(config cookies downloader extractor oauth postprocessor text util)
TESTS_CORE=(cache config cookies downloader extractor oauth postprocessor text util)
TESTS_RESULTS=(results)

@ -0,0 +1,202 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
import unittest
import tempfile
import time
from gallery_dl import config, util
dbpath = tempfile.mkstemp()[1]
config.set(("cache",), "file", dbpath)
from gallery_dl import cache # noqa
def tearDownModule():
util.remove_file(dbpath)
class TestCache(unittest.TestCase):
def test_decorator(self):
@cache.memcache()
def mc1():
pass
@cache.memcache(maxage=10)
def mc2():
pass
@cache.cache()
def dbc():
pass
self.assertIsInstance(mc1, cache.CacheDecorator)
self.assertIsInstance(mc2, cache.MemoryCacheDecorator)
self.assertIsInstance(dbc, cache.DatabaseCacheDecorator)
def test_keyarg_mem_simple(self):
@cache.memcache(keyarg=2)
def ka(a, b, c):
return a+b+c
self.assertEqual(ka(1, 1, 1), 3)
self.assertEqual(ka(2, 2, 2), 6)
self.assertEqual(ka(0, 0, 1), 3)
self.assertEqual(ka(9, 9, 1), 3)
self.assertEqual(ka(0, 0, 2), 6)
self.assertEqual(ka(9, 9, 2), 6)
def test_keyarg_mem(self):
@cache.memcache(keyarg=2, maxage=10)
def ka(a, b, c):
return a+b+c
self.assertEqual(ka(1, 1, 1), 3)
self.assertEqual(ka(2, 2, 2), 6)
self.assertEqual(ka(0, 0, 1), 3)
self.assertEqual(ka(9, 9, 1), 3)
self.assertEqual(ka(0, 0, 2), 6)
self.assertEqual(ka(9, 9, 2), 6)
def test_keyarg_db(self):
@cache.cache(keyarg=2, maxage=10)
def ka(a, b, c):
return a+b+c
self.assertEqual(ka(1, 1, 1), 3)
self.assertEqual(ka(2, 2, 2), 6)
self.assertEqual(ka(0, 0, 1), 3)
self.assertEqual(ka(9, 9, 1), 3)
self.assertEqual(ka(0, 0, 2), 6)
self.assertEqual(ka(9, 9, 2), 6)
def test_expires_mem(self):
@cache.memcache(maxage=1)
def ex(a, b, c):
return a+b+c
self.assertEqual(ex(1, 1, 1), 3)
self.assertEqual(ex(2, 2, 2), 3)
self.assertEqual(ex(3, 3, 3), 3)
time.sleep(2)
self.assertEqual(ex(3, 3, 3), 9)
self.assertEqual(ex(2, 2, 2), 9)
self.assertEqual(ex(1, 1, 1), 9)
def test_expires_db(self):
@cache.cache(maxage=1)
def ex(a, b, c):
return a+b+c
self.assertEqual(ex(1, 1, 1), 3)
self.assertEqual(ex(2, 2, 2), 3)
self.assertEqual(ex(3, 3, 3), 3)
time.sleep(2)
self.assertEqual(ex(3, 3, 3), 9)
self.assertEqual(ex(2, 2, 2), 9)
self.assertEqual(ex(1, 1, 1), 9)
def test_update_mem_simple(self):
@cache.memcache(keyarg=0)
def up(a, b, c):
return a+b+c
self.assertEqual(up(1, 1, 1), 3)
up.update(1, 0)
up.update(2, 9)
self.assertEqual(up(1, 0, 0), 0)
self.assertEqual(up(2, 0, 0), 9)
def test_update_mem(self):
@cache.memcache(keyarg=0, maxage=10)
def up(a, b, c):
return a+b+c
self.assertEqual(up(1, 1, 1), 3)
up.update(1, 0)
up.update(2, 9)
self.assertEqual(up(1, 0, 0), 0)
self.assertEqual(up(2, 0, 0), 9)
def test_update_db(self):
@cache.cache(keyarg=0, maxage=10)
def up(a, b, c):
return a+b+c
self.assertEqual(up(1, 1, 1), 3)
up.update(1, 0)
up.update(2, 9)
self.assertEqual(up(1, 0, 0), 0)
self.assertEqual(up(2, 0, 0), 9)
def test_invalidate_mem_simple(self):
@cache.memcache(keyarg=0)
def inv(a, b, c):
return a+b+c
self.assertEqual(inv(1, 1, 1), 3)
inv.invalidate(1)
inv.invalidate(2)
self.assertEqual(inv(1, 0, 0), 1)
self.assertEqual(inv(2, 0, 0), 2)
def test_invalidate_mem(self):
@cache.memcache(keyarg=0, maxage=10)
def inv(a, b, c):
return a+b+c
self.assertEqual(inv(1, 1, 1), 3)
inv.invalidate(1)
inv.invalidate(2)
self.assertEqual(inv(1, 0, 0), 1)
self.assertEqual(inv(2, 0, 0), 2)
def test_invalidate_db(self):
@cache.cache(keyarg=0, maxage=10)
def inv(a, b, c):
return a+b+c
self.assertEqual(inv(1, 1, 1), 3)
inv.invalidate(1)
inv.invalidate(2)
self.assertEqual(inv(1, 0, 0), 1)
self.assertEqual(inv(2, 0, 0), 2)
def test_database_read(self):
@cache.cache(keyarg=0, maxage=10)
def db(a, b, c):
return a+b+c
# initialize cache
self.assertEqual(db(1, 1, 1), 3)
db.update(2, 6)
# check and clear the in-memory portion of said cache
self.assertEqual(db.cache[1][0], 3)
self.assertEqual(db.cache[2][0], 6)
db.cache.clear()
self.assertEqual(db.cache, {})
# fetch results from database
self.assertEqual(db(1, 0, 0), 3)
self.assertEqual(db(2, 0, 0), 6)
# check in-memory cache updates
self.assertEqual(db.cache[1][0], 3)
self.assertEqual(db.cache[2][0], 6)
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save