Skip to content
Snippets Groups Projects
Commit 94b7e45c authored by Olivier Bertrand's avatar Olivier Bertrand
Browse files

Add source

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1231 additions and 0 deletions
"""
The Place comparator list different methods to
compare a current place to a memorised place or
memorised places.
"""
import numpy as np
from Scene_processing import is_ibpc, is_obpc
def imagediff(current, memory):
"""Compute the root mean square difference between
the current and memorised place code
:param current: current place code
:param memory: memorised place code
:returns: the image difference
:rtype: float
..ref: Zeil, J., 2012. Visual homing: an insect perspective.
Current opinion in neurobiology
"""
assert isinstance(current, np.ndarray),\
'current place code should be a numpy array'
assert isinstance(memory, np.ndarray),\
'memory place code should be a numpy array'
assert np.all(current.shape == memory.shape),\
'memory and current place code should have the same shape'
diff = np.power(current - memory, 2)
if is_ibpc(current):
return np.sqrt(diff.mean(axis=0).mean(axis=1))
elif is_obpc(current):
return np.sqrt(diff.mean(axis=0).mean(axis=0))
else:
raise TypeError('place code is neither an ibpc nor obpc')
def rot_imagediff(current, memory):
"""Compute the rotational image difference between
the current and memorised place code.
:param current: current place code
:param memory: memorised place code
:returns: the rotational image difference
:rtype: (np.ndarray)
..ref: Zeil, J., 2012. Visual homing: an insect perspective.
Current opinion in neurobiology
..note: assume that the image is periodic along the x axis
(the left-right axis)
"""
assert is_ibpc(current),\
'The current and memory place code should be image based'
ridf = np.zeros(current.shape[1])
for azimuth_i in range(0, current.shape[1]):
rot_im = np.roll(current, azimuth_i, axis=1)
rot_im[azimuth_i] = imagediff(rot_im, memory)
return ridf
from database.database import DataBaseLoad
from database.database import DataBaseSave
File added
File added
"""
Database are generated by the rendering module, and contains all \
images and there corresponding position-orientations.
* position_orientation: containing all position and orientation of where \
images were rendered. The position-orientation is described by \
['x','y','z','alpha_0','alpha_1','alpha_2']
* image: containing all images ever rendered. Each channel of each image \
are normalised, so to use the full coding range.
* normalisation: the normalisation constantes
How to load a database
----------------------
.. code-block:: python
from database import DataBaseLoad
mydb_filename = 'database.db'
mydb = DataBaseLoad(mydb_filename)
How to load all position-orientation
------------------------------------
The database contains all position-orientation \
at which an image as been rendered. In certain \
situation, it may be usefull to know all \
position-orientation in the database. More technically \
speaking, loading the full table of position-orientaiton.
.. code-block:: python
posorients = mydb.get_posorients()
posorients.head()
How to load an image
--------------------
The database contains images which can be processed differently \
depending on the navigation strategy beeing used.
Images are at given position-orientations. To load an image \
the position-orientation can be given. The DataBaseLoader will \
look if this position-orientation has been rendered. If it is \
the case, the image will be returned.
.. code-block:: python
posorient = pd.Series(index=['x', 'y', 'z',
'alpha_0', 'alpha_1', 'alpha_2'])
posorient.x = -0.6
posorient.y = -7.2
posorient.z = 2.35
posorient.alpha_0 = np.pi / 2
posorient.alpha_1 = 0
posorient.alpha_2 = 0
image = mydb.read_image(posorient=posorient)
.. plot:: example/database/load_image_posorient.py
However, looking in the database if an image has already been \
rendered at a given position-orientation can cost time. To speed up \
certain calculation, image can instead be access by row number. \
Indeed each position-orientation can be identified by a unique row \
number. This number is consistant through the entire database. Thus, \
an image can be loaded by providing the row number.
.. code-block:: python
rowid = 1000
image = mydb.read_image(rowid=rowid)
.. plot:: example/database/load_image_rowid.py
.. todo: channels as part of database
"""
import os
import numpy as np
import pandas as pd
import sqlite3
import io
import warnings
def adapt_array(arr):
"""
http://stackoverflow.com/a/31312102/190597 (SoulNibbler)
"""
out = io.BytesIO()
np.save(out, arr)
out.seek(0)
return sqlite3.Binary(out.read())
def convert_array(text):
out = io.BytesIO(text)
out.seek(0)
return np.load(out)
# Converts np.array to TEXT when inserting
sqlite3.register_adapter(np.ndarray, adapt_array)
# Converts TEXT to np.array when selecting
sqlite3.register_converter("array", convert_array)
class DataBase():
"""DataBase is the parent class of DataBaseLoad and DataBaseSave.
It creates three sql table on initialisation.
"""
__float_tolerance = 1e-14
def __init__(self, filename, channels=['R', 'G', 'B', 'D']):
"""Initialisation of the database """
assert isinstance(filename, str), 'filename should be a string'
assert isinstance(channels, list), 'nb_channel should be an integer'
self.filename = filename
self.channels = channels
self.normalisation_columns = list()
for chan_n in self.channels:
self.normalisation_columns.append(str(chan_n) + '_max')
self.normalisation_columns.append(str(chan_n) + '_min')
self.normalisation_columns.append(str(chan_n) + '_range')
self.tablecolumns = dict()
self.tablecolumns['position_orientation'] = dict()
self.tablecolumns['position_orientation']['x'] = 'real'
self.tablecolumns['position_orientation']['y'] = 'real'
self.tablecolumns['position_orientation']['z'] = 'real'
self.tablecolumns['position_orientation']['alpha_0'] = 'real'
self.tablecolumns['position_orientation']['alpha_1'] = 'real'
self.tablecolumns['position_orientation']['alpha_2'] = 'real'
self.tablecolumns['image'] = dict()
self.tablecolumns['image']['data'] = 'array'
self.tablecolumns['normalisation'] = dict()
for col in self.normalisation_columns:
self.tablecolumns['normalisation'][col] = 'real'
if os.path.exists(filename):
# Check database
self.db = sqlite3.connect(
filename, detect_types=sqlite3.PARSE_DECLTYPES)
self.db_cursor = self.db.cursor()
for tablename, _ in self.tablecolumns.items():
print(tablename)
assert self.table_exist(tablename),\
'{} does not contain a table named {}'.format(
filename, tablename)
elif self.create():
# Create database
self.db = sqlite3.connect(
filename, detect_types=sqlite3.PARSE_DECLTYPES)
self.db_cursor = self.db.cursor()
for key, val in self.tablecolumns.items():
columns = "(id integer primary key autoincrement"
for colname, coltype in val.items():
columns += ' , ' + colname + ' ' + coltype
columns += ')'
print(key, columns)
self.db_cursor.execute(
"create table {} {}".format(key, columns))
self.db.commit()
else:
raise NameError('Database {} does not exist'.format(filename))
azimuth = np.linspace(-180, 180, 360)
elevation = np.linspace(-90, 90, 180)
[ma, me] = np.meshgrid(azimuth, elevation)
self.viewing_directions = np.zeros((ma.shape[0], ma.shape[1], 2))
self.viewing_directions[..., 0] = me
self.viewing_directions[..., 1] = ma
def table_exist(self, tablename):
assert isinstance(tablename, str), 'tablename should be a string'
self.db_cursor.execute(
"""
SELECT count(*)
FROM sqlite_master
WHERE type='table' and name=?;
""", (tablename,))
return bool(self.db_cursor.fetchone())
def check_data_validity(self, rowid):
self.db_cursor.execute(
"""
SELECT count(*)
FROM position_orientation
WHERE rowid=?;""", (rowid,))
valid = bool(self.db_cursor.fetchone()[0])
self.db_cursor.execute(
"""
SELECT count(*)
FROM normalisation
WHERE rowid=?;""", (rowid,))
valid = valid and bool(self.db_cursor.fetchone()[0])
self.db_cursor.execute(
"""
SELECT count(*)
FROM image
WHERE rowid=?;""", (rowid,))
valid = valid and bool(self.db_cursor.fetchone()[0])
return valid
def get_posid(self, posorient):
assert isinstance(posorient, pd.Series),\
'posorient should be a pandas Series'
where = """x>=? and x<=?"""
where += """and y>=? and y<=?"""
where += """and z>=? and z<=?"""
where += """and alpha_0>=? and alpha_0<=?"""
where += """and alpha_1>=? and alpha_1<=?"""
where += """and alpha_2>=? and alpha_2<=?"""
params = (
posorient['x'] - self.__float_tolerance,
posorient['x'] + self.__float_tolerance,
posorient['y'] - self.__float_tolerance,
posorient['y'] + self.__float_tolerance,
posorient['z'] - self.__float_tolerance,
posorient['z'] + self.__float_tolerance,
posorient['alpha_0'] - self.__float_tolerance,
posorient['alpha_0'] + self.__float_tolerance,
posorient['alpha_1'] - self.__float_tolerance,
posorient['alpha_1'] + self.__float_tolerance,
posorient['alpha_2'] - self.__float_tolerance,
posorient['alpha_2'] + self.__float_tolerance)
self.db_cursor.execute(
"""
SELECT count(*)
FROM position_orientation
WHERE {};""".format(where), params)
exist = self.db_cursor.fetchone()[0] # [0] because of tupple
if bool(exist):
self.db_cursor.execute(
"""
SELECT rowid
FROM position_orientation
WHERE {};
""".format(where), params)
return self.db_cursor.fetchone()[0]
elif self.create():
self.db_cursor.execute(
"""
INSERT
INTO position_orientation(x,y,z,alpha_0,alpha_1,alpha_2)
VALUES (?,?,?,?,?,?)
""", (
posorient['x'],
posorient['y'],
posorient['z'],
posorient['alpha_0'],
posorient['alpha_1'],
posorient['alpha_2']))
rowid = self.db_cursor.lastrowid
self.db.commit()
return rowid
else:
print(posorient)
raise ValueError('posorient not found')
def create(self):
return False
class DataBaseLoad(DataBase):
"""A database generated by the rendering module is based on sqlite3.
"""
def __init__(self, filename, channels=['R', 'G', 'B', 'D']):
"""Initialise the DataBaseLoader"""
DataBase.__init__(self, filename, channels=channels)
def create(self):
"""use to decide weather to alter the database or not
return False because we do not want
to write on database (Load class)"""
return False
def get_posorients(self):
"""Return the position orientations of all points in the \
database
"""
posorient = pd.read_sql_query(
"select * from position_orientation;", self.db)
posorient.set_index('id', inplace=True)
return posorient
def read_image(self, posorient=None, rowid=None):
"""Read an image at a given position-orientation or given id of row in the \
database.
:param posorient: a pandas Series with index \
['x','y','z','alpha_0','alpha_1','alpha_2']
:param rowid: an integer
:returns: an image
:rtype: numpy.ndarray
"""
assert (posorient is None) or (rowid is None),\
'posorient and rowid can not be both None'
if posorient is not None:
rowid = self.get_posid(posorient)
# Read images
tablename = 'image'
self.db_cursor.execute(
"""
SELECT data
FROM {}
WHERE (rowid=?)
""".format(tablename), (rowid,))
image = self.db_cursor.fetchone()[0]
# Read cmaxminrange
tablename = 'normalisation'
cmaxminrange = pd.read_sql_query(
"""
SELECT *
FROM {}
WHERE (rowid={})
""".format(tablename, rowid), self.db)
assert cmaxminrange.shape[0] == 1,\
'Error while reading normalisation factors'
cmaxminrange = cmaxminrange.iloc[0, :]
return self.denormalise_image(image, cmaxminrange)
def denormalise_image(self, image, cmaxminrange):
assert len(image.shape) == 3,\
'image should be 3D array'
assert image.shape[2] == len(self.channels),\
'image does not have the required number of channels {}'.format(
len(self.channels))
assert isinstance(cmaxminrange, pd.Series),\
'cmaxminrange should be a pandas Series'
denormed_im = np.zeros(image.shape, dtype=np.float)
maxval_nim = np.iinfo(image.dtype).max
#
for chan_i, chan_n in enumerate(self.channels):
cimage = image[:, :, chan_i].astype(float)
cmax = cmaxminrange.loc[str(chan_n) + '_max']
cmin = cmaxminrange.loc[str(chan_n) + '_min']
crange = cmaxminrange.loc[str(chan_n) + '_range']
cimage /= maxval_nim
cimage *= crange
cimage += cmin
denormed_im[:, :, chan_i] = cimage
assert np.max(cimage) == cmax,\
'denormalisation failed {}!={}'.format(np.max(cimage), cmax)
return denormed_im
class DataBaseSave(DataBase):
def __init__(self, filename, channels=['R', 'G', 'B', 'D'],
arr_dtype=np.uint8):
"""
"""
DataBase.__init__(self, filename, channels=channels)
self.arr_dtype = arr_dtype
def create(self):
"""use to decide weather to alter the database or not
return True because we will need
to write on database (Save class)"""
return True
def write_image(self, posorient, image):
normed_im, cmaxminrange = self.normalise_image(image, self.arr_dtype)
rowid = self.get_posid(posorient)
# Write image
tablename = 'image'
params = dict()
params['rowid'] = rowid
params['data'] = normed_im
self.insert_replace(tablename, params)
#
tablename = 'normalisation'
params = dict()
params['rowid'] = rowid
for chan_n in self.normalisation_columns:
params[chan_n] = cmaxminrange.loc[chan_n]
self.insert_replace(tablename, params)
def insert_replace(self, tablename, params):
assert isinstance(tablename, str),\
'table are named by string'
assert isinstance(params, dict),\
'params should be dictionary columns:val'
params_list = list()
columns_str = ''
for key, val in params.items():
columns_str += key + ','
params_list.append(val)
columns_str = columns_str[:-1] # remove last comma
if len(params_list) == 0:
warnings.warn('nothing to be done in {}'.format(tablename))
return
questionsmarks = '?'
for _ in range(1, len(params_list)):
questionsmarks += ',?'
self.db_cursor.execute(
"""
INSERT OR REPLACE
INTO {} ({})
VALUES ({})
""".format(tablename,
columns_str,
questionsmarks),
tuple(params_list)
)
self.db.commit()
def normalise_image(self, image, dtype=np.uint8):
normed_im = np.zeros(image.shape, dtype=dtype)
maxval_nim = np.iinfo(normed_im.dtype).max
#
columns = list()
for chan_n in self.channels:
columns.append(str(chan_n) + '_max')
columns.append(str(chan_n) + '_min')
columns.append(str(chan_n) + '_range')
cmaxminrange = pd.Series(index=columns)
for chan_i, chan_n in enumerate(self.channels):
cimage = image[:, :, chan_i].astype(float)
cmax = cimage.max()
cmin = cimage.min()
crange = cmax - cmin
cimage -= cmin
cimage /= crange
cimage *= maxval_nim
cimage = cimage.astype(normed_im.dtype)
normed_im[:, :, chan_i] = cimage
cmaxminrange.loc[str(chan_n) + '_max'] = cmax
cmaxminrange.loc[str(chan_n) + '_min'] = cmin
cmaxminrange.loc[str(chan_n) + '_range'] = crange
return normed_im, cmaxminrange
"""
The scene processing part of the toolbox defines methodes
to transform an image into a place code in the sense
of Basten and Mallot (2010).
The scene is either
* a 4d numpy array, used when the image is a equirectangular \
projection, i.e. a panoramic image.
* a 3d numpy array, used when the viewing direction can not \
be mapped/projected on a regular image (e.g. insect eye).
We thus define the following for a scene:
image based scene (IBS)
A classical image. Each pixel is viewed in a direction
(elevation,azimuth) in a regular manner.
In that case the scene is a 4d numpy array
[elevation-index,azimuth-index,channel-index,1].
Omatidium based scene (OBS)
In an ommatidia based scene, the viewing direction
do not need to be regularally spaced.
In that case the scene is a 3d numpy array
[ommatidia-index, channel-index,1].
By extension a place-code is either image based or ommatidium based.
The number of dimension of an ib-place-code is always 4, and of an
ob-place-code always 3.
image based place-code (IBPC)
A place code derived from IBS. Each pixel is viewed in a direction
(elevation,azimuth) in a regular manner.
In that case the scene is a 4d numpy array
[elevation-index,azimuth-index,channel-index,component-index].
Omatidium based place-code (OBPC)
A place code derived from OBS, the viewing direction
do not need to be regularally spaced.
In that case the scene is a 3d numpy array
[ommatidia-index, channel-index,component-index].
Abusing the terminology of a place-code, a scene can be a place-code.
Therefore ibs and obs have 4 and 3 dimension, respectively.
.. todo:
* implement optic flow vector
"""
import numpy as np
import pandas as pd
from scipy.ndimage import maximum_filter, minimum_filter
import processing.constants as prc
import processing.tools as prt
def scene(database, posorient=None, rowid=None):
""" Return a scene at a position orientation or given rowid
in a given database.
:param database: a DataBaseLoad class \
:param posorient: a pandas Series with index: \
['x','y','z','alpha_0,'alpha_1,'alpha_2'] (default None, i.e. not used)
:param rowid: a row identification integer for directly reading \
in the database (default None, i.e. not used).
:returns: a scene [elevation, azimuth, channel, 1] or \
[ommatidia,channel,1].
:rtype: np.ndarray
.. literalinclude:: example/processing/scene.py
:lines: 13-14
.. plot:: example/processing/scene.py
"""
if posorient is not None:
assert isinstance(posorient, pd.Series),\
'posorient should be a pandas Series'
scene = database.read_image(posorient=posorient,
rowid=rowid)
scene = scene[..., np.newaxis]
return scene
def skyline(scene):
"""Return the average along the elevation of a scene
:param scene: the scenery at a given location (a 4d numpy array)
:returns: the skyline [1,azimuth,channel,1]
:rtype: np.ndarray
.. literalinclude:: example/processing/skyline.py
:lines: 12-14
.. plot:: example/processing/skyline.py
"""
assert prt.is_ibpc(scene),\
'scene should be image based to compute a skyline'
skyline = scene.mean(axis=prc.__ibpc_indeces__['elevation'])
return skyline[np.newaxis, :]
def michelson_contrast(scene, size=3):
"""Return the michelson constrast
.. math::
\\frac{I_\\text{max}-I_\\text{min}}{I_\\text{max}+I_\\text{min}}
with :math:`I_\\text{max}` and :math:`I_\\text{min}` representing the \
highest and lowest luminance in an image region around each pixel.
:param scene: an image based scene
:param size: the size of the region to calculate the maximum \
and minimum of the local image intensity
:returns: the michelson-contrast
:rtype: np.ndarray
.. literalinclude:: example/processing/michelson_contrast.py
:lines: 12-14
.. plot:: example/processing/michelson_contrast.py
"""
assert prt.is_ibpc(scene), \
'scene should be image based to compute the michelson constrast'
contrast = np.zeros_like(scene)
for channel in range(scene.shape[prc.__ibpc_indeces__['channel']]):
i_max = maximum_filter(scene[..., channel, 0],
size=size, mode='wrap')
i_min = minimum_filter(scene[..., channel, 0],
size=size, mode='wrap')
contrast[..., channel, 0] = (i_max - i_min) / (i_max + i_min)
return contrast
def contrast_weighted_nearness(scene, contrast_size=3, distance_channel=3):
"""Return the michelson contrast wheighted nearness
:param scene: an image based scene
:param contrast_size: the size of the region to calculate the maximum \
and minimum of the local image intensity in the michelson-contrast.
:param distance_channel: the index of the distance-channel.
.. literalinclude:: example/processing/contrast_weighted_nearness.py
:lines: 12-14
.. plot:: example/processing/contrast_weighted_nearness.py
"""
assert prt.is_ibpc(scene), \
'scene should be image based to compute the contrast weighted nearness'
contrast = michelson_contrast(scene, size=contrast_size)
distance = scene[..., distance_channel, 0]
distance = distance[..., np.newaxis, np.newaxis]
distance = np.tile(distance, (1, 1, scene.shape[-2], 1))
return contrast / distance
def pcv(place_code, viewing_directions):
"""Place code vectors
:param place_code: the place code at a given location (e.g. an ibs scene)
:param viewing_directions: viewing direction of each pixel
:returns: the place code vectors in cartesian coordinates
:rtype: (np.ndarray)
.. literalinclude:: example/processing/pcv.py
:lines: 12-14
.. plot:: example/processing/pcv.py
"""
if prt.is_ibpc(place_code):
component_dim = prc.__ibpc_indeces__['component']
channel_dim = prc.__ibpc_indeces__['channel']
elif prt.is_obpc(place_code):
component_dim = prc.__obpc_indeces__['component']
channel_dim = prc.__obpc_indeces__['channel']
else:
raise TypeError('place code should be either an ibpc or obpc')
assert isinstance(viewing_directions, np.ndarray), \
'viewing_directions should be a numpy array'
assert place_code.shape[component_dim] == 1, \
'the last dimension ({}) of the place-code should be 1'.format(
place_code.shape[component_dim])
elevation = viewing_directions[..., prc.__spherical_indeces__['elevation']]
azimuth = viewing_directions[..., prc.__spherical_indeces__['azimuth']]
unscaled_lv = prt.spherical_to_cartesian(elevation, azimuth, radius=1)
scaled_lv = np.zeros_like(place_code)
# (3,) -> (1,1,3) or (1,1,1,3) see numpy.tile
scaled_lv = np.tile(scaled_lv, (unscaled_lv.shape[-1],))
for channel_index in range(0, scaled_lv.shape[channel_dim]):
radius = np.tile(place_code[..., channel_index, 0]
[..., np.newaxis], (scaled_lv.shape[-1],))
scaled_lv[..., channel_index, :] = unscaled_lv * radius
return scaled_lv
def apcv(place_code, viewing_directions):
"""Calculate the average scene vector
:param place_code: the place code at a given location (e.g. an ibs scene)
:param viewing_directions: viewing direction of each pixel
:returns: the average place-code vector
:rtype: (np.ndarray)
.. literalinclude:: example/processing/apcv.py
:lines: 12-14
.. plot:: example/processing/apcv.py
"""
scaled_lv = pcv(place_code, viewing_directions)
if prt.is_ibpc(place_code):
return (scaled_lv.sum(axis=0).sum(axis=0))[np.newaxis, np.newaxis, ...]
elif prt.is_obpc(place_code):
return (scaled_lv.sum(axis=0))[np.newaxis, ...]
else:
raise TypeError('place code is neither an ibpc nor obpc')
def optic_flow(place_code, viewing_directions, velocity):
"""NOT IMPLEMENTED"""
raise NameError('Not Implemented')
File added
File added
File added
File added
File added
File added
"""
Define some constant
"""
__spherical_indeces__ = {'elevation': 0,
'azimuth': 1,
'radius': 2}
__cartesian_indeces__ = {'x': 0,
'y': 1,
'z': 2}
__ibpc_indeces__ = {'elevation': 0,
'azimuth': 1,
'channel': 2,
'component': 3}
__obpc_indeces__ = {'ommatidia': 0,
'channel': 1,
'component': 2}
__eye_indeces__ = {'elevation': 0,
'azimuth': 1,
'component': 2}
__ommadia_indeces__ = {'ommatidia': 0,
'component': 1}
import processing.constants as prc
import numpy as np
def is_ibpc(place_code):
"""Test if a place code is image based
:param place_code: a place-code
:returns: True if image based place-code
:rtype: bool
"""
toreturn = isinstance(place_code, np.ndarray)
toreturn = toreturn and (len(place_code.shape) ==
len(prc.__ibpc_indeces__))
return toreturn
def is_obpc(place_code):
"""Test if a place code is ommatidia based
:param place_code: a place-code
:returns: True if ommatidia based place-code
:rtype: bool
"""
toreturn = isinstance(place_code, np.ndarray)
toreturn = toreturn and (len(place_code.shape) ==
len(prc.__obpc_indeces__))
return toreturn
def ibs_to_obs(scene, eye_map):
"""Convert an image based scene to an ommatidium based scene.
:param scene: The scene to be converted
:param eye_map: The eye_map to use
:returns: (obs_scene,ommatidia_map)
:rtype: (np.ndarray,np.ndarray)
"""
assert is_ibpc(scene),\
'scene should be an ibs scene'
assert isinstance(eye_map, np.ndarray), 'eye_map should be a numpy array'
assert len(eye_map.shape) == len(prc.__eye_indeces__),\
'eye_map should have {} dimensions to be an ibs scene'.format(
len(prc.__eye_indeces__))
for index_name in ['elevation', 'azimuth']:
index = prc.__ibpc_indeces__[index_name]
assert eye_map.shape[index] == scene.shape[index],\
'eye_map and scene should have the same number of {}'.format(
index_name)
obs_size = (scene.shape[prc.__ibpc_indeces__['elevation']] *
scene.shape[prc.__ibpc_indeces__['azimuth']],
scene.shape[prc.__ibpc_indeces__['channel']],
scene.shape[prc.__ibpc_indeces__['component']])
obs_scene = scene.reshape(obs_size)
omm_size = (eye_map.shape[prc.__ibpc_indeces__['elevation']] *
eye_map.shape[prc.__ibpc_indeces__['azimuth']],
eye_map.shape[prc.__ibpc_indeces__['component']])
ommatidia_map = eye_map.reshape(omm_size)
return (obs_scene, ommatidia_map)
def cartesian_to_spherical(x, y, z):
radius = np.sqrt(x**2 + y**2 + z**2)
elevation = np.arctan2(z, np.sqrt(x**2 + y**2))
azimuth = np.arctan2(y, x)
spherical = np.zeros_like(x)
spherical = np.tile(spherical[..., np.newaxis], (3,))
spherical[..., prc.__spherical_indeces__['elevation']] = elevation
spherical[..., prc.__spherical_indeces__['azimuth']] = azimuth
spherical[..., prc.__spherical_indeces__['radius']] = radius
return spherical
def spherical_to_cartesian(elevation, azimuth, radius=1):
cartesian = np.zeros_like(elevation)
cartesian = np.tile(cartesian[..., np.newaxis], (3,))
cartesian[..., prc.__cartesian_indeces__['x']] = np.cos(
elevation) * np.cos(azimuth)
cartesian[..., prc.__cartesian_indeces__['y']] = np.cos(
elevation) * np.sin(azimuth)
cartesian[..., prc.__cartesian_indeces__['z']] = np.sin(elevation)
cartesian = radius * cartesian
return cartesian
"""
How to test the script:
-----------------------
>>> blender test.blend --background --python BeeSampling.py
:Author: Olivier Bertrand (olivier.bertrand@uni-bielefeld.de)
:Parent module: Scene_rendering
"""
import bpy
import os
import numpy as np
import pandas as pd
import warnings
from Cyberbee import Cyberbee
from DataBase import DataBaseSave
class BeeSampling(Cyberbee):
"""
BeeSampling is a class deriving from Cyberbee.
The BeeSampling can be used to generate a database of
images taken on a rectangular regular grid. For the database,
the BeeSampling rely on DataBase
It worth noting that the generated database can take a large
harddrive space, as each image is composed of 4 channels of 180x360 pixels.
"""
def __init__(self):
"""Initialise the BeeSampling"""
Cyberbee.__init__(self)
self.blenddirname = os.path.dirname(bpy.data.filepath)
self.blendfilename = os.path.basename(bpy.data.filepath)
self.__grid_posorients = None
self.__grid_size = None
self.world_dim = np.inf
def create_sampling_grid(self, x, y, z, alpha1, alpha2, alpha3):
"""Create a cubic grid from all the sampling points
:param x: the positions along the x-axis
:param y: the positions along the y-axis
:param z: the positions along the z-axis
:param alpha1: the first euler angles
:param alpha2: the 2nd euler angles
:param alpha3: the 3rd euler angles
"""
[mx, my, mz, ma1, ma2, ma3] = np.meshgrid(x,
y,
z,
alpha1,
alpha2,
alpha3)
self.grid_size = mx.shape
mx = mx.flatten()
my = my.flatten()
mz = mz.flatten()
ma1 = ma1.flatten()
ma2 = ma2.flatten()
ma3 = ma3.flatten()
self.__grid_posorients = pd.DataFrame(index=range(mx.shape[0]),
columns=['x', 'y', 'z',
'alpha_0',
'alpha_1',
'alpha_2'])
self.__grid_posorients.loc[:, 'x'] = mx
self.__grid_posorients.loc[:, 'y'] = my
self.__grid_posorients.loc[:, 'z'] = mz
self.__grid_posorients.loc[:, 'alpha_0'] = ma1
self.__grid_posorients.loc[:, 'alpha_1'] = ma2
self.__grid_posorients.loc[:, 'alpha_2'] = ma3
self.__grid_posorients.index.name = 'grid_index'
self.__grid_posorients.name = 'grid_position_orientation'
def get_grid_posorients(self):
"""return a copy of the posorientation matrix
:returns: position-orientations of the grid
:rtype: pandas array
"""
return self.__grid_posorients.copy()
def set_gridindeces2nan(self, indeces):
"""Set certain grid point to nan, so they will be ignore in the rendering
:param indeces: a list of indeces to be set to nan
"""
self.__grid_posorients.loc[indeces, :] = np.nan
def render(self, database_filename):
database_folder = os.path.dirname(database_filename)
if not os.path.exists(database_folder):
os.makedirs(database_folder)
dataloger = DataBaseSave(database_filename,
channels=['R', 'G', 'B', 'D'],
arr_dtype=np.uint8)
for frame_i, posorient in self.__grid_posorients.iterrows():
if np.any(np.isnan(posorient)):
# warnings.warn('frame_i: {} posorient nans'.format(frame_i))
continue
rowid = dataloger.get_posid(posorient)
if dataloger.check_data_validity(rowid):
warnings.warn(
'frame_i: {} data is valid rowid {}'.format(frame_i,
rowid))
continue
# The position-orientatios is valid (none nan)
# and the cmaxminrange has not already been assigned
# so the image need to be rendered
self.update(posorient)
distance = self.get_distance()
distance[distance > self.world_dim] = self.world_dim
image = self.get_image()
image[:, :, 3] = distance
dataloger.write_image(posorient, image)
print('rendering completed')
if __name__ == "__main__":
import tempfile
bee_samp = BeeSampling()
# Create mesh
world_dim = 15.0
x = np.linspace(-7.5, 7.5, 5)
y = np.linspace(-7.5, 7.5, 5)
z = np.arange(1, 8, 2)
alpha_1 = np.array([0]) + np.pi / 2
alpha_2 = np.array([0])
alpha_3 = np.array([0])
bee_samp.create_sampling_grid(
x, y, z, alpha1=alpha_1, alpha2=alpha_2, alpha3=alpha_3)
bee_samp.world_dim = world_dim
grid_pos = bee_samp.get_grid_posorients()
condition = (grid_pos.x**2 + grid_pos.y**2) < ((bee_samp.world_dim / 2)**2)
bee_samp.set_gridindeces2nan(condition[condition == 0].index)
bee_samp.set_cycle_samples(samples=5)
with tempfile.TemporaryDirectory() as folder:
bee_samp.render(folder + '/database.db')
"""
How to test the script:
-----------------------
>>> blender test.blend --background --python Cyberbee.py
:Author: Olivier Bertrand (olivier.bertrand@uni-bielefeld.de)
:Parent module: Scene_rendering
"""
import bpy
import numpy as np
import tempfile
import os
class Cyberbee():
"""
Cyberbee is a small class binding python with blender.
With Cyberbee one can move the bee to a position, and render what
the bee see at this position.
The Bee eye is a panoramic camera with equirectangular projection
The light rays attaining the eyes are filtered with a gaussian.
"""
def __init__(self):
"""Initialise the Cyberbee"""
# Rendering engine needs to be Cycles to support panoramic
# equirectangular camera
bpy.context.scene.render.engine = 'CYCLES'
bpy.context.scene.render.layers["RenderLayer"].use_pass_z = True
# Look for object camera
camera_found = False
for obj in bpy.context.scene.objects:
if obj.type == 'CAMERA':
self.camera = obj
camera_found = True
break
assert camera_found, 'The blender file does not contain a camera'
# The bee eye is panoramic, and with equirectangular projection
self.camera.data.type = 'PANO'
self.camera.data.cycles.panorama_type = 'EQUIRECTANGULAR'
# Filtering props
bpy.context.scene.cycles.filter_type = 'GAUSSIAN'
# Call all set function with default values
self.set_camera_rotation_mode()
self.set_camera_fov()
self.set_camera_gaussian_width()
self.set_camera_resolution()
self.set_cycle_samples()
# switch on nodes
# Create render link to OutputFile with Image and Z buffer
bpy.context.scene.use_nodes = True
scene = bpy.context.scene
nodes = scene.node_tree.nodes
render_layers = nodes['Render Layers']
output_file = nodes.new("CompositorNodeOutputFile")
output_file.format.file_format = "OPEN_EXR"
output_file.file_slots.remove(output_file.inputs[0])
tmp_fileoutput = dict()
tmp_fileoutput['Image'] = 'Image'
tmp_fileoutput['Depth'] = 'Depth'
tmp_fileoutput['Folder'] = tempfile.TemporaryDirectory().name
tmp_fileoutput['ext'] = '.exr'
output_file.file_slots.new(tmp_fileoutput['Image'])
output_file.file_slots.new(tmp_fileoutput['Depth'])
output_file.base_path = tmp_fileoutput['Folder']
scene.node_tree.links.new(
render_layers.outputs['Image'],
output_file.inputs['Image']
)
scene.node_tree.links.new(
render_layers.outputs['Z'],
output_file.inputs['Depth']
)
self.tmp_fileoutput = tmp_fileoutput
def set_camera_rotation_mode(self, mode='XYZ'):
"""change the camera rotation mode
:param mode: the mode of rotation for the camera see blender doc
(default: 'XYZ').
:type mode: a string
.. seealso: blender bpy.data.scenes["Scene"].camera.rotation_mode
"""
bpy.data.scenes["Scene"].camera.rotation_mode = mode
def get_camera_rotation_mode(self):
"""get the current camera rotation mode
:returns: the mode of rotation used by the camera
:rtype: string
"""
return bpy.data.scenes["Scene"].camera.rotation_mode
def set_cycle_samples(self, samples=30):
"""change the samples for rendering with cycle
:param samples: the number of samples to use when rendering images
:type samples: int
"""
bpy.context.scene.cycles.samples = samples
def get_cycle_samples(self):
"""get the samples for rendering with cycle
:returns: the number of samples used for the rendering
:rtype: int
"""
return bpy.context.scene.cycles.samples
def set_camera_fov(self, latmin=-90, latmax=+90,
longmin=-180, longmax=+180):
"""change the field of view of the panoramic camera
:param latmin: minimum latitude (in deg)
:type latmin: float
:param latmax: maximum latitude (in deg)
:type latmax: float
:param longmin: minimum longitude (in deg)
:type longmin: float
:param longmin: maximum longitude (in deg)
:type longmin: float
"""
assert self.camera.data.type == 'PANO', 'Camera is not panoramic'
assert self.camera.data.cycles.panorama_type == 'EQUIRECTANGULAR',\
'Camera is not equirectangular'
self.camera.data.cycles.latitude_min = np.deg2rad(latmin)
self.camera.data.cycles.latitude_max = np.deg2rad(latmax)
self.camera.data.cycles.longitude_min = np.deg2rad(longmin)
self.camera.data.cycles.longitude_max = np.deg2rad(longmax)
def get_camera_fov(self):
"""get fov of camera
:returns: the field of view of the camera as min/max,longitude/latitude
in degrees
:rtype: dict
"""
assert self.camera.data.type == 'PANO', 'Camera is not panoramic'
assert self.camera.cycles.panorama_type == 'EQUIRECTANGULAR',\
'Camera is not equirectangular'
fov = dict()
fov['latitude_min'] = np.rad2ged(self.camera.data.cycles.latitude_min)
fov['latitude_max'] = np.rad2ged(self.camera.data.cycles.latitude_max)
fov['longitude_min'] = np.rad2ged(
self.camera.data.cycles.longitude_min)
fov['longitude_max'] = np.rad2ged(
self.camera.data.cycles.longitude_max)
return fov
def set_camera_gaussian_width(self, gauss_w=1.5):
"""change width of the gaussian spatial filter
:param gauss_w: width of the gaussian filter
:type gauss_w: float
"""
bpy.context.scene.cycles.filter_width = gauss_w
def get_camera_gaussian_width(self, gauss_w=1.5):
"""get width of the gaussian spatial filter
:returns: the width of the gaussian filter
:rtype: float
"""
return bpy.context.scene.cycles.filter_width
def set_camera_resolution(self, resolution_x=360, resolution_y=180):
"""change the camera resolution (nb of pixels)
:param resolution_x: number of pixels along the x-axis of the camera
:type resolution_x: int
:param resolution_y: number of pixels along the y-axis of the camera
:type resolution_y: int
"""
bpy.context.scene.render.resolution_x = resolution_x
bpy.context.scene.render.resolution_y = resolution_y
bpy.context.scene.render.resolution_percentage = 100
def get_camera_resolution(self):
"""return camera resolution (x,y)
:returns: the resolution of the camera along (x-axis,y-axis)
:rtype: (int,int)
"""
resolution_x = bpy.context.scene.render.resolution_x
resolution_y = bpy.context.scene.render.resolution_y
return resolution_x, resolution_y
def update(self, posorient):
"""assign the position and the orientation of the camera.
:param posorient: is a 1x6 vector continaing:
x,y,z, angle_1, angle_2, angle_3,
here the angles are euler rotation around the axis
specified by scene.camera.rotation_mode
:type posorient: 1x6 double array
"""
assert len(posorient) == 6, 'posorient should be a 1x6 double array'
self.camera.location = posorient[:3]
self.camera.rotation_euler = posorient[3:]
# Render
bpy.ops.render.render()
def get_image(self):
"""return the last rendered image as a numpy array
:returns: the image (height,width,4)
:rtype: a double numpy array
.. note: A temporary file will be written on the harddrive,
due to API blender limitation
"""
# save image as a temporary file, and then loaded
# sadly the rendered image pixels can not directly be access
filename = os.path.join(self.tmp_fileoutput['Folder'],
self.tmp_fileoutput['Image'] + '0001')
im_width, im_height = self.get_camera_resolution()
im = bpy.data.images.load(filename)
pixels = np.array(im.pixels)
# im=PIL.Image.open(filename)
# pixels=np.asarray(im)
pixels = pixels.reshape([im_height, im_width, 4])
return pixels
def get_distance(self):
"""return the last rendered distance map as a numpy array
:returns: the distance map (height, width)
:rtype: a double numpy array
.. note: A temporary file will be written on the harddrive,
due to API blender limitation
"""
# save image as a temporary file, and then loaded
# sadly the rendered image pixels can not directly be access
filename = os.path.join(self.tmp_fileoutput['Folder'],
self.tmp_fileoutput['Depth'] + '0001')
im_width, im_height = self.get_camera_resolution()
im = bpy.data.images.load(filename)
distance = np.array(im.pixels)
# im=PIL.Image.open(filename)
# distance=np.asarray(im)
distance = distance.reshape([im_height, im_width, 4])
distance = distance[:, :, 0]
return distance
if __name__ == "__main__":
# Initiate the Cyberbee
mybee = Cyberbee()
frames_per_revolution = 5.0
step_size = 2 * np.pi / frames_per_revolution
posorients = np.zeros((frames_per_revolution, 6))
posorients[:, 0] = np.sin(np.arange(frames_per_revolution) * step_size) * 5
posorients[:, 1] = np.cos(np.arange(frames_per_revolution) * step_size) * 5
for frame_i, posorient in enumerate(posorients):
mybee.update(posorient)
# Test image
image = mybee.get_image()
# Test distance
distance = mybee.get_distance()
print('Cyberbee OK')
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment