Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce expire_tags action and procedure with olderThan user custom time #4138

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ All available procedures are listed below.
CALL sys.delete_tag('default.T', 'my_tag')
</td>
</tr>
<tr>
<td>expire_tags</td>
<td>
CALL [catalog.]sys.expire_tags('identifier', 'expiration_time')
</td>
<td>
To expire tags by time. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>expiration_time: tagCreateTime before which tags will be removed.</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

older_than

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

</td>
<td>
CALL sys.expire_tags(table => 'default.T', expiration_time => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
<td>merge_into</td>
<td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ This section introduce all available spark procedures about paimon.
</td>
<td>CALL sys.delete_tag(table => 'default.T', tag => 'my_tag')</td>
</tr>
<tr>
<td>expire_tags</td>
<td>
To expire tags by time. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: tagCreateTime before which tags will be removed.</li>
</td>
<td>
CALL sys.expire_tags(table => 'default.T', expiration_time => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
<td>rollback</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -194,6 +195,12 @@ public ExpireSnapshots newExpireChangelog() {
return wrapped.newExpireChangelog();
}

@Override
public TagTimeExpire newExpireTags() {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newExpireTags();
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new PrivilegedFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
Expand Down Expand Up @@ -372,6 +373,15 @@ public ExpireSnapshots newExpireChangelog() {
snapshotManager(), tagManager(), store().newChangelogDeletion());
}

@Override
public TagTimeExpire newExpireTags() {
return TagTimeExpire.create(
snapshotManager(),
tagManager(),
store().newTagDeletion(),
store().createTagCallbacks());
}

@Override
public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -199,6 +200,11 @@ public ExpireSnapshots newExpireChangelog() {
return wrapped.newExpireChangelog();
}

@Override
public TagTimeExpire newExpireTags() {
return wrapped.newExpireTags();
}

@Override
public DataTableScan newScan() {
return wrapped.newScan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;

Expand Down Expand Up @@ -306,6 +307,11 @@ default ExpireSnapshots newExpireChangelog() {
throw new UnsupportedOperationException();
}

@Override
default TagTimeExpire newExpireTags() {
throw new UnsupportedOperationException();
}

@Override
default ReadBuilder newReadBuilder() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.WriteSelector;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.SimpleFileReader;

import java.time.Duration;
Expand Down Expand Up @@ -252,4 +253,12 @@ default ExpireSnapshots newExpireChangelog() {
"Readonly Table %s does not support expireChangelog.",
this.getClass().getSimpleName()));
}

@Override
default TagTimeExpire newExpireTags() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support expireTags.",
this.getClass().getSimpleName()));
}
}
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;

Expand Down Expand Up @@ -164,6 +165,9 @@ default void deleteBranches(String branchNames) {
@Experimental
ExpireSnapshots newExpireChangelog();

/** Expire tags. */
TagTimeExpire newExpireTags();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont introduce this method to Table, it is not so useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed. Removed it, just depends on TagAutoManager.


// =============== Read & Write Operations ==================

/** Returns a new read builder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void run() {
tagAutoCreation.run();
}
if (tagTimeExpire != null) {
tagTimeExpire.run();
tagTimeExpire.expire();
}
}

Expand Down
17 changes: 15 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

/** A manager to expire tags by time. */
Expand All @@ -41,6 +42,8 @@ public class TagTimeExpire {
private final TagDeletion tagDeletion;
private final List<TagCallback> callbacks;

private LocalDateTime expirationTime;

private TagTimeExpire(
SnapshotManager snapshotManager,
TagManager tagManager,
Expand All @@ -52,8 +55,9 @@ private TagTimeExpire(
this.callbacks = callbacks;
}

public void run() {
public List<String> expire() {
List<Pair<Tag, String>> tags = tagManager.tagObjects();
List<String> expired = new ArrayList<>();
for (Pair<Tag, String> pair : tags) {
Tag tag = pair.getLeft();
String tagName = pair.getRight();
Expand All @@ -62,14 +66,23 @@ public void run() {
if (createTime == null || timeRetained == null) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tag without createTime, can we just check its file creation time?

}
if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) {
if ((expirationTime == null
&& LocalDateTime.now().isAfter(createTime.plus(timeRetained)))
|| (expirationTime != null && expirationTime.isAfter(createTime))) {
LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {} or" +
" its createTime {} is olderThan olderThanTime {}.",
tagName,
timeRetained,
createTime,
olderThanTime);

tagName,
timeRetained);
tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks);
expired.add(tagName);
}
}
return expired;
}

public TagTimeExpire withExpirationTime(LocalDateTime expirationTime) {
this.expirationTime = expirationTime;
return this;
}

public static TagTimeExpire create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.ExpireTagsProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Expire tags action for Flink. */
public class ExpireTagsAction extends ActionBase {

private final String table;
private final String expirationTime;

public ExpireTagsAction(
String warehouse,
String table,
String expirationTime,
Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
this.table = table;
this.expirationTime = expirationTime;
}

@Override
public void run() throws Exception {
ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure();
expireTagsProcedure.withCatalog(catalog);
expireTagsProcedure.call(new DefaultProcedureContext(env), table, expirationTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link ExpireTagsAction}. */
public class ExpireTagsActionFactory implements ActionFactory {

public static final String IDENTIFIER = "expire_tags";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private


private static final String EXPIRATION_TIME = "expiration_time";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String table = params.get(TABLE);
String expirationTime = params.get(EXPIRATION_TIME);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

ExpireTagsAction expireTagsAction =
new ExpireTagsAction(warehouse, table, expirationTime, catalogConfig);
return Optional.of(expireTagsAction);
}

@Override
public void printHelp() {
System.out.println("Action \"expire_tags\" expire tags by time.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" expire_tags --warehouse <warehouse_path> "
+ "--table <database.table_name> "
+ "[--expiration_time <expiration_time>]");
}
}
Loading
Loading