Skip to content

Commit

Permalink
Improving message for access to unknown objects
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo E <pabloem@apache.org>

Fixes #28341. Generates an error instead of hanging on ray.get of unknown object

Signed-off-by: Pablo E <pabloem@apache.org>

format

Signed-off-by: Pablo E <pabloem@apache.org>

fix build error -Wunused-result

Signed-off-by: Pablo E <pabloem@apache.org>

Apply clang-format

Signed-off-by: Pablo E <pabloem@apache.org>

Fix jni import

fixup

Signed-off-by: Pablo E <pabloem@apache.org>
  • Loading branch information
pabloem committed Sep 9, 2022
1 parent 5e24379 commit 7c22aca
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ std::vector<std::unique_ptr<::ray::TaskArg>> TransformArgs(
auto owner_address = ray::rpc::Address{};
if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
owner_address = core_worker.GetOwnerAddress(id);
owner_address = core_worker.GetOwnerAddressOrDie(id);
}
ray_arg = absl::make_unique<ray::TaskArgByReference>(id,
owner_address,
Expand Down
17 changes: 13 additions & 4 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ from ray.exceptions import (
TaskCancelledError,
AsyncioActorExit,
PendingCallsLimitExceeded,
PlasmaObjectNotAvailable,
)
from ray._private import external_storage
from ray.util.scheduling_strategies import (
Expand Down Expand Up @@ -399,6 +400,8 @@ cdef prepare_args_internal(
c_vector[CObjectID] inlined_ids
c_string put_arg_call_site
c_vector[CObjectReference] inlined_refs
CAddress c_owner_address
CRayStatus op_status

worker = ray._private.worker.global_worker
put_threshold = RayConfig.instance().max_direct_call_object_size()
Expand All @@ -407,11 +410,15 @@ cdef prepare_args_internal(
for arg in args:
if isinstance(arg, ObjectRef):
c_arg = (<ObjectRef>arg).native()
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_arg, c_owner_address)
if not op_status.ok():
raise PlasmaObjectNotAvailable(
'Object %r could not be found in this Ray session.' % arg)
args_vector.push_back(
unique_ptr[CTaskArg](new CTaskArgByReference(
c_arg,
CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_arg),
c_owner_address,
arg.call_site())))

else:
Expand Down Expand Up @@ -2024,8 +2031,10 @@ cdef class CoreWorker:
def get_owner_address(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id).SerializeAsString()
CAddress c_owner_address
CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id, c_owner_address)
return c_owner_address.SerializeAsString()

def serialize_object_ref(self, ObjectRef object_ref):
cdef:
Expand Down
7 changes: 4 additions & 3 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,14 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
void PutObjectIntoPlasma(const CRayObject &object,
const CObjectID &object_id)
const CAddress &GetRpcAddress() const
CAddress GetOwnerAddress(const CObjectID &object_id) const
CRayStatus GetOwnerAddress(const CObjectID &object_id,
CAddress &owner_address) const
c_vector[CObjectReference] GetObjectRefs(
const c_vector[CObjectID] &object_ids) const

void GetOwnershipInfo(const CObjectID &object_id,
CAddress *owner_address,
c_string *object_status)
CAddress *owner_address,
c_string *object_status)
void RegisterOwnershipInfoAndResolveFuture(
const CObjectID &object_id,
const CObjectID &outer_object_id,
Expand Down
55 changes: 55 additions & 0 deletions python/ray/tests/test_ray_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,61 @@ def f():
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown_then_call(short_gcs_publish_timeout, shutdown_only):
"""Make sure ray will not kill cpython when using unrecognized ObjectId"""
# Set include_dashboard=False to have faster startup.
ray.init(num_cpus=1, include_dashboard=False)

my_ref = ray.put("anystring")

@ray.remote
def f(s):
print(s)

ray.shutdown()

ray.init(num_cpus=1, include_dashboard=False)
exception_thrown = False
try:
f.remote(my_ref) # This would cause full CPython death.
except ray.exceptions.RayError:
# Ignore exception
exception_thrown = True

ray.shutdown()
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)
assert exception_thrown


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown_then_get(short_gcs_publish_timeout, shutdown_only):
"""Make sure ray will not hang when trying to Get an unrecognized Obj."""
# Set include_dashboard=False to have faster startup.
ray.init(include_dashboard=False)

