[gpodder-devel] [PATCH] Thread safe SQLite.

  • From: justin.forest at gmail.com (Justin Forest)
  • Date: Wed, 6 Aug 2008 12:18:41 +0400

Makes the SQLite interaction thread safe, required for threaded
feed updates and other threaded things that are coming.  The changes
are completely internal and do not affect the usage.  Also, some
commit() calls were removed (after selects, where they did nothing).

PS: the first version of this patch had one issue with db.__get__(),
this one works well.
---
 src/gpodder/dbsqlite.py |   62 ++++++++++++++++++++++++++++++----------------
 1 files changed, 40 insertions(+), 22 deletions(-)

diff --git a/src/gpodder/dbsqlite.py b/src/gpodder/dbsqlite.py
index 91e8533..d7fab2a 100644
--- a/src/gpodder/dbsqlite.py
+++ b/src/gpodder/dbsqlite.py
@@ -36,39 +36,62 @@ if not have_sqlite:
     sys.exit()
 
 from gpodder.liblogger import log
-
 from email.Utils import mktime_tz
 from email.Utils import parsedate_tz
 from email.Utils import formatdate
-
-import thread
+from threading import RLock
 import string
 
-class Storage(object):
-    channel_map = {}
-    settings = {}
-    _db = {}
+class LockingCursor(sqlite.Cursor):
+    """
+    This custom cursor implementation provides thread safety.
+    Only one thread at a time can work with cursors, as many
+    as needed.  Using a custom cursor makes this transparent.
+
+    One possible alternative is to use a queue and a single
+    SQLite worker thread, which sounds right but can introduce
+    serious problems, e.g. when a thread puts something in
+    queue and dies, refusing to fetch the results.  Example
+    implementation:
 
+    http://code.activestate.com/recipes/526618/
+    """
+    lock = None
+
+    def __init__(self, *args, **kwargs):
+        self.lock.acquire()
+        sqlite.Cursor.__init__(self, *args, **kwargs)
+
+    def __del__(self):
+        self.lock.release()
+
+class Storage(object):
     (STATE_NORMAL, STATE_DOWNLOADED, STATE_DELETED) = range(3)
 
+    def __init__(self):
+        self.settings = {}
+        self.channel_map = {}
+        self._db = None
+        LockingCursor.lock = RLock()
+
     def setup(self, settings):
         self.settings = settings
         self.__check_schema()
 
     @property
     def db(self):
-        t = thread.get_ident()
-        if t not in self._db:
-            conn = sqlite.connect(self.settings['database'])
-            conn.create_collation("unicode", lambda a, b: cmp(a.lower(), 
b.lower()))
-            self._db[t] = conn
-            log('SQLite connection for thread %d opened.', t)
-        return self._db[t]
+        if self._db is None:
+            self._db = sqlite.connect(self.settings['database'], 
check_same_thread=False)
+            self._db.create_collation("unicode", lambda a, b: cmp(a.lower(), 
b.lower()))
+            log('SQLite connected', sender=self)
+        return self._db
 
     def cursor(self):
-        return self.db.cursor()
+        return self.db.cursor(factory=LockingCursor)
 
     def commit(self):
+        # grab a cursor to lock threads
+        cur = self.cursor()
         self.db.commit()
 
     def __check_schema(self):
@@ -126,7 +149,6 @@ class Storage(object):
         cur.execute("""CREATE INDEX IF NOT EXISTS idx_locked ON episodes 
(locked)""")
 
         cur.close()
-        self.commit()
 
     def get_channel_stat(self, url_or_id, state=None, is_played=None, 
is_locked=None):
         where, params = ((),())
@@ -290,9 +312,6 @@ class Storage(object):
             else:
                 result.append(factory(episode))
 
-        if commit:
-            self.commit()
-
         return result
 
     def load_episodes(self, channel, factory=None, limit=1000, state=None):
@@ -333,12 +352,11 @@ class Storage(object):
             else:
                 log('Episode updated: %s', e.title)
                 cur.execute("UPDATE episodes SET title = ?, length = ?, 
mimetype = ?, description = ?, link = ?, pubDate = ? WHERE id = ?", (e.title, 
e.length, e.mimetype, e.description, e.link, self.__mktime__(e.pubDate), e.id, 
))
-
-            if not bulk:
-                self.commit()
         except Exception, e:
             log('save_episode() failed: %s', e, sender=self)
 
+        self.commit()
+
     def mark_episode(self, url, state=None, is_played=None, is_locked=None, 
toggle=False):
         cur = self.cursor()
         cur.execute("SELECT state, played, locked FROM episodes WHERE url = 
?", (url, ))
-- 
1.5.4.3



Other related posts: