From 4b04db8abb7b81430ce5456c4247c07a3f16c232 Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de> Date: Wed, 19 Sep 2012 15:51:47 +0200 Subject: [PATCH] Now the ipaaca-messageiu wire schema is correctly used. Now for Python, too. --- .../cpp/examples/src/example-component.cc | 12 ++- ipaacalib/cpp/src/ipaaca.cc | 25 ++++- ipaacalib/python/src/ipaaca.py | 101 +++++++++++++++++- 3 files changed, 132 insertions(+), 6 deletions(-) diff --git a/ipaacalib/cpp/examples/src/example-component.cc b/ipaacalib/cpp/examples/src/example-component.cc index 23bb04e..908ada0 100644 --- a/ipaacalib/cpp/examples/src/example-component.cc +++ b/ipaacalib/cpp/examples/src/example-component.cc @@ -39,6 +39,7 @@ class LegacyComponent { void handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local); /// example publishing function to produce a 'grounded' IU void publish_reply_iu(const std::string& text, const std::string& received_iu_uid); + void publish_reply_message(const std::string& text, const std::string& received_iu_uid); void publish_hello_world(); }; @@ -78,6 +79,9 @@ void LegacyComponent::handle_iu_event(IUInterface::ptr iu, IUEventType event_typ std::string description = iu->payload()["description"]; std::cout << "[ Current description: " << description << "]" << std::endl; + + /// let's also react by emitting an IU ourselves (function below) + publish_reply_iu("important-result", iu->uid()); } else if (event_type == IU_ADDED) { std::cout << "[Received new IU!]" << std::endl; @@ -90,7 +94,7 @@ void LegacyComponent::handle_iu_event(IUInterface::ptr iu, IUEventType event_typ std::cout << "[ Current description: " << description << "]" << std::endl; /// let's also react by emitting an IU ourselves (function below) - publish_reply_iu("important-result", iu->uid()); + publish_reply_message("important-result", iu->uid()); } else if (event_type == IU_UPDATED) { std::cout << "[Received IU payload update for IU " << iu->uid() << "]" << std::endl; @@ -128,6 +132,12 @@ void LegacyComponent::publish_reply_iu(const std::string& text, const std::strin /// add to output buffer ( = "publish") _out_buf->add(iu); } +void LegacyComponent::publish_reply_message(const std::string& text, const std::string& received_iu_uid) { + IU::ptr iu = Message::create( "myResultCategory" ); + iu->payload()["description"] = "SomeResult"; + iu->add_link("GRIN", received_iu_uid); + _out_buf->add(iu); +} void LegacyComponent::publish_hello_world() { IU::ptr iu = Message::create( "myCategoryInterest"); //helloWorld" ); diff --git a/ipaacalib/cpp/src/ipaaca.cc b/ipaacalib/cpp/src/ipaaca.cc index 8667be1..9da3bf1 100644 --- a/ipaacalib/cpp/src/ipaaca.cc +++ b/ipaacalib/cpp/src/ipaaca.cc @@ -1202,6 +1202,7 @@ IUConverter::IUConverter() std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) { + //std::cout << "serialize" << std::endl; // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "ipaaca::IU" // NOTE: a dynamic_pointer_cast cannot be used from void* @@ -1242,11 +1243,22 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) } } pbo->SerializeToString(&wire); - return getWireSchema(); + switch(obj->access_mode()) { + case IU_ACCESS_PUSH: + //std::cout << "Requesting to send as ipaaca-iu" << std::endl; + return "ipaaca-iu"; + case IU_ACCESS_MESSAGE: + //std::cout << "Requesting to send as ipaaca-messageiu" << std::endl; + return "ipaaca-messageiu"; + default: + //std::cout << "Requesting to send as default" << std::endl; + return getWireSchema(); + } } AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) { + //std::cout << "deserialize" << std::endl; assert(wireSchema == getWireSchema()); // "ipaaca-iu" boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); pbo->ParseFromString(wire); @@ -1315,6 +1327,7 @@ AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std: } //}}} + // MessageConverter//{{{ MessageConverter::MessageConverter() @@ -1364,7 +1377,15 @@ std::string MessageConverter::serialize(const AnnotatedData& data, std::string& } } pbo->SerializeToString(&wire); - return getWireSchema(); + switch(obj->access_mode()) { + case IU_ACCESS_PUSH: + return "ipaaca-iu"; + case IU_ACCESS_MESSAGE: + return "ipaaca-messageiu"; + default: + //std::cout << "Requesting to send as default" << std::endl; + return getWireSchema(); + } } diff --git a/ipaacalib/python/src/ipaaca.py b/ipaacalib/python/src/ipaaca.py index e097cb7..f6f80ad 100755 --- a/ipaacalib/python/src/ipaaca.py +++ b/ipaacalib/python/src/ipaaca.py @@ -195,6 +195,7 @@ class IUInterface(object): #{{{ def __str__(self): s = unicode(self.__class__)+"{ " + s += "category="+("<None>" if self._category is None else self._category)+" " s += "uid="+self._uid+" " s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") " s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " " @@ -720,7 +721,11 @@ class IUConverter(rsb.converter.Converter):#{{{ pbo.payload_type = iu._payload_type pbo.owner_name = iu._owner_name pbo.committed = iu._committed - pbo.access_mode = iu._access_mode #ipaaca_pb2.IU.PUSH # TODO + am=ipaaca_pb2.IU.PUSH #default + if iu._access_mode == IUAccessMode.MESSAGE: + am=ipaaca_pb2.IU.MESSAGE + # TODO add other types later + pbo.access_mode = am pbo.read_only = iu._read_only for k,v in iu._payload.items(): entry = pbo.payload.add() @@ -729,11 +734,99 @@ class IUConverter(rsb.converter.Converter):#{{{ linkset = pbo.links.add() linkset.type = type_ linkset.targets.extend(iu._links[type_]) - return bytearray(pbo.SerializeToString()), self.wireSchema + ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema + return bytearray(pbo.SerializeToString()), ws def deserialize(self, byte_stream, ws): type = self.getDataType() - if type == IU: + #print('IUConverter.deserialize got a '+str(type)+' over wireSchema '+ws) + if type == IU or type == Message: + pbo = ipaaca_pb2.IU() + pbo.ParseFromString( str(byte_stream) ) + if pbo.access_mode == ipaaca_pb2.IU.PUSH: + _payload = {} + for entry in pbo.payload: + k, v = unpack_typed_payload_item(entry) + _payload[k] = v + _links = collections.defaultdict(set) + for linkset in pbo.links: + for target_uid in linkset.targets: + _links[linkset.type].add(target_uid) + remote_push_iu = RemotePushIU( + uid=pbo.uid, + revision=pbo.revision, + read_only = pbo.read_only, + owner_name = pbo.owner_name, + category = pbo.category, + payload_type = pbo.payload_type, + committed = pbo.committed, + payload=_payload, + links=_links + ) + return remote_push_iu + elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE: + _payload = {} + for entry in pbo.payload: + k, v = unpack_typed_payload_item(entry) + _payload[k] = v + _links = collections.defaultdict(set) + for linkset in pbo.links: + for target_uid in linkset.targets: + _links[linkset.type].add(target_uid) + remote_message = RemoteMessage( + uid=pbo.uid, + revision=pbo.revision, + read_only = pbo.read_only, + owner_name = pbo.owner_name, + category = pbo.category, + payload_type = pbo.payload_type, + committed = pbo.committed, + payload=_payload, + links=_links + ) + return remote_message + else: + raise Exception("We can only handle IUs with access mode 'PUSH' or 'MESSAGE' for now!") + else: + raise ValueError("Inacceptable dataType %s" % type) +#}}} + +class MessageConverter(rsb.converter.Converter):#{{{ + ''' + Converter class for Full IU representations + wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU + ''' + def __init__(self, wireSchema="ipaaca-messageiu", dataType=Message): + super(IUConverter, self).__init__(bytearray, dataType, wireSchema) + + def serialize(self, iu): + pbo = ipaaca_pb2.IU() + pbo.uid = iu._uid + pbo.revision = iu._revision + pbo.category = iu._category + pbo.payload_type = iu._payload_type + pbo.owner_name = iu._owner_name + pbo.committed = iu._committed + am=ipaaca_pb2.IU.PUSH #default + if iu._access_mode == IUAccessMode.MESSAGE: + am=ipaaca_pb2.IU.MESSAGE + # TODO add other types later + pbo.access_mode = am + pbo.read_only = iu._read_only + for k,v in iu._payload.items(): + entry = pbo.payload.add() + pack_typed_payload_item(entry, k, v) + for type_ in iu._links.keys(): + linkset = pbo.links.add() + linkset.type = type_ + linkset.targets.extend(iu._links[type_]) + ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema + return bytearray(pbo.SerializeToString()), ws + + def deserialize(self, byte_stream, ws): + type = self.getDataType() + #print('MessageConverter.deserialize got a '+str(type)+' over wireSchema '+ws) + if type == IU or type == Message: pbo = ipaaca_pb2.IU() pbo.ParseFromString( str(byte_stream) ) if pbo.access_mode == ipaaca_pb2.IU.PUSH: @@ -1360,6 +1453,8 @@ def initialize_ipaaca_rsb():#{{{ IntConverter(wireSchema="int32", dataType=int)) rsb.converter.registerGlobalConverter( IUConverter(wireSchema="ipaaca-iu", dataType=IU)) + rsb.converter.registerGlobalConverter( + IUConverter(wireSchema="ipaaca-messageiu", dataType=Message)) rsb.converter.registerGlobalConverter( IULinkUpdateConverter( wireSchema="ipaaca-iu-link-update", -- GitLab