Skip to content
Snippets Groups Projects
Commit 92cb73fb authored by Lukas Kettenbach's avatar Lukas Kettenbach
Browse files

Base structure for DBN, forward sampling

parent 1d9dd23a
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from primo.core import BayesNet
from primo.core import DynamicBayesNet
from primo.core import TwoTBN
from primo.reasoning import DiscreteNode
import primo.reasoning.particlebased.ForwardSampling as fs
from primo.reasoning.density import ProbabilityTable
import numpy
#Construct some simple DynmaicBayesianNetwork
B0 = BayesNet()
dbn = DynamicBayesNet()
twoTBN = TwoTBN()
weather0 = DiscreteNode("Weather", ["Rain", "Sun"])
weather = DiscreteNode("Weather", ["Rain", "Sun"])
umbrella0 = DiscreteNode("Umbrella", [True, False])
umbrella = DiscreteNode("Umbrella", [True, False])
B0.add_node(weather0)
B0.add_node(umbrella0)
twoTBN.add_node(weather)
twoTBN.add_node(umbrella)
B0.add_edge(weather0, umbrella0)
twoTBN.add_edge(weather, umbrella)
twoTBN.add_edge(weather, weather, True);
weather0_cpt = numpy.array([.4, .6])
weather0.set_probability_table(weather0_cpt, [weather0])
umbrella0_cpt = numpy.array([[.9, .1],
[.2, .8]])
umbrella0.set_probability_table(umbrella0_cpt, [weather0, umbrella0])
weather_cpt=numpy.array([[.7, .3],
[.5, .5]])
weather.set_probability_table(weather_cpt, [weather, weather])
umbrella_cpt = numpy.array([[.9, .1],
[.2, .8]])
umbrella.set_probability_table(umbrella_cpt, [weather, umbrella])
dbn.set_B0(B0)
dbn.set_TwoTBN(twoTBN)
chain = fs.sample_DBN(dbn, 100000)
pt = ProbabilityTable()
pt.add_variable(weather)
pt.add_variable(umbrella)
pt.to_jpt_by_states(chain)
print "----joint-probability----"
print pt
print "----umbrella----"
print pt.marginalization(umbrella)
#print "----alarm----"
#print pt.division(weather.get_cpd())
\ No newline at end of file
......@@ -54,7 +54,7 @@ class BayesNet(object):
def get_all_nodes(self):
return self.graph.nodes()
def get_nodes(self, node_names):
def get_nodes(self, node_names=[]):
nodes = []
if not node_names:
nodes = self.graph.nodes()
......@@ -63,6 +63,9 @@ class BayesNet(object):
nodes.append(self.get_node(node_name))
return nodes
def get_nodes_in_topological_sort(self):
return nx.topological_sort(self.graph)
def get_parents(self, node):
if node.name not in self.node_lookup.keys():
raise Exception("Node " + node.name + "does not exists")
......
# -*- coding: utf-8 -*-
from primo.core import BayesNet
from primo.core import TwoTBN
class DynamicBayesNet(BayesNet):
''' This is the implementation of a dynamic Bayesian network (also called
temporal Bayesian network).
Definition: DBN is a pair (B0, TwoTBN), where B0 is a BN over X(0),
representing the initial distribution over states, and TwoTBN is a
2-TBN for the process.
See Koller, Friedman - "Probabilistic Graphical Models" (p. 204)
Properties: Markov property, stationary, directed, discrete,
acyclic (within a slice)
'''
__B0 = BayesNet()
__twoTBN = TwoTBN()
def __init__(self):
super(DynamicBayesNet, self).__init__()
def add_edge(self, node_from, node_to, arc=False):
'''Add an directed edge to the graph.
Keyword arguments:
node_from -- from node
node_to -- to node
arc -- is this edge a temporal conditional dependency (default: False)
'''
super(DynamicBayesNet, self).add_edge(node_from, node_to)
# Adding an edge that already exists updates the edge data.
self.graph[node_from][node_to]['arc'] = arc
def is_valid(self):
'''Check if graph structure is valid.
Returns true if graph is directed and acyclic, false otherwiese'''
for node in self.graph.nodes():
if self.has_loop(node):
return False
return True
def has_loop(self, node, origin=None):
'''Check if any path from node leads back to node (except temporal
conditional dependencies).
Keyword arguments:
node -- the start node
origin -- for internal recursive loop (default: None)
Returns true on succes, false otherwise.'''
if not origin:
origin = node
print("has_loop DBN")
for successor in self.graph.successors(node):
if self.graph[node][successor]['arc']:
print("### Found arc.")
return False
if successor == origin:
self.graph.n
return True
else:
return self.has_loop(successor, origin)
def set_B0(self, B0):
''' Set the Bayesian network representing the initial distribution.'''
if isinstance(B0, BayesNet):
if not B0.is_valid():
raise Exception("BayesNet is not valid.")
self.__B0 = B0
else:
raise Exception("Can only set 'BayesNet' and its subclasses as " +
"B0 of a DBN.")
def set_TwoTBN(self, twoTBN):
''' Set the 2-time-slice Bayesian network.'''
if isinstance(twoTBN, TwoTBN):
if not twoTBN.is_valid():
raise Exception("BayesNet is not valid.")
self.__twoTBN = twoTBN
else:
raise Exception("Can only set 'TwoTBN' and its subclasses as " +
"twoTBN of a DBN.")
def get_B0(self):
''' Get the Bayesian network representing the initial distribution.'''
return self.__B0;
def get_TwoTBN(self):
''' Get the 2-time-slice Bayesian network.'''
return self.__twoTBN;
\ No newline at end of file
# -*- coding: utf-8 -*-
from primo.core import BayesNet
class TwoTBN(BayesNet):
''' This is the implementation of a 2-time-slice Bayesian network (2-TBN).
'''
def __init__(self):
BayesNet.__init__(self)
def add_edge(self, node_from, node_to, inter=False):
'''Add an directed edge to the graph.
Keyword arguments:
node_from -- from node
node_to -- to node
inter -- is this edge a temporal (inter-slice) conditional dependency
(default: False)
'''
super(TwoTBN, self).add_edge(node_from, node_to)
# Adding an edge that already exists updates the edge data.
self.graph[node_from][node_to]['inter'] = inter
def is_valid(self):
'''Check if graph structure is valid.
Returns true if graph is directed and acyclic, false otherwiese'''
for node in self.graph.nodes():
if self.has_loop(node):
return False
return True
def has_loop(self, node, origin=None):
'''Check if any path from node leads back to node (except temporal
conditional dependencies).
Keyword arguments:
node -- the start node
origin -- for internal recursive loop (default: None)
Returns true on succes, false otherwise.'''
if not origin:
origin = node
for successor in self.graph.successors(node):
if self.graph[node][successor]['inter']:
return False
if successor == origin:
return True
else:
return self.has_loop(successor, origin)
\ No newline at end of file
from Node import Node
from BayesNet import BayesNet
from DynamicBayesNet import DynamicBayesNet
from TwoTBN import TwoTBN
from DynamicBayesNet import DynamicBayesNet
\ No newline at end of file
# -*- coding: utf-8 -*-
def weighted_random(self, weights):
import random
counter = random.random() * sum(weights)
for i, w in enumerate(weights):
counter -= w
if counter <= 0:
return i
\ No newline at end of file
# -*- coding: utf-8 -*-
import primo.reasoning.GibbsTransitionModel as gtm
def particle_filtering(e, N, dbn):
S = dbn.sample_from_prior_distribution(N)
W = [(1. / N)] * N
print S
for i in xrange(0, N):
print S[i]
S[i] = gtm.transition(dbn, S[i])
print " --------------------------------------------------------------- "
print S
# for i in xrange(1, self.iterations):
# self.particles = gtm.transition(dbn, state)
\ No newline at end of file
......@@ -2,4 +2,4 @@ from RandomNode import RandomNode
from DiscreteNode import DiscreteNode
from GaussNode import GaussNode
from MCMC import MarkovChainSampler
from MCMC import GibbsTransitionModel
from MCMC import GibbsTransitionModel
\ No newline at end of file
# -*- coding: utf-8 -*-
from primo.core import BayesNet
from primo.core import DynamicBayesNet
from primo.core import TwoTBN
import random
def weighted_random(weights):
counter = random.random() * sum(weights)
for i, w in enumerate(weights):
counter -= w
if counter <= 0:
return i
def sample(network, state={}):
if not isinstance(network, BayesNet):
raise Exception("The given network is not an instance of BayesNet.")
if not isinstance(network, TwoTBN):
nodes = network.get_nodes_in_topological_sort()
else:
nodes = network.get_nodes()
for node in nodes:
# reduce this node's cpd
parents = network.get_parents(node)
if parents:
evidence = [(parent, state[parent]) for parent in parents]
reduced_cpd = node.get_cpd_reduced(evidence)
else:
reduced_cpd = node.get_cpd()
new_state = weighted_random(reduced_cpd.get_table())
state[node] = node.get_value_range()[new_state]
return state
def sample_DBN(network, N):
if not isinstance(network, DynamicBayesNet):
raise Exception("The given network is not an instance of DynamicBayesNet.")
# Sample from initial distribution
init_state = sample(network.get_B0(), {})
# Copy nodes from 2TBN in state but keep values
twoTBN = network.get_TwoTBN()
state = {}
print("### Initial state ###")
for node in init_state:
print(str(node.name) + ": " + str(init_state[node]))
state[twoTBN.get_node(node.name)] = init_state[node]
# Sample N Particles
for n in xrange(N):
yield state
state = sample(twoTBN, state)
\ No newline at end of file
# -*- coding: utf-8 -*-
import ForwardSampling
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment