Skip to content

Commit

Permalink
Fix broker to specify a list of bookie groups. (apache#6349)
Browse files Browse the repository at this point in the history
### Motivation

Fixes apache#6343

### Modifications

Add a method to cast object value to `String`.
  • Loading branch information
murong00 authored and huangdx0726 committed Aug 24, 2020
1 parent c09fe0a commit ba992ac
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
StatsLogger statsLogger) {
if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
String isolationGroupsString = (String) conf.getProperty(ISOLATION_BOOKIE_GROUPS);
String isolationGroupsString = castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
if (!isolationGroupsString.isEmpty()) {
for (String isolationGroup : isolationGroupsString.split(",")) {
primaryIsolationGroups.add(isolationGroup);
Expand All @@ -80,7 +80,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
}
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString = (String) conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS);
String secondaryIsolationGroupsString = castToString(conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS));
if (!secondaryIsolationGroupsString.isEmpty()) {
for (String isolationGroup : secondaryIsolationGroupsString.split(",")) {
secondaryIsolationGroups.add(isolationGroup);
Expand All @@ -90,6 +90,18 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
}

private String castToString(Object obj) {
if (obj instanceof List<?>) {
List<String> result = new ArrayList<>();
for (Object o : (List<?>) obj) {
result.add(String.class.cast(o));
}
return String.join(",", result);
} else {
return obj.toString();
}
}

private ZooKeeperCache getAndSetZkCache(Configuration conf) {
ZooKeeperCache zkCache = null;
if (conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) != null) {
Expand Down

0 comments on commit ba992ac

Please sign in to comment.