[aligner] Add aligners pipeline, see #182035
authorVincent Michel <vincent.michel@logilab.fr>
Tue, 08 Oct 2013 13:22:53 +0000
changeset 308 723bfd8b0ff6
parent 307 31a13077a293
child 309 abb55b01503f
[aligner] Add aligners pipeline, see #182035
aligner.py
test/test_alignment.py
--- a/aligner.py	Tue Oct 15 08:42:27 2013 +0000
+++ b/aligner.py	Tue Oct 08 13:22:53 2013 +0000
@@ -24,6 +24,26 @@
 
 
 ###############################################################################
+### UTILITY FUNCTIONS #########################################################
+###############################################################################
+def iter_aligned_pairs(refset, targetset, global_mat, global_matched, unique=True):
+    """ Return the aligned pairs
+    """
+    if unique:
+        for refid in global_matched:
+            bestid, _ = sorted(global_matched[refid], key=lambda x:x[1])[0]
+            ref_record = refset[refid]
+            target_record = targetset[bestid]
+            yield (ref_record[0], refid), (target_record[0], bestid)
+    else:
+        for refid in global_matched:
+            for targetid, _ in global_matched[refid]:
+                ref_record = refset[refid]
+                target_record = targetset[targetid]
+                yield (ref_record[0], refid), (target_record[0], targetid)
+
+
+###############################################################################
 ### BASE ALIGNER OBJECT #######################################################
 ###############################################################################
 class BaseAligner(object):
@@ -35,6 +55,7 @@
         self.ref_normalizer = None
         self.target_normalizer = None
         self.blocking = None
+        self.alignments_done = 0
         self.nb_comparisons = 0
         self.nb_blocks = 0
         self.refset_size = None
@@ -51,14 +72,14 @@
         before alignment """
         self.target_normalizer = normalizer
 
+    def register_blocking(self, blocking):
+        self.blocking = blocking
+
     def apply_normalization(self, dataset, normalizer):
         if normalizer:
             return normalizer.normalize_dataset(dataset)
         return dataset
 
-    def register_blocking(self, blocking):
-        self.blocking = blocking
-
     def compute_distance_matrix(self, refset, targetset,
                                 ref_indexes, target_indexes):
         """ Compute and return the global alignment matrix.
@@ -129,42 +150,21 @@
                 subdict = global_matched.setdefault(k, set())
                 for v, d in values:
                     subdict.add((v, d))
+                    self.alignments_done += 1
                     if get_matrix:
                         # XXX avoid issue in sparse matrix
                         global_mat[k, v] = d or 10**(-10)
         self.time = time.time() - start_time
         return global_mat, global_matched
 
-    def _iter_aligned_pairs(self, refset, targetset, global_mat, global_matched, unique=True):
-        """ Return the aligned pairs
-        """
-        if unique:
-            for refid in global_matched:
-                bestid, _ = sorted(global_matched[refid], key=lambda x:x[1])[0]
-                ref_record = refset[refid]
-                target_record = targetset[bestid]
-                yield (ref_record[0], refid), (target_record[0], bestid)
-        else:
-            for refid in global_matched:
-                for targetid, _ in global_matched[refid]:
-                    ref_record = refset[refid]
-                    target_record = targetset[targetid]
-                    yield (ref_record[0], refid), (target_record[0], targetid)
-        if self.verbose:
-            print 'Computation time : ', self.time
-            print 'Size reference set : ', self.refset_size
-            print 'Size target set : ', self.targetset_size
-            print 'Done comparisons : ', self.nb_comparisons
-            print 'Maximum comparisons : ', self.refset_size * self.targetset_size
-            print 'Number of blocks : ', self.nb_blocks
-            print 'Blocking reduction : ', self.nb_comparisons/(self.refset_size * self.targetset_size)
-
     def get_aligned_pairs(self, refset, targetset, unique=True):
         """ Get the pairs of aligned elements
         """
         global_mat, global_matched = self.align(refset, targetset, get_matrix=False)
-        for pair in self._iter_aligned_pairs(refset, targetset, global_mat, global_matched, unique):
+        for pair in iter_aligned_pairs(refset, targetset, global_mat, global_matched, unique):
             yield pair
