Skip to content

Commit

Permalink
fix proccess accounts function to be more effecient
Browse files Browse the repository at this point in the history
  • Loading branch information
nullpointer0x00 committed Oct 1, 2024
1 parent fc1d8b6 commit 699c43e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import io.provenance.explorer.model.base.USD_UPPER
import io.provenance.explorer.model.download.TxHistoryChartData
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.withPermit
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.andWhere
import org.jetbrains.exposed.sql.innerJoin
Expand Down Expand Up @@ -405,14 +406,14 @@ class AccountService(
}

suspend fun updateTokenCounts(addr: String) {
try {
val nftCount = metadataClient.getScopesByOwnerTotal(addr)
val ftCount = getBalances(addr, 0, 1).pagination.total.toInt()
AccountTokenCountRecord.upsert(addr, ftCount, nftCount)
} catch (e: Exception) {
logger.error("Failed to update token counts for $addr : ${e.message}")
try {
val nftCount = metadataClient.getScopesByOwnerTotal(addr)
val ftCount = getBalances(addr, 0, 1).pagination.total.toInt()
AccountTokenCountRecord.upsert(addr, ftCount, nftCount)
} catch (e: Exception) {
logger.error("Failed to update token counts for $addr : ${e.message}")
}
}
}
}

fun String.getAccountType() = this.split(".").last()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,32 +445,25 @@ class ScheduledTaskService(
}

@Scheduled(initialDelay = 5000L, fixedDelay = 5000L)
fun startAccountProcess() = runBlocking {
ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT)
val producer = startAccountProcess()
repeat(5) { accountProcessor(producer) }
fun startAccountProcess() {
processAccountRecords()
}

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.startAccountProcess() = produce {
while (true) {
ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT).firstOrNull()?.let {
fun processAccountRecords() {
ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT)
val records = ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT)
runBlocking {
for (record in records) {
try {
transaction { it.apply { this.processing = true } }
send(it.processValue)
transaction { record.apply { this.processing = true } }
accountService.updateTokenCounts(record.processValue)
ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, record.processValue)
} catch (_: Exception) {
}
}
}
}

fun CoroutineScope.accountProcessor(channel: ReceiveChannel<String>) = launch(Dispatchers.IO) {
for (msg in channel) {
accountService.updateTokenCounts(msg)
ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, msg)
}
}

@Scheduled(cron = "0 0 0 * * *") // Every beginning of every day
fun calculateValidatorMetrics() {
val (year, quarter) = DateTime.now().minusMinutes(5).let { it.year to it.monthOfYear.monthToQuarter() }
Expand Down

0 comments on commit 699c43e

Please sign in to comment.