Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds plugin version sweep background job #434

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementH
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.PluginVersionSweepCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -370,7 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
Expand Down Expand Up @@ -428,6 +429,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)

val pluginVersionSweepCoordinator = PluginVersionSweepCoordinator(skipFlag, settings, threadPool, clusterService)

return listOf(
managedIndexRunner,
rollupRunner,
Expand All @@ -436,7 +439,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
managedIndexCoordinator,
indexStateManagementHistory,
indexMetadataProvider,
smRunner
smRunner,
pluginVersionSweepCoordinator
)
}

Expand All @@ -461,6 +465,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.SWEEP_SKIP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.threadpool.Scheduler
import org.opensearch.threadpool.ThreadPool

class PluginVersionSweepCoordinator(
private val skipExecution: SkipExecution,
settings: Settings,
private val threadPool: ThreadPool,
clusterService: ClusterService,
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
LifecycleListener(),
ClusterStateListener {
private val logger = LogManager.getLogger(javaClass)

private var scheduledSkipExecution: Scheduler.Cancellable? = null

@Volatile
private var sweepSkipPeriod = ManagedIndexSettings.SWEEP_SKIP_PERIOD.get(settings)

@Volatile
private var indexStateManagementEnabled = ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings)

init {
clusterService.addLifecycleListener(this)
clusterService.addListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.SWEEP_SKIP_PERIOD) {
sweepSkipPeriod = it
initBackgroundSweepISMPluginVersionExecution()
}
}

override fun afterStart() {
initBackgroundSweepISMPluginVersionExecution()
}

override fun beforeStop() {
scheduledSkipExecution?.cancel()
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
skipExecution.sweepISMPluginVersion()
initBackgroundSweepISMPluginVersionExecution()
}
}

@OpenForTesting
fun initBackgroundSweepISMPluginVersionExecution() {
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return
// Cancel existing background sweep
scheduledSkipExecution?.cancel()
val scheduledJob = Runnable {
launch {
try {
if (!skipExecution.flag) {
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
}
Comment on lines +75 to +80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to cancel this job or let it run forever?

} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
}
}
}
scheduledSkipExecution =
threadPool.scheduleWithFixedDelay(scheduledJob, sweepSkipPeriod, ThreadPool.Names.MANAGEMENT)
}

private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,25 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
private val clusterService: ClusterService
) : ClusterStateListener {
private val client: Client
) {
private val logger = LogManager.getLogger(javaClass)

@Volatile final var flag: Boolean = false
@Volatile
final var flag: Boolean = false
private set

// To track if there are any legacy IM plugin nodes part of the cluster
@Volatile final var hasLegacyPlugin: Boolean = false
@Volatile
final var hasLegacyPlugin: Boolean = false
private set

init {
clusterService.addListener(this)
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
sweepISMPluginVersion()
}
}

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val SWEEP_SKIP_PERIOD: Setting<TimeValue> = Setting.timeSetting(
"plugins.index_state_management.coordinator.sweep_skip_period",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val COORDINATOR_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
"plugins.index_state_management.coordinator.backoff_millis",
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)
)
val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()

Expand All @@ -75,7 +75,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,24 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution

class SkipExecutionTests : OpenSearchAllocationTestCase() {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
clusterService = Mockito.mock(ClusterService::class.java)
skip = SkipExecution(client, clusterService)
skip = SkipExecution(client)
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
skip.clusterChanged(event)
skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
}
}