#!/usr/bin/python3 -BbbEIsSttW all """This module implements a simple index file analyzer to find resources eligible for deduplication based on the file digest. Therefore block based deduplication candidates cannot be found by this tool. As all duperemove does not use the FS_IOC_FIEMAP system call for some reason, duperemove runs are VERY slow. Therefore perform these checks here. This software is provided by the copyright owner "as is" and without any expressed or implied warranties, including, but not limited to, the implied warranties of merchantability and fitness for a particular purpose are disclaimed. In no event shall the copyright owner be liable for any direct, indirect, incidential, special, exemplary or consequential damages, including, but not limited to, procurement of substitute goods or services, loss of use, data or profits or business interruption, however caused and on any theory of liability, whether in contract, strict liability, or tort, including negligence or otherwise, arising in any way out of the use of this software, even if advised of the possibility of such damage. Copyright (c) 2019 halfdog You may not distribute copies, re-license, sell or make any kind of profit with this software without proper prior written consent from the author. You may not remove this copyright notice, license or terms of use. PS: Restrictive license as I did not have time to publish this backup data quality verification, deduplication, data synchronization tools. I would be be willing to do so if someone assists in getting software distributed as Debian package.""" import ctypes import fcntl import json import os import subprocess import sys class FiemapExtentStruct(ctypes.Structure): _fields_ = [('logicalOffset', ctypes.c_int64), ('physicalOffset', ctypes.c_int64), ('length', ctypes.c_int64), ('reserved64', ctypes.c_int64*2), ('flags', ctypes.c_int32), ('reserved32', ctypes.c_int32*3)] class FiemapStruct(ctypes.Structure): _fields_ = [('startOffset', ctypes.c_int64), ('mapLength', ctypes.c_int64), ('flags', ctypes.c_int32), ('extentCopied', ctypes.c_int32), ('extentsAvailable', ctypes.c_int32), ('reserved', ctypes.c_int32)] class FiemapExtentHelper(): """This class wraps the helper for checking a list of files if their extends are already deduplicated.""" def __init__(self): self.fiemap = FiemapStruct() self.maxExtentsCount = 4096 self.fiemapExtent = FiemapExtentStruct() self.buffer = (ctypes.c_int8*(ctypes.sizeof(self.fiemap)+ctypes.sizeof(FiemapExtentStruct)*self.maxExtentsCount))() def checkFiemapExtentsMatch(self, dataCheckLength, fileNameList): """Check if all extents alread match. @return True when all extents are the same.""" extentDataDict = {} for fileName in fileNameList: self.fiemap.startOffset = 0 self.fiemap.mapLength = dataCheckLength # FIEMAP_FLAG_SYNC 1 self.fiemap.flags = 1 self.fiemap.extentCopied = 0 self.fiemap.extentsAvailable = self.maxExtentsCount ctypes.memmove( ctypes.addressof(self.buffer), ctypes.addressof(self.fiemap), ctypes.sizeof(self.fiemap)) testFd = os.open(fileName, os.O_RDONLY|os.O_NOFOLLOW|os.O_NOCTTY) result = fcntl.ioctl(testFd, 0xc020660b, self.buffer, True) os.close(testFd) if result != 0: raise Exception() ctypes.memmove( ctypes.addressof(self.fiemap), self.buffer, ctypes.sizeof(FiemapStruct)) if (self.fiemap.extentCopied >= self.maxExtentsCount) or \ (self.fiemap.extentsAvailable > self.maxExtentsCount): raise Exception( 'Extents list exhausted: copied %d, available %d' % ( self.fiemap.extentCopied, self.fiemap.extentsAvailable)) extentList = [] for extentPos in range(0, self.fiemap.extentCopied): ctypes.memmove( ctypes.addressof(self.fiemapExtent), ctypes.addressof(self.buffer)+ctypes.sizeof(FiemapStruct)+extentPos*ctypes.sizeof(FiemapExtentStruct), ctypes.sizeof(FiemapExtentStruct)) extentList.append(( self.fiemapExtent.logicalOffset, self.fiemapExtent.physicalOffset, self.fiemapExtent.length)) extentDataDict[fileName] = extentList print('Got dict: %s' % repr(extentDataDict), file=sys.stderr) refExtentList = None for fileName, extentList in extentDataDict.items(): if refExtentList is None: refExtentList = extentList else: if extentList != refExtentList: return False return True class IndexedLocation(): """This class stores information about each indexed location.""" def __init__(self, dataPathName, indexFileName): self.dataPathName = dataPathName self.indexFileName = indexFileName def getDataPath(self): return self.dataPathName def getIndexData(self): indexFile = open(self.indexFileName, 'rb') indexData = indexFile.read() indexFile.close() return json.loads(str(indexData, 'ascii')) class DedupAnalyzer(): def __init__(self): # 0: normal, 1: info, 2: debug self.logLevel = 0 self.fiemapExtentHelper = FiemapExtentHelper() self.dedupFileList = [] def addFile(self, fileNameBytes): """Add the given filename to the output performing the appropriate resource URL transformations.""" if fileNameBytes.find(b'\n') >= 0: raise Exception('Output format does not support newlines') checkPos = 0 while True: checkPos = fileNameBytes.find(b'%', checkPos) if checkPos < 0: break replaceByte = int.to_bytes( int(fileNameBytes[checkPos+1:checkPos+3], 16), 1, 'big') fileNameBytes = fileNameBytes[:checkPos]+replaceByte+fileNameBytes[checkPos+3:] checkPos += 1 self.dedupFileList.append(fileNameBytes) def flushDedupList(self, dataCheckLength): if len(self.dedupFileList) > 1: if self.fiemapExtentHelper.checkFiemapExtentsMatch( dataCheckLength, self.dedupFileList): if self.logLevel >= 1: print( 'Files %s already deduplicated' % repr(self.dedupFileList), file=sys.stderr) else: sys.stdout.buffer.write(b'\n'.join(self.dedupFileList)+b'\n\n') self.dedupFileList = [] def createDeduplicationData(self, indexLocationList): """Create the deduplication data to be feed into duperemove. The function extracts all hash/length/path results from all indices, passes them to sort for memory efficient sorting and uses the output to create the duperemove output format.""" sortProcess = subprocess.Popen( ['/usr/bin/sort'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) for indexLocation in indexLocationList: dataPath = bytes(indexLocation.getDataPath(), 'ascii') indexData = indexLocation.getIndexData() for indexRecord in indexData: if indexRecord['type'] in ['dir', 'link', 'pipe', 'socket']: continue if 'digest-md5' not in indexRecord: raise Exception('Strange %s' % repr(indexRecord)) sortProcess.stdin.write(b'%s %d %s%s\n' % ( bytes(indexRecord['digest-md5'], 'ascii'), indexRecord['length'], dataPath, bytes(indexRecord['url'], 'ascii'))) sortProcess.stdin.close() # Now read all entries. lastDigest = None lastLength = None dataBuffer = b'' while True: lineEnd = dataBuffer.find(b'\n') if lineEnd < 0: inputData = sortProcess.stdout.read(1<<20) if not inputData: if len(dataBuffer): raise Exception('Unhandled input data %s' % repr(dataBuffer)) self.flushDedupList(lastLength) sys.stdout.buffer.write(b'\n') break dataBuffer += inputData continue lineData = dataBuffer[:lineEnd].split(b' ') dataBuffer = dataBuffer[lineEnd+1:] if len(lineData) != 3: raise Exception('Malformed line %s' % repr(lineData)) if lineData[0] != lastDigest: self.flushDedupList(lastLength) dedupFileList = [] lastDigest = lineData[0] lastLength = int(lineData[1]) self.addFile(lineData[2]) continue if int(lineData[1]) != lastLength: raise Exception('Collision') # Ignore empty files. if lastLength == 0: continue # This is a duplicate. self.addFile(lineData[2]) sortProcess.stdout.close() processResult = sortProcess.wait() if processResult != 0: print('Sort terminated with error %d' % processResult, file=sys.stderr) sys.exit(1) print('Dedupe search complete', file=sys.stderr) def mainFunction(): """This is the main function to analyze the program call arguments and invoke indexing.""" indexLocationList = [] argPos = 1 while argPos < len(sys.argv)-1: argName = sys.argv[argPos] argPos += 1 if argName == '--': break if argName == '--IndexedDir': dataPathName = os.path.realpath(sys.argv[argPos]) argPos += 1 if dataPathName == '/': print( 'Cannot use %s as indexed dir as no parent directory exists' % dataPathName, file=sys.stderr) sys.exit(1) indexFileName = os.path.normpath('%s-Index.json' % dataPathName) if (not os.path.isdir(dataPathName)) or \ (not os.path.exists(indexFileName)): print( 'Data path %s or index file %s does not exist' % ( repr(dataPathName), repr(indexFileName)), file=sys.stderr) sys.exit(1) indexLocationList.append(IndexedLocation(dataPathName, indexFileName)) continue if argName == '--IndexFile': if argPos+2 > len(sys.argv): print( '--IndexFile requires index file and data path argument', file=sys.stderr) sys.exit(1) indexFileName = os.path.realpath(sys.argv[argPos]) argPos += 1 dataPathName = os.path.realpath(sys.argv[argPos]) argPos += 1 if (not os.path.isdir(dataPathName)) or \ (not os.path.exists(indexFileName)): print( 'Data path %s or index file %s does not exist' % ( repr(dataPathName), repr(indexFileName)), file=sys.stderr) sys.exit(1) indexLocationList.append(IndexedLocation(dataPathName, indexFileName)) continue print('Unsupported argument %s' % argName, file=sys.stderr) sys.exit(1) analyzer = DedupAnalyzer() analyzer.createDeduplicationData(indexLocationList) if __name__ == '__main__': mainFunction()