golang系列(5)-ini配置文件解析, strconv, goroutine与并发, GMP与并行, sync, 通道与CSP,work pool

marvin

marvin

golang系列(5)-ini配置文件解析, strconv, goroutine与并发, GMP与并行, sync, 通道与CSP,work pool

配置文件在任何一个项目里都是必要的, 最简单的配置文件类型是 ini, 它里面只有键值对和分组没有复杂的数据结构, 不像 yaml 有数组, 对象, 引用, 嵌套等, 下面就来看看如何通过反射来解析一个 ini 配置文件.

ini 配置文件解析

为了读取配置文件, 我们需要定义一个配置结构体去保存这些值, 代码如下:

type RedisConfig struct {
	Host string `ini:"host"`
	Port int `ini:"port"`
	Password string `ini:"password"`
	Database int `ini:"database"`
}

type MysqlConfig struct {
	Address string `ini"address"`
	Port int `ini:"port"`
	Username string `ini:"username"`
	Password string `ini:"password"`
}

type Config struct {
	MysqlConfig `ini:"mysql"`
	RedisConfig `ini:"redis"`
}

我们这里通过定义 tag 告诉解析器, 这个字段对应的配置文件中的名称是什么, 接下来我们需要准备一个配置文件:

[mysql]
address=192.168.0.1
port=3306
username=root
password=root

[redis]
host=127.0.0.1
port=6379
password=root
database=0

在这里我们定义了两个组, 一个 mysql 一个 redis, 我们要获取的是整个配置文件的内容 config, 接下来我们需要编写解析器去读取配置文件并解析:

func loadIni(fileName string, data interface{}) (err error) {
	// 校验参数
	t := reflect.Typeof(data)
	if t.Kind() != reflect.Ptr || t.Elem().Kind() != reflect.Struct {
		err := errors.New("data should be a struct pointer")
		return
	}
	// 读文件, 获取每一行的数据
	b, err := ioutil.ReadFile(fileName)
	if err != nil {
		return
	}
	lines := strings.Split(string(b), "\n")
	// 当前的分组名称
	var structName string
	// 遍历每一行的数据
	for idx, line := range lines {
		// 去掉每一首尾的空格
		line = strings.TrimSpace(line)
		// 忽略注释 和 空行
		if strings.HasPrefix(line, ";") || strings.HasPrefix(line, "#") || len(line) == 0 {
			continue
		}
		// 当前行是分组
		if strings.HasPrefix(line, "[") {
			// 分组没有 ] 结尾
			if line[len(line) - 1] != ']' {
				err = fmt.Errorf("line: %d syntax error", idx + 1)
				return
			}
			// 分组没有名称
			sectionName := strings.TrimSpace(line[1:len(line) - 1])
			if len(sectionName) == 0 {
				err := fmt.Errorf("line: %d syntax error", idx + 1)
				return
			}
			// 分组有名称, 根据名称找到对应的结构体, 存储其名称到当前分组名里
			v := reflect.ValudOf(data)
			for i := 0; i < t.Elem().NumField(); i++ {
				field := t.Elem().Field(i)
				if sectionName == field.Tag.Get("ini") {
					structName = field.Name()
				}
			}
		// 当前行不是分组则是配置的值
		} else {
			// 等号的位置不对
			if strings.Index(line, "=") == -1 || strings.HasPrefix(line, "=") {
				err = fmt.Errorf("line: %d syntax error", idx + 1)
				return 
			}
			// 根据当前的分组名称找到对应的结构体
			v := reflect.ValueOf(data)
			sValue := v.Elem().FieldByName(structName)
			sType := sValue.Type()
			if sType.Kind() != reflect.Struct {
				fmt.Errorf("field %s should be a struct", structName)
				return
			}
			// 获取配置的 key 与 value
			index := strings.Index(line, "=")
			key := strings.TrimSpace(line[:index])
			value := strings.TrimSpace(line[index + 1:])
			// 遍历嵌套结构体的字段匹配 key
			var fieldName string
			var fileType reflect.StructField
			for i := 0; i < sValue.NumField(); i++ {
				field := sType.Field(i)
				fileType : = field
				if filed.Tag.Get("ini") == key {
					filedName = field.Name
					break
				}
			}
			fileObj := sValue.fieldByName(fieldName)
			if len(fieldName) == 0 {
				continue
			}
			// 判断字段的类型, 做不同类型的赋值
			switch fileType.Type.Kind() {
				case reflect.String:
					fileObj.SetString(value)
				case reflect.Int,reflect.Int8,reflect.Int16,reflect.Int32,reflect.Int64:
					var valueInt int64
					valueInt, err = strconv.ParseInt(value, 10, 64)
					if err != nil {
						err = fmt.Errorf("line:%d value type error", idx + 1)
						return
					}
					fileObj.setInt(valueInt)
				case reflect.Bool:
					var valueBool bool
					valueBool, err = strconv.ParseBool(value)
					if err != nil {
						err = fmt.Errorf("line:%d value type error", idx + 1)
						return
					}
					fileObj.SetBool(valueBool)
				case reflect.Float32,reflect.Float64:
					var valueFloat float64
					valueFloat, err = strconv.ParseFloat(value, 64)
					if err != nil {
						err = fmt.Errorf("line:%d value type error", idx + 1)
						return
					}
					fileObj.SetFloat(valueFloat)
			}
		}
	}
	return
}

