Skip to content

Commit

Permalink
Improving error messages, matching in tests, thrown error, and includ…
Browse files Browse the repository at this point in the history
…ing ray.wait to improve ObjectNotFound errors
  • Loading branch information
pabloem committed Oct 10, 2022
1 parent 7c22aca commit de776e0
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 59 deletions.
84 changes: 75 additions & 9 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ from ray.exceptions import (
TaskCancelledError,
AsyncioActorExit,
PendingCallsLimitExceeded,
PlasmaObjectNotAvailable,
)
from ray._private import external_storage
from ray.util.scheduling_strategies import (
Expand Down Expand Up @@ -413,8 +412,19 @@ cdef prepare_args_internal(
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_arg, c_owner_address)
if not op_status.ok():
raise PlasmaObjectNotAvailable(
'Object %r could not be found in this Ray session.' % arg)
msg = (
f"Object {repr(arg)} could not be found in this Ray "
"session."
"Please make sure that all Ray objects used by the "
"application are part of the current Ray session. "
"Note that object IDs generated randomly "
"(ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task "
"arguments because Ray does not know which task created them."
"If this was not how your object ID was generated, please "
"file an issue at "
"https://github.com/ray-project/ray/issues/")
raise ValueError(msg)
args_vector.push_back(
unique_ptr[CTaskArg](new CTaskArgByReference(
c_arg,
Expand Down Expand Up @@ -1276,8 +1286,22 @@ cdef class CoreWorker:
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results))
op_status = CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results)
if not op_status.ok():
msg = (
f"One or more objects in {repr(object_refs)} could not be "
"found in this Ray session. "
"Please make sure that all Ray objects used by the "
"application are part of the current Ray session. "
"Note that object IDs generated randomly "
"(ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task "
"arguments because Ray does not know which task created them."
"If this was not how your object ID was generated, please "
"file an issue at "
"https://github.com/ray-project/ray/issues/")
raise ValueError(msg)

return RayObjectsToDataMetadataPairs(results)

Expand Down Expand Up @@ -1479,8 +1503,22 @@ cdef class CoreWorker:

wait_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Wait(
wait_ids, num_returns, timeout_ms, &results, fetch_local))
op_status = CCoreWorkerProcess.GetCoreWorker().Wait(
wait_ids, num_returns, timeout_ms, &results, fetch_local)
if not op_status.ok():
msg = (
f"None of the objects in {repr(object_refs)} were "
"found in this Ray session. "
"Please make sure that all Ray objects used by the "
"application are part of the current Ray session. "
"Note that object IDs generated randomly "
"(ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task "
"arguments because Ray does not know which task created them."
"If this was not how your object ID was generated, please "
"file an issue at "
"https://github.com/ray-project/ray/issues/")
raise ValueError(msg)

assert len(results) == len(object_refs)

Expand Down Expand Up @@ -2032,17 +2070,45 @@ cdef class CoreWorker:
cdef:
CObjectID c_object_id = object_ref.native()
CAddress c_owner_address
CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id, c_owner_address)
if not op_status.ok():
msg = (
f"Object {repr(object_ref)} could not be found in this Ray "
"session."
"Please make sure that all Ray objects used by the "
"application are part of the current Ray session. "
"Note that object IDs generated randomly "
"(ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task "
"arguments because Ray does not know which task created them."
"If this was not how your object ID was generated, please "
"file an issue at "
"https://github.com/ray-project/ray/issues/")
raise ValueError(msg)
return c_owner_address.SerializeAsString()

def serialize_object_ref(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
CAddress c_owner_address = CAddress()
c_string serialized_object_status
CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
c_object_id, &c_owner_address, &serialized_object_status)
if not op_status.ok():
msg = (
f"Object {repr(object_ref)} could not be found in this Ray "
"session."
"Please make sure that all Ray objects used by the "
"application are part of the current Ray session. "
"Note that object IDs generated randomly "
"(ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task "
"arguments because Ray does not know which task created them."
"If this was not how your object ID was generated, please "
"file an issue at "
"https://github.com/ray-project/ray/issues/")
raise ValueError(msg)
return (object_ref,
c_owner_address.SerializeAsString(),
serialized_object_status)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_vector[CObjectReference] GetObjectRefs(
const c_vector[CObjectID] &object_ids) const

