使用Go搭建并行排序处理管道笔记

一、并行管道搭建:

总结下实现思路:

创新互联长期为超过千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为资源企业提供专业的成都网站设计、成都网站建设资源网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。

  1. 归并排序:进行集合元素排序(节点),并两两节点归并排序;每个节点元素要求有序的(排序),当然终点最小节点元数个数为1必是有序的;
  2. 节点:任务处理单元,归并排序节点是处理输出有序集合任务的单元;文件过大单台机排不了需要多台机集群;
  3. 根据粒度,单机版:单任务版每个节点可以是排序方法,并发版每个节点可以是一个线程/协程去处理(异步排序),集群版节点是一个主机;
  4. 单机版,不管并发还是非并发,节点采用的是内存共享数据;集群版节点则需要网络连接请求应答来共享数据;
  5. go语言异步数据传输通道通过channel实现的;
  6. 每个节点将处理的数据异步发送到各自channel中,等待一个主节点获取归并,集群版多了网络的数据传输(在并发版加上网络的接口)。

二、代码实现:
  1. 本地节点 nodes.go:
    package pipeline
    
    import (
    	"encoding/binary"
    	"fmt"
    	"io"
    	"math/rand"
    	"sort"
    	"time"
    )
    
    var startTime time.Time
    
    func Init() {
    	startTime = time.Now()
    }
    
    //内部处理方法
    //这里是排序:异步处理容器元素排序
    func InMemSort(in <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		a := []int{}
    		for v := range in {
    			a = append(a, v)
    		}
    		fmt.Println("Read done:", time.Since(startTime))
    
    		sort.Ints(a)
    		fmt.Println("InMemSort done:", time.Since(startTime))
    
    		for _, v := range a {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    
    //两路和并,每路通过内部方法异步处理
    //这里是排序:in1,in2元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列)
    func Merge(in1, in2 <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	// go func() {
    	// 	v1, ok1 := <-in1
    	// 	v2, ok2 := <-in2
    	// 	for {
    	// 		if ok1 || ok2 {
    	// 			if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大
    	// 				out <- v1
    	// 				v1, ok1 = <-in1
    	// 			} else {
    	// 				out <- v2
    	// 				v2, ok2 = <-in2
    	// 			}
    	// 		} else {
    	// 			close(out)
    	// 			break
    	// 		}
    	// 	}
    	// }()
    	go func() {
    		v1, ok1 := <-in1
    		v2, ok2 := <-in2
    		for ok1 || ok2 {
    			if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大
    				out <- v1
    				v1, ok1 = <-in1
    			} else {
    				out <- v2
    				v2, ok2 = <-in2
    			}
    		}
    		close(out)
    
    		fmt.Println("Merge done:", time.Since(startTime))
    	}()
    	return out
    }
    
    //读取原数据
    //chunkSize=-1全读
    func ReadSource(r io.Reader, chunkSize int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		buffer := make([]byte, 8) //int长度根据操作系统来的,64位为int64,64位8个字节
    		bytesRead := 0
    		for { //持续读取
    			n, err := r.Read(buffer) //读取一个int 8byte
    			bytesRead += n
    			if n > 0 {
    				out <- int(binary.BigEndian.Uint64(buffer)) //字节数组转int
    			}
    			if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全读
    				break
    			}
    		}
    		close(out)
    	}()
    	return out
    }
    
    //写处理后(排序)数据
    func WriteSink(w io.Writer, in <-chan int) {
    	for v := range in {
    		buffer := make([]byte, 8)
    		binary.BigEndian.PutUint64(buffer, uint64(v))
    		w.Write(buffer)
    	}
    }
    
    //随机生成数据源
    func RandomSource(count int) <-chan int {
    	out := make(chan int)
    	go func() {
    		for i := 0; i < count; i++ {
    			out <- rand.Int()
    		}
    		close(out)
    	}()
    	return out
    }
    
    //多路两两归并,每路通过内部方法异步处理
    //这里是排序:ins元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列)
    func MergeN(ins ...<-chan int) <-chan int {
    	if len(ins) == 1 {
    		return ins[0]
    	}
    	m := len(ins) / 2
    	return Merge(
    		MergeN(ins[:m]...),
    		MergeN(ins[m:]...)) //chennel异步并发归并
    }
    

    本文题目:使用Go搭建并行排序处理管道笔记
    URL地址:http://scyanting.com/article/dsoisog.html