Skip to content

antonyoni/PsThreading

Repository files navigation

PowerShell Threading Module

This module is designed to simplify multi-threading in PowerShell. PowerShell jobs are a great way to work with background tasks, but lack the necessary throttling and resource-sharing mechanisms. There is also a performance advantage to using a RunspacePool over jobs, as detailed in this blog post.

Threading with this module

Use the New-Thread function to create a thread template:

$ScriptBlock = {
    Param($ThreadId, $WorkQueue)
    $item = ""
    while ($WorkQueue.TryDequeue([ref]$item)) {
        # do work here
        Write-Output "$ThreadId -> $item"
    }
}

$worker = New-Thread -ScriptBlock $ScriptBlock -Number $PsThreading.Utility.LogicalCpus

Then use the Invoke-ThreadPool function to create and execute the threads, and wait (poll) for them to complete:

$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
1..10000 | % { $workQueue.Enqueue("item number $_") }

$params = @{
    WorkQueue = $workQueue
}

Invoke-ThreadPool -Thread $worker -Parameters $params

Helper functions

There is a helper function Split-FileToStream that creates n MemoryStreams from a file using the nearest delimiter. By default, it will split a file into the number of logical CPUs by finding the nearest new line.

$Path = '\path\to\large-file.csv'

$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
Split-FileToStream -Path $Path | % { $workQueue.Enqueue($_) }

The PsThreading module variable

The $PsThreading variable contains two helper properties:

$PsThreading.Utility.CpuCores     # Number of physical cores
$PsThreading.Utility.LogicalCpus  # Number of logical CPUs

and some sample parameter sets and thread script blocks for various threading patterns:

$PsThreading.Parameter.WorkerOnly
$PsThreading.Parameter.ProducerConsumer
$PsThreading.Parameter.ProducerWorkerWriter

$PsThreading.Thread.Consumer
$PsThreading.Thread.Producer
$PsThreading.Thread.Worker
$PsThreading.Thread.Writer

Have a look at the PsThreading.Patterns.ps1 and PsThreading.Patterns.Tests.ps1 files for some example implementations.

Examples

Producer - Consumer example where the producer uses the Split-FileToStream function to create file chunks for the consumer threads to process.

$FileToProcess   = '\path\to\large-file.csv'
$PsThreadingPath = '\path\to\PsThreading'

$threads = $PsThreading.Utility.LogicalCpus
$params  = $PsThreading.Parameter.ProducerConsumer

$params.Settings['FileToProcess'] = $FileToProcess
$params.Settings['NumberToProduce'] = $threads

$params.ResultSet = New-Object 'System.Collections.Concurrent.ConcurrentDictionary`2[string,int]'

$producer = New-Thread -Type "Producer" -Weight 100 -ScriptBlock {
    Param($ThreadId, $Settings, $WorkQueue)
    $path  = $Settings['FileToProcess']
    $split = $Settings['NumberToProduce']
    Split-FileToStream -Path $path -SplitNumber $split | % {
        $WorkQueue.Enqueue($_)
    }
    $Settings['ProducerIsDone'] = $true
}

$consumer = New-Thread -Type "Consumer" -Number $threads -ScriptBlock {
    Param($ThreadId, $WorkQueue, $Settings, $ResultSet)
    $item = ""
    while (!$Settings['ProducerIsDone'] -or $WorkQueue.Count -gt 0) {
        if ($WorkQueue.TryDequeue([ref]$item)) {
            $reader = New-Object System.IO.StreamReader($item.Stream)
            $count  = 0
            while (($line = $reader.ReadLine()) -ne $null) {
                $count++
            }
            $ResultSet.AddOrUpdate($ThreadId, $count, { param($key, $val) $val + $count }) | Out-Null
        } else {
            Start-Sleep -Milliseconds 10
        }
    }
}

Invoke-ThreadPool -Thread $producer, $consumer -Parameters $params -PathsToImport $PsThreadingPath

$params.ResultSet

License

Creative Commons License This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.

About

PowerShell multi-threading module.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published