my_ref = ray.put("anystring")

ray.shutdown()

ray.init(include_dashboard=False)
appropriate_exception_thrown = False
try:
ray.get(my_ref, timeout=30) # This would cause ray to hang
except ray.exceptions.GetTimeoutError:
# Get timed out, which means it failed to recognize unknown object
appropriate_exception_thrown = False
except ray.exceptions.RayError:
# Ignore exception
appropriate_exception_thrown = True

ray.shutdown()
wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0)
assert (
appropriate_exception_thrown
), "ray.get is hanging on unknown object retrieval"


@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_driver_dead(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is killed."""
Expand Down
43 changes: 32 additions & 11 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,18 +814,33 @@ CoreWorker::GetAllReferenceCounts() const {

const rpc::Address &CoreWorker::GetRpcAddress() const { return rpc_address_; }

rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const {
rpc::Address CoreWorker::GetOwnerAddressOrDie(const ObjectID &object_id) const {
rpc::Address owner_address;
auto has_owner = reference_counter_->GetOwner(object_id, &owner_address);
RAY_CHECK(has_owner)
<< "Object IDs generated randomly (ObjectID.from_random()) or out-of-band "
auto status = GetOwnerAddress(object_id, owner_address);
RAY_CHECK(status.ok())
<< "An application is trying to access a Ray object whose owner is unknown ("
<< object_id
<< "). "
"Please make sure that all Ray objects you are trying to access are part of "
"the current Ray session. Note that "
"object IDs generated randomly (ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray "
"does not know which task created them. "
"If this was not how your object ID was generated, please file an issue "
"at https://github.com/ray-project/ray/issues/";
return owner_address;
}

Status CoreWorker::GetOwnerAddress(const ObjectID &object_id,
rpc::Address &owner_address) const {
auto has_owner = reference_counter_->GetOwner(object_id, &owner_address);
if (!has_owner) {
return Status::ObjectNotFound(
"Unable to get ownership information for requested object");
}
return Status::OK();
}

std::vector<rpc::ObjectReference> CoreWorker::GetObjectRefs(
const std::vector<ObjectID> &object_ids) const {
std::vector<rpc::ObjectReference> refs;
Expand All @@ -847,12 +862,16 @@ void CoreWorker::GetOwnershipInfo(const ObjectID &object_id,
std::string *serialized_object_status) {
auto has_owner = reference_counter_->GetOwner(object_id, owner_address);
RAY_CHECK(has_owner)
<< "Object IDs generated randomly (ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be serialized because Ray does not know "
"which task will create them. "
<< "An application is trying to access a Ray object whose owner is unknown ("
<< object_id
<< "). "
"Please make sure that all Ray objects you are trying to access are part of "
"the current Ray session. Note that "
"object IDs generated randomly (ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray "
"does not know which task created them. "
"If this was not how your object ID was generated, please file an issue "
"at https://github.com/ray-project/ray/issues/: "
<< object_id;
"at https://github.com/ray-project/ray/issues/";

rpc::GetObjectStatusReply object_status;
// Optimization: if the object exists, serialize and inline its status. This also
Expand Down Expand Up @@ -1319,7 +1338,8 @@ Status CoreWorker::GetLocationFromOwner(
std::make_shared<absl::flat_hash_map<ObjectID, std::shared_ptr<ObjectLocation>>>();

for (const auto &object_id : object_ids) {
auto owner_address = GetOwnerAddress(object_id);
rpc::Address owner_address;
RAY_RETURN_NOT_OK(GetOwnerAddress(object_id, owner_address));
auto client = core_worker_client_pool_->GetOrConnect(owner_address);
rpc::GetObjectLocationsOwnerRequest request;
auto object_location_request = request.mutable_object_location_request();
Expand Down Expand Up @@ -3290,7 +3310,8 @@ void CoreWorker::PlasmaCallback(SetResultCallback success,
// Ask raylet to subscribe to object notification. Raylet will call this core worker
// when the object is local (and it will fire the callback immediately if the object
// exists). CoreWorker::HandlePlasmaObjectReady handles such request.
local_raylet_client_->SubscribeToPlasma(object_id, GetOwnerAddress(object_id));
auto owner_address = GetOwnerAddressOrDie(object_id);
local_raylet_client_->SubscribeToPlasma(object_id, owner_address);
}

void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request,
Expand Down
10 changes: 9 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// us, or the caller previously added the ownership information (via
/// RegisterOwnershipInfoAndResolveFuture).
/// \param[out] The RPC address of the worker that owns this object.
rpc::Address GetOwnerAddress(const ObjectID &object_id) const;
Status GetOwnerAddress(const ObjectID &object_id, rpc::Address &owner_address) const;

/// Get the RPC address of the worker that owns the given object.
///
/// \param[in] object_id The object ID. The object must either be owned by
/// us, or the caller previously added the ownership information (via
/// RegisterOwnershipInfoAndResolveFuture).
/// \param[out] The RPC address of the worker that owns this object.
rpc::Address GetOwnerAddressOrDie(const ObjectID &object_id) const;

/// Get the RPC address of the worker that owns the given object.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetOwnerAddress(JNIEnv *env,
jclass,
jbyteArray objectId) {
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
const auto &rpc_address = CoreWorkerProcess::GetCoreWorker().GetOwnerAddress(object_id);
return NativeStringToJavaByteArray(env, rpc_address.SerializeAsString());
rpc::Address owner_address;
// Ignore the outcome for now.
const auto &rpc_address =
CoreWorkerProcess::GetCoreWorker().GetOwnerAddressOrDie(object_id);
return NativeStringToJavaByteArray(env, owner_address.SerializeAsString());
}

JNIEXPORT jbyteArray JNICALL
Expand Down
21 changes: 21 additions & 0 deletions src/ray/core_worker/store_provider/memory_store/memory_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,30 @@ Status CoreWorkerMemoryStore::Get(const std::vector<ObjectID> &object_ids,
/*abort_if_any_object_is_exception=*/true);
}

Status CoreWorkerMemoryStore::ObjectHasOwner(const ObjectID &object_id,
bool abort_if_missing) {
if (!abort_if_missing) {
return Status::OK();
} else {
rpc::Address unused_owner_address;
auto has_owner = ref_counter_->GetOwner(object_id, &unused_owner_address);
if (!has_owner) {
return Status::ObjectNotFound(
"Unable to get ownership information for requested object");
} else {
return Status::OK();
}
}
}

Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
int num_objects,
int64_t timeout_ms,
const WorkerContext &ctx,
bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception) {
// TODO(pabloem): This function never returns, even for objects that do not exist.
(*results).resize(object_ids.size(), nullptr);

std::shared_ptr<GetRequest> get_request;
Expand All @@ -311,6 +328,10 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
count += 1;
} else {
remaining_ids.insert(object_id);
Status status = ObjectHasOwner(object_id, abort_if_any_object_is_exception);
if (!status.ok()) {
return status;
}
}
}
RAY_CHECK(count <= num_objects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ class CoreWorkerMemoryStore {
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception);

// Checks whether the given object ID has a known owner. Returns Ok if
// it does, or if abort_if_missing = false, as that indiciates that a missing
// owner is no reason to abort.
Status ObjectHasOwner(const ObjectID &object_id, bool abort_if_missing);

/// Called when an object is deleted from the store.
void OnDelete(std::shared_ptr<RayObject> obj);

Expand Down

0 comments on commit 7c22aca

Please sign in to comment.