Skip to content

Commit

Permalink
483: Updated detekt plugin and snakeyaml dependency. Updated a code t… (
Browse files Browse the repository at this point in the history
opensearch-project#485)

* 483: Updated detekt plugin and snakeyaml dependency. Updated a code to reduce the number of issues after static analysis

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* 483: Updated snakeyaml version to use the latest

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
  • Loading branch information
stevanbz authored Sep 1, 2022
1 parent 7475cfd commit ed6ed10
Show file tree
Hide file tree
Showing 21 changed files with 67 additions and 58 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ buildscript {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}"
classpath "org.jetbrains.kotlin:kotlin-allopen:${kotlin_version}"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.17.1"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.21.0"
classpath "org.jacoco:org.jacoco.agent:0.8.7"
}
}
Expand Down Expand Up @@ -95,7 +95,7 @@ configurations.all {
force 'org.apache.httpcomponents.client5:httpclient5:5.0.3'
force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3'
force 'com.fasterxml.jackson.core:jackson-databind:2.10.4'
force 'org.yaml:snakeyaml:1.26'
force 'org.yaml:snakeyaml:1.31'
force 'org.codehaus.plexus:plexus-utils:3.0.24'
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retr
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
Expand Down Expand Up @@ -280,15 +280,15 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
indexManagementExtensions.forEach { extension ->
val extensionName = extension.getExtensionName()
if (extensionName in extensions) {
throw IllegalStateException("Multiple extensions of IndexManagement have same name $extensionName - not supported")
error("Multiple extensions of IndexManagement have same name $extensionName - not supported")
}
extension.getISMActionParsers().forEach { parser ->
ISMActionsParser.instance.addParser(parser, extensionName)
}
indexMetadataServices.add(extension.getIndexMetadataService())
extension.overrideClusterStateIndexUuidSetting()?.let {
if (customIndexUUIDSetting != null) {
throw IllegalStateException(
error(
"Multiple extensions of IndexManagement plugin overriding ClusterStateIndexUUIDSetting - not supported"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ data class Channel(val id: String) : ToXContent, Writeable {
when (fieldName) {
ID -> id = xcp.text()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing Channel destination")
error("Unexpected field: $fieldName, while parsing Channel destination")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed
import org.opensearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.contentParser
import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.migration
package org.opensearch.indexmanagement.indexstatemanagement.migration

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import java.lang.IllegalStateException

/**
* A value object that represents a Chime message. Chime message will be
Expand Down Expand Up @@ -58,7 +57,7 @@ data class Chime(val url: String) : ToXContent, Writeable {
when (fieldName) {
URL -> url = xcp.text()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing Chime destination")
error("Unexpected field: $fieldName, while parsing Chime destination")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import java.lang.IllegalStateException

/**
* A value object that represents a Custom webhook message. Webhook message will be
Expand Down Expand Up @@ -121,7 +120,7 @@ data class CustomWebhook(
USERNAME_FIELD -> username = xcp.textOrNull()
PASSWORD_FIELD -> password = xcp.textOrNull()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing custom webhook destination")
error("Unexpected field: $fieldName, while parsing custom webhook destination")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import java.lang.IllegalStateException

/**
* A value object that represents a Slack message. Slack message will be
Expand Down Expand Up @@ -58,7 +57,7 @@ data class Slack(val url: String) : ToXContent, Writeable {
when (fieldName) {
URL -> url = xcp.text()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing Slack destination")
error("Unexpected field: $fieldName, while parsing Slack destination")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
private fun getOriginalSettings(indexName: String, clusterService: ClusterService): Map<String, String> {
val indexSettings = clusterService.state().metadata.index(indexName).settings
val originalSettings = mutableMapOf<String, String>()
indexSettings.get(ROUTING_SETTING)?.let { it -> originalSettings.put(ROUTING_SETTING, it) }
indexSettings.get(SETTING_BLOCKS_WRITE)?.let { it -> originalSettings.put(SETTING_BLOCKS_WRITE, it) }
indexSettings.get(ROUTING_SETTING)?.let { originalSettings.put(ROUTING_SETTING, it) }
indexSettings.get(SETTING_BLOCKS_WRITE)?.let { originalSettings.put(SETTING_BLOCKS_WRITE, it) }
return originalSettings
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class RollupIndexer(
is InternalMin -> aggResults[it.name] = it.value
is InternalValueCount -> aggResults[it.name] = it.value
is InternalAvg -> aggResults[it.name] = it.value
else -> throw IllegalStateException("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]")
else -> error("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]")
}
}
mapOfKeyValues.putAll(aggResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ object RollupRunner :
) {
client.suspendUntil { listener: ActionListener<GetRollupResponse> ->
execute(GetRollupAction.INSTANCE, GetRollupRequest(updatableJob.id, null, "_local"), listener)
}.rollup ?: throw IllegalStateException("Unable to get rollup job")
}.rollup ?: error("Unable to get rollup job")
}
}
is RollupResult.Failure -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ inline fun <reified T> Rollup.findMatchingMetricField(field: String): String {
}
}
}
throw IllegalStateException("Did not find matching rollup metric")
error("Did not find matching rollup metric")
}

@Suppress("NestedBlockDepth", "ComplexMethod")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ object SMRunner :
return this
}

private const val MAX_NUMBER_OF_RETRIES = 3
private const val EXPONENTIAL_BACKOFF_MILLIS = 1000L

private val backoffPolicy: BackoffPolicy = BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(1000L), 3
TimeValue.timeValueMillis(EXPONENTIAL_BACKOFF_MILLIS), MAX_NUMBER_OF_RETRIES
)

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class SMStateMachine(
val retryCount: Int
if (retry == null) {
log.warn("Starting to retry state [$currentState], remaining count 3.")
metadataBuilder.setRetry(3) // TODO SM 3 retry count could be customizable
metadataBuilder.setRetry(MAX_NUMBER_OF_RETRIES) // TODO SM 3 retry count could be customizable
} else {
retryCount = retry.count - 1
if (retryCount > 0) {
Expand Down Expand Up @@ -218,7 +218,8 @@ class SMStateMachine(

// TODO SM save a copy to history
}
private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(EXPONENTIAL_BACKOFF_MILLIS), MAX_NUMBER_OF_RETRIES)

/**
* Handle the policy change before job running
Expand All @@ -241,4 +242,9 @@ class SMStateMachine(
}
return this
}

companion object {
private const val MAX_NUMBER_OF_RETRIES = 3
private const val EXPONENTIAL_BACKOFF_MILLIS = 250L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object CreatingState : State {
if (snapshotName == null) {
val getSnapshotsResult = client.getSnapshots(
job, job.policyName + "*", metadataBuilder,
log, null, getSnapshotsErrorMessage(),
log, null, SNAPSHOT_ERROR_MESSAGE,
)
metadataBuilder = getSnapshotsResult.metadataBuilder
if (getSnapshotsResult.failed) {
Expand Down Expand Up @@ -86,10 +86,10 @@ object CreatingState : State {

private fun handleException(ex: Exception, snapshotName: String, metadataBuilder: SMMetadata.Builder, log: Logger): SMResult {
if (ex is ConcurrentSnapshotExecutionException) {
log.error(getConcurrentSnapshotMessage(), ex)
log.error(CONCURRENT_SNAPSHOT_MESSAGE, ex)
metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.RETRYING,
message = getConcurrentSnapshotMessage(),
message = CONCURRENT_SNAPSHOT_MESSAGE,
)
return SMResult.Stay(metadataBuilder)
}
Expand All @@ -102,10 +102,10 @@ object CreatingState : State {
return SMResult.Fail(metadataBuilder, WorkflowType.CREATION)
}

fun getConcurrentSnapshotMessage() = "Concurrent snapshot exception happened, retrying..."
const val CONCURRENT_SNAPSHOT_MESSAGE = "Concurrent snapshot exception happened, retrying..."
private fun getSnapshotCreationStartedMessage(snapshotName: String) =
"Snapshot $snapshotName creation has been started and waiting for completion."
private fun getSnapshotsErrorMessage() =
private const val SNAPSHOT_ERROR_MESSAGE =
"Caught exception while getting snapshots to decide if snapshot has been created in previous execution period."
private fun getCreateSnapshotErrorMessage(snapshotName: String) =
"Caught exception while creating snapshot $snapshotName."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ object DeletingState : State {

private fun handleException(ex: Exception, snapshotsToDelete: List<String>, metadataBuilder: SMMetadata.Builder, log: Logger): SMResult {
if (ex is ConcurrentSnapshotExecutionException) {
log.error(CreatingState.getConcurrentSnapshotMessage(), ex)
log.error(CreatingState.CONCURRENT_SNAPSHOT_MESSAGE, ex)
metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.RETRYING,
message = CreatingState.getConcurrentSnapshotMessage(),
message = CreatingState.CONCURRENT_SNAPSHOT_MESSAGE,
)
return SMResult.Stay(metadataBuilder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ data class SMPolicy(
) : ScheduledJobParameter, Writeable {

init {
require(snapshotConfig["repository"] != null && snapshotConfig["repository"] != "") { "Must provide the repository in snapshot config." }
require(creation.schedule.getNextExecutionTime(now()) != null) { "Next execution time from the creation schedule is null, please provide a valid cron expression." }
require(deletion == null || (deletion.schedule.getNextExecutionTime(now()) != null)) { "Next execution time from the deletion schedule is null, please provide a valid cron expression." }
require(snapshotConfig["repository"] != null && snapshotConfig["repository"] != "") {
"Must provide the repository in snapshot config."
}
require(creation.schedule.getNextExecutionTime(now()) != null) {
"Next execution time from the creation schedule is null, please provide a valid cron expression."
}
require(deletion == null || (deletion.schedule.getNextExecutionTime(now()) != null)) {
"Next execution time from the deletion schedule is null, please provide a valid cron expression."
}
}

// This name is used by the job scheduler and needs to match the id to avoid namespace conflicts with ISM policies sharing the same name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.opensearch.indexmanagement.transform.model.TransformValidationResult
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.monitor.jvm.JvmService
import org.opensearch.transport.RemoteTransportException
import java.lang.IllegalStateException

@Suppress("SpreadOperator", "ReturnCount", "ThrowsCount")
class TransformValidator(
Expand Down Expand Up @@ -93,9 +92,7 @@ class TransformValidator(
private suspend fun validateIndex(index: String, transform: Transform): List<String> {
val request = GetMappingsRequest().indices(index)
val result: GetMappingsResponse =
client.admin().indices().suspendUntil { getMappings(request, it) } ?: throw IllegalStateException(
"GetMappingResponse for [$index] was null"
)
client.admin().indices().suspendUntil { getMappings(request, it) } ?: error("GetMappingResponse for [$index] was null")
return validateMappingsResponse(index, result, transform)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,11 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// assuming our ingestion is randomly split between the 20 primary shards
// then 250kb/20 gives around 12.5kb per primary shard which is below our 100kb condition
val KB_250 = 250_000
val kb250 = 250_000
var primaryStoreSizeBytes = 0
var count = 0
// Ingest data into the test index until the total size of the index is greater than our min primary size condition
while (primaryStoreSizeBytes < KB_250) {
while (primaryStoreSizeBytes < kb250) {
// this count should never get as high as 10... if it does just fail the test
if (count++ > 10) fail("Something is wrong with the data ingestion for testing rollover condition")
insertSampleData(index = firstIndex, docCount = 20, jsonString = "{ \"test_field\": \"${OpenSearchTestCase.randomAlphaOfLength(7000)}\" }", delay = 0)
Expand Down Expand Up @@ -271,12 +271,12 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
assertThat("Did not have min size current", minPrimarySize["current"], isA(String::class.java))
}

val KB_150 = 150_000
val kb150 = 150_000
var primaryShardSizeBytes = 0
count = 0
// Ingest data into the test index using custom routing so it always goes to a single shard until the size of the
// primary shard is over 150kb
while (primaryShardSizeBytes < KB_150) {
while (primaryShardSizeBytes < kb150) {
// this count should never get as high as 10... if it does just fail the test
if (count++ > 10) fail("Something is wrong with the data ingestion for testing rollover condition")
insertSampleData(index = firstIndex, docCount = 20, delay = 0, jsonString = "{ \"test_field\": \"${OpenSearchTestCase.randomAlphaOfLength(7000)}\" }", routing = "custom_routing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.test.ClusterServiceUtils
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.threadpool.Scheduler
Expand Down
Loading

0 comments on commit ed6ed10

Please sign in to comment.