func main() {
	var config Config
	err := loadIni("./conf.ini", &config)
	if err != nil {
		fmt.Printf("load ini failed, err: %v\n", err)
		return
	}
	fmt.Prinln(config)  
}

这里的 data 传入的必须是结构体的指针, 因为我们要在函数中对其内部的值做出修改. 整体的思路是读取配置文件中的每一行, 如果是分组名称, 则通过分组名称找到 tag 和分组名称对应的当前结构体, 如果不是分组名称, 则找到 key 和 value, 然后遍历当前结构体中的字段, 找到 tag 和 key 对应的字段, 然后获取字段的类型, 再通过 strconv 的解析方法按不同的类型进行解析, 最后把 value 赋值给这个字段.

strconv 标准库

这个库主要用于将字符串解析为不同的数据类型, 是官方提供的标准库, 可以直接使用

ParseInt

ParseInt 用于解析整型, 其方法签名为:

func ParseInt(s string, base int, bitSize int) (i int64, err error)

这里的 s 表示要解析的字符串, base 表示解析到的进制数, bitSize 表示位数, 比如是 16 位还是 32 位, 返回的 i 统一都是 int64 类型的, 这样就可以保证即使转换为 int32 等位数更小的类型的时候精度不会丢失.

Atoi

Atoi 也是一个将字符串转换为 int 类型的方法, 其方法签名为:

func Atoi(s string) (int, error)

这里的返回值是 int 类型

Itoa

Itoa 用于将一个整型转换为字符串, 其方法签名为:

func Itoa(i int) string

ParseBool

ParseBool 用于将字符串解析为布尔值, 其方法签名为:

func ParseBool(str string) (bool, error)

ParseFloat

ParseFloat 用于将字符串解析为浮点数, 其方法签名为:

func ParseFloat(s string, bitSize int) (float64, error)

同样这里需要指定浮点数的位数, 最后统一返回float64以保证精度

并发

并发是编程里面一个非常重要的概念, go 语言在语言层面天生支持并发, 这也是 go 语言流行的一个很重要的原因, 并发和并行是容易搞混的一个概念, 并发是指同一个时间段内执行多个任务, 并行是指同一时刻执行多个任务, go 语言的并发通过 goroutine 实现. goroutine 类似线程, 它属于用户态线程, 我们可以更具需要创建成千上万个 goroutine 并发工作. goroutine 是由 go 语言的运行时调度完成, 而线程是由操作系统调度完成. go 语言还提供 channel 在多个 goroutine 之间进行通信. goroutine 和 channel 是 go 语言秉承的 CSP (Communicating Sequential Process) 并发模式的重要实现基础.

goroutine

在 java/c++ 中我们要实现并发编程的时候, 我们通常需要自己维护一个线程池, 并且需要自己包装一个又一个的任务, 同时需要自己去调度线程执行任务并维护上下文切换, 这一切通常会耗费程序员大量的心智, 那么能不能有那么一种机制, 程序员只需要定义很多个任务, 让系统去帮助我们把这些任务分配到 CPU 上去实现并发执行呢?

go 语言中的 goroutine 就是这样一种机制, goroutine 的概念类似线程,但 goroutine 是由 go 的运行时调度和管理的. go 程序会智能地将 goroutine 中地任务合理分配给每个 CPU, go 语言之所以被称为现代化地编程语言, 就是因为它在语言层面已经内置了调度和上下文切换机制, 在 go 中你不需要自己写进程, 线程, 协程, 你得技能包里只有一个技能-goroutine, 当你需要让某个任务并发执行地时候, 你只需要把这个任务包装成一个函数, 开启一个 goroutine 去执行这个函数就可以了, 就是这么简单.

