-
Notifications
You must be signed in to change notification settings - Fork 928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Introduce expire_tags action and procedure with olderThan user custom time #4138
base: master
Are you sure you want to change the base?
Conversation
docs/content/flink/procedures.md
Outdated
<td> | ||
To expire tags by time. Arguments: | ||
<li>identifier: the target table identifier. Cannot be empty.</li> | ||
<li>expiration_time: tagCreateTime before which tags will be removed.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
older_than
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
/** IT cases for {@link ExpireTagsAction}. */ | ||
public class ExpireTagsActionITTest extends ActionITCaseBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpireTagsActionTest
@@ -62,14 +66,23 @@ public void run() { | |||
if (createTime == null || timeRetained == null) { | |||
continue; | |||
} | |||
if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { | |||
if ((olderThanTime == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ((LocalDateTime.now().isAfter(createTime.plus(timeRetained)))
|| (olderThanTime != null && olderThanTime.isAfter(createTime)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For tag without createTime, can we just check its file creation time?
Add check file creation time for tag without createTime.
This if locial makes some adjustments, help me review it again
@@ -62,14 +66,23 @@ public void run() { | |||
if (createTime == null || timeRetained == null) { | |||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For tag without createTime, can we just check its file creation time?
@@ -164,6 +165,9 @@ default void deleteBranches(String branchNames) { | |||
@Experimental | |||
ExpireSnapshots newExpireChangelog(); | |||
|
|||
/** Expire tags. */ | |||
TagTimeExpire newExpireTags(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont introduce this method to Table, it is not so useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed. Removed it, just depends on TagAutoManager.
@JingsongLi Thanks for review, updated |
if (createTime == null) { | ||
FileStatus tagFileStatus; | ||
try { | ||
tagFileStatus = fileIO.getFileStatus(tagManager.tagPath(tagName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (timeRetained != null && olderThanTime != null), in this case, we get file status for this tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe if (createTime == null && olderThanTime != null)
?
if timeRetained != null, the tag has own createTime, no need to get file status.
} | ||
if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { | ||
Duration timeRetained = tag.getTagTimeRetained(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1、Method invocation 'plus' may produce 'NullPointerException',because timeRetained in Tag is nullable. So you should check and handle it.
2、Argument 'createTime' might be null,because createTime in Tag is nullable. So you should check and handle it.
3、Change line 81 if statement to :
boolean xxx = timeRetained != null && LocalDateTime.now().isAfter(createTime.plus(timeRetained));
boolean yyy = olderThanTime != null && olderThanTime.isAfter(createTime);
if (xxx || yyy) {
...
}
if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { | ||
Duration timeRetained = tag.getTagTimeRetained(); | ||
if ((timeRetained != null && LocalDateTime.now().isAfter(createTime.plus(timeRetained))) | ||
|| (olderThanTime != null && olderThanTime.isAfter(createTime))) { | ||
LOG.info( | ||
"Delete tag {}, because its existence time has reached its timeRetained of {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {} or" +
" its createTime {} is olderThan olderThanTime {}.",
tagName,
timeRetained,
createTime,
olderThanTime);
ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr) | ||
throws Catalog.TableNotExistException { | ||
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); | ||
FileStore fileStore = fileStoreTable.store(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline line 56 to line 57
/** A procedure to expire tags by time. */ | ||
public class ExpireTagsProcedure extends ProcedureBase { | ||
|
||
public static final String IDENTIFIER = "expire_tags"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
List<String> expired = tagTimeExpire.expire(); | ||
return expired.isEmpty() | ||
? new Row[] {Row.of("No expired tags.")} | ||
: expired.stream().map(x -> Row.of(x)).toArray(Row[]::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to : expired.stream().map(Row::of).toArray(Row[]::new);
/** Factory to create {@link ExpireTagsAction}. */ | ||
public class ExpireTagsActionFactory implements ActionFactory { | ||
|
||
public static final String IDENTIFIER = "expire_tags"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
List<Pair<Tag, String>> tags = tagManager.tagObjects(); | ||
FileIO fileIO = snapshotManager.fileIO(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline this to line 73:
tagFileStatus = snapshotManager.fileIO().getFileStatus(tagManager.tagPath(tagName));
tableIdent, | ||
table -> { | ||
FileStoreTable fileStoreTable = (FileStoreTable) table; | ||
FileStore fileStore = fileStoreTable.store(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline line 77 to line 79
Suggest change title to "Introduce expire_tags action and procedure with olderThan user custom time". |
@wwj6591812 thank you, updated |
Purpose
Introduce expire_tags procedure, support expire tags by time.
Tests
API and Format
Documentation