[blocking] Add pipeline blocking, see #182032
authorVincent Michel <vincent.michel@logilab.fr>
Tue, 08 Oct 2013 13:19:27 +0000
changeset 305 afe25ae5facf
parent 304 3ad94b0b5322
child 306 cf1e78baf803
[blocking] Add pipeline blocking, see #182032
blocking.py
test/test_blocking.py
--- a/blocking.py	Tue Oct 08 13:18:13 2013 +0000
+++ b/blocking.py	Tue Oct 08 13:19:27 2013 +0000
@@ -63,6 +63,19 @@
         self.targetids = None
         self.is_fitted = False
 
+    def _fit(self, refset, targetset):
+        raise NotImplementedError
+
+    def _iter_blocks(self):
+        """ Internal iteration function over blocks
+        """
+        raise NotImplementedError
+
+    def _cleanup(self):
+        """ Internal cleanup blocking for further use (e.g. in pipeline)
+        """
+        raise NotImplementedError
+
     def fit(self, refset, targetset):
         """ Fit the blocking technique on the reference and target datasets
 
@@ -78,9 +91,6 @@
         self.targetids = [(i, r[0]) for i, r in enumerate(targetset)]
         self.is_fitted = True
 
-    def _fit(self, refset, targetset):
-        raise NotImplementedError
-
     def iter_blocks(self):
         """ Iterator over the different possible blocks.
 
@@ -122,12 +132,6 @@
         for block1, block2 in self._iter_blocks():
             yield [r[1] for r in block1], [r[1] for r in block2]
 
-
-    def _iter_blocks(self):
-        """ Internal iteration function over blocks
-        """
-        raise NotImplementedError
-
     def iter_pairs(self):
         """ Iterator over the different possible pairs.
 
@@ -174,6 +178,12 @@
                 for val2 in block2:
                     yield val1, val2
 
+    def cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.is_fitted = True
+        self._cleanup()
+
 
 ###############################################################################
 ### KEY BLOCKING ##############################################################
@@ -223,6 +233,12 @@
             if block1 and block2:
                 yield (block1, block2)
 
+    def _cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.reference_index = {}
+        self.target_index = {}
+
 
 class SoundexBlocking(KeyBlocking):
 
@@ -290,6 +306,12 @@
             if block1 and block2:
                 yield block1, block2
 
+    def _cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.reference_index = {}
+        self.target_index = {}
+
 
 ###############################################################################
 ### SORTKEY BLOCKING ##########################################################
@@ -330,6 +352,11 @@
             if block1 and block2:
                 yield (block1, block2)
 
+    def _cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.sorted_dataset = None
+
 
 ###############################################################################
 ### CLUSTERING-BASED BLOCKINGS ################################################
@@ -380,6 +407,12 @@
             if len(block1) and len(block2):
                 yield block1, block2
 
+    def _cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.kmeans = None
+        self.predicted = None
+
 
 ###############################################################################
 ### KDTREE BLOCKINGS ##########################################################
@@ -431,6 +464,13 @@
             if len(block1) and len(block2):
                 yield block1, block2
 
+    def _cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.reftree = None
+        self.targettree = None
+        self.nb_elements = None
+
 
 ###############################################################################
 ### MINHASHING BLOCKINGS ######################################################
@@ -481,3 +521,64 @@
         for block1, block2 in neighbours:
             if len(block1) and len(block2):
                 yield block1, block2
+
+    def _cleanup(self):
+        """ Cleanup blocking for further use (e.g. in pipeline)
+        """
+        self.minhasher = Minlsh()
+        self.nb_elements = None
+
+
+###############################################################################
+### BLOCKING PIPELINE #########################################################
+###############################################################################
+class PipelineBlocking(BaseBlocking):
+    """ Pipeline multiple blocking techniques
+    """
+
+    def __init__(self, blockings):
+        """ Build the blocking object
+
+        Parameters
+        ----------
+
+        blockings: ordered list of blocking objects
+        """
+        self.blockings = blockings
+        self.stored_blocks = []
+
+    def _fit(self, refset, targetset):
+        """ Internal fit of the pipeline """
+        self._recursive_fit(refset, targetset, range(len(refset)), range(len(targetset)), 0)
+
+    def _recursive_fit(self, refset, targetset, ref_index, target_index, ind):
+        """ Recursive fit of the blockings.
+        Blocks are stored in the stored_blocks attribute.
+        """
+        if ind < len(self.blockings) - 1:
+            # There are other blockings after this one
+            blocking = self.blockings[ind]
+            blocking.cleanup()
+            blocking.fit([refset[i] for i in ref_index],
+                         [targetset[i] for i in target_index])
+            for block1, block2 in blocking.iter_indice_blocks():
+                ind_block1 = [ref_index[i] for i in block1]
+                ind_block2 = [target_index[i] for i in block2]
+                self._recursive_fit(refset, targetset, ind_block1, ind_block2, ind+1)
+        else:
+            # This is the final blocking
+            blocking = self.blockings[ind]
+            blocking.cleanup()
+            blocking.fit([refset[i] for i in ref_index],
+                         [targetset[i] for i in target_index])
+            for block1, block2 in blocking.iter_blocks():
+                ind_block1 = [(ref_index[i], _id) for i, _id in block1]
+                ind_block2 = [(target_index[i], _id) for i, _id in block2]
+                self.stored_blocks.append((ind_block1, ind_block2))
+
+    def _iter_blocks(self):
+        """ Internal iteration function over blocks
+        """
+        for block1, block2 in self.stored_blocks:
+            if block1 and block2:
+                yield block1, block2
--- a/test/test_blocking.py	Tue Oct 08 13:18:13 2013 +0000
+++ b/test/test_blocking.py	Tue Oct 08 13:19:27 2013 +0000
@@ -24,7 +24,7 @@
 from nazca.distances import (levenshtein, soundex, soundexcode,   \
                              jaccard, euclidean, geographical)
 from nazca.blocking import (KeyBlocking, SortedNeighborhoodBlocking,
-                            NGramBlocking,
+                            NGramBlocking, PipelineBlocking,
                             SoundexBlocking, KmeansBlocking,
                             MinHashingBlocking, KdTreeBlocking)
 from nazca.normalize import SimplifyNormalizer, loadlemmas
@@ -166,7 +166,7 @@
 
 class NGramBlockingTest(unittest2.TestCase):
 
-    def test_keyblocking_blocks(self):
+    def test_ngram_blocks(self):
         blocking = NGramBlocking(ref_attr_index=1, target_attr_index=1)
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
         blocks = list(blocking.iter_id_blocks())
@@ -175,7 +175,7 @@
         self.assertIn((['a5'], ['b4']), blocks)
         self.assertIn((['a1', 'a4'], ['b3']), blocks)
 
-    def test_keyblocking_blocks_depth(self):
+    def test_ngram_blocks_depth(self):
         blocking = NGramBlocking(ref_attr_index=1, target_attr_index=1, depth=1)
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
         blocks = list(blocking.iter_id_blocks())
@@ -186,6 +186,19 @@
         self.assertIn((['a7'], ['b6']), blocks)
         self.assertIn((['a1', 'a4'], ['b3']), blocks)
 
+    def test_ngram_blocks_2(self):
+        refset = [['3', 'ccdd', 'aabb'],
+                  ['4', 'ccdd', 'bbaa']]
+        targetset = [['c', 'ccdd', 'aabb'],
+                     ['d', 'ccdd', 'bbaa']]
+        true_pairs = [('3', 'c'), ('4', 'd')]
+        blocking = NGramBlocking(ref_attr_index=2, target_attr_index=2,
+                                   ngram_size=2, depth=1)
+        blocking.fit(refset, targetset)
+        pairs = list(blocking.iter_id_pairs())
+        self.assertEqual(len(pairs), len(true_pairs))
+
+
 class SortedNeighborhoodBlockingTest(unittest2.TestCase):
 
     def test_sorted_neighborhood_blocks(self):
@@ -291,6 +304,54 @@
                           (['V4'], ['T2'])], blocks)
 
 
+class PipelineBlockingTest(unittest2.TestCase):
+
+    def test_pipeline_blocking(self):
+        refset = [['1', 'aabb', 'ccdd'],
+                  ['2', 'aabb', 'ddcc'],
+                  ['3', 'ccdd', 'aabb'],
+                  ['4', 'ccdd', 'bbaa']]
+        targetset = [['a', 'aabb', 'ccdd'],
+                     ['b', 'aabb', 'ddcc'],
+                     ['c', 'ccdd', 'aabb'],
+                     ['d', 'ccdd', 'bbaa']]
+        true_pairs = [((0, '1'), (0, 'a')), ((1, '2'), (1, 'b')), ((2, '3'), (2, 'c')), ((3, '4'), (3, 'd'))]
+        blocking_1 = NGramBlocking(ref_attr_index=1, target_attr_index=1,
+                                   ngram_size=2, depth=1)
+        blocking_2 = NGramBlocking(ref_attr_index=2, target_attr_index=2,
+                                   ngram_size=2, depth=1)
+        blocking = PipelineBlocking((blocking_1, blocking_2))
+        blocking.fit(refset, targetset)
+        pairs = list(blocking.iter_pairs())
+        self.assertEqual(len(pairs), len(true_pairs))
+        for pair in true_pairs:
+            self.assertIn(pair, pairs)
+
+    def test_pipeline_id_blocking(self):
+        refset = [['1', 'aabb', 'ccdd'],
+                  ['2', 'aabb', 'ddcc'],
+                  ['3', 'ccdd', 'aabb'],
+                  ['4', 'ccdd', 'bbaa']]
+        targetset = [['a', 'aabb', 'ccdd'],
+                     ['b', 'aabb', 'ddcc'],
+                     ['c', 'ccdd', 'aabb'],
+                     ['d', 'ccdd', 'bbaa']]
+        true_pairs = [('1', 'a'), ('2', 'b'), ('3', 'c'), ('4', 'd')]
+        blocking_1 = NGramBlocking(ref_attr_index=1, target_attr_index=1,
+                                   ngram_size=2, depth=1)
+        blocking_2 = NGramBlocking(ref_attr_index=2, target_attr_index=2,
+                                   ngram_size=2, depth=1)
+        blocking = PipelineBlocking((blocking_1, blocking_2))
+        blocking.fit(refset, targetset)
+        pairs = list(blocking.iter_id_pairs())
+        self.assertEqual(len(pairs), len(true_pairs))
+        for pair in true_pairs:
+            self.assertIn(pair, pairs)
+
+
+
+
+
 if __name__ == '__main__':
     unittest2.main()