Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Large Buffers in MigrateSession #623

Open
wants to merge 36 commits into
base: main
Choose a base branch
from

Conversation

vazois
Copy link
Contributor

@vazois vazois commented Aug 29, 2024

This PR tries to improve memory utilization and reduce fragmentation by reusing large buffers that were allocated across different migrate sessions.
The PR includes the following:

  • Augment the network stack to support using separate send and receive buffer size allocation by providing separate buffer pool objects
  • Declare a shared NetworkBuffers object in MigrationManager and ReplicationManager in order to re-use allocated buffers across different scenarios.
  • Utilize the shared NetworkBuffers object to allocate buffer space for managing keys that actively being migrated.
  • Added PURGEBP command. Issuing PURGEBP [MM|RM] will attempt to release any buffer not being used in LFBP of the migration manager or replication manager.
  • Added INFO BPSTATS to list information about the shared buffer pool of the migration and replication managers.

Notes:

  • There is an upper limit on the number of entries per level in the LimitedBufferPool which may cause fragmentation of the LOH due to the way we allocate and return buffers to the pool itself (shown below)

    if (Interlocked.Increment(ref pool[level].size) <= maxEntriesPerLevel)
    {
    Array.Clear(buffer.entry, 0, buffer.entry.Length);
    pool[level].items.Enqueue(buffer);
    }
    else
    Interlocked.Decrement(ref pool[level].size);
    The default limit is 16 entries which should be enough for common scenarios (i.e. up to 16 parallel migrate sessions and up to 16 replication sessions).

  • Separate buffer pool from send and receive spec.

  • Remove unused parts of NetworkSenderBase.

  • Resize allocation for send/receive of replication code.

@vazois vazois force-pushed the vazois/migration-reuse-buffers branch 4 times, most recently from 29aa345 to 146da1a Compare September 4, 2024 20:26
@vazois vazois force-pushed the vazois/migration-reuse-buffers branch 3 times, most recently from e13374e to 9d65b87 Compare September 10, 2024 02:23
@vazois vazois marked this pull request as ready for review September 10, 2024 16:35
@vazois vazois marked this pull request as draft September 10, 2024 17:24
@vazois vazois force-pushed the vazois/migration-reuse-buffers branch 3 times, most recently from 18e9c50 to d34f522 Compare September 12, 2024 00:52
@vazois vazois marked this pull request as ready for review September 12, 2024 00:52
@vazois vazois force-pushed the vazois/migration-reuse-buffers branch 2 times, most recently from 2ab3868 to c362324 Compare September 17, 2024 23:34
@vazois vazois force-pushed the vazois/migration-reuse-buffers branch from c362324 to dfc86d7 Compare September 18, 2024 17:10
@@ -224,15 +225,15 @@ private void PopulateObjectStoreStats(StoreWrapper storeWrapper)
];
}

public void PopulateStoreHashDistribution(StoreWrapper storeWrapper) => storeHashDistrInfo = [new("", storeWrapper.store.DumpDistribution())];
private void PopulateStoreHashDistribution(StoreWrapper storeWrapper) => storeHashDistrInfo = [new("", storeWrapper.store.DumpDistribution())];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to explicitly call out "private" members in the codebase.

/// <summary>
/// Create a NetworkBuffers instance
/// </summary>
public struct NetworkBuffers
Copy link
Contributor

@badrishc badrishc Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct does not do much, can create challenges with single-copy as it is a struct with value semantics, creates another level of indirection everywhere, and generally pervades across the codebase. The main thing it does is to use two allocation sizes, which can be maintained by callers or within LFBP itself.

Copy link
Contributor

@badrishc badrishc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments.

/// <summary>
/// Used to free up buffer pool
/// </summary>
public void Purge() => networkBuffers.Purge();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be dangerous to call potentially mutating methods (purge sounds mutating, although its implementation here is safe as it probably passes through into the LFBP class). As this could be a potential copy of the struct, it can create subtle bugs, as well as correctness and maintenance challenges.

@@ -212,7 +211,7 @@ public override void Dispose()
{
networkSender.ReturnResponseObject();
networkHandler?.Dispose();
networkPool.Dispose();
networkBuffers.Dispose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't really reliably dispose structs.

@@ -77,6 +77,8 @@ private GarnetClient CreateConnection(string nodeId)
address,
port,
clusterProvider.serverOptions.TlsOptions?.TlsClientOptions,
sendPageSize: 1 << 17,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the 3rd time (so far) I've seen the magic # 17, so best to make it a const int somewhere

client = new GarnetClientSession(
address,
port,
new(Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just new() by itself (with or without args) requires an extra step to figure it out. Better to prefix with param name or include the type.

this.clusterProvider = clusterProvider;
var bufferSize = 1 << clusterProvider.serverOptions.PageSizeBits();
this.networkBufferSettings = new NetworkBufferSettings(bufferSize, 1 << 12);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does 12 come from? These magic numbers should be a "const long" or a config

@@ -46,8 +46,10 @@ public AofSyncTaskInfo(

public void Dispose()
{
iter?.Dispose();
cts.Cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a ?

this.clusterProvider = clusterProvider;
this.storeWrapper = clusterProvider.storeWrapper;

this.networkBufferSettings = new NetworkBufferSettings(1 << 22, 1 << 12);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic numberrrrrrrrs

@@ -61,8 +69,8 @@ public void Return(PoolEntry buffer)
Interlocked.Decrement(ref pool[level].size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs commenting. My first thought on seeing .size was to wonder why we can't just use the queue.Count method. Now it looks like .size is total number of allocations? The PoolLevel field name comments are uninformative.

{
#if HANGDETECT
if (++count % 10000 == 0)
logger?.LogTrace("Dispose iteration {count}, {activeHandlerCount}", count, activeHandlerCount);
#endif
Thread.Yield();
}
for (int i = 0; i < numLevels; i++)
for (var i = 0; i < numLevels; i++)
{
if (pool[i] == null) continue;
while (pool[i].size > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly that .size is total # of allocations which may be > maxEntriesPerLevel, then this can spin forever if there are unReturned items. This might warrant an Assert or logging with an exit. (Holding an unReturned item and calling Dispose() (and Purge()?) is bad anyway, so let's make it easier to catch).

/// <summary>
/// Initial allocation size for receive network buffer.
/// (NOTE: Receive buffers can automatically grow to accomodate larger payloads.)
/// </summary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comments. "Accommodate"

/// MigrationManager Buffer Pool
/// </summary>
MM,
/// <summary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Migration, Replication, ServerSocket?

internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
private bool NetworkBurgeBP()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PurgeBP"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants