Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fifo hangs #1376

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/vmaf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

__copyright__ = "Copyright 2016-2020, Netflix, Inc."
__license__ = "BSD+Patent"
__version__ = "3.0.0"
__version__ = "4.0.0"

logging.basicConfig()
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
Expand Down
125 changes: 66 additions & 59 deletions python/vmaf/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,24 +276,6 @@ def _get_workfile_yuv_type(asset):
assert asset.ref_yuv_type == asset.dis_yuv_type, "YUV types for ref and dis do not match."
return asset.ref_yuv_type

def _wait_for_workfiles(self, asset):
# wait til workfile paths being generated
for i in range(10):
if os.path.exists(asset.ref_workfile_path) and os.path.exists(asset.dis_workfile_path):
break
sleep(0.1)
else:
raise RuntimeError("ref or dis video workfile path {ref} or {dis} is missing.".format(ref=asset.ref_workfile_path, dis=asset.dis_workfile_path))

def _wait_for_procfiles(self, asset):
# wait til procfile paths being generated
for i in range(10):
if os.path.exists(asset.ref_procfile_path) and os.path.exists(asset.dis_procfile_path):
break
sleep(0.1)
else:
raise RuntimeError("ref or dis video procfile path {ref} or {dis} is missing.".format(ref=asset.ref_procfile_path, dis=asset.dis_procfile_path))

def _prepare_log_file(self, asset):

log_file_path = self._get_log_file_path(asset)
Expand Down Expand Up @@ -438,26 +420,42 @@ def _open_workfiles(self, asset):
self._open_dis_workfile(asset, fifo_mode=False)

def _open_workfiles_in_fifo_mode(self, asset):
sem = multiprocessing.Semaphore(0)
ref_p = multiprocessing.Process(target=self._open_ref_workfile,
args=(asset, True))
args=(asset, True),
kwargs={'open_sem': sem})
dis_p = multiprocessing.Process(target=self._open_dis_workfile,
args=(asset, True))
args=(asset, True),
kwargs={'open_sem': sem})
ref_p.start()
dis_p.start()
self._wait_for_workfiles(asset)

if not sem.acquire(timeout=5):
if self.logger:
self.logger.warn(f">5 seconds elapsed waiting for reference and/or distorted workfiles {asset.ref_workfile_path} and {asset.dis_workfile_path} to be created; now blocking until created")
sem.acquire()
sem.acquire()

def _open_procfiles(self, asset):
self._open_ref_procfile(asset, fifo_mode=False)
self._open_dis_procfile(asset, fifo_mode=False)

def _open_procfiles_in_fifo_mode(self, asset):
sem = multiprocessing.Semaphore(0)
ref_p = multiprocessing.Process(target=self._open_ref_procfile,
args=(asset, True))
args=(asset, True),
kwargs={'open_sem': sem})
dis_p = multiprocessing.Process(target=self._open_dis_procfile,
args=(asset, True))
args=(asset, True),
kwargs={'open_sem': sem})
ref_p.start()
dis_p.start()
self._wait_for_procfiles(asset)

if not sem.acquire(timeout=5):
if self.logger:
self.logger.warn(f">5 seconds elapsed waiting for reference and/or distorted procfiles {asset.ref_procfile_path} and {asset.dis_procfile_path} to be created; now blocking until created")
sem.acquire()
sem.acquire()

def _close_workfiles(self, asset):
self._close_ref_workfile(asset)
Expand Down Expand Up @@ -507,7 +505,7 @@ def _get_log_file_path(self, asset):

# ===== workfile =====

def _open_ref_workfile(self, asset, fifo_mode):
def _open_ref_workfile(self, asset, fifo_mode, open_sem=None):

use_path_as_workpath = asset.use_path_as_workpath
path = asset.ref_path
Expand Down Expand Up @@ -537,9 +535,9 @@ def _open_ref_workfile(self, asset, fifo_mode):
else self._open_workfile
_open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type,
preresampling_filterchain, resampling_type, postresampling_filterchain,
width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, logger)
width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger)

def _open_dis_workfile(self, asset, fifo_mode):
def _open_dis_workfile(self, asset, fifo_mode, open_sem=None):

use_path_as_workpath = asset.use_path_as_workpath
path = asset.dis_path
Expand Down Expand Up @@ -569,12 +567,12 @@ def _open_dis_workfile(self, asset, fifo_mode):
else self._open_workfile
_open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type,
preresampling_filterchain, resampling_type, postresampling_filterchain,
width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, logger)
width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger)

@staticmethod
def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str],
preresampling_filterchain: Optional[List[str]], resampling_type: str, postresampling_filterchain: Optional[List[str]],
width_height: Optional[Tuple[int, int]], quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, fifo_mode, logger):
def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str], preresampling_filterchain: Optional[List[str]],
resampling_type: str, postresampling_filterchain: Optional[List[str]], width_height: Optional[Tuple[int, int]],
quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger):

# decoder type must be None here
assert decoder_type is None, f'decoder_type must be None but is: {decoder_type}'
Expand All @@ -587,9 +585,16 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type,

# only need to open workfile if the path is different from path
assert use_path_as_workpath is False and path != workfile_path

# if fifo mode, mkfifo
if fifo_mode:
os.mkfifo(workfile_path)
else:
with open(workfile_path, 'wb'):
pass

if open_sem is not None:
open_sem.release()

if ref_or_dis == 'ref':
start_end_frame = asset.ref_start_end_frame
Expand Down Expand Up @@ -638,7 +643,7 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type,