+        if self.verbose:
+            self.display_infos()
 
     def align_from_files(self, reffile, targetfile,
                          ref_indexes=None, target_indexes=None,
@@ -208,28 +208,105 @@
         targetset = parsefile(targetfile, indexes=target_indexes,
                               encoding=target_encoding, delimiter=target_separator)
         global_mat, global_matched = self.align(refset, targetset, get_matrix=False)
-        for pair in self._iter_aligned_pairs(refset, targetset, global_mat, global_matched, unique):
+        for pair in iter_aligned_pairs(refset, targetset, global_mat, global_matched, unique):
             yield pair
 
+    def display_infos(self):
+        """ Display some info on the aligner process
+        """
+        print 'Computation time : ', self.time
+        print 'Size reference set : ', self.refset_size
+        print 'Size target set : ', self.targetset_size
+        print 'Comparisons done : ', self.nb_comparisons
+        print 'Alignments done : ', self.alignments_done
+        print 'Ratio reference set/alignments done : ', self.alignments_done/float(self.refset_size)
+        print 'Ratio target set/alignments done : ', self.alignments_done/float(self.targetset_size)
+        print 'Maximum comparisons : ', self.refset_size * self.targetset_size
+        print 'Number of blocks : ', self.nb_blocks
+        if self.nb_blocks:
+            print 'Ratio comparisons/block : ', float(self.nb_comparisons)/self.nb_blocks
+        print 'Blocking reduction : ', self.nb_comparisons/float(self.refset_size * self.targetset_size)
+
 
 ###############################################################################
-### ITERATIVE ALIGNER OBJECT ##################################################
+### PIPELINE ALIGNER OBJECT ##################################################
 ###############################################################################
-class IterativePassAligner(object):
-    """ This aligner may be used to perform multi pass of alignements.
-
-        It takes your csv files as arguments and split them into smaller ones
-        (files of `size` lines), and runs the alignment on those files.
-
-        If the distance of an alignment is below `equality_threshold`, the
-        alignment is considered as perfect, and the corresponding item is
-        removed from the alignset (to speed up the computation).
+class PipelineAligner(object):
+    """ This pipeline will perform iterative alignments, removing each time
+    the aligned results from the previous aligner.
     """
 
-    def __init__(self, threshold, treatments, equality_threshold):
-        self.threshold = threshold
-        self.treatments = treatments
-        self.equality_threshold = equality_threshold
-        self.ref_normalizer = None
-        self.target_normalizer = None
-        self.blocking = None
+    def __init__(self, aligners, verbose=False):
+        self.aligners = aligners
+        self.verbose = verbose
+        self.pairs = {}
+        self.nb_comparisons = 0
+        self.nb_blocks = 0
+        self.alignments_done = 0
+        self.refset_size = None
+        self.targetset_size = None
+        self.time = None
+
+    def align(self, refset, targetset):
+        """ Perform the alignment on the referenceset
+        and the targetset
+        """
+        start_time = time.time()
+        ref_index = range(len(refset))
+        target_index = range(len(targetset))
+        self.refset_size = len(refset)
+        self.targetset_size = len(targetset)
+        global_matched = {}
+        global_mat = lil_matrix((len(refset), len(targetset)))
+        # Iteration over aligners
+        for ind_aligner, aligner in enumerate(self.aligners):
+            # Perform alignment
+            _refset = [refset[i] for i in ref_index]
+            _targetset = [targetset[i] for i in target_index]
+            _global_mat, _global_matched = aligner.align(_refset, _targetset, get_matrix=False)
+            # Store results
+            for k, values in _global_matched.iteritems():
+                subdict = global_matched.setdefault(ref_index[k], set())
+                for v, d in values:
+                    self.alignments_done += 1
+                    subdict.add((target_index [v], d))
+            # Store stats
+            self.nb_blocks += aligner.nb_blocks
+            self.nb_comparisons += aligner.nb_comparisons
+            # Update indexes if necessary
+            if ind_aligner < len(self.aligners) - 1:
+                # There are other aligners after this one
+                _ref_index, _target_index = set(), set()
+                for k, values in _global_matched.iteritems():
+                    _ref_index.add(k)
+                    for v, d in values:
+                        _target_index.add(v)
+                ref_index = [i for i in ref_index if i not in _ref_index]
+                target_index = [i for i in target_index if i not in _target_index]
+        self.time = time.time() - start_time
+        return global_mat, global_matched
+
+    def get_aligned_pairs(self, refset, targetset, unique=True):
+        """ Get the pairs of aligned elements
+        """
+        global_mat, global_matched = self.align(refset, targetset)
+        for pair in iter_aligned_pairs(refset, targetset, global_mat, global_matched, unique):
+            yield pair
+        if self.verbose:
+            self.display_infos()
+
+    def display_infos(self):
+        """ Display some info on the aligner process
+        """
+        print 'Computation time : ', self.time
+        print 'Size reference set : ', self.refset_size
+        print 'Size target set : ', self.targetset_size
+        print 'Comparisons done : ', self.nb_comparisons
+        print 'Alignments done : ', self.alignments_done
+        print 'Ratio reference set/alignments done : ', self.alignments_done/float(self.refset_size)
+        print 'Ratio target set/alignments done : ', self.alignments_done/float(self.targetset_size)
+        print 'Maximum comparisons : ', self.refset_size * self.targetset_size
+        print 'Number of blocks : ', self.nb_blocks
+        if self.nb_blocks:
+            print 'Ratio comparisons/block : ', float(self.nb_comparisons)/self.nb_blocks
+        print 'Blocking reduction : ', self.nb_comparisons/float(self.refset_size * self.targetset_size)
--- a/test/test_alignment.py	Tue Oct 15 08:42:27 2013 +0000
+++ b/test/test_alignment.py	Tue Oct 08 13:22:53 2013 +0000
@@ -24,7 +24,7 @@
 from nazca.normalize import simplify
 import nazca.aligner as alig
 import nazca.blocking as blo
-from nazca.distances import GeographicalProcessing
+from nazca.distances import LevenshteinProcessing, GeographicalProcessing
 
 
 TESTDIR = path.dirname(__file__)
@@ -145,6 +145,62 @@
         for m in uniq_matched:
             self.assertIn(m, matched)
 
+
+class PipelineAlignerTestCase(unittest2.TestCase):
+
+    def test_pipeline_align(self):
+        refset = [['V1', 'aaa', (6.14194444444, 48.67)],
+                  ['V2', 'bbb', (6.2, 49)],
+                  ['V3', 'ccc', (5.1, 48)],
+                  ['V4', 'ddd', (5.2, 48.1)],
+                  ]
+        targetset = [['T1', 'zzz', (6.17, 48.7)],
+                     ['T2', 'eec', (5.3, 48.2)],
+                     ['T3', 'fff', (6.25, 48.91)],
+                     ['T4', 'ccd', (0, 0)],
+                     ]
+        # Creation of the aligner object
+        processings = (GeographicalProcessing(2, 2, units='km'),)
+        aligner_1 = alig.BaseAligner(threshold=30, processings=processings)
+        processings = (LevenshteinProcessing(1, 1),)
+        aligner_2 = alig.BaseAligner(threshold=1, processings=processings)
+        pipeline = alig.PipelineAligner((aligner_1, aligner_2))
+        global_mat, global_matched = pipeline.align(refset, targetset)
+        true_global_matched =  {0: set([(2, 29.124842), (0, 4.5532517)]),
+                                1: set([(2, 11.396689)]), 2: set([(3, 1.0)]),
+                                3: set([(1, 15.69241)])}
+        self.assertEqual(len(global_matched), len(true_global_matched))
+        for k, v in true_global_matched.iteritems():
+            self.assertIn(k, global_matched)
+            self.assertEqual(len(v), len(global_matched[k]))
+
+    def test_pipeline_align_pairs(self):
+        refset = [['V1', 'aaa', (6.14194444444, 48.67)],
+                  ['V2', 'bbb', (6.2, 49)],
+                  ['V3', 'ccc', (5.1, 48)],
+                  ['V4', 'ddd', (5.2, 48.1)],
+                  ]
+        targetset = [['T1', 'zzz', (6.17, 48.7)],
+                     ['T2', 'eec', (5.3, 48.2)],
+                     ['T3', 'fff', (6.25, 48.91)],
+                     ['T4', 'ccd', (0, 0)],
+                     ]
+        # Creation of the aligner object
+        processings = (GeographicalProcessing(2, 2, units='km'),)
+        aligner_1 = alig.BaseAligner(threshold=30, processings=processings)
+        processings = (LevenshteinProcessing(1, 1),)
+        aligner_2 = alig.BaseAligner(threshold=1, processings=processings)
+        pipeline = alig.PipelineAligner((aligner_1, aligner_2))
+        uniq_matched = [(('V1', 0), ('T1', 0)), (('V2', 1), ('T3', 2)),
+                        (('V4', 3), ('T2', 1)), (('V3', 2), ('T4', 3))]
+        matched = list(pipeline.get_aligned_pairs(refset, targetset, unique=True))
+        self.assertEqual(len(matched), len(uniq_matched))
+        for m in uniq_matched:
+            self.assertIn(m, matched)
+
+
+
+
 if __name__ == '__main__':
     unittest2.main()