Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support empty AvailableData structs #138

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 47 additions & 47 deletions .github/workflows/test_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,53 +103,53 @@ jobs:
- name: Test
run: |
python -m pytest -k test_dcam

egrabber:
name: Python ${{ matrix.python }} (eGrabber)
runs-on:
- self-hosted
- egrabber
- VC-151MX-M6H00
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
python: [ "3.8", "3.9", "3.10" ]

permissions:
actions: write
env:
GH_TOKEN: ${{ github.token }}
steps:
- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@0.11.0
with:
access_token: ${{ github.token }}

- uses: actions/checkout@v3
with:
submodules: true
ref: ${{ github.event.pull_request.head.sha }}

- name: Get CMake 3.24
uses: lukka/get-cmake@latest
with:
cmakeVersion: 3.24.3

- name: Set up Python ${{ matrix.python }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}

- name: Install
run: |
pip install --upgrade pip
pip install -e .[testing]

- name: Test
run: |
python -m pytest -k test_egrabber

# TODO (aliddell): uncomment when we get an eGrabber runner up again
# egrabber:
# name: Python ${{ matrix.python }} (eGrabber)
# runs-on:
# - self-hosted
# - egrabber
# - VC-151MX-M6H00
# timeout-minutes: 20
# strategy:
# fail-fast: false
# matrix:
# python: [ "3.8", "3.9", "3.10" ]

# permissions:
# actions: write
# env:
# GH_TOKEN: ${{ github.token }}
# steps:
# - name: Cancel Previous Runs
# uses: styfle/cancel-workflow-action@0.11.0
# with:
# access_token: ${{ github.token }}

# - uses: actions/checkout@v3
# with:
# submodules: true
# ref: ${{ github.event.pull_request.head.sha }}

# - name: Get CMake 3.24
# uses: lukka/get-cmake@latest
# with:
# cmakeVersion: 3.24.3

# - name: Set up Python ${{ matrix.python }}
# uses: actions/setup-python@v4
# with:
# python-version: ${{ matrix.python }}

# - name: Install
# run: |
# pip install --upgrade pip
# pip install -e .[testing]

# - name: Test
# run: |
# python -m pytest -k test_egrabber

spinnaker:
name: Python ${{ matrix.python }} (Spinnaker)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "acquire-imaging"
authors = ["Nathan Clack <nclack@chanzuckerberg.com>"]
version = "0.3.0"
version = "0.3.0-rc1"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion drivers.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"acquire-driver-common": "0.1.6",
"acquire-driver-zarr": "0.1.7",
"acquire-driver-zarr": "0.1.8",
"acquire-driver-egrabber": "0.1.5",
"acquire-driver-hdcam": "0.1.7",
"acquire-driver-spinnaker": "0.1.1"
Expand Down
21 changes: 10 additions & 11 deletions python/acquire/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,16 @@ def next_frame() -> Optional[npt.NDArray[Any]]:
"""Get the next frame from the current stream."""
if nframes[stream_id] < p.video[stream_id].max_frame_count:
with runtime.get_available_data(stream_id) as packet:
if packet:
n = packet.get_frame_count()
nframes[stream_id] += n
logging.info(
f"[stream {stream_id}] frame count: {nframes}"
)
f = next(packet.frames())
logging.debug(
f"stream {stream_id} frame {f.metadata().frame_id}"
)
return f.data().squeeze().copy()
n = packet.get_frame_count()
nframes[stream_id] += n
logging.info(
f"[stream {stream_id}] frame count: {nframes}"
)
f = next(packet.frames())
logging.debug(
f"stream {stream_id} frame {f.metadata().frame_id}"
)
return f.data().squeeze().copy()
return None

while is_not_done(): # runtime.get_state()==DeviceState.Running:
Expand Down
3 changes: 1 addition & 2 deletions python/acquire/acquire.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ from numpy.typing import NDArray

@final
class AvailableDataContext:
def __enter__(self) -> Optional[AvailableData]: ...
def __enter__(self) -> AvailableData: ...
def __exit__(
self, exc_type: Any, exc_value: Any, traceback: Any
) -> None: ...
Expand All @@ -23,7 +23,6 @@ class AvailableDataContext:
class AvailableData:
def frames(self) -> Iterator[VideoFrame]: ...
def get_frame_count(self) -> int: ...
def invalidate(self) -> None: ...
def __iter__(self) -> Iterator[VideoFrame]: ...