使用 goroutine

go 中使用 goroutine 非常简单, 只需要在调用函数的时候在前面加上 go 关键字, 就可以为一个函数创建一个 goroutine, 一个 goroutine 必须对应一个函数, 可以创建多个 goroutine 去执行相同的函数.

启动goroutine的方式非常简单, 只需要在调用函数(普通函数和匿名函数) 前面加上一个 go 关键字. 下面是一个启动 goroutine 的例子:

func hello(i) {
	fmt.Println("hello world", i)
}

func main() {
	for i := 0; i < 1000; i++ {
		go func(i int) {
			hello(i)
		}(i)
	}
	fmt.Println("main goroutine done!")
}

这里通过启动一个 goroutine 去执行 hello 这个函数, 而 main 函数在结束的时候由其启动的 goroutine 都会被关闭, 因此 hello 函数来不及执行. 这里之所以将 i 当做参数传递进去是为了避免 for 循环执行的太快而导致获取到的i不是我们想要的.

goroutine 的调度模型GMP

操作系统的线程一般都有固定的栈内存, 一个 goroutine 的栈在其声明周期开始时只有很小的栈(典型情况下2KB), goroutine 的栈不是固定的, 他可以按需增大和缩小, goroutine 的栈大小限制可以达到 1GB, 虽然很少会用到这么大。 因此在 go 语言中依次创建10万个左右的 goroutine 也是可以的.

GMP 是go语言运行时层面的实现, 是 go 语言自己实现的一套调度系统, 区别于操作系统调度线程的模型, 其有以下几个要点:

  • G 很好理解, 就是个goroutine的, 里面除了存放本goroutine信息外, 还有与所在P绑定的一些信息
  • M (machine) 是 go 运行时对操作系统内核线程的虚拟, M 与内核线程一般是一一映射的关系, 一个 goroutine 最终是要放到 M 上执行的
  • P 管理着一组 goroutine 队列, P 里面会存储当前goroutine运行的上下文环境(函数指针, 堆栈地址, 以及地址边界), P 会对自己管理的 goroutine 队列做一些调度(比如把占CPU时间长的goroutine暂停, 运行后续的goroutine等等) 当自己的队列消费完了就去全局队列里面取, 如果全局队列也消费完毕, 就会去其他P的队列里面抢任务

P 与 M 一般也是一一对应的, 它们的关系是 P 管理着一组 G 挂载在 M 上执行, 当一个 G 长久阻塞在一个 M 上时, runtime 会新建一个 M, 阻塞 G 所在的 P 会把其他 G 挂载在新建的 M 上. 当旧的 G 阻塞完成或者认为其已经死掉时回收M.

P的个数是通过 runtime.GOMAXPROCS 设置的(最大256), GO1.5版本之后默认为物理线程数, 在并发量大的时候会增加一些 P 和 M, 但不会太多, 切换太多反而影响性能, 单从线程调度讲, go 相比其他语言的优势在于 goroutine 完全由其运行时调度, 这个调度器使用一个称为 m:n 调度的技术(复用/调度m个goroutine到n个操作系统线程). 其一大特点是 goroutine 的调度是在用户态下完成的, 不涉及内核态和用户态之间的频繁切换, 包括内存的分配与释放, 都是在用户态维护着一块大的内存池, 不直接调用系统的 malloc 函数(除非内存池需要改变), 成本比调度操作系统线程低了很多, 另一方面充分利用了多核的硬件资源, 近似把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量, 足以保证go在调度方面的性能.

GOMAXPROCS

go 运行时的调度器使用 GOMAXPROCS 来确定需要使用多少个操作系统线程来同时执行go代码, 默认值是机器上的 CPU 核心数. 我们可以通过 runtime.GOMAXPROCS 函数来设置当前程序发生时占用的 CPU 逻辑核心数. GO 1.5 之前,默认使用的是单核心执行, 1.5之后默认使用全部 CPU 逻辑核心数量. 我们可以将任务分配到不同的 CPU 逻辑核心上实现并行的效果, 下面是一个并行的例子:

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second * 1)
}

sync.WaitGroup

sync.WaitGroup 表示等待组, 每启动一个 goroutine 的时候进行加一操作, 当 goroutine 结束的时候调用 Done 登记完成. 在main goroutine中我们可以用 wg.Wait 等到所有的 goroutine 执行完毕, 下面是一个使用等待组的例子:

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done()
	fmt.Println("hello Goroutine!", i)
}

