对象资源池 ObjectPool 为Redis客户端、Rpc客户端等提供连接池能力。此外还实现了轻量级对象池 Pool<T>,无锁高性能。

Nuget包:NewLife.Core

源码地址:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Collections/ObjectPool.cs


池接口IPool

有借有还,IPool定义了获取Get和归还Put方法。

/// <summary>对象池接口</summary>
/// <typeparam name="T"></typeparam>
public interface IPool<T> where T : class
{
    /// <summary>对象池大小</summary>
    Int32 Max { get; set; }

    /// <summary>获取</summary>
    /// <returns></returns>
    T Get();

    /// <summary>归还</summary>
    /// <param name="value"></param>
    Boolean Put(T value);

    /// <summary>清空</summary>
    Int32 Clear();
}


对象池ObjectPool

ObjectPool 是比较完整的对象池设计,具有所需各种功能,包括定时清理闲置资源。

Redis客户端内维持了多个到服务器的连接,用RedisClient表示,需要用一个对象池来管理,每次读写操作时取一个连接出来使用,用完以后还回去。

Redis连接池实现如下,每次借出时,如果池里RedisClient对象不够,则调用外部OnCreate创建连接对象。同时,借出对象前,做一次Reset清理操作,避免因上一次失败而导致残留返回数据影响这一次操作。

private class MyPool : ObjectPool<RedisClient>
{
    public Redis Instance { get; set; }

    protected override RedisClient OnCreate() => Instance.OnCreate();

    protected override Boolean OnGet(RedisClient value)
    {
        // 借出时清空残留
        value?.Reset();

        return base.OnGet(value);
    }
}

private MyPool _Pool;
/// <summary>连接池</summary>
public IPool<RedisClient> Pool
{
    get
    {
        if (_Pool != null) return _Pool;
        lock (this)
        {
            if (_Pool != null) return _Pool;

            var pool = new MyPool
            {
                Name = Name + "Pool",
                Instance = this,
                Min = 2,
                Max = 1000,
                IdleTime = 20,
                AllIdleTime = 120,
                Log = Log,
            };

            return _Pool = pool;
        }
    }
}

这里我们看到,连接池设置了最小2个连接,最大1000个连接。空闲时间20秒,超过该空闲时间的连接将被销毁释放,直到留下最后2个。完全空闲时间120秒,把最后2个连接都干掉。

再来看看Redis连接池用法,对实际情况作了一些简化:

/// <summary>执行命令</summary>
/// <typeparam name="TResult">返回类型</typeparam>
/// <param name="key">命令key,用于选择集群节点</param>
/// <param name="func">回调函数</param>
/// <param name="write">是否写入操作</param>
/// <returns></returns>
public virtual TResult Execute<TResult>(String key, Func<RedisClient, TResult> func, Boolean write = false)
{
    // 统计性能
    var sw = Counter?.StartCount();

        // 每次重试都需要重新从池里借出连接
        var client = Pool.Get();
        try
        {
            client.Reset();
            return func(client);
        }
        finally
        {
            Pool.Put(client);

            Counter?.StopCount(sw);
        }
}


轻量级对象池Pool

Pool<T> 借助数组实现,无锁操作,具有超高性能,适用于高频调用场合,常用于规避GC。Pool 没有过期机制,只有最基本的借还。

内置了两个最常见的池,分别是用于字符串拼接的StringBuilder池和用于内存流处理的MemoryStream池,主要目的就是减少内存分配,减少GC。


StringBuilder池

/// <summary>字符串构建器池</summary>
public static IPool<StringBuilder> StringBuilder { get; set; } = new StringBuilderPool();

/// <summary>归还一个字符串构建器到对象池</summary>
/// <param name="sb"></param>
/// <param name="requireResult">是否需要返回结果</param>
/// <returns></returns>
public static String Put(this StringBuilder sb, Boolean requireResult = false)
{
    if (sb == null) return null;

    var str = requireResult ? sb.ToString() : null;

    Pool.StringBuilder.Put(sb);

    return str;
}

/// <summary>字符串构建器池</summary>
public class StringBuilderPool : Pool<StringBuilder>
{
    /// <summary>初始容量。默认100个</summary>
    public Int32 InitialCapacity { get; set; } = 100;

    /// <summary>最大容量。超过该大小时不进入池内,默认4k</summary>
    public Int32 MaximumCapacity { get; set; } = 4 * 1024;

    /// <summary>创建</summary>
    /// <returns></returns>
    protected override StringBuilder OnCreate() => new StringBuilder(InitialCapacity);

    /// <summary>归还</summary>
    /// <param name="value"></param>
    /// <returns></returns>
    public override Boolean Put(StringBuilder value)
    {
        if (value.Capacity > MaximumCapacity) return false;

        value.Clear();

        return true;
    }
}

在RedisClient中多次用到StringBuilder池,主要用于解析返回数据时,需要逐个字符拼接,不方便每次都分配新的StringBuilder实例,那样会带来很大GC压力。

private static String ReadLine(Stream ms)
{
    var sb = Pool.StringBuilder.Get();
    while (true)
    {
        var b = ms.ReadByte();
        if (b < 0) break;

        if (b == '\r')
        {
            var b2 = ms.ReadByte();
            if (b2 < 0) break;

            if (b2 == '\n') break;

            sb.Append((Char)b);
            sb.Append((Char)b2);
        }
        else
            sb.Append((Char)b);
    }

    return sb.Put(true);
}


MemoryStream池

/// <summary>内存流池</summary>
public static IPool<MemoryStream> MemoryStream { get; set; } = new MemoryStreamPool();

/// <summary>归还一个内存流到对象池</summary>
/// <param name="ms"></param>
/// <param name="requireResult">是否需要返回结果</param>
/// <returns></returns>
public static Byte[] Put(this MemoryStream ms, Boolean requireResult = false)
{
    if (ms == null) return null;

    var buf = requireResult ? ms.ToArray() : null;

    Pool.MemoryStream.Put(ms);

    return buf;
}

/// <summary>内存流池</summary>
public class MemoryStreamPool : Pool<MemoryStream>
{
    /// <summary>初始容量。默认1024个</summary>
    public Int32 InitialCapacity { get; set; } = 1024;

    /// <summary>最大容量。超过该大小时不进入池内,默认64k</summary>
    public Int32 MaximumCapacity { get; set; } = 64 * 1024;

    /// <summary>创建</summary>
    /// <returns></returns>
    protected override MemoryStream OnCreate() => new MemoryStream(InitialCapacity);

    /// <summary>归还</summary>
    /// <param name="value"></param>
    /// <returns></returns>
    public override Boolean Put(MemoryStream value)
    {
        if (value.Capacity > MaximumCapacity) return false;

        value.Position = 0;
        value.SetLength(0);

        return true;
    }
}

在RedisClient中有使用内存池,处理请求命令到内存流中,然后整体写入到网络流ns。如果不使用内存流池,就会导致每一次Redis操作都需要分配一个MemoryStream对象,用完后释放等待GC回收。在Redis高并发操作时,将会给GC系统带来很大压力。

var ms = Pool.MemoryStream.Get();
GetRequest(ms, cmd, args, oriArgs);

if (ms.Length > 0) ms.WriteTo(ns);
ms.Put();


总结

Redis 是对象池最大用户,除了ObjectPool,还有两个内置轻量级池都用到了。

此处为文档,点击链接查看:https://newlifex.com/go/doc/5966794


作者:大石头 发布:2021-02-19 00:22:52 浏览:658)