Menu Search

db_common.py

import sqlite3
import queue
import threading


class Db(object):
    def __init__(self, db, injector):
        self.db = db
        self.injector = injector
        self.tasks = queue.Queue()
        self.position = None
        self.pending_events = []
        self.running = True
        self.thread = threading.Thread(target=self._process)
        self.thread.daemon = True
        self.thread.start()

    def close(self):
        self.tasks.put(lambda conn: self._close())

    def reset(self):
        self.tasks.put(lambda conn: self._reset())

    def load(self, records, event=None):
        self.tasks.put(lambda conn: self._load(conn, records, event))

    def get_id(self, event):
        self.tasks.put(lambda conn: self._get_id(conn, event))

    def insert(self, id, data, event=None):
        self.tasks.put(lambda conn: self._insert(conn, id, data, event))

    def delete(self, id, event=None):
        self.tasks.put(lambda conn: self._delete(conn, id, event))

    def _reset(self, ignored=None):
        self.position = None

    def _close(self, ignored=None):
        self.running = False

    def _get_id(self, conn, event):
        cursor = conn.execute("SELECT * FROM records ORDER BY id DESC")
        row = cursor.fetchone()
        if event:
            if row:
                event.id = row['id']
            else:
                event.id = 0
            self.injector.trigger(event)

    def _load(self, conn, records, event):
        if self.position:
            cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
        else:
            cursor = conn.execute("SELECT * FROM records ORDER BY id")
        while not records.full():
            row = cursor.fetchone()
            if row:
                self.position = row['id']
                records.put(dict(row))
            else:
                break
        if event:
            self.injector.trigger(event)

    def _insert(self, conn, id, data, event):
        if id:
            conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
        else:
            conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
        if event:
            self.pending_events.append(event)

    def _delete(self, conn, id, event):
        conn.execute("DELETE FROM records WHERE id=?", (id,))
        if event:
            self.pending_events.append(event)

    def _process(self):
        conn = sqlite3.connect(self.db)
        conn.row_factory = sqlite3.Row
        with conn:
            while self.running:
                f = self.tasks.get(True)
                try:
                    while True:
                        f(conn)
                        f = self.tasks.get(False)
                except queue.Empty:
                    pass
                conn.commit()
                for event in self.pending_events:
                    self.injector.trigger(event)
                self.pending_events = []
        self.injector.close()

Download this file