void GetOwnershipInfo(const CObjectID &object_id,
CRayStatus GetOwnershipInfo(const CObjectID &object_id,
CAddress *owner_address,
c_string *object_status)
void RegisterOwnershipInfoAndResolveFuture(
Expand Down
78 changes: 58 additions & 20 deletions python/ray/tests/test_ray_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,44 +66,82 @@ def f(s):
ray.shutdown()

ray.init(num_cpus=1, include_dashboard=False)
exception_thrown = False
try:
with pytest.raises(ValueError, match="could not be found in this Ray session"):
f.remote(my_ref) # This would cause full CPython death.
except ray.exceptions.RayError:
# Ignore exception
exception_thrown = True

ray.shutdown()
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)
assert exception_thrown


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown_then_call_list(short_gcs_publish_timeout, shutdown_only):
"""Make sure ray will not kill cpython when using unrecognized ObjectId"""
# Set include_dashboard=False to have faster startup.
ray.init(num_cpus=1, include_dashboard=False)

my_ref = ray.put("anystring")

@ray.remote
def f(s):
print(s)

ray.shutdown()

ray.init(num_cpus=1, include_dashboard=False)
with pytest.raises(ValueError, match="could not be found in this Ray session"):
f.remote([my_ref]) # This would cause full CPython death.

ray.shutdown()
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown_then_get(short_gcs_publish_timeout, shutdown_only):
"""Make sure ray will not hang when trying to Get an unrecognized Obj."""
# Set include_dashboard=False to have faster startup.
ray.init(include_dashboard=False)
ray.init(num_cpus=1, include_dashboard=False)

my_ref = ray.put("anystring")

ray.shutdown()

ray.init(num_cpus=1, include_dashboard=False)
with pytest.raises(ValueError, match="could not be found in this Ray session"):
# This used to cause ray to hang indefinitely (without timeout) or
# throw a timeout exception if a timeout was provided. Now it is expected to
# throw an exception reporting the unknown object.
ray.get(my_ref, timeout=30)

ray.shutdown()
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown_then_wait(short_gcs_publish_timeout, shutdown_only):
"""Make sure ray will not hang when trying to Get an unrecognized Obj."""
# Set include_dashboard=False to have faster startup.
ray.init(num_cpus=1, include_dashboard=False)

my_ref = ray.put("anystring")

ray.shutdown()

ray.init(include_dashboard=False)
appropriate_exception_thrown = False
try:
ray.get(my_ref, timeout=30) # This would cause ray to hang
except ray.exceptions.GetTimeoutError:
# Get timed out, which means it failed to recognize unknown object
appropriate_exception_thrown = False
except ray.exceptions.RayError:
# Ignore exception
appropriate_exception_thrown = True
ray.init(num_cpus=1, include_dashboard=False)
my_new_ref = ray.put("anyotherstring")

# If we have some known and some unknown references, we allow the
# function to wait for the valid references; however, if all the
# references are unknown, we expect an error.
ready, not_ready = ray.wait([my_new_ref, my_ref])
with pytest.raises(ValueError, match="were found in this Ray session"):
# This used to cause ray to hang indefinitely (without timeout) or
# forever return all tasks as not-ready if a timeout was provided.
# Now it is expected to throw an exception reporting if all objects are
# unknown.
ray.wait(not_ready, timeout=30)

ray.shutdown()
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)
assert (
appropriate_exception_thrown
), "ray.get is hanging on unknown object retrieval"


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
Expand Down
18 changes: 15 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -857,11 +857,12 @@ std::vector<rpc::ObjectReference> CoreWorker::GetObjectRefs(
return refs;
}

