Skip to content

Commit

Permalink
[207]: Increased retry period for background job that sets the skip f…
Browse files Browse the repository at this point in the history
…lag up to 5 mins

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
  • Loading branch information
stevanbz committed Sep 22, 2022
1 parent 51d9bc7 commit 47b7a24
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
Expand Down Expand Up @@ -501,7 +501,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD,
LegacyOpenDistroManagedIndexSettings.SWEEP_SKIP_PERIOD,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
LegacyOpenDistroManagedIndexSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.threadpool.Scheduler
Expand All @@ -23,9 +25,10 @@ class PluginVersionSweepCoordinator(
private val skipExecution: SkipExecution,
settings: Settings,
private val threadPool: ThreadPool,
private val clusterService: ClusterService,
clusterService: ClusterService,
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
LifecycleListener() {
LifecycleListener(),
ClusterStateListener {
private val logger = LogManager.getLogger(javaClass)

private var scheduledSkipExecution: Scheduler.Cancellable? = null
Expand All @@ -38,6 +41,7 @@ class PluginVersionSweepCoordinator(

init {
clusterService.addLifecycleListener(this)
clusterService.addListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.SWEEP_SKIP_PERIOD) {
sweepSkipPeriod = it
initBackgroundSweepISMPluginVersionExecution()
Expand All @@ -52,34 +56,36 @@ class PluginVersionSweepCoordinator(
scheduledSkipExecution?.cancel()
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
skipExecution.sweepISMPluginVersion()
initBackgroundSweepISMPluginVersionExecution()
}
}

@OpenForTesting
fun initBackgroundSweepISMPluginVersionExecution() {
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return
// Cancel existing background sweep
scheduledSkipExecution?.cancel()
val scheduledJob = Runnable {
// Starting job without coroutine - in order to avoid thread leak error
try {
if (!skipExecution.flag) {
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
launch {
try {
if (!skipExecution.flag) {
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
}
} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
}
} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
}
}
scheduledSkipExecution =
threadPool.scheduleWithFixedDelay(scheduledJob,
TimeValue.timeValueMinutes(RETRY_PERIOD_IN_MINUTES),
ThreadPool.Names.MANAGEMENT)
threadPool.scheduleWithFixedDelay(scheduledJob, sweepSkipPeriod, ThreadPool.Names.MANAGEMENT)
}

private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true

companion object {
private const val RETRY_PERIOD_IN_MINUTES = 5L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
private val clusterService: ClusterService,
) : ClusterStateListener {
private val client: Client
) {
private val logger = LogManager.getLogger(javaClass)

@Volatile
Expand All @@ -35,16 +31,6 @@ class SkipExecution(
final var hasLegacyPlugin: Boolean = false
private set

init {
clusterService.addListener(this)
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
sweepISMPluginVersion()
}
}

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,6 @@ class LegacyOpenDistroManagedIndexSettings {
Setting.Property.Deprecated
)

val SWEEP_SKIP_PERIOD: Setting<TimeValue> = Setting.timeSetting(
"opendistro.index_state_management.coordinator.sweep_skip_period",
TimeValue.timeValueMinutes(10),
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic,
Setting.Property.Deprecated
)

val COORDINATOR_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
"opendistro.index_state_management.coordinator.backoff_millis",
TimeValue.timeValueMillis(50),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ManagedIndexSettings {

val SWEEP_SKIP_PERIOD: Setting<TimeValue> = Setting.timeSetting(
"plugins.index_state_management.coordinator.sweep_skip_period",
LegacyOpenDistroManagedIndexSettings.SWEEP_SKIP_PERIOD,
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)
)
val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()

Expand All @@ -75,7 +75,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,24 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution

class SkipExecutionTests : OpenSearchAllocationTestCase() {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
clusterService = Mockito.mock(ClusterService::class.java)
skip = SkipExecution(client, clusterService)
skip = SkipExecution(client)
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
skip.clusterChanged(event)
skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
}
}

0 comments on commit 47b7a24

Please sign in to comment.