#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''A simple HTTP accessible database for IoT projects.'''
__author__ = 'luphord'
__email__ = 'luphord@protonmail.com'
__version__ = '0.4.0'
import json
import sqlite3
import urllib.parse
from datetime import datetime, date, timedelta
from typing import Dict, Union, Any, Callable, Iterable, List, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from wsgiref.types import StartResponse # noqa: F401
DateLike = Union[str, datetime, date]
def _parse_form(form_s: str) -> Dict[str, Any]:
'''Parse url encoded form and convert numerical types.'''
d = dict() # type: Dict[str, Any]
for k, v in urllib.parse.parse_qsl(form_s):
try:
d[k] = int(v)
except ValueError:
try:
d[k] = float(v)
except ValueError:
if v.lower() == 'true':
d[k] = True
elif v.lower() == 'false':
d[k] = False
else:
d[k] = v
return d
def _parse_dt(s: Optional[DateLike]) -> DateLike:
'''Parse string using custom differential date formats like "2 days".'''
if not s:
return ''
if isinstance(s, datetime):
return s.isoformat(sep=' ') # datetime format suitable for sqlite
s = str(s)
if s.lower() == 'today':
return date.today()
elif s.lower() == 'yesterday':
return date.today() - timedelta(days=1)
elif s.lower().endswith('days') \
or s.lower().endswith('day') \
or s.lower().endswith('d'):
try:
days = int(s.split('d')[0])
except ValueError:
return ''
return datetime.now() - timedelta(days=days)
elif s.lower().endswith('hours') \
or s.lower().endswith('hour') \
or s.lower().endswith('h'):
try:
hours = int(s.split('h')[0])
except ValueError:
return ''
return datetime.now() - timedelta(hours=hours)
elif s.lower().endswith('min') or s.lower().endswith('m'):
try:
minutes = int(s.split('m')[0])
except ValueError:
return ''
return datetime.now() - timedelta(minutes=minutes)
elif s.lower().endswith('sec') or s.lower().endswith('s'):
try:
seconds = int(s.split('s')[0])
except ValueError:
return ''
return datetime.now() - timedelta(seconds=seconds)
return s
def _json_default(obj: Union[datetime, date]) -> str:
if isinstance(obj, datetime) or isinstance(obj, date):
return obj.isoformat()
raise TypeError(type(obj))
def _serialize_json(data: Any, indent: Optional[int] = 2) -> str:
'''Serialize to json supporting dates.'''
return json.dumps(data, indent=indent, default=_json_default)
def _j(obj_to_serialize: Any = None, **kwargs: Any) -> bytes:
'''Serialize `obj_to_serialize` or keyword arguments as dict to json
and encode to bytes.'''
if obj_to_serialize is None:
obj_to_serialize = kwargs
return _serialize_json(obj_to_serialize).encode('utf8')
_HTTP_STATUS_CODES = {
200: '200 OK',
201: '201 Created',
400: '400 Bad Request',
404: '404 Not found',
405: '405 Method Not Allowed',
415: '415 Unsupported Media Type'
}
[docs]class BlanketDB:
'''A simple HTTP accessible database for IoT projects'''
_SOURCE = 'FROM blanketdb WHERE (? OR bucket=?) AND rowid>=? ' + \
'AND timestamp>=? AND (? OR rowid<?) AND (? OR timestamp<?)'
_QUERY = 'SELECT rowid, * ' + _SOURCE + \
' ORDER BY (? * rowid) DESC LIMIT ?;'
_DELETE = 'DELETE ' + _SOURCE + ';'
def __init__(self,
connection_string: str,
now: Callable[[], datetime] = datetime.now) -> None:
'''Initialize `BlanketDB` instance using a `connection_string`
that can be understood by SQLite. `now` should be a function
returning the current datetime (or a suitable test replacement).
'''
self.connection = sqlite3.connect(connection_string,
detect_types=sqlite3.PARSE_DECLTYPES)
with self.connection as conn:
conn.execute('CREATE TABLE IF NOT EXISTS blanketdb ' +
'(bucket text, timestamp timestamp, data text);')
self.now = now
[docs] def store(self, data: Any, bucket: str = 'default') -> Dict[str, Any]:
'''Serialize `data` to json and store it under `bucket`.'''
entry_id = None
bucket = bucket.lower()
timestamp = self.now()
with self.connection as conn:
c = conn.cursor()
c.execute('INSERT INTO blanketdb VALUES (?, ?, ?);',
(bucket, timestamp, _serialize_json(data, indent=None)))
entry_id = c.lastrowid
return dict(id=entry_id, bucket=bucket,
timestamp=timestamp.isoformat(), data=data)
[docs] def store_dict(self,
bucket: str = 'default',
**kwargs: Dict[str, Any]) -> Dict[str, Any]:
'''Serialize key word args to json and store under `bucket`.'''
return self.store(kwargs, bucket)
def __getitem__(self, entry_id: int) -> Optional[Dict[str, Any]]:
'''Get a stored entry by its `entry_id`.
Return None if no entry exists for that ID.
'''
with self.connection as conn:
c = conn.execute('SELECT rowid, * FROM blanketdb WHERE rowid=?;',
(entry_id,))
res = c.fetchone()
if res:
id, bucket, timestamp, data = res
return dict(id=id, bucket=bucket,
timestamp=timestamp, data=json.loads(data))
else:
return None
[docs] def query(self, bucket: str = None,
since_id: Optional[int] = None,
since: Optional[DateLike] = None,
before_id: Optional[int] = None,
before: Optional[DateLike] = None,
limit: int = -1, newest_first: bool = True) \
-> Iterable[Dict[str, Any]]:
'''Query this `BlanketDB` instance using various optional filters.
`since` and `since_id` are inclusive, `before` and `before` are
exclusive regarding the specified value.'''
is_bucket_requested = bool(bucket)
if is_bucket_requested:
bucket = bucket.lower() if bucket else ''
if since_id is None:
since_id = 0
since = _parse_dt(since)
before = _parse_dt(before)
with self.connection as conn:
c = conn.execute(BlanketDB._QUERY,
(not is_bucket_requested, bucket,
since_id, since,
not before_id, before_id, not before, before,
1 if newest_first else -1, limit))
for id, bucket, timestamp, data in c.fetchall():
yield dict(id=id, bucket=bucket,
timestamp=timestamp, data=json.loads(data))
def __iter__(self) -> Iterable[Dict[str, Any]]:
'''Iterate over all entries stored in this `BlanketDB` instance.'''
with self.connection as conn:
c = conn.execute('SELECT rowid, * FROM blanketdb;')
for id, bucket, timestamp, data in c.fetchall():
yield dict(id=id, bucket=bucket,
timestamp=timestamp, data=json.loads(data))
def __delitem__(self, entry_id: int) -> None:
'''Delete an entry by its `entry_id`.'''
with self.connection as conn:
conn.execute('DELETE FROM blanketdb WHERE rowid=?;', (entry_id,))
[docs] def delete(self, bucket: str = None,
since_id: Optional[int] = None,
since: Optional[DateLike] = None,
before_id: Optional[int] = None,
before: Optional[DateLike] = None) \
-> Any:
'''Delete entries from this `BlanketDB` instance
using various filters. `since` and `since_id` are inclusive,
`before` and `before` are exclusive regarding the specified value.
'''
is_bucket_requested = bool(bucket)
if is_bucket_requested:
bucket = bucket.lower() if bucket else ''
if since_id is None:
since_id = 0
since = _parse_dt(since)
before = _parse_dt(before)
with self.connection as conn:
conn.execute(BlanketDB._DELETE, (not is_bucket_requested, bucket,
since_id, since,
not before_id, before_id, not before, before))
return conn.execute('select changes();').fetchone()[0]
def __call__(self,
env: Dict[str, Any],
start_response: 'StartResponse') \
-> Iterable[bytes]:
'''WSGI conform callable method.'''
def start_json_response(status: int,
headers: List[Any] = [('Content-Type',
'application/json')]) \
-> None:
start_response(_HTTP_STATUS_CODES[status], headers)
path = (str(env['PATH_INFO']) or '/').lower()
method = str(env['REQUEST_METHOD']).upper()
qs = _parse_form(str(env['QUERY_STRING']))
try:
show_meta = qs.get('meta', True)
since_id = qs.get('since_id', 0)
since_id = int(since_id) \
if isinstance(since_id, (int, str)) \
else None
before_id = qs.get('before_id', None)
before_id = int(before_id) \
if isinstance(before_id, (int, str)) \
else None
since = qs.get('since', None)
assert since is None or isinstance(since, (str, datetime, date))
since = _parse_dt(since)
before = qs.get('before', None)
assert before is None or isinstance(before, (str, datetime, date))
before = _parse_dt(before)
limit = int(str(qs.get('limit', -1)))
newest_first = bool(qs.get('newest_first', True))
except Exception as e:
start_json_response(400)
yield _j(message='An error occured while' +
' parsing the query parameters: ' + str(e),
parameters=qs)
return
if method == 'GET':
if path.startswith('/_entry/'):
try:
entry_id = int(path[8:])
except ValueError:
start_json_response(400)
yield _j(message='Path does not contain' +
' a valid integer ID',
path=path)
return
entry = self[entry_id]
if entry:
start_json_response(200)
yield _j(entry if show_meta else entry['data'])
else:
start_json_response(404)
yield _j(message='Entry does not exist', id=entry_id)
else:
start_json_response(200)
bucket = path[1:] # type: Optional[str]
if not bucket:
bucket = None # make it a little more explicit
entries = list(self.query(bucket, since_id, since,
before_id, before, limit, newest_first))
last_id = max((entry['id'] for entry in entries), default=None)
entries = [entry if show_meta else entry['data']
for entry in entries]
yield _j(bucket_requested=bucket,
since_id=since_id,
since=since if since else None,
before_id=before_id,
before=before if before else None,
number_of_entries=len(entries),
last_id=last_id,
limit=limit if limit > -1 else None,
newest_first=newest_first,
entries=entries)
elif method == 'POST':
if path == '/_entry' or path.startswith('/_entry/'):
start_json_response(405)
yield _j(message='The HTTP method is not allowed' +
' for this path',
path=path, method=method)
return
try:
request_body_size = int(env.get('CONTENT_LENGTH', 0))
except ValueError:
request_body_size = 0
request_body = env['wsgi.input'].read(request_body_size)
bucket = 'default' if path == '/' else path[1:]
content_type = env.get('CONTENT_TYPE', 'application/json').lower()
if content_type.startswith('application/json'):
if request_body == b'':
data = None
else:
# ToDo: handle parser errors
data = json.loads(request_body.decode('utf8'))
elif content_type.startswith('application/x-www-form-urlencoded'):
data = _parse_form(request_body.decode('utf8'))
else:
start_json_response(415)
yield _j(message='Supported media types are application/json' +
' and application/x-www-form-urlencoded',
media_type=content_type)
return
entry = self.store(data, bucket=bucket)
start_json_response(201)
yield _j(entry)
elif method == 'DELETE':
if path.startswith('/_entry/'):
try:
entry_id = int(path[8:])
except ValueError:
start_json_response(400)
yield _j(message='Path does not contain' +
' a valid integer ID',
path=path)
return
entry = self[entry_id]
if entry:
start_json_response(200)
del self[entry_id]
yield _j(entry)
else:
start_json_response(404)
yield _j(message='Entry does not exist', id=entry_id)
else:
bucket = path[1:]
if not bucket:
bucket = None # make it a little more explicit
n = self.delete(bucket, since_id, since, before_id, before)
start_json_response(200)
yield _j(bucket_requested=bucket,
since_id=since_id,
since=since if since else None,
before_id=before_id,
before=before if before else None,
number_of_entries_deleted=n)
else:
start_json_response(405)
yield _j(message='The HTTP method is not allowed for this path',
path=path, method=method)
[docs]def cli() -> None:
from argparse import ArgumentParser
parser = ArgumentParser(description='Start a BlanketDB instance ' +
'using wsgiref.simple_server.')
parser.add_argument('-i', '--interface', help='Interface to listen on',
default='localhost', type=str)
parser.add_argument('-p', '--port', help='Port to listen on',
default=8080, type=int)
parser.add_argument('-f', '--file', help='Database file to use',
default='db.sqlite', type=str)
args = parser.parse_args()
from wsgiref.simple_server import make_server
msg = 'Starting BlanketDB at http://{interface}:{port} using ' + \
'database file "{file}"'
print(msg.format_map(vars(args)))
httpd = make_server(args.interface, args.port, BlanketDB(args.file))
httpd.serve_forever()
if __name__ == '__main__':
cli()