func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go hello(i)
	}
	wg.Wait()
}

这样就可以避免 goroutine 来不及执行的情况

channel

单纯地将函数并发执行是没有意义的, 函数与函数之间需要交换数据才能体现并发执行函数的意义. 虽然可以使用共享内存进行数据交换, 但是共享内存在不同的 goroutine 中容易发生竞态问题. 为了保证数据交换的正确性, 必须使用互斥量对内存进行加锁, 这种做法就会导致性能问题.

go 的并发模型是 CSP(communicating sequential processes), 提倡通过通信共享内存实现通信.

如果说 goroutine 是 go 程序并发的执行体, channel 就是它们之间的连接, channel 是可以让一个 goroutine 发送特征量到另一个 goroutine 的通信机制. go 语言中的通道是一种特殊的类型, 通道像一个传送带或者队列. 总是遵循先入先出的规则, 保证收发数据的顺序. 每一个通道都是一个具体类型的管道, 也就是声明 channel 的时候需要为其指定元素类型.

channel 类型

通道本身是一个引用类型, 声明通道类型的格式如下:

var 变量 chan 元素类型

下面是声明一个传递整型通道的例子:

var ch1 chan int
ch1 = make(chan int, 10)

这里的10指定了通道缓冲区的大小

通道的操作

发送值

将一个值发送到通道中语法为: 通道 ← 值

接收值

将一个值存入通道中语法为: 变量 := ← 通道, 忽略结果的时候可以为: ← 通道

关闭通道

我们通过调用内置的 close 函数来关闭通道

close(ch)

关闭通道需要注意的事情是, 只有在通知接收方 goroutine 所有的数据都发送完毕的时候才需要关闭通道, 通道是可以被垃圾回收机制回收的, 它和关闭文件是不一样的, 在操作结束之后关闭文件是必须要做的, 但关闭通道不是必须的.

为了说明, 下面是一个使用通道操作的例子:

var wg sync.WaitGroup

func main() {
	b := make(chan int)
	wg.Add(1)
	go func() {
		defer wg.Done()
		x := <-b
		fmt.Prinln(x)
	}()
	b <- 100
	wg.Wait()
}

关闭后的通道有以下几个特点:

  1. 再发送值会导致 panic
  2. 接收值会一直获取值直到通道为空
  3. 没有值的时候执行接收操作会得到对应类型的零值
  4. 再次关闭会导致 panic

如何优雅地从 channel 里取值

当通过通道发送有限的数据时, 我们可以通过 close 函数关闭通道来告诉接收者停止等待,下面是一个循环取值的例子:

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// 开启 goroutine 将0-100的数发送到 ch1 中
	go func() {
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	// 开启 goroutine 从 ch1 中接收值, 并将该值的平方发送到 ch2 中
	go func() {
		for {
			i, ok := <- ch1
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()
	// 在main goroutine 中从 ch2 总接收值打印
	for i := range ch2 {
		form.Println(i)
	}
}

这里首先创建了两个通道, 通道一放100个数, 通道而取每一个数得到其平方再放入通道二里面, 最后再从通道二中取出这些值打印到终端上.

单向通道

我们前面讲到的通道都是既可以发送也可以接收, 单向通道是指,某个通道只能用来发送值或接收值. 只能向其中发送值的通道的类型为 chan←, 只能向其中取值的通道的类型为 ←chan. 单向通道一般就是用来给函数限制参数的.

work pool

work pool 指的是goroutine池, 例如需要我们编写代码实现一个计算随机数各位数之和的程序, 要求使用 goroutine 和 channel 构建生产者和消费者模式, 可以指定启动的 goroutine 数量, 这个时候我们一般使用 work pool, 这样可以控制 goroutine 的数量, 防止 goroutine 泄漏和暴涨, 下面是一个使用 work pool 的例子:

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Printf("worker: %d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker: %d end job: %d\n", id, j)
		results <- j * 2
	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	for a := 1; a <= 5; a++ {
		<-results
	}
}

这里启动了5个任务, 通过3个goroutine去执行这5个任务. 谁先做完就先去从 jobs 里面取任务.

ini 配置文件解析
strconv 标准库
ParseInt
Atoi
Itoa
ParseBool
ParseFloat
并发
goroutine
使用 goroutine
goroutine 的调度模型GMP
GOMAXPROCS
sync.WaitGroup
channel
channel 类型
通道的操作
发送值
接收值
关闭通道
如何优雅地从 channel 里取值
单向通道
work pool