Skip to content

Commit

Permalink
[azservicebus, azeventhubs] Treat 'entity full' as a fatal error (#20722
Browse files Browse the repository at this point in the history
)

When the remote entity is full we get a resource-limit-exceeded condition. This isn't something we should keep retrying on and it's best to just abort and let the user know immediately, rather than hoping it might eventually clear out.

This affected both Event Hubs and Service Bus.

Fixes #20647
  • Loading branch information
richardpark-msft committed May 2, 2023
1 parent e2693bd commit 03f0ac3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 62 deletions.
24 changes: 13 additions & 11 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 1.0.0 (2023-04-11)
## 1.0.0 (2023-05-09)

### Features Added

Expand All @@ -13,13 +13,14 @@
- Recovery now includes internal timeouts and also handles restarting a connection if AMQP primitives aren't closed cleanly.
- Potential leaks for $cbs and $management when there was a partial failure. (PR#20564)
- Latest go-amqp changes have been merged in with fixes for robustness.
- Sending a message to an entity that is full will no longer retry. (PR#20722)

## 0.6.0 (2023-03-07)

### Features Added

- Added the `ConsumerClientOptions.InstanceID` field. This optional field can enhance error messages from
Event Hubs. For example, error messages related to ownership changes for a partition will contain the
- Added the `ConsumerClientOptions.InstanceID` field. This optional field can enhance error messages from
Event Hubs. For example, error messages related to ownership changes for a partition will contain the
name of the link that has taken ownership, which can help with traceability.

### Breaking Changes
Expand All @@ -41,15 +42,15 @@
### Breaking Changes

- ProcessorOptions.OwnerLevel has been removed. The Processor uses 0 as the owner level.
- Uses the public release of `github.com/Azure/azure-sdk-for-go/sdk/storage/azblob` package rather than using an internal copy.
- Uses the public release of `github.com/Azure/azure-sdk-for-go/sdk/storage/azblob` package rather than using an internal copy.
For an example, see [example_consuming_with_checkpoints_test.go](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go).

## 0.4.0 (2023-01-10)

### Bugs Fixed

- User-Agent was incorrectly formatted in our AMQP-based clients. (PR#19712)
- Connection recovery has been improved, removing some unnecessasry retries as well as adding a bound around
- Connection recovery has been improved, removing some unnecessasry retries as well as adding a bound around
some operations (Close) that could potentially block recovery for a long time. (PR#19683)

## 0.3.0 (2022-11-10)
Expand Down Expand Up @@ -78,7 +79,7 @@
- NewWebSocketConnArgs renamed to WebSocketConnParams
- Code renamed to ErrorCode, including associated constants like `ErrorCodeOwnershipLost`.
- OwnershipData, CheckpointData, and CheckpointStoreAddress have been folded into their individual structs: Ownership and Checkpoint.
- StartPosition and OwnerLevel were erroneously included in the ConsumerClientOptions struct - they've been removed. These can be
- StartPosition and OwnerLevel were erroneously included in the ConsumerClientOptions struct - they've been removed. These can be
configured in the PartitionClientOptions.

### Bugs Fixed
Expand All @@ -90,8 +91,8 @@

### Features Added

- Adding in the new Processor type, which can be used to do distributed (and load balanced) consumption of events, using a
CheckpointStore. The built-in checkpoints.BlobStore uses Azure Blob Storage for persistence. A full example is
- Adding in the new Processor type, which can be used to do distributed (and load balanced) consumption of events, using a
CheckpointStore. The built-in checkpoints.BlobStore uses Azure Blob Storage for persistence. A full example is
in [example_consuming_with_checkpoints_test.go](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go).

### Breaking Changes
Expand All @@ -101,15 +102,16 @@
instances (using ConsumerClient.NewPartitionClient), which allows you to share the same AMQP connection and receive from multiple
partitions simultaneously.
- Changes to EventData/ReceivedEventData:

- ReceivedEventData now embeds EventData for fields common between the two, making it easier to change and resend.
- `ApplicationProperties` renamed to `Properties`.
- `PartitionKey` removed from `EventData`. To send events using a PartitionKey you must set it in the options
when creating the EventDataBatch:

```go
batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.NewEventDataBatchOptions{
PartitionKey: to.Ptr("partition key"),
})
PartitionKey: to.Ptr("partition key"),
})
```

### Bugs Fixed
Expand All @@ -120,4 +122,4 @@

## 0.1.0 (2022-08-11)

- Initial preview for the new version of the Azure Event Hubs Go SDK.
- Initial preview for the new version of the Azure Event Hubs Go SDK.
3 changes: 3 additions & 0 deletions sdk/messaging/azeventhubs/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{
amqp.ErrCondInternalError: RecoveryKindConn, // "amqp:internal-error"

// No recovery possible - this operation is non retriable.

// ErrCondResourceLimitExceeded comes back if the entity is actually full.
amqp.ErrCondResourceLimitExceeded: RecoveryKindFatal, // "amqp:resource-limit-exceeded"
amqp.ErrCondMessageSizeExceeded: RecoveryKindFatal, // "amqp:link:message-size-exceeded"
amqp.ErrCondUnauthorizedAccess: RecoveryKindFatal, // creds are bad
amqp.ErrCondNotFound: RecoveryKindFatal, // "amqp:not-found"
Expand Down
1 change: 1 addition & 0 deletions sdk/messaging/azeventhubs/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestGetRecoveryKind(t *testing.T) {
require.Equal(t, GetRecoveryKind(context.Canceled), RecoveryKindFatal)
require.Equal(t, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusUnauthorized}}), RecoveryKindFatal)
require.Equal(t, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusNotFound}}), RecoveryKindFatal)
require.Equal(t, GetRecoveryKind(&amqp.Error{Condition: amqp.ErrCondResourceLimitExceeded}), RecoveryKindFatal)
}

func Test_TransformError(t *testing.T) {
Expand Down
Loading

0 comments on commit 03f0ac3

Please sign in to comment.