void CoreWorker::GetOwnershipInfo(const ObjectID &object_id,

void CoreWorker::GetOwnershipInfoOrDie(const ObjectID &object_id,
rpc::Address *owner_address,
std::string *serialized_object_status) {
auto has_owner = reference_counter_->GetOwner(object_id, owner_address);
RAY_CHECK(has_owner)
auto status = GetOwnershipInfo(object_id, owner_address, serialized_object_status);
RAY_CHECK(status.ok())
<< "An application is trying to access a Ray object whose owner is unknown ("
<< object_id
<< "). "
Expand All @@ -872,6 +873,16 @@ void CoreWorker::GetOwnershipInfo(const ObjectID &object_id,
"does not know which task created them. "
"If this was not how your object ID was generated, please file an issue "
"at https://github.com/ray-project/ray/issues/";
}

Status CoreWorker::GetOwnershipInfo(const ObjectID &object_id,
rpc::Address *owner_address,
std::string *serialized_object_status) {
auto has_owner = reference_counter_->GetOwner(object_id, owner_address);
if (!has_owner) {
return Status::ObjectNotFound(
"Unable to get ownership information for requested object");
}

rpc::GetObjectStatusReply object_status;
// Optimization: if the object exists, serialize and inline its status. This also
Expand All @@ -883,6 +894,7 @@ void CoreWorker::GetOwnershipInfo(const ObjectID &object_id,
}
}
*serialized_object_status = object_status.SerializeAsString();
return Status::OK();
}

void CoreWorker::RegisterOwnershipInfoAndResolveFuture(
Expand Down
26 changes: 24 additions & 2 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[out] The RPC address of the worker that owns this object.
Status GetOwnerAddress(const ObjectID &object_id, rpc::Address &owner_address) const;

/// Get the RPC address of the worker that owns the given object.
/// Get the RPC address of the worker that owns the given object. If the
/// object has no owner, then we terminate the process.
///
/// \param[in] object_id The object ID. The object must either be owned by
/// us, or the caller previously added the ownership information (via
Expand Down Expand Up @@ -225,7 +226,28 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[out] owner_address The address of the object's owner. This should
/// be appended to the serialized object ID.
/// \param[out] serialized_object_status The serialized object status protobuf.
void GetOwnershipInfo(const ObjectID &object_id,
Status GetOwnershipInfo(const ObjectID &object_id,
rpc::Address *owner_address,
std::string *serialized_object_status);

/// Get the owner information of an object. This should be
/// called when serializing an object ID, and the returned information should
/// be stored with the serialized object ID. If the ownership of the object
/// cannot be established, then we terminate the process.
///
/// This can only be called on object IDs that we created via task
/// submission, ray.put, or object IDs that we deserialized. It cannot be
/// called on object IDs that were created randomly, e.g.,
/// ObjectID::FromRandom.
///
/// Postcondition: Get(object_id) is valid.
///
/// \param[in] object_id The object ID to serialize.
/// appended to the serialized object ID.
/// \param[out] owner_address The address of the object's owner. This should
/// be appended to the serialized object ID.
/// \param[out] serialized_object_status The serialized object status protobuf.
void GetOwnershipInfoOrDie(const ObjectID &object_id,
rpc::Address *owner_address,
std::string *serialized_object_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetOwnershipInfo(JNIEnv *env,
rpc::Address address;
// TODO(ekl) send serialized object status to Java land.
std::string serialized_object_status;
CoreWorkerProcess::GetCoreWorker().GetOwnershipInfo(
CoreWorkerProcess::GetCoreWorker().GetOwnershipInfoOrDie(
object_id, &address, &serialized_object_status);
auto address_str = address.SerializeAsString();
auto arr = NativeStringToJavaByteArray(env, address_str);
Expand Down
Loading

0 comments on commit de776e0

Please sign in to comment.