Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internally construct and use stream ARNs for all streams in multi-stream mode #1318

Merged
merged 1 commit into from
Apr 30, 2024

Conversation

furq-aws
Copy link
Contributor

Issue #, if available:
N/A.

Description of changes:
Construct stream ARNs (using the Kinesis client's configured region) for all stream configs upon writing to the currentStreamConfigMap in multi-stream mode.
This should ensure that all Kinesis API requests from KinesisDataFetcher and KinesisShardDetector are provided with the stream ARN in multi-stream applications.

Note: An implication of stream ARNs being used for Kinesis API calls is that the accountId provided through the StreamTracker must be correct, whereas previously applications may have consumed the streams as long as the provided streamName existed in the account that the Kinesis client was configured for.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally lgtm, let's double check with @akidambisrinivasan though

* @return The {@link Arn} of the Kinesis stream.
*/
public static Arn constructStreamArn(
@NonNull final Region region, final long accountId, @NonNull final String streamName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the accountId just be a String value instead? StreamIdentifier uses accountId as a string. Would avoid any extra parsing we have to do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a long for the accountId can lead to truncation issues for accounts with leading zeros,
so this is actually a bug I think.

Thanks for the callout!

Comment on lines 1109 to 1121
final Optional<Arn> streamArnOptional = streamConfig.streamIdentifier().streamArnOptional();
if (streamArnOptional.isPresent()) {
Validate.isTrue(kinesisRegion.id().equals(streamArnOptional.get().region().get()),
"The provided streamARN " + streamArnOptional.get()
+ " does not match the Kinesis client's configured region - " + kinesisRegion);
return super.put(streamIdentifier, streamConfig);
}

if (isMultiStreamMode) {
return super.put(streamIdentifier, withStreamArn(streamConfig, kinesisRegion));
}

return super.put(streamIdentifier, streamConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isPresent() followed by get is an anti-pattern.
Instead use map or ifPresent as appropriate. In this case, something like

Suggested change
final Optional<Arn> streamArnOptional = streamConfig.streamIdentifier().streamArnOptional();
if (streamArnOptional.isPresent()) {
Validate.isTrue(kinesisRegion.id().equals(streamArnOptional.get().region().get()),
"The provided streamARN " + streamArnOptional.get()
+ " does not match the Kinesis client's configured region - " + kinesisRegion);
return super.put(streamIdentifier, streamConfig);
}
if (isMultiStreamMode) {
return super.put(streamIdentifier, withStreamArn(streamConfig, kinesisRegion));
}
return super.put(streamIdentifier, streamConfig);
StreamConfig cfg = streamConfig.streamIdentifier().streamArnOptional().map(streamArn -> {
Validate.isTrue(kinesisRegion.id().equals(streamArn.region().get()),
"The provided streamARN " + streamArn +
+ " does not match the Kinesis client's configured region - " + kinesisRegion);
return streamConfig;
}).orElse(isMultiStreamMode ? withStreamArn(streamConfig, kinesisRegion):streamConfig);
return super.put(streamIdentifier, cfg);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks!

@furq-aws furq-aws force-pushed the multistream-arn-construction branch from c582565 to 6846fe7 Compare April 30, 2024 12:24
lucienlu-aws
lucienlu-aws previously approved these changes Apr 30, 2024

@KinesisClientInternalApi
public final class ArnUtil {
private static final String SERVICE_NAME = "kinesis";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use something already existing e.g. KinesisAsyncClient.SERVICE_NAME ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thank you for pointing this out!

* Only to be used in multi-stream mode.
*
* @param streamConfig The {@link StreamConfig} object to return a copy of.
* @param kinesisRegion The {@link Region} the stream exists in to be used for constructing the {@link Arn}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing comma - exists in, to be used for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks!

scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);

final Scheduler schedulerSpy = spy(scheduler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a spy? can you not invoke runProcessLoop on scheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout! Spy is not needed here; must've copied from a previous unit test and forgotten to clean it up.

@furq-aws furq-aws merged commit ec34ed1 into awslabs:master Apr 30, 2024
1 of 2 checks passed
@furq-aws furq-aws deleted the multistream-arn-construction branch April 30, 2024 21:20

return super.put(streamIdentifier, streamConfig.streamIdentifier().streamArnOptional()
.map(streamArn -> {
Validate.isTrue(kinesisRegion.id().equals(streamArn.region().get()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that this is invoked in checkAndSyncStreamShardsAndLeases when new streams are discovered, when a shard sync task is generated and updates the currentStreamConfigMap. In that case if the validation fails, it can short-circuit the processing because the exception is only caught in runProcessLoop. This can prevent other streams from being processed. Ideally if there is an issue with a stream, its reasonable for Leader to log an error and skip the stream and maybe even emit a metric. But not let processing of other streams be affected. Can you fix it as a follow up ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants