Skip to content

Commit

Permalink
Adds plugin version sweep background job (opensearch-project#434)
Browse files Browse the repository at this point in the history
* [207]: Added 5 min scheduled job for sweeping ISM plugin version in the case of version discrepancy

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* [207]: Created pluginVersionSweepCoordinator component responsible for scheduling the skip execution task. Annotated tests in order to prevent thread leak error during integrational tests

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* [207]: Increased retry period for background job that sets the skip flag up to 5 mins

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* Empty-Commit

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Co-authored-by: Stevan Buzejic <buzejic.stevan@gmail.com>
  • Loading branch information
downsrob and stevanbz authored Oct 4, 2022
1 parent b09ec6a commit 4d844fa
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementH
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.PluginVersionSweepCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -370,7 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
Expand Down Expand Up @@ -428,6 +429,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)

val pluginVersionSweepCoordinator = PluginVersionSweepCoordinator(skipFlag, settings, threadPool, clusterService)

return listOf(
managedIndexRunner,
rollupRunner,
Expand All @@ -436,7 +439,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
managedIndexCoordinator,
indexStateManagementHistory,
indexMetadataProvider,
smRunner
smRunner,
pluginVersionSweepCoordinator
)
}

Expand All @@ -461,6 +465,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.SWEEP_SKIP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.threadpool.Scheduler
import org.opensearch.threadpool.ThreadPool

class PluginVersionSweepCoordinator(
private val skipExecution: SkipExecution,
settings: Settings,
private val threadPool: ThreadPool,
clusterService: ClusterService,
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
LifecycleListener(),
ClusterStateListener {
private val logger = LogManager.getLogger(javaClass)

private var scheduledSkipExecution: Scheduler.Cancellable? = null

@Volatile
private var sweepSkipPeriod = ManagedIndexSettings.SWEEP_SKIP_PERIOD.get(settings)

@Volatile
private var indexStateManagementEnabled = ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings)

init {
clusterService.addLifecycleListener(this)
clusterService.addListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.SWEEP_SKIP_PERIOD) {
sweepSkipPeriod = it
initBackgroundSweepISMPluginVersionExecution()
}
}

override fun afterStart() {
initBackgroundSweepISMPluginVersionExecution()
}

override fun beforeStop() {
scheduledSkipExecution?.cancel()
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
skipExecution.sweepISMPluginVersion()
initBackgroundSweepISMPluginVersionExecution()
}
}

@OpenForTesting
fun initBackgroundSweepISMPluginVersionExecution() {
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return
// Cancel existing background sweep
scheduledSkipExecution?.cancel()
val scheduledJob = Runnable {
launch {
try {
if (!skipExecution.flag) {
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
}
} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
}
}
}
scheduledSkipExecution =
threadPool.scheduleWithFixedDelay(scheduledJob, sweepSkipPeriod, ThreadPool.Names.MANAGEMENT)
}

private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,25 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
private val clusterService: ClusterService
) : ClusterStateListener {
private val client: Client
) {
private val logger = LogManager.getLogger(javaClass)

@Volatile final var flag: Boolean = false
@Volatile
final var flag: Boolean = false
private set

// To track if there are any legacy IM plugin nodes part of the cluster
@Volatile final var hasLegacyPlugin: Boolean = false
@Volatile
final var hasLegacyPlugin: Boolean = false
private set

init {
clusterService.addListener(this)
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
sweepISMPluginVersion()
}
}

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val SWEEP_SKIP_PERIOD: Setting<TimeValue> = Setting.timeSetting(
"plugins.index_state_management.coordinator.sweep_skip_period",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val COORDINATOR_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
"plugins.index_state_management.coordinator.backoff_millis",
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)
)
val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()

Expand All @@ -75,7 +75,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,24 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution

class SkipExecutionTests : OpenSearchAllocationTestCase() {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
clusterService = Mockito.mock(ClusterService::class.java)
skip = SkipExecution(client, clusterService)
skip = SkipExecution(client)
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
skip.clusterChanged(event)
skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
}
}

0 comments on commit 4d844fa

Please sign in to comment.