diff --git a/python/vmaf/__init__.py b/python/vmaf/__init__.py index 54ccbac39..3fc7a0d20 100644 --- a/python/vmaf/__init__.py +++ b/python/vmaf/__init__.py @@ -4,7 +4,7 @@ __copyright__ = "Copyright 2016-2020, Netflix, Inc." __license__ = "BSD+Patent" -__version__ = "3.0.0" +__version__ = "4.0.0" logging.basicConfig() logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) diff --git a/python/vmaf/core/executor.py b/python/vmaf/core/executor.py index d2da1375d..24e354c12 100644 --- a/python/vmaf/core/executor.py +++ b/python/vmaf/core/executor.py @@ -276,24 +276,6 @@ def _get_workfile_yuv_type(asset): assert asset.ref_yuv_type == asset.dis_yuv_type, "YUV types for ref and dis do not match." return asset.ref_yuv_type - def _wait_for_workfiles(self, asset): - # wait til workfile paths being generated - for i in range(10): - if os.path.exists(asset.ref_workfile_path) and os.path.exists(asset.dis_workfile_path): - break - sleep(0.1) - else: - raise RuntimeError("ref or dis video workfile path {ref} or {dis} is missing.".format(ref=asset.ref_workfile_path, dis=asset.dis_workfile_path)) - - def _wait_for_procfiles(self, asset): - # wait til procfile paths being generated - for i in range(10): - if os.path.exists(asset.ref_procfile_path) and os.path.exists(asset.dis_procfile_path): - break - sleep(0.1) - else: - raise RuntimeError("ref or dis video procfile path {ref} or {dis} is missing.".format(ref=asset.ref_procfile_path, dis=asset.dis_procfile_path)) - def _prepare_log_file(self, asset): log_file_path = self._get_log_file_path(asset) @@ -438,26 +420,42 @@ def _open_workfiles(self, asset): self._open_dis_workfile(asset, fifo_mode=False) def _open_workfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) ref_p = multiprocessing.Process(target=self._open_ref_workfile, - args=(asset, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p = multiprocessing.Process(target=self._open_dis_workfile, - args=(asset, True)) + args=(asset, True), + kwargs={'open_sem': sem}) ref_p.start() dis_p.start() - self._wait_for_workfiles(asset) + + if not sem.acquire(timeout=5): + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for reference and/or distorted workfiles {asset.ref_workfile_path} and {asset.dis_workfile_path} to be created; now blocking until created") + sem.acquire() + sem.acquire() def _open_procfiles(self, asset): self._open_ref_procfile(asset, fifo_mode=False) self._open_dis_procfile(asset, fifo_mode=False) def _open_procfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) ref_p = multiprocessing.Process(target=self._open_ref_procfile, - args=(asset, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p = multiprocessing.Process(target=self._open_dis_procfile, - args=(asset, True)) + args=(asset, True), + kwargs={'open_sem': sem}) ref_p.start() dis_p.start() - self._wait_for_procfiles(asset) + + if not sem.acquire(timeout=5): + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for reference and/or distorted procfiles {asset.ref_procfile_path} and {asset.dis_procfile_path} to be created; now blocking until created") + sem.acquire() + sem.acquire() def _close_workfiles(self, asset): self._close_ref_workfile(asset) @@ -507,7 +505,7 @@ def _get_log_file_path(self, asset): # ===== workfile ===== - def _open_ref_workfile(self, asset, fifo_mode): + def _open_ref_workfile(self, asset, fifo_mode, open_sem=None): use_path_as_workpath = asset.use_path_as_workpath path = asset.ref_path @@ -537,9 +535,9 @@ def _open_ref_workfile(self, asset, fifo_mode): else self._open_workfile _open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type, preresampling_filterchain, resampling_type, postresampling_filterchain, - width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, logger) + width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger) - def _open_dis_workfile(self, asset, fifo_mode): + def _open_dis_workfile(self, asset, fifo_mode, open_sem=None): use_path_as_workpath = asset.use_path_as_workpath path = asset.dis_path @@ -569,12 +567,12 @@ def _open_dis_workfile(self, asset, fifo_mode): else self._open_workfile _open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type, preresampling_filterchain, resampling_type, postresampling_filterchain, - width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, logger) + width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger) @staticmethod - def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str], - preresampling_filterchain: Optional[List[str]], resampling_type: str, postresampling_filterchain: Optional[List[str]], - width_height: Optional[Tuple[int, int]], quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, fifo_mode, logger): + def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str], preresampling_filterchain: Optional[List[str]], + resampling_type: str, postresampling_filterchain: Optional[List[str]], width_height: Optional[Tuple[int, int]], + quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger): # decoder type must be None here assert decoder_type is None, f'decoder_type must be None but is: {decoder_type}' @@ -587,9 +585,16 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, # only need to open workfile if the path is different from path assert use_path_as_workpath is False and path != workfile_path + # if fifo mode, mkfifo if fifo_mode: os.mkfifo(workfile_path) + else: + with open(workfile_path, 'wb'): + pass + + if open_sem is not None: + open_sem.release() if ref_or_dis == 'ref': start_end_frame = asset.ref_start_end_frame @@ -638,7 +643,7 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, # ===== procfile ===== - def _open_ref_procfile(self, asset, fifo_mode): + def _open_ref_procfile(self, asset, fifo_mode, open_sem=None): # only need to open ref procfile if the path is different from ref path assert asset.use_workpath_as_procpath is False and asset.ref_workfile_path != asset.ref_procfile_path @@ -647,6 +652,12 @@ def _open_ref_procfile(self, asset, fifo_mode): if fifo_mode: os.mkfifo(asset.ref_procfile_path) + else: + with open(asset.ref_procfile_path, 'wb'): + pass + + if open_sem is not None: + open_sem.release() quality_width, quality_height = self._get_quality_width_height(asset) yuv_type = asset.workfile_yuv_type @@ -662,7 +673,7 @@ def _open_ref_procfile(self, asset, fifo_mode): except StopIteration: break - def _open_dis_procfile(self, asset, fifo_mode): + def _open_dis_procfile(self, asset, fifo_mode, open_sem=None): # only need to open dis procfile if the path is different from dis path assert asset.use_workpath_as_procpath is False and asset.dis_workfile_path != asset.dis_procfile_path @@ -671,6 +682,12 @@ def _open_dis_procfile(self, asset, fifo_mode): if fifo_mode: os.mkfifo(asset.dis_procfile_path) + else: + with open(asset.dis_procfile_path, 'wb'): + pass + + if open_sem is not None: + open_sem.release() quality_width, quality_height = self._get_quality_width_height(asset) yuv_type = asset.workfile_yuv_type @@ -914,28 +931,6 @@ def _get_workfile_yuv_type(asset): else: return asset.dis_yuv_type - @override(Executor) - def _wait_for_workfiles(self, asset): - # wait til workfile paths being generated - for i in range(10): - if os.path.exists(asset.dis_workfile_path): - break - sleep(0.1) - else: - raise RuntimeError("dis video workfile path {} is missing.".format( - asset.dis_workfile_path)) - - @override(Executor) - def _wait_for_procfiles(self, asset): - # wait til procfile paths being generated - for i in range(10): - if os.path.exists(asset.dis_procfile_path): - break - sleep(0.1) - else: - raise RuntimeError("dis video procfile path {} is missing.".format( - asset.dis_procfile_path)) - @override(Executor) def _assert_paths(self, asset): assert os.path.exists(asset.dis_path) or match_any_files(asset.dis_path), \ @@ -947,10 +942,16 @@ def _open_workfiles(self, asset): @override(Executor) def _open_workfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) dis_p = multiprocessing.Process(target=self._open_dis_workfile, - args=(asset, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p.start() - self._wait_for_workfiles(asset) + + if not sem.acquire(timeout=5): + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for distorted workfile {asset.dis_workfile_path} to be created to be created; now blocking until created") + sem.acquire() @override(Executor) def _open_procfiles(self, asset): @@ -958,10 +959,16 @@ def _open_procfiles(self, asset): @override(Executor) def _open_procfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) dis_p = multiprocessing.Process(target=self._open_dis_procfile, - args=(asset, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p.start() - self._wait_for_procfiles(asset) + + if not sem.acquire(timeout=5): + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for distorted procfile {asset.dis_procfile_path} to be created to be created; now blocking until created") + sem.acquire() @override(Executor) def _close_workfiles(self, asset): diff --git a/python/vmaf/core/raw_extractor.py b/python/vmaf/core/raw_extractor.py index 5b29a68fa..347b814a6 100644 --- a/python/vmaf/core/raw_extractor.py +++ b/python/vmaf/core/raw_extractor.py @@ -41,18 +41,14 @@ def _assert_an_asset(cls, asset): pass @override(Executor) - def _open_ref_workfile(self, asset, fifo_mode): - # do nothing - pass + def _open_ref_workfile(self, asset, fifo_mode, open_sem=None): + if open_sem is not None: + open_sem.release() @override(Executor) - def _open_dis_workfile(self, asset, fifo_mode): - # do nothing - pass - - @override(Executor) - def _wait_for_workfiles(self, asset): - pass + def _open_dis_workfile(self, asset, fifo_mode, open_sem=None): + if open_sem is not None: + open_sem.release() def _generate_result(self, asset): # do nothing @@ -102,21 +98,10 @@ def _assert_args(self): self.assert_h5py_file() @override(Executor) - def _open_ref_workfile(self, asset, fifo_mode): + def _open_ref_workfile(self, asset, fifo_mode, open_sem=None): # do nothing - pass - - @override(Executor) - def _wait_for_workfiles(self, asset): - # Override Executor._wait_for_workfiles to skip ref_workfile_path - # wait til workfile paths being generated - for i in range(10): - if os.path.exists(asset.dis_workfile_path): - break - sleep(0.1) - else: - raise RuntimeError("dis video workfile path {} is missing.".format( - asset.dis_workfile_path)) + if open_sem is not None: + open_sem.release() def _generate_result(self, asset): quality_w, quality_h = asset.quality_width_height