On Thu, Jun 05, 2014 at 04:47:46PM +0800, Fam Zheng wrote: > + def setUp(self): > + qemu_img('create', '-f', iotests.imgfmt, test_img, "1G") > + #self.vm = iotests.VM().add_drive(test_img, "bps=1024,bps_max=1") Commented out lines should be dropped > + self.vm = iotests.VM().add_drive(test_img) > + self.vm.launch() > + > + def tearDown(self): > + self.vm.shutdown() > + os.remove(test_img) > + > + def do_test_throttle(self, seconds=10, **limits): > + def check_limit(limit, num): > + # IO throttling algorithm is discrete, allow 10% error so the test > + # is more deterministic > + return limit == 0 or num < seconds * limit * 1.1 > + > + nsec_per_sec = 1000000000 > + > + limits['bps_max'] = 1 > + limits['iops_max'] = 1 > + > + # Enqueue many requests to throttling queue This comment is wrong, it actually happens further down > + result = self.vm.qmp("block_set_io_throttle", conv_keys=False, **limits) > + self.assert_qmp(result, 'return', {}) > + > + # Set vm clock to a known value > + ns = nsec_per_sec > + self.vm.qtest_cmd("clock_step %d" % ns) > + > + # Append many requests into the throttle queue > + # They drain bps_max and iops_max > + # The rest requests won't get executed until qtest clock is driven > + for i in range(1000): > + self.vm.hmp_qemu_io("drive0", "aio_read -a -q 0 512") > + self.vm.hmp_qemu_io("drive0", "aio_write -a -q 0 512") > + > + start_rd_bytes, start_rd_iops, start_wr_bytes, start_wr_iops = self.blockstats('drive0') > + > + ns += seconds * nsec_per_sec > + self.vm.qtest_cmd("clock_step %d" % ns) > + # wait for a while to let requests take off > + time.sleep(1) This is not a reliable testing approach. If the system is under heavy load maybe only a few requests completed. We don't know whether that is due to I/O throttling or not. A reliable test would not perform real disk I/O so the test is independent of disk/system speed. And it would not use time.sleep(1) to "wait" since there is no guarantee that anything happened in the meantime. Do you think this can be improved? > + end_rd_bytes, end_rd_iops, end_wr_bytes, end_wr_iops = self.blockstats('drive0') > + > + rd_bytes = end_rd_bytes - start_rd_bytes > + rd_iops = end_rd_iops - start_rd_iops > + wr_bytes = end_wr_bytes - start_wr_bytes > + wr_iops = end_wr_iops - start_wr_iops > + > + assert check_limit(limits['bps'], rd_bytes) > + assert check_limit(limits['bps_rd'], rd_bytes) > + assert check_limit(limits['bps'], wr_bytes) > + assert check_limit(limits['bps_wr'], wr_bytes) > + assert check_limit(limits['iops'], rd_iops) > + assert check_limit(limits['iops_rd'], rd_iops) > + assert check_limit(limits['iops'], wr_iops) > + assert check_limit(limits['iops_wr'], wr_iops) Please use TestCase.assert*() methods instead of plain assert. They produce humand-readable error messages including the failing values. > + > + def test_bps(self): > + self.do_test_throttle(**{ > + 'device': 'drive0', > + 'bps': 1000, > + 'bps_rd': 0, > + 'bps_wr': 0, > + 'iops': 0, > + 'iops_rd': 0, > + 'iops_wr': 0, > + }) Keyword argument syntax is more concise: self.do_test_throttle(device='drive0', bps=1000, bps_rd=0, ...)