[profile gen] Introduce an ABC adapter
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 06 Jul 2016 18:12:11 +0200
changeset 1338 162b0e22ccbd
parent 1337 d63c144f8cbc
child 1339 70bf9bb74735
[profile gen] Introduce an ABC adapter
entities/profile_generation.py
--- a/entities/profile_generation.py	Wed Jul 06 17:54:04 2016 +0200
+++ b/entities/profile_generation.py	Wed Jul 06 18:12:11 2016 +0200
@@ -51,14 +51,114 @@
     return '{{{0}}}'.format(namespaces[prefix]) + name
 
 
-class SEDA2XSDExport(EntityAdapter):
-    """Adapter to build an XSD representation of a SEDA profile, using SEDA 2.0 specification"""
-    __regid__ = 'SEDA-2.0.xsd'
+class SEDA2ExportAdapter(EntityAdapter):
+    """Abstract base class for export of SEDA profile"""
+    __abstract__ = True
     __select__ = is_instance('SEDAArchiveTransfer')
     encoding = 'utf-8'
     content_type = 'application/xml'
+    # to be defined in concret implementations
+    extension = None
+    namespaces = {}
+    root_attributes = {}
+
+    @property
+    def file_name(self):
+        """Return a file name for the dump"""
+        return '%s.%s' % (self.entity.dc_title(), self.extension)
+
+    def dump(self):
+        """Return an schema string for the adapted SEDA profile."""
+        root = self.dump_etree()
+        return etree.tostring(root, encoding=self.encoding, pretty_print=True, standalone=False)
+
+    def dump_etree(self):
+        """Return an XSD etree for the adapted SEDA profile."""
+        raise NotImplementedError()
+
+    def qname(self, tag):
+        return substitute_xml_prefix(tag, self.namespaces)
+
+    def element(self, tag, parent=None, attributes=None, text=None):
+        """Generic method to build a XSD element tag.
+
+        Params:
+
+        * `name`, value for the 'name' attribute of the xsd:element
+
+        * `parent`, the parent etree node
+
+        * `attributes`, dictionary of attributes
+        """
+        attributes = attributes or {}
+        tag = self.qname(tag)
+        documentation = attributes.pop('documentation', None)
+        for attr, value in attributes.items():
+            newattr = substitute_xml_prefix(attr, self.namespaces)
+            attributes[newattr] = value
+            if newattr != attr:
+                attributes.pop(attr)
+        if parent is None:
+            elt = etree.Element(tag, attributes, nsmap=self.namespaces)
+        else:
+            elt = etree.SubElement(parent, tag, attributes)
+        if text is not None:
+            elt.text = text
+        if documentation:
+            annot = self.element('xsd:annotation', elt)
+            self.element('xsd:documentation', annot).text = documentation
+        return elt
+
+    def dispatch_occ(self, profile_element, occ, target_value, to_process, card_entity):
+        callback = getattr(self, 'element_' + occ.target.__class__.__name__.lower())
+        callback(occ, profile_element, target_value, to_process, card_entity)
+
+    def _process(self, entity, profile_element, child_defs, to_process):
+        for occ, path in child_defs:
+            # print '  child', getattr(occ.target, 'local_name', occ.target.__class__.__name__), \
+            #    [x[:-1] for x in path]
+            if not path:
+                assert not isinstance(occ.target, graph_nodes.XMLAttribute)
+                if occ.target.local_name in JUMPED_OPTIONAL_ELEMENTS:
+                    # elements in JUMPED_OPTIONAL_ELEMENTS are jumped but have optional cardinality,
+                    # so search in all it's child element, and mark it as mandatory if one of them
+                    # is mandatory, else keep it optional
+                    cardinality = '0..1'
+                    for rtype, role, _path in XSDM_MAPPING.iter_rtype_role(occ.target.local_name):
+                        if _path[0][2] in BASE_TYPES:
+                            # special case of a mandatory attribute: parent element will be
+                            # mandatory if some value is specified, else that's fine
+                            if getattr(entity, rtype) is not None:
+                                cardinality = '1'
+                                break
+                            else:
+                                continue
+                        targets = entity.related(rtype, role, entities=True)
+                        if any(target.user_cardinality == '1' for target in targets):
+                            cardinality = '1'
+                            break
+                else:
+                    cardinality = None
+                # jumped element: give None as target_value but register the generated element for
+                # later processing
+                self.dispatch_occ(profile_element, occ, None, to_process,
+                                  card_entity=attrdict({'user_cardinality': cardinality}))
+                to_process[occ.target].append((entity, self._jumped_element(profile_element)))
+            else:
+                # print '  values', _path_target_values(entity, path)
+                for card_entity, target_value in _path_target_values(entity, path):
+                    self.dispatch_occ(profile_element, occ, target_value, to_process,
+                                      card_entity=card_entity)
+
+    def _jumped_element(self, profile_element):
+        """Return the last generated element, for insertion of its content;"""
+        raise NotImplementedError()
+
+
+class SEDA2XSDExport(SEDA2ExportAdapter):
+    """Adapter to build an XSD representation of a SEDA profile, using SEDA 2.0 specification"""
+    __regid__ = 'SEDA-2.0.xsd'
     extension = 'xsd'