@final
Expand Down
95 changes: 55 additions & 40 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use numpy::{
Ix4, ToPyArray,
};
use parking_lot::Mutex;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -196,7 +197,14 @@ impl Runtime {
Ok(AvailableDataContext {
inner: self.inner.clone(),
stream_id,
available_data: None,
available_data: Python::with_gil(|py| {
Py::new(
py,
AvailableData {
inner: Arc::new(Mutex::new(None)),
},
)
})?,
})
}
}
Expand Down Expand Up @@ -273,13 +281,13 @@ impl Drop for RawAvailableData {

#[pyclass]
pub(crate) struct AvailableData {
inner: Option<Arc<RawAvailableData>>,
inner: Arc<Mutex<Option<RawAvailableData>>>,
}

#[pymethods]
impl AvailableData {
fn get_frame_count(&self) -> usize {
if let Some(inner) = &self.inner {
if let Some(inner) = &*self.inner.lock() {
inner.get_frame_count()
} else {
0
Expand All @@ -288,9 +296,9 @@ impl AvailableData {

fn frames(&self) -> VideoFrameIterator {
VideoFrameIterator {
inner: if let Some(frames) = &self.inner {
inner: if let Some(frames) = &*self.inner.lock() {
Some(VideoFrameIteratorInner {
store: frames.clone(),
store: self.inner.clone(),
cur: Mutex::new(frames.beg),
end: frames.end,
})
Expand All @@ -303,24 +311,26 @@ impl AvailableData {
fn __iter__(slf: PyRef<'_, Self>) -> PyResult<Py<VideoFrameIterator>> {
Py::new(slf.py(), slf.frames())
}
}

impl AvailableData {
fn invalidate(&mut self) {
// Will drop the RawAvailableData and cause Available data to act like
// an empty iterator.
self.inner = None;
*self.inner.lock() = None;
}
}

#[pyclass]
pub(crate) struct AvailableDataContext {
inner: Arc<RawRuntime>,
stream_id: u32,
available_data: Option<Py<AvailableData>>,
available_data: Py<AvailableData>,
}

#[pymethods]
impl AvailableDataContext {
fn __enter__(&mut self) -> PyResult<Option<Py<AvailableData>>> {
fn __enter__(&mut self) -> PyResult<Py<AvailableData>> {
let AvailableDataContext {
inner,
stream_id,
Expand All @@ -329,45 +339,40 @@ impl AvailableDataContext {
let stream_id = *stream_id;
let (beg, end) = inner.map_read(stream_id)?;
let nbytes = unsafe { byte_offset_from(beg, end) };
if nbytes > 0 {
log::trace!(
"[stream {}] ACQUIRED {:p}-{:p}:{} bytes",
stream_id,
beg,
end,
nbytes

log::trace!(
"[stream {}] ACQUIRED {:p}-{:p}:{} bytes",
stream_id,
beg,
end,
nbytes
);
*available_data = Python::with_gil(|py| {
Py::new(
py,
AvailableData {
inner: Arc::new(Mutex::new(Some(RawAvailableData {
runtime: self.inner.clone(),
beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?,
end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?,
stream_id,
consumed_bytes: None,
}))),
},
)
};
if nbytes > 0 {
*available_data = Some(Python::with_gil(|py| {
Py::new(
py,
AvailableData {
inner: Some(Arc::new(RawAvailableData {
runtime: self.inner.clone(),
beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?,
end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?,
stream_id,
consumed_bytes: None,
})),
},
)
})?);
}
})?;
return Ok(self.available_data.clone());
}

fn __exit__(&mut self, _exc_type: &PyAny, _exc_value: &PyAny, _traceback: &PyAny) {
Python::with_gil(|py| {
if let Some(a) = &self.available_data {
a.as_ref(py).borrow_mut().invalidate()
};
(&self.available_data).as_ref(py).borrow_mut().invalidate();
nclack marked this conversation as resolved.
Show resolved Hide resolved
});
}
}

struct VideoFrameIteratorInner {
store: Arc<RawAvailableData>,
store: Arc<Mutex<Option<RawAvailableData>>>,
cur: Mutex<NonNull<capi::VideoFrame>>,
end: NonNull<capi::VideoFrame>,
}
Expand All @@ -379,7 +384,7 @@ impl Iterator for VideoFrameIteratorInner {

fn next(&mut self) -> Option<Self::Item> {
let mut cur = self.cur.lock();
if *cur < self.end {
if (*self.store.lock()).is_some() && *cur < self.end {
let out = VideoFrame {
_store: self.store.clone(),
cur: *cur,
Expand Down Expand Up @@ -503,7 +508,7 @@ impl IntoDimension for capi::ImageShape {

#[pyclass]
pub(crate) struct VideoFrame {
_store: Arc<RawAvailableData>,
_store: Arc<Mutex<Option<RawAvailableData>>>,
cur: NonNull<capi::VideoFrame>,
}

Expand All @@ -512,6 +517,11 @@ unsafe impl Send for VideoFrame {}
#[pymethods]
impl VideoFrame {
fn metadata(slf: PyRef<'_, Self>) -> PyResult<VideoFrameMetadata> {
if (*slf._store.lock()).is_none() {
return Err(PyRuntimeError::new_err(
"VideoFrame is not valid outside of context",
));
}
let cur = slf.cur.as_ptr();
let meta = unsafe {
VideoFrameMetadata {
Expand All @@ -522,7 +532,12 @@ impl VideoFrame {
Ok(meta)
}

fn data<'py>(&self, py: Python<'py>) -> Py<PyAny> {
fn data<'py>(&self, py: Python<'py>) -> PyResult<Py<PyAny>> {
if (*self._store.lock()).is_none() {
return Err(PyRuntimeError::new_err(
"VideoFrame is not valid outside of context",
));
}
let cur = self.cur.as_ptr();

macro_rules! gen_match {
Expand Down Expand Up @@ -556,7 +571,7 @@ impl VideoFrame {
}
.unwrap();

array.to_pyobject(py)
Ok(array.to_pyobject(py))
}
}

Expand Down
Loading
Loading