Go语言之并发示例-Pool(二)

针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。

瑞金ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!

package commonimport (
    "errors"
    "io"
    "sync"
    "log")//一个安全的资源池,被管理的资源必须都实现io.Close接口type Pool struct {
    m       sync.Mutex
    res     chan io.Closer
    factory func() (io.Closer, error)
    closed  bool}var ErrPoolClosed = errors.New("资源池已经被关闭。")//创建一个资源池func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 { 
           return nil, errors.New("size的值太小了。")
    }    
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil}//从资源池里获取一个资源func (p *Pool) Acquire() (io.Closer,error) { 
   select {
       case r,ok := <-p.res:
        log.Println("Acquire:共享资源")       
        if !ok {
                    return nil,ErrPoolClosed
        }        
        return r,nil
    default:
        log.Println("Acquire:新生成资源")        
        return p.factory()
    }}
 //关闭资源池,释放资源func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()    
    if p.closed { 
           return
    }

    p.closed = true

    //关闭通道,不让写入了
    close(p.res)    //关闭通道里的资源
    for r:=range p.res {
        r.Close()
    }}func (p *Pool) Release(r io.Closer){
    //保证该操作和Close方法的操作是安全的
    p.m.Lock()    
    defer p.m.Unlock()   

     //资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {
        r.Close()        
        return 
    }    
    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")    
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()
    }
}


好了,资源池管理写好了,也知道资源池是如何实现的啦,现在我们看看如何使用这个资源池,模拟一个数据库连接池吧。

package mainimport (
    "flysnow.org/hello/common"
    "io"
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time")const (    
    //模拟的最大goroutine
    maxGoroutine = 5
    //资源池的大小
    poolRes      = 2)func main() {    
    //等待任务完成
    var wg sync.WaitGroup
    wg.Add(maxGoroutine)

    p, err := common.New(createConnection, poolRes)    

    if err != nil {
        log.Println(err)        
        return
    }    
    //模拟好几个goroutine同时使用资源池查询数据
    for query := 0; query < maxGoroutine; query++ { 
       go func(q int) {
            dbQuery(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()
    log.Println("开始关闭资源池")
    p.Close()}//模拟数据库查询func dbQuery(query int, pool *common.Pool) {
    conn, err := pool.Acquire()
    if err != nil {
        log.Println(err)       
         return
    }    
    defer pool.Release(conn)    
    //模拟查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)}//数据库连接type dbConnection struct {
    ID int32//连接的标志}//实现io.Closer接口func (db *dbConnection) Close() error {
    log.Println("关闭连接", db.ID)    
    return nil}var idCounter int32//生成数据库连接的方法,以供资源池使用func createConnection() (io.Closer, error) {   
    //并发安全,给数据库连接生成唯一标志
    id := atomic.AddInt32(&idCounter, 1)    
    return &dbConnection{id}, nil
}


这时我们测试使用资源池的例子,首先定义了一个结构体dbConnection,它只有一个字段,用来做唯一标记。然后dbConnection实现了io.Closer接口,这样才可以使用我们的资源池。


createConnection函数对应的是资源池中的factory字段,用来创建数据库连接dbConnection的,同时为其赋予了一个为止的标志。


接着我们就同时开了 5 个goroutine,模拟并发的数据库查询dbQuery,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。


这里我们会创建 5 个数据库连接,但是我们设置的资源池大小只有 2 ,所以再释放了 2 个连接后,后面的 3 个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。


最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的Close方法即可。


2017/04/1722:25:08Acquire:新生成资源
2017/04/1722:25:08Acquire:新生成资源
2017/04/1722:25:08Acquire:新生成资源
2017/04/1722:25:08Acquire:新生成资源
2017/04/1722:25:08Acquire:新生成资源
2017/04/1722:25:08第2个查询,使用的是ID为4的数据库连接
2017/04/1722:25:08资源释放到池子里了
2017/04/1722:25:08第4个查询,使用的是ID为1的数据库连接
2017/04/1722:25:08资源释放到池子里了
2017/04/1722:25:08第3个查询,使用的是ID为5的数据库连接
2017/04/1722:25:08资源池满了,释放这个资源吧
2017/04/1722:25:08关闭连接5
2017/04/1722:25:09第1个查询,使用的是ID为3的数据库连接
2017/04/1722:25:09资源池满了,释放这个资源吧
2017/04/1722:25:09关闭连接3
2017/04/1722:25:09第0个查询,使用的是ID为2的数据库连接
2017/04/1722:25:09资源池满了,释放这个资源吧
2017/04/1722:25:09关闭连接2
2017/04/1722:25:09开始关闭资源池
2017/04/1722:25:09关闭连接4
2017/04/1722:25:09关闭连接1


到这里,我们已经完成了一个资源池的管理,并且进行了使用测试。


资源对象池的使用比较频繁,因为我们想把一些对象缓存起来,以便使用,这样就会比较高效,而且不会经常调用GC,为此Go为我们提供了原生的资源池管理,防止我们重复造轮子,这就是sync.Pool,我们看下刚刚我们的例子,如果用sync.Pool实现。


package mainimport(
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time")const(    
    //模拟的最大goroutine
    maxGoroutine=5)func main(){
    //等待任务完成
    var wg sync.WaitGroup
    wg.Add(maxGoroutine)

    p:=&sync.Pool{
        New:createConnection,
    }   
    //模拟好几个goroutine同时使用资源池查询数据
    for query:=0; query< maxGoroutine; query++{
      go func(qint){
            dbQuery(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()}//模拟数据库查询
func dbQuery(queryint, pool*sync.Pool){    conn:=pool.Get().(*dbConnection)        defer pool.Put(conn)        //模拟查询    time.Sleep(time.Duration(rand.Intn(1000))* time.Millisecond)    log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)}//数据库连接
type dbConnectionstruct{    ID int32//连接的标志}//实现io.Closer接口
func(db*dbConnection)Close() error{    log.Println("关闭连接", db.ID)    returnnil}var idCounter int32//生成数据库连接的方法,以供资源池使用
func createConnection()interface{}{      //并发安全,给数据库连接生成唯一标志    id:= atomic.AddInt32(&idCounter,1)    return&dbConnection{ID:id}
}


进行微小的改变即可,因为系统库没有提供New这类的工厂函数,所以我们使用字面量创建了一个sync.Pool,注意里面的New字段,这是一个返回任意对象的方法,类似我们自己实现的资源池中的factory字段,意思都是一样的,都是当没有可用资源的时候,生成一个。


这里我们留意到系统的资源池是没有大小限制的,也就是说默认情况下是无上限的,受内存大小限制。


资源的获取和释放对应的方法是Get和Put,也很简洁,返回任意对象interface{}。


2017/04/1722:42:43第0个查询,使用的是ID为2的数据库连接
2017/04/1722:42:43第2个查询,使用的是ID为5的数据库连接
2017/04/1722:42:43第4个查询,使用的是ID为1的数据库连接
2017/04/1722:42:44第3个查询,使用的是ID为4的数据库连接
2017/04/1722:42:44第1个查询,使用的是ID为3的数据库连接


关于系统的资源池,我们需要注意的是它缓存的对象都是临时的,也就说下一次GC的时候,这些存放的对象都会被清除掉。


网站名称:Go语言之并发示例-Pool(二)
分享网址:http://scyanting.com/article/iidcso.html