-
     namespaces = {
         None: 'fr:gouv:culture:archivesdefrance:seda:v2.0',
         'seda': 'fr:gouv:culture:archivesdefrance:seda:v2.0',
@@ -72,22 +172,6 @@
         'version': '1.0',
     }
 
-    def __init__(self, *args, **kwargs):
-        super(SEDA2XSDExport, self).__init__(*args, **kwargs)
-        self.processed_types = set()
-        self.processed_entities = {}
-        self.type_index = 0
-
-    @property
-    def file_name(self):
-        """Return a file name for the dump"""
-        return '%s.%s' % (self.entity.dc_title(), self.extension)
-
-    def dump(self):
-        """Return an XSD string for the adapted SEDA profile."""
-        root = self.dump_etree()
-        return etree.tostring(root, encoding=self.encoding, pretty_print=True, standalone=False)
-
     def dump_etree(self):
         """Return an XSD etree for the adapted SEDA profile."""
         self.defined_content_types = set()
@@ -139,47 +223,6 @@
         self.element('xsd:any', open_type_seq, {'namespace': '##other', 'processContents': 'lax',
                                                 'minOccurs': '0'})
 
-    def _process(self, entity, profile_element, child_defs, to_process):
-        for occ, path in child_defs:
-            # print '  child', getattr(occ.target, 'local_name', occ.target.__class__.__name__), \
-            #    [x[:-1] for x in path]
-            if not path:
-                assert not isinstance(occ.target, graph_nodes.XMLAttribute)
-                if occ.target.local_name in JUMPED_OPTIONAL_ELEMENTS:
-                    # elements in JUMPED_OPTIONAL_ELEMENTS are jumped but have optional cardinality,
-                    # so search in all it's child element, and mark it as mandatory if one of them
-                    # is mandatory, else keep it optional
-                    cardinality = '0..1'
-                    for rtype, role, _path in XSDM_MAPPING.iter_rtype_role(occ.target.local_name):
-                        if _path[0][2] in BASE_TYPES:
-                            # special case of a mandatory attribute: parent element will be
-                            # mandatory if some value is specified, else that's fine
-                            if getattr(entity, rtype) is not None:
-                                cardinality = '1'
-                                break
-                            else:
-                                continue
-                        targets = entity.related(rtype, role, entities=True)
-                        if any(target.user_cardinality == '1' for target in targets):
-                            cardinality = '1'
-                            break
-                else:
-                    cardinality = None
-                # jumped element: give None as target_value but register the generated element for
-                # later processing
-                self.dispatch_occ(profile_element, occ, None, to_process,
-                                  card_entity=attrdict({'user_cardinality': cardinality}))
-                to_process[occ.target].append((entity, self._jumped_element(profile_element)))
-            else:
-                # print '  values', _path_target_values(entity, path)
-                for card_entity, target_value in _path_target_values(entity, path):
-                    self.dispatch_occ(profile_element, occ, target_value, to_process,
-                                      card_entity=card_entity)
-
-    def dispatch_occ(self, profile_element, occ, target_value, to_process, card_entity):
-        callback = getattr(self, 'element_' + occ.target.__class__.__name__.lower())
-        callback(occ, profile_element, target_value, to_process, card_entity)
-
     def element_alternative(self, occ, profile_element, target_value, to_process, card_entity):
         attrs = xsd_element_cardinality(occ, card_entity)
         parent_element = self._parent_element(profile_element)
@@ -331,41 +374,8 @@
             content_type = 'xsd:' + content_type
         return content_type
 
-    def qname(self, tag):
-        return substitute_xml_prefix(tag, self.namespaces)
 
-    def element(self, tag, parent=None, attributes=None, text=None):
-        """Generic method to build a XSD element tag.
-
-        Params:
-
-        * `name`, value for the 'name' attribute of the xsd:element
-
-        * `parent`, the parent etree node
-
-        * `attributes`, dictionary of attributes
-        """
-        attributes = attributes or {}
-        tag = self.qname(tag)
-        documentation = attributes.pop('documentation', None)
-        for attr, value in attributes.items():
-            newattr = substitute_xml_prefix(attr, self.namespaces)
-            attributes[newattr] = value
-            if newattr != attr:
-                attributes.pop(attr)
-        if parent is None:
-            elt = etree.Element(tag, attributes, nsmap=self.namespaces)
-        else:
-            elt = etree.SubElement(parent, tag, attributes)
-        if text is not None:
-            elt.text = text
-        if documentation:
-            annot = self.element('xsd:annotation', elt)
-            self.element('xsd:documentation', annot).text = documentation
-        return elt
-
-
-class SEDA2RelaxNGExport(SEDA2XSDExport):
+class SEDA2RelaxNGExport(SEDA2ExportAdapter):
     """Adapter to build a Relax NG representation of a SEDA profile, using SEDA 2.0 specification"""
     __regid__ = 'SEDA-2.0.rng'
     extension = 'rng'
@@ -381,7 +391,6 @@
 
     def dump_etree(self):
         """Return an XSD etree for the adapted SEDA profile."""
-        self.defined_content_types = set()
         root = self.element('rng:grammar', attributes=self.root_attributes)
         start = self.element('rng:start', root)
         # XXX http://lists.xml.org/archives/xml-dev/200206/msg01074.html ?