Skip to content

Commit 3c55ea2

Browse files
committed
Fix Reactive Producer Tests (Fencing)
Use a different `transactional.id` for each test.
1 parent 86a91b1 commit 3c55ea2

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.junit.jupiter.api.BeforeAll;
3838
import org.junit.jupiter.api.BeforeEach;
3939
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.TestInfo;
4041
import org.reactivestreams.Publisher;
4142
import org.reactivestreams.Subscription;
4243

@@ -104,17 +105,18 @@ public static void setUpBeforeClass() {
104105
}
105106

106107
@BeforeEach
107-
public void setUp() {
108-
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(),
108+
public void setUp(TestInfo info) {
109+
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(info),
109110
new MessagingMessageConverter());
110111
}
111112

112-
private SenderOptions<Integer, String> setupSenderOptionsWithDefaultTopic() {
113+
private SenderOptions<Integer, String> setupSenderOptionsWithDefaultTopic(TestInfo info) {
113114
Map<String, Object> senderProps =
114115
KafkaTestUtils.producerProps(EmbeddedKafkaCondition.getBroker().getBrokersAsString());
115116
SenderOptions<Integer, String> senderOptions = SenderOptions.create(senderProps);
116117
senderOptions = senderOptions
117-
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "reactive.transaction")
118+
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
119+
"reactive.transaction." + info.getDisplayName().replaceAll("\\(\\)", ""))
118120
.producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
119121
return senderOptions;
120122
}
@@ -279,7 +281,9 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
279281
.abort()
280282
.then(Mono.error(error))))
281283
.expectErrorMatches(throwable -> throwable instanceof IllegalStateException &&
282-
throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " +
284+
throwable.getMessage().equals("TransactionalId reactive.transaction."
285+
+ "shouldSendOneRecordTransactionallyViaTemplateAsSenderRecord"
286+
+ "AndReceiveItExactlyOnceWithException: Invalid transition " +
283287
"attempted from state READY to state ABORTING_TRANSACTION"))
284288
.verify(DEFAULT_VERIFY_TIMEOUT);
285289

0 commit comments

Comments
 (0)