[blocking] Now blocking techniques return indice or id or both, see #183415
authorVincent Michel <vincent.michel@logilab.fr>
Tue, 15 Oct 2013 08:10:03 +0000
changeset 303 52ba47aa143f
parent 302 8170c858e303
child 304 3ad94b0b5322
[blocking] Now blocking techniques return indice or id or both, see #183415
aligner.py
blocking.py
normalize.py
test/test_blocking.py
test/test_normalize.py
--- a/aligner.py	Tue Oct 08 13:10:21 2013 +0000
+++ b/aligner.py	Tue Oct 15 08:10:03 2013 +0000
@@ -22,25 +22,30 @@
 
 
 ###############################################################################
-### ALIGNER OBJECTS ###########################################################
+### BASE ALIGNER OBJECT #######################################################
 ###############################################################################
 class BaseAligner(object):
 
     def __init__(self, threshold, processings):
         self.threshold = threshold
         self.processings = processings
-        self.normalizers = None
+        self.ref_normalizer = None
+        self.target_normalizer = None
         self.blocking = None
 
-    def register_normalizers(self, normalizers):
+    def register_ref_normalizer(self, normalizer):
         """ Register normalizers to be applied
         before alignment """
-        self.normalizers = normalizers
+        self.ref_normalizer = normalizer
 
-    def apply_normalization(self, dataset):
-        if self.normalizers:
-            norm_pipeline = NormalizerPipeline(self.normalizers)
-            return norm_pipeline.normalize_dataset(dataset)
+    def register_target_normalizer(self, normalizer):
+        """ Register normalizers to be applied
+        before alignment """
+        self.target_normalizer = normalizer
+
+    def apply_normalization(self, dataset, normalizer):
+        if normalizer:
+            return normalizer.normalize_dataset(dataset)
         return dataset
 
     def register_blocking(self, blocking):
@@ -94,8 +99,8 @@
         """ Perform the alignment on the referenceset
         and the targetset
         """
-        refset = self.apply_normalization(refset)
-        targetset = self.apply_normalization(targetset)
+        refset = self.apply_normalization(refset, self.ref_normalizer)
+        targetset = self.apply_normalization(targetset, self.target_normalizer)
         # If no blocking
         if not self.blocking:
             return self._get_match(refset, targetset)
@@ -104,7 +109,10 @@
         global_mat = lil_matrix((len(refset), len(targetset)))
         self.blocking.fit(refset, targetset)
         for refblock, targetblock in self.blocking.iter_blocks():
-            _, matched = self._get_match(refset, targetset, refblock, targetblock)
+            ref_index = [r[0] for r in refblock]
+            target_index = [r[0] for r in targetblock]
+            print ref_index, target_index
+            _, matched = self._get_match(refset, targetset, ref_index, target_index)
             for k, values in matched.iteritems():
                 subdict = global_matched.setdefault(k, set())
                 for v, d in values:
@@ -126,3 +134,15 @@
             for refid in global_matched:
                 for targetid, _ in global_matched[refid]:
                     yield refset[refid][0], targetset[targetid][0]
+
+
+## ###############################################################################
+## ### ITERATIVE ALIGNER OBJECT ##################################################
+## ###############################################################################
+## class MultiPassAligner(object):
+##     """ This aligner may be used to perform multi pass of alignements.
+##     Records linked in a previous pass will not be consider in the nex pass.
+##     """
+
+##     def __init__(self, threshold, treatments):
+ 
--- a/blocking.py	Tue Oct 08 13:10:21 2013 +0000
+++ b/blocking.py	Tue Oct 15 08:10:03 2013 +0000
@@ -59,6 +59,8 @@
         """
         self.ref_attr_index = ref_attr_index
         self.target_attr_index = target_attr_index
+        self.refids = None
+        self.targetids = None
         self.is_fitted = False
 
     def fit(self, refset, targetset):
@@ -71,6 +73,9 @@
         targetset: a dataset (list of records)
         """
         self._fit(refset, targetset)
+        # Keep ids for blocks building
+        self.refids = [(i, r[0]) for i, r in enumerate(refset)]
+        self.targetids = [(i, r[0]) for i, r in enumerate(targetset)]
         self.is_fitted = True
 
     def _fit(self, refset, targetset):
@@ -83,12 +88,41 @@
         -------
 
         (block1, block2): The blocks are always (reference_block, target_block)
