--- a/h5fs.py Tue Apr 02 18:27:47 2013 +0200
+++ b/h5fs.py Fri Mar 29 03:15:18 2013 +0100
@@ -16,6 +16,7 @@
import errno
from itertools import imap, chain
from functools import partial
+from StringIO import StringIO
import h5py
@@ -134,11 +135,66 @@
raise H5IOError(errno.EISDIR, 'Is a Group', path)
return self.get_data(h5entry, offset, offset + size)
+class Npy10FsMixin(FsMixin):
+ '''Implement the HDF5 filesytem interface with dataset exported as
+ .npy file version 1.0
+ '''
+ # XXX: There is a problematic case when a subgroup named
+ # 'a.npy' and a dataset named 'a' have the same parent
+ file_extension = 'npy'
-def main():
+ @staticmethod
+ def _get_npy_header(h5entry):
+ from numpy.lib.format import write_array_header_1_0 as hwrite, \
+ header_data_from_array_1_0 as hdata, \
+ magic
+ header = StringIO()
+ header.write(magic(1, 0)) # XXX: support only 1.0 version of npy file
+ hwrite(header, hdata(h5entry.value))
+ return header.getvalue()
+
+ # ===
+
+ def get_entry(self, path):
+ '''Return the entry recorded at ``path`` or raise H5IOError'''
+ trailling = osp.extsep + self.file_extension
+ if path.endswith(trailling):
+ path = path[:-len(trailling)]
+ return super(Npy10FsMixin, self).get_entry(path)
+
+ def get_name(self, h5entry):
+ '''Return the entry file name as string'''
+ # XXX unicode deos not seems to be allowed
+ name = super(Npy10FsMixin, self).get_name(h5entry)
+ if isinstance(h5entry, h5py.highlevel.Dataset):
+ name = osp.extsep.join((name, self.file_extension))
+ return name
+
+ def get_size(self, h5entry):
+ '''Return the byte size of the file'''
+ header_size = len(self._get_npy_header(h5entry))
+ data_size = super(Npy10FsMixin, self).get_size(h5entry)
+ return header_size + data_size
+
+ def get_data(self, h5entry, start, end):
+ '''Return a slice of data data from start to end'''
+ header = self._get_npy_header(h5entry)
+ header_size = len(header)
+ data = []
+ if start < header_size:
+ data.append(header[start:end])
+ start = 0
+ else:
+ start -= header_size
+ end -= header_size
+ if end > 0:
+ data.append(super(Npy10FsMixin, self).get_data(h5entry, start, end))
+ return ''.join(data)
+
+def main(mixincls):
'''Main entry point that start the HDF5 filesystem server'''
usage = "\nUserspace HDF5 file explorer.\n\n%prog sourcefile mountpoint"
- class H5FS(FsMixin, fuse.Fuse):
+ class H5FS(mixincls, fuse.Fuse):
'''merge Fuse + HDF5 interface'''
pass
server = H5FS(version="%proog " + fuse.__version__,
@@ -156,4 +212,4 @@
h5file.close()
if __name__ == '__main__':
- main()
+ main(Npy10FsMixin)
--- a/test/test_H5FS.py Tue Apr 02 18:27:47 2013 +0200
+++ b/test/test_H5FS.py Fri Mar 29 03:15:18 2013 +0100
@@ -4,8 +4,9 @@
import errno
import h5py
import numpy as N
+from StringIO import StringIO
-from unittest import TestCase as TC, main
+from unittest2 import TestCase as TC, main
from logilab.common.testlib import within_tempdir
import h5fs
@@ -25,110 +26,163 @@
return out
return wrapped
-def with_fsmixin(func):
- @with_h5file
- def wrapped(self, h5file):
- h5file.create_group(u'sam')
- data = N.array([(-1, 4.0, 'Hello'), (2, 6.0, 'World')],
- dtype=[('f0', '>u8'), ('f1', '>f4'), ('f2', '|S7')])
- h5file.create_dataset(u'melu', data=data)
- # data=xrange(12), shape=(4, 3), dtype='u4')
- fsmx = h5fs.FsMixin()
- fsmx.h5file = h5file
- return func(self, fsmx)
- return wrapped
+
+def with_mixin(clsmixin):
+ def wrapper(func):
+ @with_h5file
+ def wrapped(self, h5file):
+ h5file.create_group(u'sam')
+ data = N.array([(-1, 4.0, 'Hello'), (2, 6.0, 'World')],
+ dtype=[('f0', '>u8'), ('f1', '>f4'), ('f2', '|S7')])
+ h5file.create_dataset(u'melu', data=data)
+ # data=xrange(12), shape=(4, 3), dtype='u4')
+ fsmx = clsmixin()
+ fsmx.h5file = h5file
+ return func(self, fsmx)
+ return wrapped
+ return wrapper
class FsMixin_TC(TC):
-
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_get_dataset_size(self, fsmx):
self.assertEqual(fsmx.get_size(fsmx.h5file['melu']), 38)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_get_stat_with_group(self, fsmx):
group = fsmx.h5file['sam']
self.assertTrue(isinstance(fsmx.get_stat(group), h5fs.GroupStat))
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_get_stat_with_dataset(self, fsmx):
dataset = fsmx.h5file['melu']
stat = fsmx.get_stat(dataset)
self.assertTrue(isinstance(stat, h5fs.DatasetStat))
self.assertEqual(stat.st_size, 38)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_get_name(self, fsmx):
dataset = fsmx.h5file['melu']
self.assertEqual(fsmx.get_name(dataset), 'melu')
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_get_data(self, fsmx):
self.assertSequenceEqual(fsmx.get_data(fsmx.h5file['melu'], 7, 19),
'\xff@\x80\x00\x00Hello\x00\x00')
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_list_group(self, fsmx):
result = set(fsmx.list_group(fsmx.h5file['/'], ('alain',)))
self.assertSetEqual(result, set(['sam', 'melu', 'alain']))
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_openable_wrong(self, fsmx):
self.assertRaises(h5fs.H5IOError, fsmx.openable, fsmx.h5file['sam'])
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_openable_ok(self, fsmx):
fsmx.openable(fsmx.h5file['melu']) # no H5IOError raised
# =======
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_readdir(self, fsmx):
result = list(fsmx.readdir(u'/', 0))
expected = list(fsmx.h5file[u'/'].keys()) + ['.', '..']
self.assertEqual(len(expected), len(result))
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_readdir_wrong(self, fsmx):
self.assertRaises(h5fs.H5IOError, fsmx.readdir, u'/wrong data path', 0)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_getattr(self, fsmx):
self.assertTrue(isinstance(fsmx.getattr('/'), h5fs.GroupStat))
self.assertTrue(isinstance(fsmx.getattr('/sam'), h5fs.GroupStat))
self.assertTrue(isinstance(fsmx.getattr('/melu'), h5fs.DatasetStat))
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_getattr_wrong(self, fsmx):
self.assertRaises(h5fs.H5IOError, fsmx.getattr, u'/wrong data path')
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_open_wrong_path(self, fsmx):
self.assertRaises(h5fs.H5IOError, fsmx.open, u'/wrong data path',
os.O_RDONLY)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_open_wrong_write(self, fsmx): # XXX read-only for now
self.assertRaises(h5fs.H5IOError, fsmx.open, u'/wrong data path',
os.O_WRONLY)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_open_ok(self, fsmx):
fsmx.open(u'melu', os.O_RDONLY) # no H5IOError raised
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_read_wrong_noent(self, fsmx):
self.assertRaises(h5fs.H5IOError, fsmx.read, u'/wrong data path', 0, 10)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_read_wrong_isdir(self, fsmx):
self.assertRaises(h5fs.H5IOError, fsmx.read, u'/sam', 0, 10)
- @with_fsmixin
+ @with_mixin(h5fs.FsMixin)
def test_read(self, fsmx):
self.assertSequenceEqual(fsmx.read(u'melu', 12, 7),
'\xff@\x80\x00\x00Hello\x00\x00')
+class Npy10FsMixin_TC(TC):
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_dataset_npy_header(self, mx):
+ dataset = mx.h5file['melu']
+ reference = StringIO()
+ N.lib.format.write_array(reference, dataset.value, version=(1,0))
+ reference.seek(0)
+ header_ref = ''.join(iter(lambda: reference.read(1), '\n')) + '\n'
+ value = mx._get_npy_header(dataset)
+ self.assertMultiLineEqual(header_ref, value)
+
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_dataset_size(self, mx):
+ dataset = mx.h5file['melu']
+ reference = StringIO()
+ N.lib.format.write_array(reference, dataset.value, version=(1,0))
+ reference.seek(0)
+ len_ref = len(reference.getvalue())
+ self.assertEqual(mx.get_size(dataset), len_ref)
+
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_data(self, mx):
+ self.assertMultiLineEqual(mx.get_data(mx.h5file['melu'], 10, 20),
+ "{'descr': ")
+ self.assertMultiLineEqual(mx.get_data(mx.h5file['melu'], 119, 131),
+ '\xff@\x80\x00\x00Hello\x00\x00')
+ self.assertMultiLineEqual(
+ mx.get_data(mx.h5file['melu'], 100, 128),
+ '(2,), } \n\xff\xff\xff\xff\xff\xff\xff\xff@\x80\x00\x00Hell')
+
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_dataset_name(self, mx):
+ dataset = mx.h5file['melu']
+ self.assertEqual(mx.get_name(dataset), 'melu.npy')
+
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_group_name(self, mx):
+ dataset = mx.h5file['sam']
+ self.assertEqual(mx.get_name(dataset), 'sam')
+
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_dataset_entry(self, mx):
+ dataset = mx.h5file['melu']
+ self.assertEqual(mx.get_entry('melu.npy'), dataset)
+
+ @with_mixin(h5fs.Npy10FsMixin)
+ def test_get_group_entry(self, mx):
+ dataset = mx.h5file['sam']
+ self.assertEqual(mx.get_entry('sam'), dataset)
+
+
if __name__ == '__main__':
main()