# ===== procfile =====

def _open_ref_procfile(self, asset, fifo_mode):
def _open_ref_procfile(self, asset, fifo_mode, open_sem=None):

# only need to open ref procfile if the path is different from ref path
assert asset.use_workpath_as_procpath is False and asset.ref_workfile_path != asset.ref_procfile_path
Expand All @@ -647,6 +652,12 @@ def _open_ref_procfile(self, asset, fifo_mode):

if fifo_mode:
os.mkfifo(asset.ref_procfile_path)
else:
with open(asset.ref_procfile_path, 'wb'):
pass

if open_sem is not None:
open_sem.release()

quality_width, quality_height = self._get_quality_width_height(asset)
yuv_type = asset.workfile_yuv_type
Expand All @@ -662,7 +673,7 @@ def _open_ref_procfile(self, asset, fifo_mode):
except StopIteration:
break

def _open_dis_procfile(self, asset, fifo_mode):
def _open_dis_procfile(self, asset, fifo_mode, open_sem=None):

# only need to open dis procfile if the path is different from dis path
assert asset.use_workpath_as_procpath is False and asset.dis_workfile_path != asset.dis_procfile_path
Expand All @@ -671,6 +682,12 @@ def _open_dis_procfile(self, asset, fifo_mode):

if fifo_mode:
os.mkfifo(asset.dis_procfile_path)
else:
with open(asset.dis_procfile_path, 'wb'):
pass

if open_sem is not None:
open_sem.release()

quality_width, quality_height = self._get_quality_width_height(asset)
yuv_type = asset.workfile_yuv_type
Expand Down Expand Up @@ -914,28 +931,6 @@ def _get_workfile_yuv_type(asset):
else:
return asset.dis_yuv_type

@override(Executor)
def _wait_for_workfiles(self, asset):
# wait til workfile paths being generated
for i in range(10):
if os.path.exists(asset.dis_workfile_path):
break
sleep(0.1)
else:
raise RuntimeError("dis video workfile path {} is missing.".format(
asset.dis_workfile_path))

@override(Executor)
def _wait_for_procfiles(self, asset):
# wait til procfile paths being generated
for i in range(10):
if os.path.exists(asset.dis_procfile_path):
break
sleep(0.1)
else:
raise RuntimeError("dis video procfile path {} is missing.".format(
asset.dis_procfile_path))

@override(Executor)
def _assert_paths(self, asset):
assert os.path.exists(asset.dis_path) or match_any_files(asset.dis_path), \
Expand All @@ -947,21 +942,33 @@ def _open_workfiles(self, asset):

@override(Executor)
def _open_workfiles_in_fifo_mode(self, asset):
sem = multiprocessing.Semaphore(0)
dis_p = multiprocessing.Process(target=self._open_dis_workfile,
args=(asset, True))
args=(asset, True),
kwargs={'open_sem': sem})
dis_p.start()
self._wait_for_workfiles(asset)

if not sem.acquire(timeout=5):
if self.logger:
self.logger.warn(f">5 seconds elapsed waiting for distorted workfile {asset.dis_workfile_path} to be created to be created; now blocking until created")
sem.acquire()

@override(Executor)
def _open_procfiles(self, asset):
self._open_dis_procfile(asset, fifo_mode=False)

@override(Executor)
def _open_procfiles_in_fifo_mode(self, asset):
sem = multiprocessing.Semaphore(0)
dis_p = multiprocessing.Process(target=self._open_dis_procfile,
args=(asset, True))
args=(asset, True),
kwargs={'open_sem': sem})
dis_p.start()
self._wait_for_procfiles(asset)

if not sem.acquire(timeout=5):
if self.logger:
self.logger.warn(f">5 seconds elapsed waiting for distorted procfile {asset.dis_procfile_path} to be created to be created; now blocking until created")
sem.acquire()

@override(Executor)
def _close_workfiles(self, asset):
Expand Down
33 changes: 9 additions & 24 deletions python/vmaf/core/raw_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ def _assert_an_asset(cls, asset):
pass

@override(Executor)
def _open_ref_workfile(self, asset, fifo_mode):
# do nothing
pass
def _open_ref_workfile(self, asset, fifo_mode, open_sem=None):
if open_sem is not None:
open_sem.release()

@override(Executor)
def _open_dis_workfile(self, asset, fifo_mode):
# do nothing
pass

@override(Executor)
def _wait_for_workfiles(self, asset):
pass
def _open_dis_workfile(self, asset, fifo_mode, open_sem=None):
if open_sem is not None:
open_sem.release()

def _generate_result(self, asset):
# do nothing
Expand Down Expand Up @@ -102,21 +98,10 @@ def _assert_args(self):
self.assert_h5py_file()

@override(Executor)
def _open_ref_workfile(self, asset, fifo_mode):
def _open_ref_workfile(self, asset, fifo_mode, open_sem=None):
# do nothing
pass

@override(Executor)
def _wait_for_workfiles(self, asset):
# Override Executor._wait_for_workfiles to skip ref_workfile_path
# wait til workfile paths being generated
for i in range(10):
if os.path.exists(asset.dis_workfile_path):
break
sleep(0.1)
else:
raise RuntimeError("dis video workfile path {} is missing.".format(
asset.dis_workfile_path))
if open_sem is not None:
open_sem.release()

def _generate_result(self, asset):
quality_w, quality_h = asset.quality_width_height
Expand Down
Loading