-                          and containts the indexes of the record in the
+                          and contains the pair (index, id) of the record in the
                           corresponding dataset.
         """
         assert self.is_fitted
         return self._iter_blocks()
 
+    def iter_indice_blocks(self):
+        """ Iterator over the different possible blocks.
+
+        Returns
+        -------
+
+        (block1, block2): The blocks are always (reference_block, target_block)
+                          and contains the indexes of the record in the
+                          corresponding dataset.
+        """
+        assert self.is_fitted
+        for block1, block2 in self._iter_blocks():
+            yield [r[0] for r in block1], [r[0] for r in block2]
+
+    def iter_id_blocks(self):
+        """ Iterator over the different possible blocks.
+
+        Returns
+        -------
+
+        (block1, block2): The blocks are always (reference_block, target_block)
+                          and contains the ids of the record in the
+                          corresponding dataset.
+        """
+        assert self.is_fitted
+        for block1, block2 in self._iter_blocks():
+            yield [r[1] for r in block1], [r[1] for r in block2]
+
+
     def _iter_blocks(self):
         """ Internal iteration function over blocks
         """
@@ -100,14 +134,45 @@
         Returns
         -------
 
+        (pair1, pari2): The pairs are always ((ind_reference, id_reference),
+                                              (ind_target, id_target))
+                        and are the ids of the record in the corresponding dataset.
+        """
+        assert self.is_fitted
+        for block1, block2 in self.iter_blocks():
+            for val1 in block1:
+                for val2 in block2:
+                    yield val1, val2
+
+    def iter_indice_pairs(self):
+        """ Iterator over the different possible pairs.
+
+        Returns
+        -------
+
+        (pair1, pari2): The pairs are always (ind_reference, ind_target)
+                        and are the ids of the record in the corresponding dataset.
+        """
+        assert self.is_fitted
+        for block1, block2 in self.iter_indice_blocks():
+            for val1 in block1:
+                for val2 in block2:
+                    yield val1, val2
+
+    def iter_id_pairs(self):
+        """ Iterator over the different possible pairs.
+
+        Returns
+        -------
+
         (pair1, pari2): The pairs are always (id_reference, id_target)
                         and are the ids of the record in the corresponding dataset.
         """
         assert self.is_fitted
-        for block1, block2 in self.iter_blocks():
-            for refid in block1:
-                for targetid in block2:
-                    yield refid, targetid
+        for block1, block2 in self.iter_id_blocks():
+            for val1 in block1:
+                for val2 in block2:
+                    yield val1, val2
 
 
 ###############################################################################
@@ -136,12 +201,12 @@
     def _fit(self, refset, targetset):
         """ Fit a dataset in an index using the callback
         """
-        for rec in refset:
+        for ind, rec in enumerate(refset):
             key = self.callback(rec[self.ref_attr_index])
-            self.reference_index.setdefault(key, []).append(rec[0])
-        for rec in targetset:
+            self.reference_index.setdefault(key, []).append((ind, rec[0]))
+        for ind, rec in enumerate(targetset):
             key = self.callback(rec[self.target_attr_index])
-            self.target_index.setdefault(key, []).append(rec[0])
+            self.target_index.setdefault(key, []).append((ind, rec[0]))
 
     def _iter_blocks(self):
         """ Iterator over the different possible blocks.
@@ -183,14 +248,14 @@
     def _fit_dataset(self, dataset, cur_index, attr_index):
         """ Fit a dataset
         """
-        for r in dataset:
+        for ind, r in enumerate(dataset):
             cur_dict = cur_index
             text = r[attr_index]
             for i in range(self.depth):
                 ngram = text[i*self.ngram_size:(i+1)*self.ngram_size]
                 if i < self.depth - 1:
                     cur_dict = cur_dict.setdefault(ngram, {})
-            cur_dict.setdefault(ngram, []).append(r[0])
+            cur_dict.setdefault(ngram, []).append((ind, r[0]))
 
     def _fit(self, refset, targetset):
         """ Fit the two sets (reference set and target set)
@@ -243,8 +308,10 @@
     def _fit(self, refset, targetset):
         """ Fit a dataset in an index using the callback
         """
-        self.sorted_dataset = [(r[0], r[self.ref_attr_index], 0) for r in refset]
-        self.sorted_dataset.extend([(r[0], r[self.target_attr_index], 1) for r in targetset])
+        self.sorted_dataset = [((ind, r[0]), r[self.ref_attr_index], 0)
+                               for ind, r in enumerate(refset)]
+        self.sorted_dataset.extend([((ind, r[0]), r[self.target_attr_index], 1)
+                                    for ind, r in enumerate(targetset)])
         self.sorted_dataset.sort(key=lambda x: self.key_func(x[1]))
 
     def _iter_blocks(self):
@@ -305,10 +372,10 @@
                           corresponding dataset.
         """
         neighbours = [[[], []] for _ in xrange(self.kmeans.n_clusters)]
-        for ind, i in enumerate(self.predicted):
-            neighbours[i][1].append(ind)
-        for ind, i in enumerate(self.kmeans.labels_):
-            neighbours[i][0].append(ind)
+        for ind, li in enumerate(self.predicted):
+            neighbours[li][1].append(self.targetids[ind])
+        for ind, li in enumerate(self.kmeans.labels_):
+            neighbours[li][0].append(self.refids[ind])
         for block1, block2 in neighbours:
             if len(block1) and len(block2):
                 yield block1, block2
@@ -357,7 +424,9 @@
         for ind in xrange(self.nb_elements):
             if not extraneighbours[ind]:
                 continue
-            neighbours.append([[ind], extraneighbours[ind]])
+            _ref = [self.refids[ind],]
+            _target = [self.targetids[v] for v in extraneighbours[ind]]
+            neighbours.append((_ref, _target))
         for block1, block2 in neighbours:
             if len(block1) and len(block2):
                 yield block1, block2
@@ -404,9 +473,9 @@
             neighbours.append([[], []])
             for i in data:
                 if i >= self.nb_elements:
-                    neighbours[-1][1].append(i - self.nb_elements)
+                    neighbours[-1][1].append(self.targetids[i - self.nb_elements])
                 else:
-                    neighbours[-1][0].append(i)
+                    neighbours[-1][0].append(self.refids[i])
             if len(neighbours[-1][0]) == 0 or len(neighbours[-1][1]) == 0:
                 neighbours.pop()
         for block1, block2 in neighbours:
--- a/normalize.py	Tue Oct 08 13:10:21 2013 +0000
+++ b/normalize.py	Tue Oct 15 08:10:03 2013 +0000
@@ -358,6 +358,34 @@
 
 
 ###############################################################################
+### JOIN NORMALIZER ###########################################################
+###############################################################################
+class JoinNormalizer(BaseNormalizer):
+    """Normalizer that join multiple fields in only one.
+    This new field will be put at the end of the new record.
+    """
+    def __init__(self, attr_indexes, join_car=', '):
+        self.attr_indexes = attr_indexes
+        self.join_car = join_car
+
+    def normalize(self, record):
+        """ Normalize a record
+
+        Parameters
+        ----------
+        record: a record (tuple/list of values).
+
+        Returns
+        -------
+
+        record: the normalized record.
+        """
+        _record = [r for ind, r in enumerate(record) if ind not in self.attr_indexes]
+        _record.append(self.join_car.join([r for ind, r in enumerate(record) if ind in self.attr_indexes]))
+        return _record
+
+
+###############################################################################
 ### NORMALIZER PIPELINE #######################################################
 ###############################################################################
 class NormalizerPipeline(BaseNormalizer):
--- a/test/test_blocking.py	Tue Oct 08 13:10:21 2013 +0000
+++ b/test/test_blocking.py	Tue Oct 15 08:10:03 2013 +0000
@@ -56,13 +56,80 @@
                  ('a7', 'b6'),)
 
 
+class BaseBlockingTest(unittest2.TestCase):
+
+    def test_baseblocking_blocks(self):
+        blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
+                               callback=partial(soundexcode, language='english'))
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        blocks = list(blocking.iter_blocks())
+        self.assertEqual(len(blocks), 3)
+        self.assertIn(([(0, 'a1'), (6, 'a7')], [(2, 'b3'), (5, 'b6')]), blocks)
+        self.assertIn(([(1, 'a2'), (4, 'a5')], [(3, 'b4')]), blocks)
+        self.assertIn(([(2, 'a3')], [(0, 'b1'), (1, 'b2')]), blocks)
+
+    def test_baseblocking_id_blocks(self):
+        blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
+                               callback=partial(soundexcode, language='english'))
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        blocks = list(blocking.iter_id_blocks())
+        self.assertEqual(len(blocks), 3)
+        self.assertIn((['a1', 'a7'], ['b3', 'b6']), blocks)
+        self.assertIn((['a2', 'a5'], ['b4']), blocks)
+        self.assertIn((['a3'], ['b1', 'b2']), blocks)
+
+    def test_baseblocking_indice_blocks(self):
+        blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
+                               callback=partial(soundexcode, language='english'))
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        blocks = list(blocking.iter_indice_blocks())
+        self.assertEqual(len(blocks), 3)
+        self.assertIn(([0, 6], [2, 5]), blocks)
+        self.assertIn(([1, 4], [3]), blocks)
+        self.assertIn(([2], [0, 1]), blocks)
+
+    def test_baseblocking_pairs(self):
+        blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
+                               callback=partial(soundexcode, language='english'))
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        pairs = list(blocking.iter_pairs())
+        ref_ind = dict((r[0], ind) for ind, r in enumerate(SOUNDEX_REFSET))
+        target_ind = dict((r[0], ind) for ind, r in enumerate(SOUNDEX_TARGETSET))
+        true_pairs = [((ref_ind[r[0]], r[0]), (target_ind[r[1]], r[1])) for r in SOUNDEX_PAIRS]
+        self.assertEqual(len(pairs), len(true_pairs))
+        for pair in true_pairs:
+            self.assertIn(pair, pairs)
+
+    def test_baseblocking_id_pairs(self):
+        blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
+                               callback=partial(soundexcode, language='english'))
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        pairs = list(blocking.iter_id_pairs())
+        true_pairs = SOUNDEX_PAIRS
+        self.assertEqual(len(pairs), len(true_pairs))
+        for pair in true_pairs:
+            self.assertIn(pair, pairs)
+
+    def test_baseblocking_indice_pairs(self):
+        blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
+                               callback=partial(soundexcode, language='english'))
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        pairs = list(blocking.iter_indice_pairs())
+        ref_ind = dict((r[0], ind) for ind, r in enumerate(SOUNDEX_REFSET))
+        target_ind = dict((r[0], ind) for ind, r in enumerate(SOUNDEX_TARGETSET))
+        true_pairs = [(ref_ind[r[0]], target_ind[r[1]]) for r in SOUNDEX_PAIRS]
+        self.assertEqual(len(pairs), len(true_pairs))
+        for pair in true_pairs:
+            self.assertIn(pair, pairs)
+
+
 class KeyBlockingTest(unittest2.TestCase):
 
     def test_keyblocking_blocks(self):
         blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
                                callback=partial(soundexcode, language='english'))
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_id_blocks())
         self.assertEqual(len(blocks), 3)
         self.assertIn((['a1', 'a7'], ['b3', 'b6']), blocks)
         self.assertIn((['a2', 'a5'], ['b4']), blocks)
@@ -72,7 +139,7 @@
         blocking = KeyBlocking(ref_attr_index=1, target_attr_index=1,
                                callback=partial(soundexcode, language='english'))
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        pairs = list(blocking.iter_pairs())
+        pairs = list(blocking.iter_id_pairs())
         self.assertEqual(len(pairs), 8)
         for pair in SOUNDEX_PAIRS:
             self.assertIn(pair, pairs)
@@ -81,7 +148,7 @@
         blocking = SoundexBlocking(ref_attr_index=1, target_attr_index=1,
                                    language='english')
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_id_blocks())
         self.assertEqual(len(blocks), 3)
         self.assertIn((['a1', 'a7'], ['b3', 'b6']), blocks)
         self.assertIn((['a2', 'a5'], ['b4']), blocks)
@@ -91,7 +158,7 @@
         blocking = SoundexBlocking(ref_attr_index=1, target_attr_index=1,
                                    language='english')
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        pairs = list(blocking.iter_pairs())
+        pairs = list(blocking.iter_id_pairs())
         self.assertEqual(len(pairs), 8)
         for pair in SOUNDEX_PAIRS:
             self.assertIn(pair, pairs)
@@ -102,11 +169,22 @@
     def test_keyblocking_blocks(self):
         blocking = NGramBlocking(ref_attr_index=1, target_attr_index=1)
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_id_blocks())
+        self.assertEqual(len(blocks), 3)
         self.assertIn((['a3'], ['b1', 'b2']), blocks)
         self.assertIn((['a5'], ['b4']), blocks)
         self.assertIn((['a1', 'a4'], ['b3']), blocks)
 
+    def test_keyblocking_blocks_depth(self):
+        blocking = NGramBlocking(ref_attr_index=1, target_attr_index=1, depth=1)
+        blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
+        blocks = list(blocking.iter_id_blocks())
+        self.assertEqual(len(blocks), 5)
+        self.assertIn((['a3'], ['b1', 'b2']), blocks)
+        self.assertIn((['a5'], ['b4']), blocks)
+        self.assertIn((['a6'], ['b5']), blocks)
+        self.assertIn((['a7'], ['b6']), blocks)
+        self.assertIn((['a1', 'a4'], ['b3']), blocks)
 
 class SortedNeighborhoodBlockingTest(unittest2.TestCase):
 
@@ -114,7 +192,7 @@
         blocking = SortedNeighborhoodBlocking(ref_attr_index=1, target_attr_index=1,
                                               window_width=1)
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_id_blocks())
         true_blocks = [(['a6'], ['b7', 'b5']), (['a3'], ['b5', 'b1']),
                        (['a2'], ['b2']), (['a5'], ['b4']), (['a7'], ['b4', 'b6']),
                        (['a1'], ['b6', 'b3']), (['a4'], ['b3'])]
@@ -128,7 +206,7 @@
         blocking = SortedNeighborhoodBlocking(ref_attr_index=1, target_attr_index=1,
                                               key_func=lambda x:x[::-1], window_width=1)
         blocking.fit(SOUNDEX_REFSET, SOUNDEX_TARGETSET)
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_id_blocks())
         true_blocks = [(['a1'], ['b3']), (['a2'], ['b6']), (['a5'], ['b4']), (['a3'], ['b7', 'b1']),
                        (['a6'], ['b2', 'b5']), (['a4'], ['b5'])]
         self.assertEqual(len(blocks), len(true_blocks))
@@ -157,12 +235,12 @@
         blocking = KmeansBlocking(ref_attr_index=2, target_attr_index=2)
         blocking.fit(refset, targetset)
         # Blocks
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_indice_blocks())
         self.assertEqual(len(blocks), 2)
         self.assertIn(([0, 1], [0, 2]), blocks)
         self.assertIn(([2, 3], [1]), blocks)
         # Pairs
-        pairs = list(blocking.iter_pairs())
+        pairs = list(blocking.iter_indice_pairs())
         self.assertEqual(len(pairs), 6)
         for pair in ((0, 0), (0, 2), (1, 0), (1, 2), (2, 1), (3, 1)):
             self.assertIn(pair, pairs)
@@ -187,14 +265,14 @@
         targetset = normalizer.normalize_dataset(targetset)
         blocking = MinHashingBlocking(threshold=0.4, ref_attr_index=2, target_attr_index=2)
         blocking.fit(refset, targetset)
-        blocks = list(blocking.iter_blocks())
+        blocks = list(blocking.iter_indice_blocks())
         for align in (([2, 4], [1]), ([0], [0]), ([3], [2])):
             self.assertIn(align, blocks)
 
 
 class KdTreeBlockingTest(unittest2.TestCase):
 
-    def test_minhashing(self):
+    def test_kdtree(self):
         refset = [['V1', 'label1', (6.14194444444, 48.67)],
                   ['V2', 'label2', (6.2, 49)],
                   ['V3', 'label3', (5.1, 48)],
@@ -206,8 +284,11 @@
                      ]
         blocking = KdTreeBlocking(threshold=0.3, ref_attr_index=2, target_attr_index=2)
         blocking.fit(refset, targetset)
-        blocks = list(blocking.iter_blocks())
-        self.assertEqual([([0], [0, 2]), ([1], [0, 2]), ([2], [1]), ([3], [1])], blocks)
+        blocks = list(blocking.iter_id_blocks())
+        self.assertEqual([(['V1'], ['T1', 'T3']),
+                          (['V2'], ['T1', 'T3']),
+                          (['V3'], ['T2']),
+                          (['V4'], ['T2'])], blocks)
 
 
 if __name__ == '__main__':
--- a/test/test_normalize.py	Tue Oct 08 13:10:21 2013 +0000
+++ b/test/test_normalize.py	Tue Oct 15 08:10:03 2013 +0000
@@ -19,7 +19,7 @@
 import unittest2
 from os import path
 
-from nazca.normalize import (BaseNormalizer, UnicodeNormalizer,
+from nazca.normalize import (BaseNormalizer, UnicodeNormalizer, JoinNormalizer,
                              SimplifyNormalizer, TokenizerNormalizer,
                              LemmatizerNormalizer, RoundNormalizer,
                              RegexpNormalizer, NormalizerPipeline,
@@ -170,6 +170,11 @@
         self.assertEqual(['a1', u'42'],
                          normalizer.normalize(['a1', u'http://perdu.com/42/supertop/cool']))
 
+    def test_join(self):
+        normalizer = JoinNormalizer((1,2))
+        self.assertEqual(normalizer.normalize((1, 'ab', 'cd', 'e', 5)), [1, 'e', 5, 'ab, cd'])
+
+
 
 class NormalizerPipelineTestCase(unittest2.TestCase):