Commit d93f06b5 authored by Dennis Quirin's avatar Dennis Quirin
Browse files

Add Docs

parent 3022ebc1
class ChannelAlreadyExists(Exception):
"""Raised when the channel already exists.
"""
pass
class NoSuchModule(Exception):
"""Raised when the module name is not found.
"""
pass
class NoSuchChannel(Exception):
"""Raised when the channel name is not found.
"""
pass
class UnknownModeError(Exception):
......@@ -19,4 +25,6 @@ class SettingsError(Exception):
pass
class ModuleIsAlreadySink(Exception):
"""Raised when the sink already exists.
"""
pass
......@@ -4,15 +4,27 @@ import sys
from ml4proflow import exceptions
class Dataflow:
"""Dataflow class:
This class creates and handles the dataflow for every module in the data processing graph.
The functions can throw several exceptions (see ml4proflow.exceptions).
"""
def __init__(self):
"""Initialization of the dataflow class
"""
self.channels = {}
def create_channel(self, channel: str):
"""Function:
Creating the channel for a module
"""
if channel in self.channels.keys():
raise exceptions.ChannelAlreadyExists()
self.channels[channel] = []
def register_sink(self, channel: str, receiver: 'SinkModule'):
"""Function:
Register a Sink module
"""
if channel not in self.channels.keys():
raise exceptions.NoSuchChannel()
if receiver in self.channels[channel]:
......@@ -20,6 +32,9 @@ class Dataflow:
self.channels[channel].append(receiver)
def unregister_sink(self, channel: str, receiver: 'SinkModule'):
"""Function:
Unregister a Sink module
"""
if channel not in self.channels.keys():
raise exceptions.NoSuchChannel()
if not self.channels[channel].contains(receiver):
......@@ -27,25 +42,22 @@ class Dataflow:
del self.channels[channel][receiver]
def new_data(self, channel: str, sender: 'SourceModule', data: pandas.DataFrame):
"""Function:
Register new data from a source module in the channel
"""
if channel not in self.channels.keys():
raise exceptions.NoSuchChannel()
for module in self.channels[channel]:
module.on_new_data(channel, sender, data)
class BasicModule:
"""Basic Module class
Events you may implement:
- ``on_new_data()`` -> There is new data on our input channel
(Helper-)Methods you may use:
- ``_push_data()`` -> push your results to a channel
"""Basic Module class:
Basic module of the framework. All other module classes inherit from this module.
"""
def __init__(self, dataflow: Dataflow, config: dict=None):
"""Subclasses need to register all there input/output channels!
""" Initialize basic module class
Subclasses need to register all there input/output channels!
"""
self.dataflow = dataflow
if config:
......@@ -70,17 +82,16 @@ class BasicModule:
return type(self).__module__
class SourceModule(BasicModule):
"""Source Module class
"""Source Module class:
Class for modules that produce data only.
Source modules are scheduled at the beginning of the data processing graph.
Events you need to implement:
- ``produce_data()`` -> Produce data to be passed to a channel
(Helper-)Methods you need to use:
- ``_push_data()`` -> Push your results to a channel
"""
def __init__(self, dataflow: Dataflow, config: dict=None, create_channels: bool = True):
"""Initialization of the Source module class
"""
BasicModule.__init__(self, dataflow, config)
self.channels_push = []
if config:
......@@ -90,11 +101,17 @@ class SourceModule(BasicModule):
dataflow.create_channel(c)
def produce_data(self):
"""(Helper-)Method:
Execution at initialization in the data processing graph to produce data
(Helper-)Function you need to use:
- ``_push_data()`` -> Push the data to a channel
"""
pass
def _push_data(self, channel: str, data: pandas.DataFrame) -> None:
"""Helper function:
Push "data" to a channel (inside our current dataflow graph)
Push "data" to a channel (inside the current dataflow graph)
:param channel: Name of the channel
:param data: Data to push
......@@ -112,6 +129,8 @@ class SinkModule(BasicModule):
- ``on_new_data()`` -> There is new data on the input channel
"""
def __init__(self, dataflow: Dataflow, config: dict=None):
"""Initialization of the Sink module class
"""
BasicModule.__init__(self, dataflow, config)
self.sink_channels = []
if config:
......@@ -120,7 +139,7 @@ class SinkModule(BasicModule):
dataflow.register_sink(c, self)
def on_new_data(self, name: str, sender: SourceModule, data: pandas.DataFrame) -> None:
"""Called when there is new data available ready to process.
"""Execute when there is new data available ready to process.
:param name: Name of the channel
:param sender: Name of the module that provides the data
:param data: Provided DataFrame
......@@ -141,6 +160,8 @@ class Module(SourceModule, SinkModule):
- ``_push_data()`` -> Push your results to a channel
"""
def __init__(self, dataflow: Dataflow, config: dict=None):
"""Initialization of the module class with inheritance of sink and source modules
"""
SourceModule.__init__(self,dataflow, config)
SinkModule.__init__(self,dataflow, config)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment