#!/usr/bin/env python # # Storage Performance Profiler # ---------------------------- # This framework facilitates running workloads with FIO. # It also organises the results in easy-to-parse '.dat' files # for later plotting with gnuplot. # # Author: Felipe Franciosi # from ast import literal_eval from ConfigParser import ConfigParser from optparse import OptionParser from os import unlink, path from subprocess import Popen, PIPE from sys import exit, stdout from tempfile import NamedTemporaryFile import datetime # dabe #full_filename = '' #drivelist = [ '\\\.\PhysicalDrive1', '\\\.\PhysicalDrive2', '\\\.\PhysicalDrive3:\\\.\PhysicalDrive4', '\\\.\PhysicalDrive5:\\\.\PhysicalDrive6', '\\\.\PhysicalDrive7:\\\.\PhysicalDrive8' ] #for filename in drivelist: #if len(full_filename) > 1: #full_filename = full_filename + ':' + filename #else: #full_filename = filename #print full_filename # defaults = { # @filename: Name of device or file to profile # Multiple files/devices can be specified with a ':' separator "filename": "\\\.\PhysicalDrive1", # @size: Amount of data to read/write from the start of @filename(s) "size": "20G", # @minjob: Starting number of I/O threads "minjob": "1", # @maxjob: Maximum number of I/O threads "maxjob": "128", # @muljob: Increment the number of I/O threads in multiples of @muljob. # The number of threads always start with @minjob and is never # higher than @maxjob. Example: # minjob=1, maxjob=10, muljob=4 generates {1, 4, 8} # minjob=3, maxjob=12, muljob=4 generates {3, 4, 8, 12} "muljob": "2", # Increment JOBS in multiples of (eg. 1, 4, 8) # @mineqd: Starting effective queue depth (qd x numjob) "mineqd": "1", # @maxeqd: Maximum effective queue depth "maxeqd": "128", # @muleqd: Increment the effective queue depth in multiples of @muleqd. # This takes into consideration @muljob. If a muleqd is not a # multiple of numjob x qd, the profiler will round up and use # the next closest option, respecting numjob first. Examples: # minjob=1, maxjob=4, muljob=2, mineqd=1, maxeqd=64, muleqd=32 # {efd=1 (qd=1,nj=1), efd=32 (qd=32,nj=1), efd=64 (qd=64,nj=1) # {efd=2 (qd=1,nj=2), efd=32 (qd=16,nj=2), efd=64 (qd=32,nj=2) # {efd=4 (qd=1,nj=4), efd=32 (qd=8,nj=4), efd=64 (qd=16,nj=4) # Note: "qd" is per thread. "muleqd": "1", # Increment QD in multiples of (eg. 1, 64, 128) # @minbsz: Minimum block size (values are always in bytes) "minbsz": "4096", # @maxbsz: Maxmium block size (values are always in bytes) # Note: block size is always incremented in powers of two "maxbsz": "1048576", # @runtime: Runtime for each spec, always in seconds "runtime": "20", # @dirs: Comma-separated list of directions. Each direction must be # specified in quotes. Valid directions: # "read" Sequential read # "write" Sequential write # "randread" Random reads # "randwrite" Random writes # "rw" Mixed sequential reads and writes (50/50) # "randrw" Mixed random reads and writes (50/50) "dirs": '"randread"', # @outdat: Filename to write plottable text data with job results "outdat": "test.dat", } dirs = [ "read", "write", "randread", "randwrite", "rw", # or readwrite "randrw" ] class ProfilerSpec(object): def __init__(self, conf): assert(conf.has_key('filename')) assert(conf.has_key('size')) assert(conf.has_key('runtime')) assert(conf.has_key('numjobs')) assert(conf.has_key('iodepth')) assert(conf.has_key('bs')) assert(conf.has_key('dir')) self.conf = conf self.spec = None self.data = {} def createSpec(self): fio_spec = """[global] ioengine=windowsaio direct=1 time_based group_reporting size={size} runtime={runtime} numjobs={numjobs} iodepth={iodepth} bs={bs} rw={dir} [job] filename={filename}""".format(**self.conf) try: self.spec = NamedTemporaryFile(delete=False) self.spec.write(fio_spec) self.spec.flush() except: if self.spec: unlink(self.spec.name) raise def run(self): assert(self.spec) cmd = ["fio", self.spec.name, "--minimal", "--terse-version=3"] proc = Popen(cmd, stdout=PIPE, stderr=PIPE) out, err = proc.communicate() if err: raise Exception(err) res = out.split(';') self.data['read_bw'] = int(res[6]) self.data['read_iops'] = int(res[7]) self.data['write_bw'] = int(res[47]) self.data['write_iops'] = int(res[48]) def cleanup(self): assert(self.spec) try: unlink(self.spec.name) except: pass finally: self.spec = None class ProfilerJob(object): def __init__(self, name, conf): assert(name) assert(conf) self.name = name self.conf = conf self.specs = [] self.outdatfp = None def append(self, spec): self.specs.append(spec) def run(self): assert(len(self.specs) > 0) print "* Running job: '%s' (%d secs / spec)" % (self.name, int(self.conf['runtime'])) i=1 for spec in self.specs: if i > 1: stdout.write("\033[F") stdout.flush() now = datetime.datetime.now() print "** Executing spec %d/%d at %s" % (i, len(self.specs), now) spec.createSpec() try: spec.run() except: raise finally: spec.cleanup() i = i + 1 def writeDataFile(self): assert(self.conf['outdat']) self.outdatfp = open(self.conf['outdat'], 'w') self.__writeDataFile() def __writeDataFile(self): data = """# FIO Results for "{filename}" (size={size}) # QD : {mineqd} -> {maxeqd} in multiples of: {muleqd} # JOBS : {minjob} -> {maxjob} in multiples of: {muljob} # BS : {minbsz} -> {maxbsz} in powers of two """.format(**self.conf) for dir in dirs: data = data + """ # %s: # Eff.QD Jobs QD blocksz IOPS_rd IOPS_wr KB/s_rd KB/s_wr """ % dir specs_dir = [ x for x in self.specs if x.conf['dir'] == dir ] atleastone = False for spec in specs_dir: if spec.data: atleastone = True break if not atleastone: data = data + "0\n\n" continue for spec in specs_dir: if not spec.data: continue effqd = spec.conf['numjobs'] * spec.conf['iodepth'] jobs = spec.conf['numjobs'] qd = spec.conf['iodepth'] bs = spec.conf['bs'] iopsr = spec.data['read_iops'] iopsw = spec.data['write_iops'] kbsr = spec.data['read_bw'] kbsw = spec.data['write_bw'] data = data + "%8d %5d %4d %8d %8d %8d %9d %9d\n" % ( effqd, jobs, qd, bs, iopsr, iopsw, kbsr, kbsw) self.outdatfp.write(data) self.outdatfp.flush() class ProfilerConfig(object): def __init__(self, configfile=None): self.configfile = configfile self.config = self.__parseConfig() self.jobs = self.__createJobs() def dumpConfig(self): assert(self.config) for section in self.config.sections(): print print "["+section+"]" for option in self.config.options(section): if option == "dirs": print "%s: %s" % (option, self.__getDirs(self.config, section)) else: print "%s: %s" % (option, self.config.get(section, option)) def dumpSpecs(self): assert(self.jobs) for job in self.jobs: for spec in job.specs: print "%s: %s" % (job.name, spec.conf) def __parseConfig(self): config = ConfigParser(defaults) if self.configfile: config.read(self.configfile) else: # Create a single 'config' section using just defaults config.add_section("config") self.__validate(config) return config def __validate(self, config): valid_opts = set(defaults) valid_dirs = set(dirs + ["readwrite"]) for section in config.sections(): sect_opts = set(config.options(section)) if sect_opts != valid_opts: raise Exception("Invalid options %s for section '%s'" % (list(sect_opts - valid_opts), section)) sect_dirs_list = self.__getDirs(config, section) sect_dirs = set(sect_dirs_list) if not sect_dirs.issubset(valid_dirs): raise Exception("Invalid dirs %s for section '%s'" % (list(sect_dirs - valid_dirs), section)) # 'rw' and 'readwrite' are equivalent in 'fio' if set(['rw', 'readwrite']).issubset(sect_dirs): sect_dirs_list.remove('readwrite') sect_dirs_str = str(sect_dirs_list).translate(None, "[]") config.set(section, "dirs", sect_dirs_str) if config.get(section, "outdat") is None: raise Exception("Need 'outdat' for section '%s'" % section) # TODO: Sanity check everything else (eg. bs % 512, min < max) def __createJobs(self): assert(self.config) jobs = [] for section in self.config.sections(): job = ProfilerJob(section, dict(self.config.items(section))) self.__createSpecs(job) jobs.append(job) return jobs def __createSpecs(self, job): section = job.name minjob = int(self.config.get(section, "minjob")) maxjob = int(self.config.get(section, "maxjob")) muljob = int(self.config.get(section, "muljob")) mineqd = int(self.config.get(section, "mineqd")) maxeqd = int(self.config.get(section, "maxeqd")) muleqd = int(self.config.get(section, "muleqd")) minbsz = int(self.config.get(section, "minbsz")) maxbsz = int(self.config.get(section, "maxbsz")) # Hack 'er up to do what I want. Yeah, this is not how I should do this. # dabe bszcur = minbsz while bszcur <= maxbsz: filename = '' drivelist = [ '\\\.\PhysicalDrive1', '\\\.\PhysicalDrive2', '\\\.\PhysicalDrive3:\\\.\PhysicalDrive4', '\\\.\PhysicalDrive5:\\\.\PhysicalDrive6', '\\\.\PhysicalDrive7:\\\.\PhysicalDrive8' ] for tmp_filename in drivelist: if len(filename) > 1: filename = filename + ':' + tmp_filename else: filename = tmp_filename diskCount = 1 #print filename #print len(filename) #filename = self.config.get(section, 'filename') size = self.config.get(section, 'size') runtime = self.config.get(section, 'runtime') for dir in self.__getDirs(self.config, section): #curjob = minjob #while curjob <= maxjob: #cureqd = mineqd if mineqd == curjob else curjob*((mineqd/curjob)+1) iodepth = 1 while iodepth <= 32: curjob = iodepth * diskCount #cureqd = mineqd if mineqd == curjob else curjob*((mineqd/curjob)+1) #while cureqd <= maxeqd or cureqd == curjob: #qdperjob = cureqd/curjob qdperjob = 1 conf = {'filename': filename, 'size': size, 'runtime': runtime, 'dir': dir, 'numjobs': curjob, 'iodepth': qdperjob, 'bs': bszcur} spec = ProfilerSpec(conf) job.append(spec) if iodepth == 1: iodepth = iodepth + 1 else: iodepth = iodepth + 4 #cureqd = muleqd*(1+(cureqd/muleqd)) #if cureqd % curjob: #cureqd = curjob*((cureqd/curjob)+1) #curjob = muljob*(1+(curjob/muljob)) if diskCount < 2: diskCount = diskCount + 1 else: diskCount = diskCount + 2 bszcur = bszcur*2 @staticmethod def __getDirs(config, section): assert(section) # ConfigParser values don't cope with lists, so we store 'dirs' as a string return literal_eval("["+config.get(section, "dirs")+"]") class Profiler(object): def __init__(self, configfile): self.config = ProfilerConfig(opts.configfile) # TODO: Ensure 'fio' is installed def dumpConfig(self): self.config.dumpConfig() def dumpSpecs(self): self.config.dumpSpecs() def checkOutDat(self, overwrite): for job in self.config.jobs: if path.isdir(job.conf['outdat']): raise Exception("Unable to write results to '%s': it's a directory") if path.exists(job.conf['outdat']): if overwrite: print "Warning: overwriting file '%s'" % (job.conf['outdat'],) else: raise Exception("Refusing to overwrite file '%s': use -f" % (job.conf['outdat'],)) def runJobs(self): for job in self.config.jobs: try: job.run() except KeyboardInterrupt: print "\nInterrupted by keyboard, writing partial results only" job.writeDataFile() def main(opts): profiler = Profiler(opts.configfile) if opts.verbose: profiler.dumpConfig() profiler.dumpSpecs() if opts.dryrun: return profiler.checkOutDat(opts.overwrite) profiler.runJobs() if __name__ == "__main__": parser = OptionParser(usage="usage: %prog [options]") parser.add_option("-c", "--conf", dest="configfile", help="Profiler configuration file") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Dump config and specs") parser.add_option("-n", "--dryrun", dest="dryrun", action="store_true", default=False, help="Just parse config file, don't run profiler") parser.add_option("-f", "--force", dest="overwrite", action="store_true", default=False, help="Overwrite existing outdat files") (opts, args) = parser.parse_args() exit(main(opts))