Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tidb] add tidb cdc connector #898

Merged
merged 8 commits into from
Mar 16, 2022
Merged

[tidb] add tidb cdc connector #898

merged 8 commits into from
Mar 16, 2022

Conversation

teckick
Copy link
Contributor

@teckick teckick commented Mar 1, 2022

This PR fixes issue #243.

  • cherry-pick initial commit from TiLaker.
  • support metadata.
  • add more tests.

@teckick teckick changed the title [WIP] add tidb cdc connector [WIP][tidb] add tidb cdc connector Mar 1, 2022
@leonardBang leonardBang self-requested a review March 2, 2022 12:15
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @eastfisher for the contribution, I left some comments. Please let me know If you need any help.

# See the License for the specific language governing permissions and
# limitations under the License.

com.ververica.cdc.connectors.tidb.table.TiDBTableSourceFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please help add one more blank line

pom.xml Outdated Show resolved Hide resolved
Comment on lines 75 to 137
public static final ConfigOption<String> TIKV_PD_ADDRESSES =
ConfigOptions.key(ConfigUtils.TIKV_PD_ADDRESSES)
.stringType()
.noDefaultValue()
.withDescription("TiKV cluster's PD address");

public static final ConfigOption<Long> TIKV_GRPC_TIMEOUT =
ConfigOptions.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
.longType()
.noDefaultValue()
.withDescription("TiKV GRPC timeout in ms");

public static final ConfigOption<Long> TIKV_GRPC_SCAN_TIMEOUT =
ConfigOptions.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
.longType()
.noDefaultValue()
.withDescription("TiKV GRPC scan timeout in ms");

public static final ConfigOption<Integer> TIKV_BATCH_GET_CONCURRENCY =
ConfigOptions.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
.intType()
.noDefaultValue()
.withDescription("TiKV GRPC batch get concurrency");

public static final ConfigOption<Integer> TIKV_BATCH_PUT_CONCURRENCY =
ConfigOptions.key(ConfigUtils.TIKV_BATCH_PUT_CONCURRENCY)
.intType()
.noDefaultValue()
.withDescription("TiKV GRPC batch put concurrency");

public static final ConfigOption<Integer> TIKV_BATCH_SCAN_CONCURRENCY =
ConfigOptions.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
.intType()
.noDefaultValue()
.withDescription("TiKV GRPC batch scan concurrency");

public static final ConfigOption<Integer> TIKV_BATCH_DELETE_CONCURRENCY =
ConfigOptions.key(ConfigUtils.TIKV_BATCH_DELETE_CONCURRENCY)
.intType()
.noDefaultValue()
.withDescription("TiKV GRPC batch delete concurrency");

public static TiConfiguration getTiConfiguration(final Map<String, String> options) {
final Configuration configuration = Configuration.fromMap(options);

final TiConfiguration tiConf =
configuration
.getOptional(TIKV_PD_ADDRESSES)
.map(TiConfiguration::createDefault)
.orElseGet(TiConfiguration::createDefault);

configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
configuration
.getOptional(TIKV_BATCH_GET_CONCURRENCY)
.ifPresent(tiConf::setBatchGetConcurrency);
configuration
.getOptional(TIKV_BATCH_PUT_CONCURRENCY)
.ifPresent(tiConf::setBatchPutConcurrency);
configuration
.getOptional(TIKV_BATCH_SCAN_CONCURRENCY)
.ifPresent(tiConf::setBatchScanConcurrency);
configuration
.getOptional(TIKV_BATCH_DELETE_CONCURRENCY)
.ifPresent(tiConf::setBatchDeleteConcurrency);
return tiConf;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot list all valida TiKV parameters here, I think we should enable user to pass tikv.xx options just like Flink Kafka Connector enable user to properties.xx.

@leonardBang leonardBang added this to the V2.2.0 milestone Mar 7, 2022
FactoryUtil.createTableFactoryHelper(this, context);

final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option TDBSourceOptions.TIKV_PD_ADDRESSES is required,but you do not add to TiDBTableSource‘s constructor parameter.

@teckick teckick changed the title [WIP][tidb] add tidb cdc connector [tidb] add tidb cdc connector Mar 15, 2022
@teckick
Copy link
Contributor Author

teckick commented Mar 16, 2022

cherry-pick and squash commits, then force push.

@leonardBang
Copy link
Contributor

Thanks @eastfisher and @GOODBOY008 for the great work! Looks pretty good to me as our first version, I'll help organize the commits and merge them.

@leonardBang leonardBang merged commit 39a9974 into apache:master Mar 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants