Example algorithm for xml serialization of Python data
Allikas: Lambda
First an overview of the conversion principles and then a) actual Python code for conversion b) X-road oriented SOAP call tool for Python
Principles of data conversion to/from XML
This memo presents a general algorithm for converting native datatypes to XML and, vice versa, XML to native datatypes. The main goals of the algorithm: - language-agnostic: support python, php, javascript, java, c, lisp, ... - convert native python and php data to/from XML without adding any kinds of extra infomation to native data - allow using user's own xml tags and attributes if necessary - fit well into the soap and arbitrary-xml-usage frameworks For example, a suitable use of the algorithm for treating soap messages is to convert the whole soap envelope with all the content directly into the native datastructure, and then let user program handle the native data as it pleases. Short overview ============== XML uses three special tags: Array, Struct and item in their own namespace. Array and Struct have the same semantics as SOAP-ENC Array and Struct. When simple native data (integers, strings, etc) are converted to xml, the type will be indicated in xml as a type attribute with xml schema values. Native arrays,lists and tuples are converted to xml Array. Native structures, dictionaries and objects are converted to xml Struct. xml Array is always converted to native array (not tuple, dict, etc). xml Struct is always converted to native struct or dict or object, whichever is available natively. In case the user wants to use his/her own XML tags and property lists, he has to create a special four-element array which contains the tag name, attribute list and children list. XML tags except Array, Struct, item are converted to such native four-element arrays. Primitive data conversion ========================= 3 <-> <av:item av:type="xsd:integer">3</av:item> "abc" <-> <av:item av:type="xsd:string">abc</av:item> 1.3 <-> <av:item av:type="xsd:float">1.3</av:item> True <-> <av:item av:type="xsd:boolean">true</av:item> False <-> <av:item av:type="xsd:boolean">false</av:item> datetime.date(2006, 2, 13) <-> <av:item type="xsd:date">2006-02-13</av:item> time.struct_time(2006, 2, 13, 11, 22, 11, 2, 347, 0) <-> <av:item type="xsd:dateTime">2006-02-13T11:22:2.347</av:item> Array/tuple conversion ====================== Nested array conversion example: [2,"a",[],["b",3]] <-> <av:Array> <av:item av:type="xsd:integer">2</av:item> <av:item av:type="xsd:string">a</av:item> <av:Array/> <av:Array> <av:item av:type="xsd:string">b</av:item> <av:item av:type="xsd:integer">3</av:item> <av:Array> <av:Array> Tuples, like (2,"a",(),("b",3)), will be converted in exactly same way to XML. Xml will then be converted back to array, not tuple. Structure/dict/object conversion ================================= Analogous nested Python dict (similarly struct/object in other languages) conversion example: {'k1': 2, 'k2' : "a", 'k3' : {}, 'k4' : {'m1' : "b", 'm2' : 3}} <-> <av:Struct> <k1 av:type="xsd:integer">2</k1> <k2 av:type="xsd:string">a</k2> <k3> <av:Struct/> </k3> <k4> <av:Struct> <m1 av:type="xsd:string">b</m1> <m2 av:type="xsd:integer">3</m2> </av:Struct> </k4> </av:Struct> User tag usage =============== To avoid Array, Struct or item default names and/or for adding attributes to tags, the user will have to use a special four-element array or tuple which contains, in that order: 0) 'xmltag' special string 1) tag name string 2) attribute dict with simple data values or a list of two-element pairs 3) value inside the tag (None or null if missing) The value (last element above) will then be surrounded by the tag name given, not the default Array/Struct/item. When converting back from xml, always an array will be used, and the last element will be always present (may be None or null). Missing content example with attributes: ---------------------------------------- ['xmltag','somename',{'someattr' : 3,'otherattr' : 'mm'},None] or equivalently ['xmltag','somename',[['someattr',3],['otherattr','mm']],None] <-> <somename someattr='3' otherattr='mm'/> When converting back from xml, always a dict/struct/object will be used for attribute list. Example with a simple value: ---------------------------- Note that the <item> tag is not put around the value! ['xmltag','somename',None, 'somevalue'] or ('xmltag','somename',None, 'somevalue') <-> <somename>somevalue</somename> Example with a nested array: ---------------------------- The nested array is exactly the same as the first Array example above: in the example below the tag 'somename' will be used instead of the default 'Array': ['xmltag','somename',None,2, ["a",[],['xmltag','othername',None,["b",3]]]] <-> <somename> <av:item av:type="xsd:integer">2</av:item> <av:item av:type="xsd:string">a</av:item> <av:Array/> <othername> <av:item av:type="xsd:string">b</av:item> <av:item av:type="xsd:integer">3</av:item> <othername> </somename> Example with a nested dict/struct/object: ----------------------------------------- The nested dict is exactly same as the first dict example above: compare these! ['xmltag','somename',None,2, {'k1': 2, 'k2' : "a", 'k3' : {}, 'k4' : {'m1' : "b", 'm2' : 3}}] <-> <somename> <k1 av:type="xsd:integer">2</k1> <k2 av:type="xsd:string">a</k2> <k3> <av:Struct/> </k3> <k4> <av:Struct> <m1 av:type="xsd:string">b</m1> <m2 av:type="xsd:integer">3</m2> </av:Struct> </k4> </somename>
Python code for conversion
#!/usr/local/bin/python import sys import libxml2 import datetime import time from types import * # --- these two are correct values and should rather not be changed --- xsdNameSpace='http://www.w3.org/2001/XMLSchema' xsiNameSpace='http://www.w3.org/2001/XMLSchema-instance' soapencNameSpace='http://schemas.xmlsoap.org/soap/encoding/' # -- default values: each can be overriden by a named parameter ---- # - sometimes you may want to pass (override) these arrayTagsDefault=('Array',) structTagsDefault=('Struct',) itemTagsDefault=('item',) #nameSpacesDefault=(None,xsiNameSpace,xsdNameSpace,soapencNameSpace,) #nameSpacesDefault=(None,xsiNameSpace,xsdNameSpace,) nameSpacesDefault=(None,xsiNameSpace,None,) structTypeDefault='Struct' arrayTypeDefault='Array' # - it is less likely that you will want to pass (override) the following xmlWildcardDefault='Xmlall' nsPrefixDefault='nx' xmlTagIndicatorDefault='xmltag' xmlNameIndicatorDefault='xmlname' xmlNsIndicatorDefault='xmlns' xmlAttrIndicatorDefault='xmlattr' useAttrNsDefault=True useTagNsDefault=True useTypesDefault=True useStringTypeDefault=False typeAttrDefault='type' xsiPrefixDefault="xsi" xsdPrefixDefault="xsd" soapencPrefixDefault="SOAP-ENC" #indentStrDefault=" " indentStrDefault="" styleDefault='complex' simpleCharDefault="##" complexCharDefault="$$" whitestripDefault=True globalVarsDefault=None objectMappingDefault=None # ---- do not change this: builtin values ----- builtInTypes=('int','string','date','time','datetime','boolean','array','struct',) # ----- the Tag class is used for 'tag' style of named tags, not 'simple' or 'complex' ------ class Tag(list): def __init__(self, name, data, namespace="", attributes = {}): list.__init__(self,data) self.name = name self.namespace = namespace self.attributes = attributes def __str__(self): res="Tag('"+self.name+"',"+repr(list(self)) if self.namespace: if self.attributes: res=res+",'"+self.namespace+"',"+repr(self.attributes)+")" else: res=res+",'"+self.namespace+"')" else: if self.attributes: res=res+",None,"+repr(self.attributes)+")" else: res=res+")" return res def __repr__(self): return self.__str__() def __add__(self, other): return Tag(self.name, list.__add__(self, other), self.namespace, self.attributes) #def __getitem__(self, key): # if isinstance(key,(int,slice)): # return self.nodes[key] # if isinstance(key,(basestring,tuple,list)): # try: # return list(islice(self.getElements(key),1))[0][1] # except IndexError: # raise KeyError(key) # raise TypeError('Cannot subscript Value by %s' % type(key).__name__) # ===================================================== # # deserialization: xml string to native data # # ===================================================== def xmlDeserialize(instr, arrayTags=arrayTagsDefault,structTags=structTagsDefault,itemTags=itemTagsDefault, nameSpaces=nameSpacesDefault,structType=structTypeDefault,arrayType=arrayTypeDefault, xmlWildcard=xmlWildcardDefault,nsPrefix=nsPrefixDefault, xmlTagIndicator=xmlTagIndicatorDefault,xmlNameIndicator=xmlNameIndicatorDefault, xmlNsIndicator=xmlNsIndicatorDefault,xmlAttrIndicator=xmlAttrIndicatorDefault, useAttrNs=useAttrNsDefault,useTagNs=useTagNsDefault,useTypes=useTypesDefault, useStringType=useStringTypeDefault,typeAttr=typeAttrDefault,xsiPrefix=xsiPrefixDefault, xsdPrefix=xsdPrefixDefault,indentStr=indentStrDefault,style=styleDefault, simpleChar=simpleCharDefault,complexChar=complexCharDefault,whitestrip=whitestripDefault, globalVars=globalVarsDefault,soapencPrefix=soapencPrefixDefault,objectMapping=objectMappingDefault): useUnicode=False if instr and type(instr)==UnicodeType: instr=instr.encode('UTF-8') useUnicode=True dom=libxml2.parseDoc(instr) root=dom.children if not root: return None if style=='simple': namespaces=(None,None,None,) useStringType=False confDict={} confDict['arrayTags']=arrayTags confDict['structTags']=structTags confDict['itemTags']=itemTags confDict['nameSpaces']=nameSpaces confDict['structType']=structType confDict['arrayType']=arrayType confDict['xmlWildcard']=xmlWildcard confDict['nsPrefix']=nsPrefix confDict['xmlTagIndicator']=xmlTagIndicator confDict['xmlNameIndicator']=xmlNameIndicator confDict['xmlNsIndicator']=xmlNsIndicator confDict['xmlAttrIndicator']=xmlAttrIndicator confDict['useAttrNs']=useAttrNs confDict['useTagNs']=useTagNs confDict['useTypes']=useTypes confDict['useStringType']=useStringType confDict['typeAttr']=typeAttr confDict['xsiPrefix']=xsiPrefix confDict['xsdPrefix']=xsdPrefix confDict['indentStr']=indentStr confDict['style']=style confDict['simpleChar']=simpleChar confDict['complexChar']=complexChar confDict['whitestrip']=whitestrip confDict['globalVars']=globalVars confDict['soapencPrefix']=soapencPrefix confDict['useUnicode']=useUnicode confDict['objectMapping']=objectMapping res=xmlDomNative(root,None,confDict) return res def xmlDomNative(node,xsiType,confDict): #print 'xmlDomNative: ',str(node),'type',xsiType if not node: return "" elif type(node)=='str': #print 'node was str',str(node),confDict['useUnicode'] return strDec(node,xsiType,confDict) elif node.type=='text': #print 'node was text',str(node.content),confDict['useUnicode'] return strDec(node.content,xsiType,confDict) elif node.type=='element': #print 'node was element' itemTags=confDict['itemTags'] structType=confDict['structType'] arrayType=confDict['arrayType'] structel=False if xsiType=='STRUCTEL': # special handling of untyped simple struct elements #print 'STRUCTEL',str(node.children.next),str(node.children.content) xsiType=None structel=True if not node.children: return "" elif not node.children.next and node.children.type=='text': return strDec(node.children.content,xsiType,confDict) elif not node.children.next and node.children.type=='element': return xmlDomNative(node.children,None,confDict) if isSpecialTag(node.name,node.ns(),itemTags,confDict): # item if node.properties and getXsiType(node,confDict)=="nil": return None elif not node.children: return '' else: return xmlDomNative(node.children,getXsiType(node,confDict),confDict) elif xsiType and xsiType!=structType and xsiType!=arrayType: # simple xsitype given return xmlDomNative(node.children,xsiType,confDict) elif (xsiType==structType or (not xsiType and isComplexTypeTag(node,node.ns,structType,confDict)) or isUserDefinedClass(node.name,node,confDict)): # Struct or user-defined class return xmlDomNativeStruct(node,xsiType,confDict) elif (xsiType==arrayType or structel or (not xsiType and isComplexTypeTag(node,node.ns,arrayType,confDict))): # Array return xmlDomNativeArray(node,xsiType,confDict) else: #named tag return xmlDomNativeNamed(node,xsiType,confDict) else: raise Exception, 'XML encoding: type error at node: '+str(node)+' type being '+str(node.type) return None def xmlDomNativeStruct(node,xsiType,confDict): #print 'struct',str(node) structTags=confDict['structTags'] style=confDict['style'] res={} el=node.children while el is not None: #print 'child el',str(el) if el.type=='element' and el.name: name=strDec(el.name,None,confDict) elxsi=getXsiType(el,confDict) #print 'el',name,str(el.content),elxsi if not elxsi: elxsi='STRUCTEL' content=xmlDomNative(el,elxsi,confDict) res[name]=content el=el.next if structTags and node.name!=structTags[0] and not style=='simple': res[confDict['xmlNameIndicator']]=node.name if confDict['useTagNs'] and node.ns() and not style=='simple': res[confDict['xmlNsIndicator']]=node.ns().content attrs={} el=node.properties while el is not None: #print 'prop el',str(el) name=strDec(el.name,None,confDict) content=strDec(el.content,getXsiType(el,confDict),confDict) ns=el.ns() if ns and ns.content and ns.content==xsiNameSpace and name==confDict['typeAttr']: pass elif ns and confDict['useAttrNs']: attrs[(ns.content,name)]=content else: attrs[name]=content el=el.next if attrs and not style=='simple': res[confDict['xmlAttrIndicator']]=attrs if isUserDefinedClass(node.name,node,confDict): cl=eval(node.name,confDict['globalVars']) obj=object.__new__(cl) if hasattr(cl,'__setstate__'): obj.__setstate__(res) else: for k in res.keys(): if not k.startswith('xml'): setattr(obj,k,res[k]) return obj else: return res def xmlDomNativeArray(node,xsiType,confDict): structType=confDict['structType'] arrayType=confDict['arrayType'] res=[] el=node.children while el is not None: #print 'el: ',el,getXsiType(el,confDict) if el.type=='element' and el.name: elxsi=getXsiType(el,confDict) if (elxsi!=structType and elxsi!=arrayType and isStandardTag(el.name,el.ns(),confDict)): tmp=xmlDomNative(el,None,confDict) res=res+[tmp] #if not(type(tmp)==StringType and not tmp.strip()): # res=res+[tmp] else: tmp=xmlDomNative(el,elxsi,confDict) res=res+[tmp] #if not(type(tmp)==StringType and not tmp.strip()): # res=res+[tmp] #elif el.type=='text': # tmp=strDec(el.content,getXsiType(el,confDict),confDict) # if tmp or tmp==False: # if not(type(tmp)==StringType and not tmp.strip()): # res=res+[tmp] el=el.next return res def xmlDomNativeNamed(node,xsiType,confDict): structType=confDict['structType'] arrayType=confDict['arrayType'] style=confDict['style'] simpleChar=confDict['simpleChar'] complexChar=confDict['complexChar'] structel=False attrs={} el=node.properties while el is not None: name=strDec(el.name,None,confDict) content=strDec(el.content,None,confDict) ns=el.ns() if ns and confDict['useAttrNs']: attrs[(ns.content,name)]=content else: attrs[name]=content el=el.next res=[] el=node.children while el is not None: #print 'el: ',el if el.type=='element' and el.name: elxsi=getXsiType(el,confDict) if (elxsi!=structType and elxsi!=arrayType and isStandardTag(el.name,el.ns(),confDict)): tmp=xmlDomNative(el,None,confDict) if not(confDict['whitestrip'] and type(tmp)==StringType and not tmp.strip()): res=res+[tmp] else: tmp=xmlDomNative(el,elxsi,confDict) if not(confDict['whitestrip'] and type(tmp)==StringType and not tmp.strip()): res=res+[tmp] elif el.type=='text': tmp=strDec(el.content,getXsiType(node,confDict),confDict) if tmp or tmp==False: if not(confDict['whitestrip'] and type(tmp)==StringType and not tmp.strip()): res=res+[tmp] el=el.next if confDict['useTagNs'] and node.ns(): namespace=node.ns().content else: namespace=None if structel: wrap=res elif isUserDefinedClass(node.name,node,confDict): #cl=eval(node.name,confDict['globalVars']) tmp=isUserDefinedClass(node.name,node,confDict) cl=tmp obj=object.__new__(cl) if hasattr(cl,'__setstate__'): obj.__setstate__(res) else: for k in res.keys(): if not k.startswith('xml'): setattr(obj,k,res[k]) wrap=obj elif style=='tag': wrap=Tag(node.name,res,namespace,attrs) elif style=='complex': if not namespace and isPureTypeAttrs(attrs,confDict): wrap=[simpleChar+node.name]+res else: wrap=[complexChar+node.name,namespace,attrs]+res else: wrap=[simpleChar+node.name]+res return wrap def isUserDefinedClass(name,node,confDict): if not confDict['objectMapping']: return False try: #if not confDict['globalVars']: # return False #cl=eval(name,confDict['globalVars']) specType=None typens=None type=None el=node.properties while el is not None: name=strDec(el.name,None,confDict) content=strDec(el.content,None,confDict) ens=el.ns() if name==confDict['typeAttr']: # and ens==confDict['nameSpaces'][1]: specType=content break el=el.next if not specType: return False tmp=specType.split(':') typens=tmp[0] type=tmp[1] tmp=confDict['objectMapping'][(typens,type,)] if not tmp: return False #if not (type(cl)==TypeType and # str(cl).startswith("<class '__main__.")): # return False return tmp except: return False def isPureTypeAttrs(attrs,confDict): #print 'attrs',attrs if not attrs: return True for key in attrs.keys(): value=attrs[key] #print 'key',key,(confDict['nameSpaces'])[1],confDict['typeAttr'],value,value[len(confDict['xsdPrefix']):] if type(key)==StringType: if (key.startswith(confDict['xsiPrefix']) and value[len(confDict['xsdPrefix'])+1:] in builtInTypes): pass elif (key.startswith(confDict['xsiPrefix']) and value[len(confDict['soapencPrefix'])+1:] in builtInTypes): pass else: return False elif (key[0]==((confDict['nameSpaces'])[1]) and key[1]==confDict['typeAttr'] and (value[len(confDict['xsdPrefix'])+1:] in builtInTypes or value[len(confDict['soapencPrefix'])+1:] in builtInTypes)): pass else: return False return True def isComplexTypeTag(node,ns,typename,confDict): #print 'isComplexTypeTag',tag,str(ns),getXsiType(node,confDict) if node.type!='element': return False xsitype=getXsiType(node,confDict) if xsitype: if xsitype==typename: return True else: return False if not node.name: return False if typename==confDict['arrayType']: nametuple=confDict['arrayTags'] elif typename==confDict['structType']: nametuple=confDict['structTags'] else: return False if isSpecialTag(node.name,ns,nametuple,confDict): return True else: return False def isSpecialTag(tag,ns,specialTags,confDict): #print tag,str(ns),specialTags,nameSpaces nameSpaces=confDict['nameSpaces'] xmlWildcard=confDict['xmlWildcard'] useTagNs=confDict['useTagNs'] if specialTags and (tag in specialTags or specialTags[0]==xmlWildcard): if useTagNs and ns and nameSpaces and nameSpaces[0] and not specialTags[0]==xmlWildcard: if ns.content==nameSpaces[0]: return True else: return False else: return True else: return False def isStandardTag(tag,ns,confDict): nameSpaces=confDict['nameSpaces'] useTagNs=confDict['useTagNs'] if (tag in confDict['arrayTags'] or tag in confDict['structTags'] or tag in confDict['itemTags']): if useTagNs and ns and nameSpaces and nameSpaces[0]: if ns.content==nameSpaces[0]: return False else: return True else: return False else: return True def getXsiType(node,confDict): #print 'getXsiType: ',str(node) if type(node)=='str': res=None elif node.type=='element': res=None attrs={} el=node.properties while not res and el is not None: if isTypeAttr(el,confDict): res=el.content if res.startswith(confDict['xsdPrefix']+":"): res=res[len(confDict['xsdPrefix'])+1:] elif res.startswith(confDict['soapencPrefix']+":"): res=res[len(confDict['soapencPrefix'])+1:] el=el.next else: res=None #print 'type res ',res return res def isTypeAttr(node,confDict): #print 'isTypeAttr ', str(node), str((confDict['nameSpaces'])[1]),str(node.ns().content) if node.name==confDict['typeAttr']: if (confDict['nameSpaces'])[1] and node.ns(): if node.ns().content==(confDict['nameSpaces'])[1]: return True else: return False else: return True else: return False def strDec(data,xsiType,confDict): if xsiType in ('decimal','integer','long','nonPositiveInteger','nonNegativeInteger', 'int','short','byte'): try: res=int(data) except: res=pureStrDec(data) elif xsiType=='float' or xsiType=='double': try: res=float(data) except: res=pureStrDec(data) elif xsiType=='boolean': if data=='false' or data=='0': res=False else: res=True elif xsiType=='date': try: res=datetime.date(*time.strptime(data, "%Y-%m-%d")[0:3]) except: res=pureStrDec(data) elif xsiType=='time': try: res=time.strptime(data, "%H:%M:%S") except: res=pureStrDec(data) elif xsiType=='dateTime': try: res=datetime.datetime(*time.strptime(data, "%Y-%m-%dT%H:%M:%S")[0:6]) except: res=pureStrDec(data) else: tmp=pureStrDec(data) if tmp and confDict['useUnicode']: res=tmp.decode('UTF-8') else: res=tmp return res def pureStrDec(data): if data.find('<')>=0: data=data.replace('<','<') if data.find('>')>=0: data=data.replace('>','>') if data.find('&')>=0: data=data.replace('&','&') if data.find(''')>=0: data=data.replace(''',"'") if data.find('"')>=0: data=data.replace('"','"') return data # ===================================================== # # serialization: native data to xml string # # ===================================================== def xmlSerialize(data, arrayTags=arrayTagsDefault,structTags=structTagsDefault,itemTags=itemTagsDefault, nameSpaces=nameSpacesDefault,structType=structTypeDefault,arrayType=arrayTypeDefault, xmlWildcard=xmlWildcardDefault,nsPrefix=nsPrefixDefault, xmlTagIndicator=xmlTagIndicatorDefault,xmlNameIndicator=xmlNameIndicatorDefault, xmlNsIndicator=xmlNsIndicatorDefault,xmlAttrIndicator=xmlAttrIndicatorDefault, useAttrNs=useAttrNsDefault,useTagNs=useTagNsDefault,useTypes=useTypesDefault, useStringType=useStringTypeDefault,typeAttr=typeAttrDefault,xsiPrefix=xsiPrefixDefault, xsdPrefix=xsdPrefixDefault,indentStr=indentStrDefault,style=styleDefault, simpleChar=simpleCharDefault,complexChar=complexCharDefault,whitestrip=whitestripDefault, globalVars=globalVarsDefault,soapencPrefix=soapencPrefixDefault,objectMapping=objectMappingDefault): confDict={} if style=='simple': #namespaces=(None,None,None,) namespaces=(None,xsiNameSpace,xsdNameSpace,) useStringType=False confDict['arrayTags']=arrayTags confDict['structTags']=structTags confDict['itemTags']=itemTags confDict['nameSpaces']=nameSpaces confDict['structType']=structType confDict['arrayType']=arrayType confDict['xmlWildcard']=xmlWildcard confDict['nsPrefix']=nsPrefix confDict['xmlTagIndicator']=xmlTagIndicator confDict['xmlNameIndicator']=xmlNameIndicator confDict['xmlNsIndicator']=xmlNsIndicator confDict['xmlAttrIndicator']=xmlAttrIndicator confDict['useAttrNs']=useAttrNs confDict['useTagNs']=useTagNs confDict['useTypes']=useTypes confDict['useStringType']=useStringType confDict['typeAttr']=typeAttr confDict['xsiPrefix']=xsiPrefix confDict['xsdPrefix']=xsdPrefix confDict['indentStr']=indentStr confDict['style']=style confDict['simpleChar']=simpleChar confDict['complexChar']=complexChar confDict['whitestrip']=whitestrip confDict['globalVars']=globalVars confDict['soapencPrefix']=soapencPrefix confDict['objectMapping']=objectMappingDefault context=[0,[],False,False] # counter, nspairs, xsiflag,xsdflag #print 'nameSpaces ',nameSpaces if xsdPrefix and nameSpaces[2]: context[1]=context[1]+[[xsdNameSpace,xsdPrefix]] if xsiPrefix and nameSpaces[1]: context[1]=context[1]+[[xsiNameSpace,xsiPrefix]] res=xmlNativeStr(data,True,0,context,confDict) return res def xmlNativeStr(data,maketag,depth,context,confDict): arrayTags=confDict['arrayTags'] structTags=confDict['structTags'] itemTags=confDict['itemTags'] nameSpaces=confDict['nameSpaces'] structType=confDict['structType'] arrayType=confDict['arrayType'] style=confDict['style'] simpleChar=confDict['simpleChar'] complexChar=confDict['complexChar'] dt=type(data) origNsContext=context[1] #if data==None: # return "" if data==None: st=makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('nil',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict) st=st+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict) return st if data==[] or data==(): st=makeSpecTag('Array',True,arrayTags,nameSpaces,context, makeTopXsdXsiNs(confDict,depth)+makeTypeAttr(getDataType(data,confDict),context,confDict), confDict) ed=makeSpecTag('Array',False,arrayTags,nameSpaces,context,None,confDict) return st+ed elif data=={}: st=makeSpecTag('Struct',True,structTags,nameSpaces,context, makeTopXsdXsiNs(confDict,depth)+makeTypeAttr(getDataType(data,confDict),context,confDict), confDict) ed=makeSpecTag('Struct',False,structTags,nameSpaces,context,None,confDict) return st+ed elif dt==StringType: if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('string',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ strEnc(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=strEnc(data) elif dt==UnicodeType: if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('string',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ strEnc(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=strEnc(data) elif dt==IntType: if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('int',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ str(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=str(data) elif dt==FloatType: if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('float',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ str(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=str(data) elif dt==BooleanType: if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('boolean',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ makeBoolean(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=makeBoolean(data) elif isinstance(data,datetime.datetime): if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('dateTime',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ makeIsoTime(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=makeIsoTime(data) elif isinstance(data,datetime.date): if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('date',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ makeIsoTime(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=makeIsoTime(data) elif isinstance(data,time.struct_time) and data[0]==1900 and data[1]==1 and data[2]==1: if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('time',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ makeIsoTime(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=makeIsoTime(data) elif isinstance(data,time.struct_time): if maketag and maketag!='STRUCTEL': res=(makeSpecTag('item',True,itemTags,nameSpaces,context, makeTypeAttr('dateTime',context,confDict)+makeTopXsdXsiNs(confDict,depth),confDict)+ makeIsoTime(data)+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) else: res=str(data) elif dt==ListType or dt==TupleType or (style=='tag' and isinstance(data,ListType)): res=xmlNativeStrList(data,maketag,depth,context,confDict) elif dt==DictType: res=xmlNativeStrDict(data,maketag,depth,context,confDict) elif (hasattr(data,'__dict__') and hasattr(data,'__class__') and str(data.__class__).startswith("<class '__main__.")): res=xmlNativeStrClass(data,maketag,depth,context,confDict) else: tmp=strEnc(str(data)) raise Exception, 'XML encoding: unknown data to be encoded:'+str(data) #res=(makeSpecTag('item',True,itemTags,nameSpaces,context,None,confDict)+ # tmp+makeSpecTag('item',False,itemTags,nameSpaces,context,None,confDict)) context[1]=origNsContext return res def xmlNativeStrList(data,maketag,depth,context,confDict): #print 'ListType',str(data),str(dt) arrayTags=confDict['arrayTags'] structTags=confDict['structTags'] itemTags=confDict['itemTags'] nameSpaces=confDict['nameSpaces'] structType=confDict['structType'] arrayType=confDict['arrayType'] xmlTagIndicator=confDict['xmlTagIndicator'] xmlNsIndicator=confDict['xmlNsIndicator'] xmlNameIndicator=confDict['xmlNameIndicator'] xmlAttrIndicator=confDict['xmlAttrIndicator'] indentStr=confDict['indentStr'] style=confDict['style'] simpleChar=confDict['simpleChar'] complexChar=confDict['complexChar'] dt=type(data) origNsContext=context[1] namedtag=False if style=='tag': if isinstance(data,Tag): namedtag=True ns=data.namespace name=data.name attr=data.attributes rest=data else: if type(data[0])==StringType and data[0].startswith(simpleChar): namedtag=True ns=[] name=(data[0])[len(simpleChar):] attr=[] rest=data[1:] elif type(data[0])==StringType and data[0].startswith(complexChar): namedtag=True ns=data[1] name=(data[0])[len(complexChar):] attr=data[2] rest=data[3:] if namedtag: attrlst,attrnspc=attrEnc(attr,context,confDict) xsns=makeTopXsdXsiNs(confDict,depth) #typeattr=makeTypeAttr(getDataType(data,confDict),context,confDict) typeattr="" nameenc=tagEnc(name) if ns: prefix=False for cpair in context[1]: if not prefix and cpair[0]==ns: prefix=cpair[1] nameenc=prefix+':'+nameenc #res="<"+nameenc+attrnspc+attrlst+xsns+typeattr+">" if not prefix: context[0]=context[0]+1 prefix=confDict['nsPrefix']+str(context[0]) nameenc=prefix+':'+nameenc tmp="xmlns:"+prefix+"='"+ns+"'" if not(attrlst and tmp and attrlst.find(tmp)>=0): attrlst=attrlst+" "+tmp #res="<"+nameenc+" xmlns:"+prefix+"='"+ns+"' "+attrnspc+attrlst+xsns+typeattr+">" context[1]=context[1]+[[ns,prefix]] if True: if len(rest)==1 and isBasicType(rest[0],confDict): typeattr=makeTypeAttr(getDataType(rest[0],confDict),context,confDict) #print 'typeattr',str(typeattr),str(attrlst) if attrlst and typeattr and attrlst.find(typeAttrName(typeattr,confDict))>=0: typeattr="" res="<"+nameenc+attrnspc+attrlst+xsns+typeattr+">" res=res+xmlNativeStr(rest[0],False,1,context,confDict) else: res="<"+nameenc+attrnspc+attrlst+xsns+typeattr+">" for el in rest: res=res+"\n"+makeIndent(depth+1,indentStr) res=res+xmlNativeStr(el,True,depth+1,context,confDict) res=res+"</"+nameenc+">" else: #print 'maketag',maketag if maketag and maketag!='STRUCTEL': xsns=makeTopXsdXsiNs(confDict,depth) typeattr=makeTypeAttr(getDataType(data,confDict),context,confDict) res=makeSpecTag('Array',True,arrayTags,nameSpaces,context,xsns+typeattr,confDict) else: res="" for el in data: #print 'el',str(el) res=res+"\n"+makeIndent(depth+1,indentStr) res=res+xmlNativeStr(el,True,depth+1,context,confDict) if maketag and maketag!='STRUCTEL': res=res+"\n"+makeSpecTag('Array',False,arrayTags,nameSpaces,context,None,confDict) return res def xmlNativeStrDict(data,maketag,depth,context,confDict): arrayTags=confDict['arrayTags'] structTags=confDict['structTags'] itemTags=confDict['itemTags'] nameSpaces=confDict['nameSpaces'] structType=confDict['structType'] arrayType=confDict['arrayType'] xmlTagIndicator=confDict['xmlTagIndicator'] xmlNsIndicator=confDict['xmlNsIndicator'] xmlNameIndicator=confDict['xmlNameIndicator'] xmlAttrIndicator=confDict['xmlAttrIndicator'] indentStr=confDict['indentStr'] style=confDict['style'] simpleChar=confDict['simpleChar'] complexChar=confDict['complexChar'] dt=type(data) origNsContext=context[1] typeattr=makeTypeAttr(getDataType(data,confDict),context,confDict) #print 'data',str(data),maketag,typeattr if maketag=='STRUCTEL': res="" elif maketag and data.has_key(xmlNameIndicator): #res="<"+data[xmlNameIndicator]+">" name=data[xmlNameIndicator] if data.has_key(xmlAttrIndicator): attr=data[xmlAttrIndicator] else: attr=None if data.has_key(xmlNsIndicator): ns=data[xmlNsIndicator] else: ns=None attrlst,attrnspc=attrEnc(attr,context,confDict) xsns=makeTopXsdXsiNs(confDict,depth) nameenc=tagEnc(name) if ns: prefix=False for cpair in context[1]: if not prefix and cpair[0]==ns: prefix=cpair[1] nameenc=prefix+':'+nameenc res="<"+nameenc+attrnspc+attrlst+xsns+typeattr+">" if not prefix: context[0]=context[0]+1 prefix=confDict['nsPrefix']+str(context[0]) nameenc=prefix+':'+nameenc if attrlst and typeattr and attrlst.find(typeattr)>=0: typeattr="" res="<"+nameenc+" xmlns:"+prefix+"='"+ns+"' "+attrnspc+attrlst+xsns+typeattr+">" context[1]=context[1]+[[ns,prefix]] else: tmp="" if not(attrlst and typeattr and attrlst.find(typeattr)>=0): tmp=typeattr res="<"+nameenc+attrnspc+attrlst+xsns+tmp+">" elif maketag: xsns=makeTopXsdXsiNs(confDict,depth) xsns=xsns+typeattr res=makeSpecTag('Struct',True,structTags,nameSpaces,context,xsns,confDict) else: res="" for el in data.keys(): if el!=xmlNameIndicator and el!=xmlNsIndicator and el!=xmlAttrIndicator: tmp=data[el] dt=getDataType(tmp,confDict) if tmp==[] or tmp=={} or isNamedTag(tmp,confDict): typeattr="" else: typeattr=makeTypeAttr(dt,context,confDict) elenc=tagEnc(el) #print 'el',el,tmp,dt,typeattr,elenc res=res+"\n"+makeIndent(depth+1,indentStr)+"<"+elenc+typeattr+">" res=res+xmlNativeStr(tmp,'STRUCTEL',depth+1,context,confDict) res=res+"</"+elenc+">" if maketag=='STRUCTEL': pass elif maketag and data.has_key(xmlNameIndicator): res=res+"\n"+"</"+nameenc+">" elif maketag: res=res+"\n"+makeSpecTag('Struct',False,structTags,nameSpaces,context,None,confDict) else: res="" return res def xmlNativeStrClass(data,maketag,depth,context,confDict): arrayTags=confDict['arrayTags'] structTags=confDict['structTags'] itemTags=confDict['itemTags'] nameSpaces=confDict['nameSpaces'] structType=confDict['structType'] arrayType=confDict['arrayType'] xmlTagIndicator=confDict['xmlTagIndicator'] xmlNsIndicator=confDict['xmlNsIndicator'] xmlNameIndicator=confDict['xmlNameIndicator'] xmlAttrIndicator=confDict['xmlAttrIndicator'] indentStr=confDict['indentStr'] style=confDict['style'] simpleChar=confDict['simpleChar'] complexChar=confDict['complexChar'] dt=type(data) origNsContext=context[1] xsns=makeTopXsdXsiNs(confDict,depth) if hasattr(data,'__getstate__'): # special getstate method is assumed to give back a dict ddict=data.__getstate__() if ddict.has_key(confDict['xmlNameIndicator']): name=ddict[confDict['xmlNameIndicator']] else: name=str(data.__class__) name=name[len("<class '__main__."):-2] #if ddict.has_key(confDict['xmlNsIndicator']): # ns=ddict[confDict['xmlNameIndicator']] #else: # ns="" if ddict.has_key(confDict['xmlAttrIndicator']): attr=ddict[confDict['xmlAttrIndicator']] attrlst,attrnspc=attrEnc(attr,context,confDict) else: attr="" attrlst="" attrnspc="" if ddict.has_key('XSDType'): if len(ddict['XSDType'])>1: attrtype=ddict['XSDType'][0]+":"+ddict['XSDType'][1] else: attrtype=ddict['XSDType'][0] attrtype=" "+confDict['xsiPrefix']+":"+confDict['typeAttr']+"='"+attrtype+"'" else: attrtype="" res="<"+tagEnc(name)+xsns+attrnspc+attrlst+attrtype+">" for el in ddict.keys(): if el!=confDict['xmlNameIndicator'] and el!=confDict['xmlAttrIndicator']: tmp=ddict[el] elenc=tagEnc(el) dt=getDataType(tmp,confDict) if tmp==[] or tmp=={} or isNamedTag(tmp,confDict): typeattr="" else: typeattr=makeTypeAttr(dt,context,confDict) elenc=tagEnc(el) #print 'el',el,tmp,dt,typeattr,elenc res=res+"\n"+makeIndent(depth+1,indentStr)+"<"+elenc+typeattr+">" res=res+xmlNativeStr(tmp,'STRUCTEL',depth+1,context,confDict) res=res+"</"+elenc+">" res=res+"</"+tagEnc(name)+">" else: # no special getstate method available, use class name and __dict__ name=str(data.__class__) name=name[len("<class '__main__."):-2] ddict=data.__dict__ if ddict.has_key('XSDType'): if len(ddict['XSDType'])>1: attrtype=ddict['XSDType'][0]+":"+ddict['XSDType'][1] else: attrtype=ddict['XSDType'][0] attrtype=" "+confDict['xsiPrefix']+":"+confDict['typeAttr']+"='"+attrtype+"'" else: attrtype="" res="<"+tagEnc(name)+" "+xsns+attrtype+">" for el in ddict.keys(): if el!='XSDType': tmp=getattr(data,el) elenc=tagEnc(el) dt=getDataType(tmp,confDict) if tmp==[] or tmp=={} or isNamedTag(tmp,confDict): typeattr="" else: typeattr=makeTypeAttr(dt,context,confDict) elenc=tagEnc(el) #print 'el',el,tmp,dt,typeattr,elenc res=res+"\n"+makeIndent(depth+1,indentStr)+"<"+elenc+typeattr+">" res=res+xmlNativeStr(tmp,'STRUCTEL',depth+1,context,confDict) res=res+"</"+elenc+">" res=res+"</"+tagEnc(name)+">" return res def typeAttrName(typeattr,confDict): #print 'typeattr',typeattr,typeattr[:4+len(confDict['xsiPrefix'])+len(confDict['typeAttr'])] return typeattr[:4+len(confDict['xsiPrefix'])+len(confDict['typeAttr'])] def isNamedTag(data,confDict): #print 'isNamedTag',str(data),type(data) dt=type(data) if dt==ListType or (confDict['style']=='tag' and isinstance(data,ListType)): if confDict['style']=='simple': if (data and data[0] and type(data[0])==StringType and (data[0].startswith(confDict['simpleChar']) or data[0].startswith(confDict['complexChar']))): return True else: return False else: if isinstance(data,ListType): return True else: return False else: return False def makeSpecTag(type,startflag,tags,nameSpaces,context,attrstr,confDict): if attrstr: attrstr=" "+attrstr else: attrstr="" basename=tags[0] if nameSpaces[0]: prefix=False for cpair in context[1]: if not prefix and cpair[0]==nameSpaces[0]: prefix=cpair[1] name=prefix+':'+basename attr="" if not prefix: prefix=confDict['nsPrefix'] name=prefix+':'+basename attr=" xmlns:"+prefix+"='"+nameSpaces[0]+"'" context[1]=context[1]+[[nameSpaces[0],prefix]] else: name=basename attr="" if startflag: return "<"+name+attr+attrstr+">" else: return "</"+name+">" def makeTopXsdXsiNs(confDict,depth): res="" #if confDict['simple']: # return res if depth==0: if confDict['xsiPrefix'] and (confDict['nameSpaces'])[1]: res=res+" xmlns:"+confDict['xsiPrefix']+"='"+xsiNameSpace+"'" if confDict['xsdPrefix'] and (confDict['nameSpaces'])[2]: res=res+" xmlns:"+confDict['xsdPrefix']+"='"+xsdNameSpace+"'" return res def attrEnc(data,context,confDict): nsPrefix=confDict['nsPrefix'] #print 'data',data res="" typeres="" if type(data)==DictType: for el in data.keys(): tmp=data[el] if type(el)==TupleType: elenc=attNameEnc(el[1]) prefix=False for cpair in context[1]: if not prefix and cpair[0]==el[0]: prefix=cpair[1] res=res+" "+prefix+":"+elenc+"='"+attStrEnc(tmp)+"'" if not prefix: context[0]=context[0]+1 prefix=nsPrefix+str(context[0]) res=res+" "+prefix+":"+elenc+"='"+attStrEnc(tmp)+"'" typeres=typeres+" xmlns:"+prefix+"='"+el[0]+"'" context[1]=context[1]+[[el[0],prefix]] else: elenc=attNameEnc(el) res=res+" "+elenc+"='"+attStrEnc(tmp)+"'" return (res,typeres,) def isBasicType(data,confDict): d=getDataType(data,confDict) if d in ('int','boolean','string','float','dateTime','date','time'): return True else: return False def getDataType(data,confDict): if data==0: return 'int' elif data==[]: return confDict['arrayType'] elif data=={}: return confDict['structType'] elif data==False: return 'boolean' elif not data: return None dt=type(data) if dt==StringType or dt==UnicodeType: return 'string' elif dt==IntType: return 'int' elif dt==FloatType: return 'float' elif dt==BooleanType: return 'boolean' elif isinstance(data,datetime.datetime): return 'dateTime' elif isinstance(data,datetime.date): return 'date' elif isinstance(data,time.struct_time) and data[0]==1900 and data[1]==1 and data[2]==1: return 'time' elif isinstance(data,time.struct_time): return 'dateTime' elif dt==DictType: return confDict['structType'] elif dt==ListType: return confDict['arrayType'] else: return None def makeTypeAttr(type,context,confDict): if not type: return "" if not confDict['useTypes']: return "" if not confDict['useStringType'] and type=='string': return "" if confDict['xsiPrefix']: xsi=confDict['xsiPrefix']+":" else: xsi="" if type==confDict['arrayType'] or type==confDict['structType']: xsd=confDict['soapencPrefix']+":" elif confDict['xsdPrefix']: xsd=confDict['xsdPrefix']+":" else: xsd="" return " "+xsi+confDict['typeAttr']+"='"+xsd+type+"'" def strEnc(data): if data.find('&')>=0: data=data.replace('&','&') if data.find('<')>=0: data=data.replace('<','<') if data.find('>')>=0: data=data.replace('>','>') return data def attNameEnc(data): if data.isalnum(): return data tmp=data.replace(':','') if tmp.isalnum(): return data raise Exception, 'XML encoding: name contains non-alphanumerics:'+str(data) return 'WRONGSYNTAX' def makeBoolean(data): if data: return 'true' else: return 'false' def makeIsoTime(t): if isinstance(t,datetime.datetime): res=t.isoformat() elif isinstance(t,datetime.date): res=t.isoformat() elif t[0]==1900 and t[1]==1 and t[2]==1: res=time.strftime("%H:%M:%S",t) else: res=time.strftime("%Y-%m-%dT%H:%M:%S",t) return res def attStrEnc(data): if type(data)!=StringType: data=str(data) if data.find('&')>=0: data=data.replace('&','&') if data.find('<')>=0: data=data.replace('<','<') if data.find('>')>=0: data=data.replace('>','>') if data.find("'")>=0: data=data.replace("'",''') if data.find('"')>=0: data=data.replace('"','"') return data def tagEnc(data): #data=data.strip() if (not data or #not data.isalnum() or data.startswith('xml') or data.startswith('XML') or data.startswith('Xml')): raise Exception, 'XML encoding: name starts with xml or contains non-alphanumerics:'+str(data) data="WRONGSYNTAX" return data def makeIndent(depth,indentStr): if not indentStr: return "" else: res="" while depth>0: res=res+indentStr depth=depth-1 return res # ===================================================== # # optional, not used above: simplifications # # ===================================================== def xmlSupersimplify(data): if not data: return data elif type(data)==ListType: if data[0]=='xmltag' and len(data)==5: return xmlSupersimplify(data[4]) elif type(data[0])==StringType and data[0].startswith('#') and len(data)==2: return xmlSupersimplify(data[1]) elif data[0]=='xmltag': i=4 l=len(data) res=[] while i<l: if type(data[i])!=StringType or data[i].strip(): res=res+[xmlSupersimplify(data[i])] i=i+1 return res elif type(data[0])==StringType and data[0].startswith('#'): i=1 l=len(data) res=[] while i<l: if type(data[i])!=StringType or data[i].strip(): res=res+[xmlSupersimplify(data[i])] i=i+1 return res else: i=0 l=len(data) res=[] while i<l: if type(data[i])!=StringType or data[i].strip(): res=res+[xmlSupersimplify(data[i])] i=i+1 return res elif type(data)==DictType: for el in data.keys(): if el.startswith('xml'): del data[el] else: data[el]=xmlSupersimplify(data[el]) return data else: return data def xmlSimplify(data): if not data: return data elif type(data)==ListType: #if data[0]=='xmltag' and len(data)==5: # return xmlSimplify(data[4]) if data[0]=='xmltag': i=4 l=len(data) res=['#'+data[2]] while i<l: if type(data[i])!=StringType or data[i].strip(): res=res+[xmlSimplify(data[i])] i=i+1 return res else: i=0 l=len(data) res=[] while i<l: if type(data[i])!=StringType or data[i].strip(): res=res+[xmlSimplify(data[i])] i=i+1 return res elif type(data)==DictType: for el in data.keys(): if el.startswith('xml'): del data[el] else: data[el]=xmlSimplify(data[el]) return data else: return data
Python code for X-road oriented SOAP call tool
Example at the end:
soapCall("foo",{'a':(12,11),'b':'proov'},log=False,debug=True,callsyst='csyst', initiatororgid='initid',initiatorpersonid='initpers',initiatorsystemid='initsystid', callersystemid='callesyst',aboutpersonid='aboutperson',transactionid='transid') <Pre> <pre> #!/usr/local/bin/python import sys import socket import urllib2 import cgi from types import * from xmlserialize import xmlSerialize,xmlDeserialize,strEnc,Tag #from logserver.logserver import storeLogServer requestSuffix="request" timeOut=5 # timeout in seconds soapCallEnvTemplate="""<SOAP-ENV:Envelope SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"> <SOAP-ENV:Header> %s </SOAP-ENV:Header> <SOAP-ENV:Body> %s </SOAP-ENV:Body> </SOAP-ENV:Envelope>""" soapAnswerEnvTemplate="""<SOAP-ENV:Envelope SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"> <SOAP-ENV:Header>%s</SOAP-ENV:Header> <SOAP-ENV:Body> %s </SOAP-ENV:Body> </SOAP-ENV:Envelope>""" soapErrorTemplate=""" <SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> <SOAP-ENV:Body> <SOAP-ENV:Fault> <faultcode>%s</faultcode> <faultstring>%s</faultstring> </SOAP-ENV:Fault> </SOAP-ENV:Body> </SOAP-ENV:Envelope> """ simpleChar="##" complexChar="$$" def soapCall(function,params, log=True,requestheader=None,url=None,style='complex', logtype=None,action=None,callsyst=None, initiatororgid=None,initiatorpersonid=None,initiatorsystemid=None, callersystemid=None,aboutpersonid=None,infotypes=None,transactionid=None, reason=None,extrainfo=None,resulttype=None, arrayTags=('Array',),structTags=('Struct',),itemTags=('item',), arrayType=('array'),structType=('struct'), debug=False): # check encoding style if not style in ('simple','complex','tag',): return callError('Unknown conversion style passed to soap call library') # check function name if debug: print 'function: ', function print 'params: ',params if (not function) or type(function)!=StringType: return callError("Function name not given or unusable.") # set obligatory log params and header elems if not logtype: logtype='T' # T: tundlikud isikuandmed, V: viga, M: muu if not action: action=function # action name, f.ex. object/operation if not requestheader: requestheader={} if (not callsyst or not initiatororgid or not initiatorpersonid or not callersystemid or not transactionid): return callError("Header not present and relevant args (callsyst...transactionid) missing") requestheader['ixTransaction']=transactionid requestheader['ixInitiatorOrganization']=initiatororgid requestheader['ixInitiatorPerson']=initiatorpersonid requestheader['ixInitiatorSystem']=callsyst requestheader['ixRequestingSystem']=callersystemid elif type(requestheader)!=DictType: return callError("Header present but not dictionary") elif (not requestheader['ixTransaction'] or not requestheader['ixInitiatorOrganization'] or not requestheader['ixInitiatorPerson'] or not requestheader['ixInitiatorSystem'] or not requestheader['ixRequestingSystem']): return callError("Header present but does not contain all the necessary values") if not callsyst: callsyst=requestheader['ixRequestingSystem'] #name of the calling system receiving data if not initiatororgid: initiatororgid=requestheader['ixInitiatorOrganization'] # organisation id of the call initiator if not initiatorpersonid: initiatorpersonid=requestheader['ixInitiatorPerson'] #person id of the call initiator if not callersystemid: callersystemid=requestheader['ixRequestingSystem'] # system (program) id of the actual caller if not transactionid: transactionid=requestheader['ixTransaction'] # encoded transaction: edsd or edsd;asdasd or # make soap header if not requestheader: strHeader="" elif type(requestheader)==StringType: strHeader=requestheader elif type(requestheader)!=DictType: return callError("requestheader is not a dictionary") elif (not requestheader['ixTransaction'] or not requestheader['ixInitiatorOrganization'] or not requestheader['ixInitiatorPerson'] or not requestheader['ixInitiatorSystem'] or not requestheader['ixRequestingSystem']): return callError("Header does not contain all the necessary values") else: requestheader['xmlname']='RequestHeader' requestheader['xmlattr']={'version':'1.0'} try: strHeader=xmlSerialize(requestheader,useTypes=False,nameSpaces=(None,None,None)) except: return callError("xml string creation error in xmlSerialize") # make soap body if not params: strBody=xmlSerialize({'xmlname':function+requestSuffix}) elif type(params)==DictType: res=[simpleChar+function+requestSuffix] for key in params.keys(): res=res+[[simpleChar+key,params[key]]] strBody=xmlSerialize(res) elif type(params)==ListType: params=[simpleChar+function+requestSuffix]+params strBody=xmlSerialize(params,style=style,simpleChar=simpleCha,complexChar=complexChar, arrayTags=arrayTags,structTags=structTags,itemTags=itemTags, arrayType=arrayType,structType=structType) else: return callError("params is neither a dictionary nor a list of name-value pairs") # make full soap call envelope envelope=(soapCallEnvTemplate % (strHeader,strBody)) if debug: print "\n== envelope: ==" print envelope print "==========" # first call the log if necessary and possible if log: #logres=storeLogServer(type=logtype,action=action,callsyst=callsyst, initiatororgid=initiatororgid,initiatorpersonid=initiatorpersonid, initiatorsystemid=initiatorsystemid, callersystemid=callersystemid,aboutpersonid=aboutpersonid,infotypes=infotypes, transactionid=transactionid, reason=reason,extrainfo=extrainfo,resulttype=resulttype if logres: return callError("Log error: "+logres) # make an url, set timeout and create a request if not url: url=getServiceUrl(function) if not url: return callError("Url unknown") socket.setdefaulttimeout(timeOut) try: #req=urllib2.Request(url,envelope,{'SOAPAction' : 'none'}) #httpheaders={'Content-type':'text/xml'} httpheaders={'Content-type':'application/soap+xml'} req=urllib2.Request(url,envelope,httpheaders) except: return callError('Error creating request, server not called yet.') # actually open the url and read returning data try: handler=urllib2.urlopen(req) data=handler.read() except socket.timeout: return callError('Timeout after '+str(timeOut)+' seconds.') except socket.error: errno,errstr=sys.exc_info()[:2] if errno==socket.timeout: return callError('Timeout after '+str(timeOut)+' seconds.') else: return callError('Socket error calling server.') except IOError, e: if hasattr(e,'reason'): return callError('Failed to reach a server. Reason: '+str(e.reason)) elif hasattr(e,'code'): return callError('The server could not fulfill the request. Error code: '+str(e.code)) else: return callError('Error: '+str(e)) except: return callError('Error: '+str(sys.exc_info()[0])) #data was obtained, see and check it if debug: print "\n== data received: ==" print data print "==========" # convert data to native format and return the result in body: try: resnative=xmlDeserialize(data, arrayTags=arrayTags, structTags=('Struct','Envelope','Header',), itemTags=itemTags, arrayType=arrayType,structType=structType, simpleChar=simpleChar,complexChar=complexChar, style=style) except: return callError('Answer xml to native data conversion error') if debug: print "\n== full result native: ==" print resnative print resnative['Body'][0] #print resnative['Body'][0][2] print "==========" if style=='simple' or style=='complex': if type(resnative)==DictType and resnative and resnative['Body'][0][0]==simpleChar+'Fault': try: errstr=resnative['Body'][0][2][1] except: return callError('SOAP answer error message structure cannot be understood') return callError('Server gave a SOAP error: '+errstr) elif type(resnative)==DictType and resnative and resnative['Body'][0][0]==complexChar+'Fault': try: errstr=resnative['Body'][0][4][1] except: return callError('SOAP answer error message structure cannot be understood') return callError('Server gave a SOAP error: '+errstr) resans=resnative['Body'][0] elif style=='tag': if (type(resnative)==DictType and resnative and isinstance(resnative['Body'][0],Tag) and (resnative['Body'][0]).name=='Fault'): try: errstr=resnative['Body'][0][1][0] except: return callError('SOAP answer error message structure cannot be understood') return callError('Server gave a SOAP error: '+errstr) resans=resnative['Body'][0] else: return callError('Unknown conversion style passed to soap call library') if debug: print "\n===== result data: =====\n" print resans return resans def callError(errstr): raise "ERROR: "+str(errstr) #return "ERROR: "+str(errstr) def getServiceUrl(function): return "http://localhost/cgi-bin/receiver_dummy" # ========== # # server functions # # ========== def fetchCallData(style='complex'): # check encoding style if not style in ('simple','complex','tag',): raise Exception, 'Unknown conversion style passed to soap call library fun fetchCallData' try: form = cgi.FieldStorage() fullxml=str(form.value) except Exception, errmsg: raise Exception, 'Error reading posted text: cgi system error? '+str(errmsg) try: native=xmlDeserialize(fullxml, structTags=('Struct','Envelope','Header','RequestHeader','Body',), style=style) except Exception, errmsg: raise Exception, "Error parsing xml query: "+str(errmsg) try: if native.has_key('Header'): header=native['Header'] else: header=None if header and header.has_key('RequestHeader'): header=header['RequestHeader'] else: header=None body=native['Body'] if not body or type(body)!=DictType: raise Exception, "Error fetching SOAP parts: Body not found, wrong SOAP format? " rpcfunction=None for key in body.keys(): if not key.startswith('xml'): rpcfunction=key rawparams=body[key] if not rpcfunction: raise Exception, "Error fetching SOAP parts: rpc function not found in Body, wrong SOAP format? " rpcparams={} for el in rawparams: if el and isinstance(el,Tag): rpcparams[el.name]=el[0] elif el and el[0] and len(el)>1: if el[0].startswith(simpleChar): rpcparams[el[0][len(simpleChar):]]=el[1] elif el[0].startswith(complexChar) and len(el)>3: rpcparams[el[0][len(complexChar):]]=el[3] else: raise Exception, "Error fetching SOAP parts: cannot understand rpc parameter, wrong SOAP format? " except Exception, errmsg: raise Exception, "Error fetching SOAP parts: wrong SOAP format? " return (rpcfunction,rpcparams,header,) def makeSoapError(code,msg): if code: return (soapErrorTemplate % (strEnc(str(code)),strEnc(str(msg)),)) else: return (soapErrorTemplate % ("",strEnc(str(msg)),)) def makeSoapAnswer(function,params,style='complex'): try: function+function+'answer' if type(params)==ListType: full=[simpleChar+function]+params elif type(params)==DictType: full=[simpleChar+function]+[params] else: full=[simpleChar+function]+[params] xmldata=xmlSerialize(full) header="" envelope=(soapAnswerEnvTemplate % (header,xmldata,)) except Exception, errmsg: raise Exception, "Error making SOAP answer envelope: wrong SOAP format? "+str(errmsg) return envelope print "naide algab" soapCall("foo",{'a':(12,11),'b':'proov'},log=False,debug=True,callsyst='csyst', initiatororgid='initid',initiatorpersonid='initpers',initiatorsystemid='initsystid', callersystemid='callesyst',aboutpersonid='aboutperson',transactionid='transid') print "naide lopeb"