Skip to content
Snippets Groups Projects
Commit f3ad7cf3 authored by Luise Odenthal's avatar Luise Odenthal
Browse files

added database _init, its now the old database.py file

parent b02419dc
No related branches found
No related tags found
No related merge requests found
from .database import DataBaseLoad, DataBaseSave
"""
Database are generated by the rendering module, and contains all \
images and there corresponding position-orientations.
* position_orientation: containing all position and orientation of where \
images were rendered. The position-orientation is described by \
['x','y','z','alpha_0','alpha_1','alpha_2']
* image: containing all images ever rendered. Each channel of each image \
are normalised, so to use the full coding range.
* normalisation: the normalisation constantes
How to load a database
----------------------
.. code-block:: python
from database import DataBaseLoad
mydb_filename = 'database.db'
mydb = DataBaseLoad(mydb_filename)
How to load all position-orientation
------------------------------------
The database contains all position-orientation \
at which an image as been rendered. In certain \
situation, it may be usefull to know all \
position-orientation in the database. More technically \
speaking, loading the full table of position-orientaiton.
.. code-block:: python
posorients = mydb.get_posorients()
posorients.head()
How to load an image
--------------------
The database contains images which can be processed differently \
depending on the navigation strategy beeing used.
Images are at given position-orientations. To load an image \
the position-orientation can be given. The DataBaseLoader will \
look if this position-orientation has been rendered. If it is \
the case, the image will be returned.
.. code-block:: python
posorient = pd.Series(index=['x', 'y', 'z',
'alpha_0', 'alpha_1', 'alpha_2'])
posorient.x = -0.6
posorient.y = -7.2
posorient.z = 2.35
posorient.alpha_0 = np.pi / 2
posorient.alpha_1 = 0
posorient.alpha_2 = 0
image = mydb.read_image(posorient=posorient)
.. plot:: example/database/load_image_posorient.py
However, looking in the database if an image has already been \
rendered at a given position-orientation can cost time. To speed up \
certain calculation, image can instead be access by row number. \
Indeed each position-orientation can be identified by a unique row \
number. This number is consistant through the entire database. Thus, \
an image can be loaded by providing the row number.
.. code-block:: python
rowid = 1000
image = mydb.read_image(rowid=rowid)
.. plot:: example/database/load_image_rowid.py
.. todo: channels as part of database
"""
import os
import numpy as np
import pandas as pd
import sqlite3
import io
import warnings
def adapt_array(arr):
"""
http://stackoverflow.com/a/31312102/190597 (SoulNibbler)
"""
out = io.BytesIO()
np.save(out, arr)
out.seek(0)
return sqlite3.Binary(out.read())
def convert_array(text):
out = io.BytesIO(text)
out.seek(0)
return np.load(out)
# Converts np.array to TEXT when inserting
sqlite3.register_adapter(np.ndarray, adapt_array)
# Converts TEXT to np.array when selecting
sqlite3.register_converter("array", convert_array)
class DataBase():
"""DataBase is the parent class of DataBaseLoad and DataBaseSave.
It creates three sql table on initialisation.
"""
__float_tolerance = 1e-14
def __init__(self, filename, channels=['R', 'G', 'B', 'D']):
"""Initialisation of the database """
assert isinstance(filename, str), 'filename should be a string'
assert isinstance(channels, list), 'nb_channel should be an integer'
self.filename = filename
self.channels = channels
self.normalisation_columns = list()
for chan_n in self.channels:
self.normalisation_columns.append(str(chan_n) + '_max')
self.normalisation_columns.append(str(chan_n) + '_min')
self.normalisation_columns.append(str(chan_n) + '_range')
self.tablecolumns = dict()
self.tablecolumns['position_orientation'] = dict()
self.tablecolumns['position_orientation']['x'] = 'real'
self.tablecolumns['position_orientation']['y'] = 'real'
self.tablecolumns['position_orientation']['z'] = 'real'
self.tablecolumns['position_orientation']['alpha_0'] = 'real'
self.tablecolumns['position_orientation']['alpha_1'] = 'real'
self.tablecolumns['position_orientation']['alpha_2'] = 'real'
self.tablecolumns['image'] = dict()
self.tablecolumns['image']['data'] = 'array'
self.tablecolumns['normalisation'] = dict()
for col in self.normalisation_columns:
self.tablecolumns['normalisation'][col] = 'real'
if os.path.exists(filename):
# Check database
self.db = sqlite3.connect(
filename, detect_types=sqlite3.PARSE_DECLTYPES)
self.db_cursor = self.db.cursor()
for tablename, _ in self.tablecolumns.items():
print(tablename)
assert self.table_exist(tablename),\
'{} does not contain a table named {}'.format(
filename, tablename)
elif self.create():
# Create database
self.db = sqlite3.connect(
filename, detect_types=sqlite3.PARSE_DECLTYPES)
self.db_cursor = self.db.cursor()
for key, val in self.tablecolumns.items():
columns = "(id integer primary key autoincrement"
for colname, coltype in val.items():
columns += ' , ' + colname + ' ' + coltype
columns += ')'
print(key, columns)
self.db_cursor.execute(
"create table {} {}".format(key, columns))
self.db.commit()
else:
raise NameError('Database {} does not exist'.format(filename))
azimuth = np.linspace(-180, 180, 360)
elevation = np.linspace(-90, 90, 180)
[ma, me] = np.meshgrid(azimuth, elevation)
self.viewing_directions = np.zeros((ma.shape[0], ma.shape[1], 2))
self.viewing_directions[..., 0] = me
self.viewing_directions[..., 1] = ma
def table_exist(self, tablename):
assert isinstance(tablename, str), 'tablename should be a string'
self.db_cursor.execute(
"""
SELECT count(*)
FROM sqlite_master
WHERE type='table' and name=?;
""", (tablename,))
return bool(self.db_cursor.fetchone())
def check_data_validity(self, rowid):
self.db_cursor.execute(
"""
SELECT count(*)
FROM position_orientation
WHERE rowid=?;""", (rowid,))
valid = bool(self.db_cursor.fetchone()[0])
self.db_cursor.execute(
"""
SELECT count(*)
FROM normalisation
WHERE rowid=?;""", (rowid,))
valid = valid and bool(self.db_cursor.fetchone()[0])
self.db_cursor.execute(
"""
SELECT count(*)
FROM image
WHERE rowid=?;""", (rowid,))
valid = valid and bool(self.db_cursor.fetchone()[0])
return valid
def get_posid(self, posorient):
assert isinstance(posorient, pd.Series),\
'posorient should be a pandas Series'
where = """x>=? and x<=?"""
where += """and y>=? and y<=?"""
where += """and z>=? and z<=?"""
where += """and alpha_0>=? and alpha_0<=?"""
where += """and alpha_1>=? and alpha_1<=?"""
where += """and alpha_2>=? and alpha_2<=?"""
params = (
posorient['x'] - self.__float_tolerance,
posorient['x'] + self.__float_tolerance,
posorient['y'] - self.__float_tolerance,
posorient['y'] + self.__float_tolerance,
posorient['z'] - self.__float_tolerance,
posorient['z'] + self.__float_tolerance,
posorient['alpha_0'] - self.__float_tolerance,
posorient['alpha_0'] + self.__float_tolerance,
posorient['alpha_1'] - self.__float_tolerance,
posorient['alpha_1'] + self.__float_tolerance,
posorient['alpha_2'] - self.__float_tolerance,
posorient['alpha_2'] + self.__float_tolerance)
self.db_cursor.execute(
"""
SELECT count(*)
FROM position_orientation
WHERE {};""".format(where), params)
exist = self.db_cursor.fetchone()[0] # [0] because of tupple
if bool(exist):
self.db_cursor.execute(
"""
SELECT rowid
FROM position_orientation
WHERE {};
""".format(where), params)
return self.db_cursor.fetchone()[0]
elif self.create():
self.db_cursor.execute(
"""
INSERT
INTO position_orientation(x,y,z,alpha_0,alpha_1,alpha_2)
VALUES (?,?,?,?,?,?)
""", (
posorient['x'],
posorient['y'],
posorient['z'],
posorient['alpha_0'],
posorient['alpha_1'],
posorient['alpha_2']))
rowid = self.db_cursor.lastrowid
self.db.commit()
return rowid
else:
print(posorient)
raise ValueError('posorient not found')
def create(self):
return False
class DataBaseLoad(DataBase):
"""A database generated by the rendering module is based on sqlite3.
"""
def __init__(self, filename, channels=['R', 'G', 'B', 'D']):
"""Initialise the DataBaseLoader"""
DataBase.__init__(self, filename, channels=channels)
def create(self):
"""use to decide weather to alter the database or not
return False because we do not want
to write on database (Load class)"""
return False
def get_posorients(self):
"""Return the position orientations of all points in the \
database
"""
posorient = pd.read_sql_query(
"select * from position_orientation;", self.db)
posorient.set_index('id', inplace=True)
return posorient
def read_image(self, posorient=None, rowid=None):
"""Read an image at a given position-orientation or given id of row in the \
database.
:param posorient: a pandas Series with index \
['x','y','z','alpha_0','alpha_1','alpha_2']
:param rowid: an integer
:returns: an image
:rtype: numpy.ndarray
"""
assert (posorient is None) or (rowid is None),\
'posorient and rowid can not be both None'
if posorient is not None:
rowid = self.get_posid(posorient)
# Read images
tablename = 'image'
self.db_cursor.execute(
"""
SELECT data
FROM {}
WHERE (rowid=?)
""".format(tablename), (rowid,))
image = self.db_cursor.fetchone()[0]
# Read cmaxminrange
tablename = 'normalisation'
cmaxminrange = pd.read_sql_query(
"""
SELECT *
FROM {}
WHERE (rowid={})
""".format(tablename, rowid), self.db)
assert cmaxminrange.shape[0] == 1,\
'Error while reading normalisation factors'
cmaxminrange = cmaxminrange.iloc[0, :]
return self.denormalise_image(image, cmaxminrange)
def denormalise_image(self, image, cmaxminrange):
assert len(image.shape) == 3,\
'image should be 3D array'
assert image.shape[2] == len(self.channels),\
'image does not have the required number of channels {}'.format(
len(self.channels))
assert isinstance(cmaxminrange, pd.Series),\
'cmaxminrange should be a pandas Series'
denormed_im = np.zeros(image.shape, dtype=np.float)
maxval_nim = np.iinfo(image.dtype).max
#
for chan_i, chan_n in enumerate(self.channels):
cimage = image[:, :, chan_i].astype(float)
cmax = cmaxminrange.loc[str(chan_n) + '_max']
cmin = cmaxminrange.loc[str(chan_n) + '_min']
crange = cmaxminrange.loc[str(chan_n) + '_range']
cimage /= maxval_nim
cimage *= crange
cimage += cmin
denormed_im[:, :, chan_i] = cimage
assert np.max(cimage) == cmax,\
'denormalisation failed {}!={}'.format(np.max(cimage), cmax)
return denormed_im
class DataBaseSave(DataBase):
def __init__(self, filename, channels=['R', 'G', 'B', 'D'],
arr_dtype=np.uint8):
"""
"""
DataBase.__init__(self, filename, channels=channels)
self.arr_dtype = arr_dtype
def create(self):
"""use to decide weather to alter the database or not
return True because we will need
to write on database (Save class)"""
return True
def write_image(self, posorient, image):
normed_im, cmaxminrange = self.normalise_image(image, self.arr_dtype)
rowid = self.get_posid(posorient)
# Write image
tablename = 'image'
params = dict()
params['rowid'] = rowid
params['data'] = normed_im
self.insert_replace(tablename, params)
#
tablename = 'normalisation'
params = dict()
params['rowid'] = rowid
for chan_n in self.normalisation_columns:
params[chan_n] = cmaxminrange.loc[chan_n]
self.insert_replace(tablename, params)
def insert_replace(self, tablename, params):
assert isinstance(tablename, str),\
'table are named by string'
assert isinstance(params, dict),\
'params should be dictionary columns:val'
params_list = list()
columns_str = ''
for key, val in params.items():
columns_str += key + ','
params_list.append(val)
columns_str = columns_str[:-1] # remove last comma
if len(params_list) == 0:
warnings.warn('nothing to be done in {}'.format(tablename))
return
questionsmarks = '?'
for _ in range(1, len(params_list)):
questionsmarks += ',?'
self.db_cursor.execute(
"""
INSERT OR REPLACE
INTO {} ({})
VALUES ({})
""".format(tablename,
columns_str,
questionsmarks),
tuple(params_list)
)
self.db.commit()
def normalise_image(self, image, dtype=np.uint8):
normed_im = np.zeros(image.shape, dtype=dtype)
maxval_nim = np.iinfo(normed_im.dtype).max
#
columns = list()
for chan_n in self.channels:
columns.append(str(chan_n) + '_max')
columns.append(str(chan_n) + '_min')
columns.append(str(chan_n) + '_range')
cmaxminrange = pd.Series(index=columns)
for chan_i, chan_n in enumerate(self.channels):
cimage = image[:, :, chan_i].astype(float)
cmax = cimage.max()
cmin = cimage.min()
crange = cmax - cmin
cimage -= cmin
cimage /= crange
cimage *= maxval_nim
cimage = cimage.astype(normed_im.dtype)
normed_im[:, :, chan_i] = cimage
cmaxminrange.loc[str(chan_n) + '_max'] = cmax
cmaxminrange.loc[str(chan_n) + '_min'] = cmin
cmaxminrange.loc[str(chan_n) + '_range'] = crange
return normed_im, cmaxminrange
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment