golang系列(6)-多路复用, 异步写, 锁, sync与并发安全, 原子操作, 互联网协议

marvin

select 多路复用
在某些场景下我们需要同时从多个通道接收数据, 通道在接收数据时, 如果没有数据可以接收将会发生阻塞, 你也许会写出如下代码使用遍历的方式来实现:
for {
data, ok := <-ch1
data, ok := <-ch2
}
这种方式虽然可以实现从多个通道接收值, 但是运行的性能会差很多, 为了应对这种场景, go 内置了 select 关键字, 可以同时响应多个通道的操作, select 的使用类似 switch 语句, 它有一些 case 分支和一个默认分支. 每个 case 会对应一个通道的通信(接收或发送) 过程. select 会一直等待, 直到某个 case 的通信操作完成时, 就会执行 case 分支对应的语句. 下面是语法格式:
select {
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
默认操作
}
下面是使用多路复用的例子:
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
这里使用 select 语句随机的向 ch 通道里面写入数据和读取数据, 使用 select 语句能提高代码的可读性, 如果多个 case 同时满足, select 会随机选择一个, 对于没有 case 的 select {} 会一直等待, 可以用来阻塞 main 函数.
异步写日志
之前的日志是同步写入的, 一般的日志应该不能够影响到正常的业务, 因此日志应该是异步的, 特别是将日志写入文件的时候, 这里需要将之前的同步写日志做一下改造, 首先定义一个通道, 用来存放需要保存的日志信息:
type FileLogger struct {
// ...
logChan chan *logMsg
}
type logMsg struct {
level LogLevel
msg string
funcName string
fileName string
time string
lineNo int
}
这里定义了一个 logChan 其中保存的是 logMsg 的指针, logMsg 是一个日志信息结结构体, 之所以使用结构体指针, 是因为指针节省空间, 而字符串会耗费大量的内存空间.
然后在初始化 FileLogger 的时候初始化这个通道, 通道的缓冲区大小可以随意指定, 这里指定50000:
func NewFileLogger(levelStr, filePath, fileName string, maxSize int64) *FileLogger {
// ...
fl := &FileLogger{
// ...
logChan: make(chan *logMsg, 50000)
}
// ...
}
接下来需要将生成的日志向通道里面写入, 因此这里将之前的写入文件的逻辑单独提取到一个方法中:
func (l *Logger) writeLogBackground() {
for {
select {
logTmp := <-f.logChan:
fmt.Fprintf(f.file, "[%s] [INFO] [%s:%s:%s] %s\n", time, logTmp.fileName, logTmp.funcName, logTmp.lineNo, logTmp.msg)
default:
time.Sleep(time.Millisecond * 500)
}
}
}
func (l Logger) Info(format string, arg ...interface{}) {
if l.level > INFO {
return nil
}
msg := fmt.Sprintf(format, arg...)
now := time.Now()
funcName, fileName, lineNo := getInfo()
logTmp := &logMsg{
level: INFO,
msg: msg,
funcName: funcName,
fileName: fileName,
time: now.Format("2006-01-02 15:04:05"),
lineNo: lineNo,
}
select {
case f.logChan <- logTmp:
default:
}
}
这里我们将文件操作相关的部分放到后台执行, 在写入文件的时候通过 f.logChan ← logTemp 将我们要输出日志的信息放到这个通道里面, 等待后台 goroutine 从这个通道里面读取数据输出到文件. 另外这里通过多路复用防止因通道满了而导致阻塞. 在后台 goroutine 函数 writeLogBackground 中我们使用循环从通道里面取值, 同样的这里使用多路复用防止阻塞发生. 如果取到值就输出到文件, 如果取不到值就等待 500 毫秒, 需要注意的是在等待期间是不会阻塞 CPU 的.
最后, 我们需要启动后台 goroutine, 时机可以放到, 初始化文件句柄完成之后:
func (f *FileLogger) initFile (error) {
// ...
go writeLogBackground()
return nil
}
我们在这里开启了一个 goroutine 在后台执行读取和写入的任务, 至此就完成了日志文件的异步写入操作.
锁
有时候在 go 代码中可能会存在多个 goroutine 同时操作一个资源, 这种情况会发生竞态, 类比现实生活中的例子有十字路口被各个方向的汽车竞争, 还有火车的卫生间被车厢里的人竞争等等
为了说明问题, 下面来看一个例子:
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上面的代码中我们开启了两个 goroutine 去累加变量 x 的值, 这两个 goroutine 在访问和修改 x 变量的时候就会存在数据竞争, 导致最后的结果和预期的不符.
互斥锁
互斥锁是一种常用的控制共享资源访问的方式, 它能够保证同时只有一个 goroutine 可以访问共享资源. go 语言中使用 sync 和 Mutex 类型来实现互斥锁, 使用互斥锁来修复上面代码的问题:
var x int64
var wg Sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用互斥锁能够保证同一时间只有一个 goroutine 能进入临界区, 其他的 goroutine 则在等待锁, 当互斥锁释放之后, 等待的 goroutine 才能获取锁进入临界区, 多个 goroutine 同时等待一个锁时, 唤醒的策略是随机的.
读写互斥锁
互斥锁是完全互斥的, 但是有很多实际场景是读多写少, 当我们并发去读取一个资源不涉及资源的修改的时候是没有必要加锁的, 这种场景下使用读写锁是一种更好的选择, 读写锁在 go 语言中使用 sync 包中的 RWMutex 类型.
读写锁分成两种, 读锁和写锁, 当一个 goroutine 获取读锁之后, 其他的 goroutine 如果是获取读锁会继续获得锁, 如果是获取写锁则会等待, 当一个 goroutine 获取写锁之后, 其他的 goroutine 无论是获取读锁还是写锁都会等待.
下面是一个使用读写锁的例子:
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
rwlock.Lock() // 加写锁
time.Sleep(10 * time.Millisecond) // 假设读操作耗时 10 毫秒
rwlock.Unlock() // 解写锁
wg.Done()
}
func read() {
rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock() // 解读锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
需要注意的是读写锁非常适合读多写少的场景, 如果读和写操作的差别不大, 读写锁的优势就发挥不出来.
sync.Once 与 sync.Map
在编程的很多场景下我们需要确保某些操作在高并发场景下只执行一次, 例如只加载一次配置文件, 只执行一次通道等等, go 语言中的 sync 包中提供了一个针对只执行一次场景的解决方案 sync.Once. 它只有一个 Do 方法, 其签名如下:
func (o *Once) Do(f func()) {}
需要注意的是, 如果这里要执行的函数 f 需要传递参数就需要搭配闭包来使用, 下面是一个加载配置文件的例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image {
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多个 goroutine 调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践, 因为预先初始化一个变量(比如在 init 函数中完成初始化) 会增加程序的启动耗时, 而且有可能时机执行过程中这个变量没有用上, 那么这个初始化操作就不是必要的. 多个 goroutine 并发调用 Icon 函数时并非并发安全的, 现代的编译器和 CPU 可能会在保证每个 goroutine 都满足串行一致的基础上自由地重排内存地访问顺序, loadIcons 函数可能会被重排为以下结果:
func loadIcons() {
icons = make(map[string][image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
在这种情况下就会出现即使判断了 icons 不是 nil 也不意味着变量初始化完成了. 考虑到这种情况, 我们能想到地办法就是添加互斥锁, 保证初始化 icons 地时候不会被其他地 goroutine 操作, 但是这样做又会引发性能问题. 而这里我们可以使用 sync.Once 进行优化, 改造如下:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image {
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
另外 go 语言中内置的 map 不是并发安全的, 下面是一个例子:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Agg(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=%v, v=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
这里的代码开启少量几个 goroutine 的时候可能没有什么问题, 当并发多了之后执行上面的代码就会报 fatal error: concurrent map writes 错误,像这种场景下就需要为 map 加锁来保证并发的安全性了, go 语言的 sync 包中提供了一个开箱即用的并发安全版的 map 即 sync.Map. 开箱即用, 不用像内置的 map 一样使用 make 函数初始化就能直接使用. 同时 sync.Map 内置了诸如 Store, Load, LoadOrStore, Delete, Range 等操作方法, 下面是使用 sync.Map 的例子:
var m = sync.Map{}
func main() {
wg := sync.WaitGroup()
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
fmt.Printf("key := %v, v := %v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
sync.Map 自带锁, 在操作的时候必须通过它本身的方法去访问和设置值
atomic 原子性操作
代码中加锁操作因为涉及内核态的上下文切换, 因此耗时比较多, 代价也比较高, 针对基本数据类型我们还可以使用原子操作来保证并发安全, 因为原子操作是 go 语言提供的方法, 因此在用户态就能可以完成. go 中原子操作由内置的标准库 sync/atomic 提供. 它提供了诸如读取, 写入, 修改, 交换, 比较并交换等操作, 为了说明我们来看一个例子:
var x int64
var wg sync.WaitGroup
func atomicAdd() {
atomic.AddInt64(&x, 1)
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
go atomicAdd()
}
wg.Wait()
end := time.Now()
fmt.Println(x)
}
这里使用 atomic.AddInt64 对我们的 int64 类型的 x 变量进行加一操作, 避免了手动加锁.
互联网协议
现在我们几乎每天都在使用互联网, 我们前面已经学习了如何编写 go 语言程序, 但是如何才能让我们的程序通过网络互相通信呢? 本章我们就一起来学习下 go 语言中的网络编程. 我们网络编程其实是一个很庞大的领域, 其中包含不同的互联网协议, 这些协议规定了电脑如何连接和组网, 如果理解了这些协议, 也就理解了互联网的原理
互联网分层模型
互联网的逻辑实现被分为好几层. 每一层都有自己的功能, 就像建筑物一样, 每一层都靠下一层支持. 用户接触到的只是最上面的那一层, 根本不会感觉到下面的几层. 要理解互联网就需要自下而上理解每一层的实现的功能.
通过以上表来看实际上就是『应表会传网数物』, 从上往下其抽象度越来越低, 这里的物理层对应的就是网线, 应用层对应到用户. 在软件领域我们一般将7层模型划分为中间的 5 层模型.
物理层
我们的电脑要与外接通信, 需要先把电脑接入到网络, 为了实现这个目标我们可以使用双绞线, 光纤, 无线电波等方式, 这就是物理层所做的事情, 指的是将电脑连接起来的物理手段. 它主要规定了网络的一些电气特性, 作用是负责传送 0 和 1 的电信号.
数据链路层
单纯的 0 和 1 是没有任何意义的, 所以我们需要为其赋予一些特定的含义, 规定解读电信号的方式, 例如: 多少个电信号算一组? 每个电信号有何意义? 这就是数据链路层的功能, 它在物理层的上层, 确定了物理层传输的 0 和 1 的分组方式及它所代表的意义. 早期的时候, 每家公司都有自己的电信号分组方式. 逐渐地, 一种叫做以太网的协议, 占据了主导地位.
以太网规定, 一组电信号构成一个数据包, 叫做帧, 每一帧分成两个部分, 标头和数据, 其中标头包含数据包地一些说明项, 比如发送者, 接收者, 数据类型等, 数据则是数据包地具体内容. 标头地长度顾定为18个字节. 数据地长度最短46个字节, 最长1500个字节. 因此每个帧最短64个字节, 最长1518个字节. 如果数据很长, 就必须分割成多个帧进行发送.
那么发送者和接收者是如何进行标识地呢? 以太网规定, 连入网络地所有设备都必须具有一个网卡接口, 数据包必须是从一块网卡传输到另一块网卡. 网卡地地址就是数据包地发送地址和接收地址, 这就是 MAC 地址. 每块网卡出厂地时候都有一个全世界独一无二的 MAC 地址. 长度是 48 个二进制位, 通常用12个十六进制数表示, 前6个是厂商编号, 后6个是该厂商的网卡流水号, 有了 MAC 地址, 就可以定位网卡和数据包的路径了
我们会通过 ARP 协议来获取接收方的 MAC 地址, 有了 MAC 地址之后, 如何把数据准确的发送给接收方呢? 其实这里以太网才用了一种很原始的方式, 它不是把数据包准确的发送给接收方, 而是向网络内所有的计算机发送, 让每台计算机读取这个包的标头, 找到接收方的 MAC 地址, 然后与自身的 MAC 地址相比较, 如果两者相同, 就接收这个包, 做进一步处理, 否则就丢弃这个包, 这种发送方式就叫做广播
网络层
按照以太网协议的规则我们可以依靠 MAC 地址来向外发送数据. 理论上依靠 MAC 地址, 你电脑的网卡就可以找到身在世界另一个角落的某台电脑的网卡了, 但是这种做法有一个重大缺陷就是以太网采用广播方式发送数据包, 所有成员人手一个包, 不仅效率低, 而且发送的数据包只能局限在发送者所在的子网络中. 也就是如果两台计算机不在同一个子网中, 广播是传不过去的, 这种设计是合理且必要的, 因为如果互联网上每台计算机都会收到互联网上收发的所有数据包, 那是不现实的.
因此, 必须找到一种方法区分哪些 MAC 地址属于同一个子网, 哪些不是. 如果是同一个子网, 就采用广播的方式发送, 否则就采用路由的方式发送. 这就导致了网络层的诞生, 它的作用是引进一套新的地址, 使得我们能够区分不同的计算机是否属于同一个子网络. 这套地址就叫做网络地址, 简称网址.
网络层出现以后, 每台计算机有了两种地址, 一种是 MAC 地址, 一种是网络地址. 两种地址之间没有任何联系, MAC 地址是绑定在网卡上的, 网络地址则是由网络管理员分配的. 网络地址帮助我们确定计算机所在的子网络, MAC 地址则将数据包发送到该子网络中的目标网卡, 因此, 从逻辑上可以推断, 必定是先处理网络地址, 然后再处理 MAC 地址.
规定网络地址的协议叫做 IP 协议, 它所定义的地址就被称为 ip 地址, 目前广泛采用的是 IP 协议的第四版, 简称 IPV4. IPV4 这个版本规定. 网络地址由32个二进制位组成, 我们通常习惯用分成四段的十进制数表示 IP 地址, 从 0.0.0.0 一直到 255.255.255.255
根据 IP 协议发送的数据, 就叫做 IP 数据包. IP 数据包也分为标头和数据两个部分, 标头部分主要包括版本, 长度, IP 地址等信息, 数据部分则是 IP 数据包的具体内容. IP 数据包的标头部分的长度是20到60字节, 整个数据包的总长度最大为 65535字节
传输层
有了 MAC 地址和 IP 地址, 我们已经可以在互联网上任意两台主机上建立通信了. 但问题是同一台主机上会有许多程序都需要用网络收发数据, 比如 QQ 和浏览器这两个程序都需要连接互联网并收发数据, 我们如何区分某个数据包到底归哪个程序呢? 也就是说, 我们还需要一个参数, 表示这个数据包到底提供哪个程序使用. 这个参数就叫做端口, 它其实是每一个使用网卡的程序的编号. 每个数据包都发到主机的特定端口. 所以不同的程序就能取到自己所需要的数据.
端口是 0 到 65535 之间的一个整数, 正好 16 个二进制位. 0 到 1023 的端口是被系统占用的, 用户只能使用大于 1023 的端口, 有了 IP 和端口后我们就能实现唯一确定互联网上一个程序, 进而实现不同网络之间的程序通信
我们必须在数据包中加入端口信息, 这就需要新的协议. 最简单的实现叫做 UDP 协议, 它的格式几乎就是在数据前面加上端口号. UDP 数据包, 也是由标头和数据两部分组成, 标头部分主要定义了发出端口和接收端口, 数据部分就是具体的内容, UDP 数据包非常简单, 标头部分一共只有8个字节, 总长度不超过65535字节, 正好放进一个 IP 数据包.
UDP 协议的优点比较简单, 容易实现, 但是缺点是可靠性比较差, 一旦数据包发出去了, 无法知道对方是否收到. 为了解决这个问题, 提高网络可靠性, TCP 协议就诞生了. TCP 协议能够保证数据不会丢失, 它的缺点是过程复杂, 实现困难, 消耗较多的资源. TCP 数据包没有长度限制, 理论上可以无限长, 但是为了保证网络效率, 通常 TCP 数据包的长度不会超过 IP 数据包的长度, 以确保 TCP 数据包不必再分割.
应用层
应用程序收到传输层的数据, 接下来就要对数据进行解包, 由于互联网是开放架构, 数据来源五花八门, 必须事先规定好通信的数据格式, 否则接收方根本无法获得真正发送的数据内容, 应用层的作用就是规定程序使用的数据格式, 例如我们 TCP 协议之上常见的 Email, http, ftp 等协议就属于应用层
socket 编程
socket 是 BSD UNIX 的进程通信机制, 通常也称为套接字, 用于描述 IP 地址和端口, 是一个通信链的句柄. socket 可以理解为 TCP/IP 网络的 API, 它定义了许多函数和例程, 程序员可以用它们来开发 TCP / IP 网络上的应用程序. 电脑上运行的应用程序通常通过套接字向网络发出请求或则应答网络请求.
socket 是应用层与 TCP/IP 协议族通信的中间软件抽象层. 在设计模式中, socket 其实就是一个门面模式, 它把复杂的 TCP/IP 协议族隐藏在 socket 后面, 对用户来说只需要调用 socket 规定的相关函数, 让 socket 去组织符合指定的协议的数据然后进行通信.
TCP
TCP/IP 叫传输控制协议/网间协议, 是一种面向连接的, 可靠的, 基于字节流的传输层通信协议, 因为是面向连接的协议, 数据像水流一样传输, 还会存在黏包问题
TCP 服务端
一个 TCP服务端可以同时连接很多个客户端, 例如世界各地的用户使用自己电脑上的浏览器访问淘宝网, 因为 go 语言中创建多个 goroutine 实现并发非常方便和高效, 所以我们可以在每次建立一个链接的时候就去创建一个 goroutine 去处理客户的连接.
tcp 服务端程序的处理流程如下:
- 监听端口
- 接收客户端的请求建立连接
- 创建 goroutine 处理连接
下面是一个使用 go 语言内置的 net 包实现的 TCP 服务端例子:
// 处理函数
func process(conn net.Conn) {
defer conn.Close() // 关闭连接
for {
reader := bufio.NewReader(conn)
var buf [128]byte
n, err := reader.Read(buf[:]) // 读取数据
if err != nil {
fmt.Println("read from client failed, err: ", err)
break
}
recvStr := string(buf[:n])
fmt.Println("收到客户端发来的数据: ", recvStr)
conn.Write([]byte(recvStr)) // 发送数据
}
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:20000")
if err != nil {
fmt.Println("listen failed, err:", err)
return
}
defer listen.Close()
for {
conn, err := listen.Accept() // 建立连接
if err != nil {
fmt.Println("获取连接失败, err", err)
continue
}
go process(conn) // 启动一个 goroutine 去处理连接
}
}
这里首先在 20000 端口上监听 tcp 服务, 然后等待建立连接, 当收到连接之后开启一个后台的 goroutine 去处理这个连接, 因为我们的 tcp 服务端支持多个用户建立连接, 这里需要使用 for 循环, 最后为了确保主程序退出之后 tcp 服务被关闭使用 defer 声明了关闭逻辑
TCP 客户端
一个 TCP 客户端进行 TCP 通信的流程如下:
- 建立与服务端的连接
- 进行数据收发
- 关闭连接
下面是一个使用 go 语言内置的 net 包实现的 TCP 客户端的例子:
package main
func main() {
// 与 server 建立连接
conn, err := net.Dial("tcp", "127.0.0.1:20000")
if err != nil {
fmt.Println("dial failed, err:", err)
return
}
// 发送数据
conn.Write([]byte("hello"))
// 关闭连接
conn.Close()
}
这里我们通过 Dial 与服务端建立了一个 tcp 连接, 同时服务端会获取到这个连接, 等待读取数据 , 然后我们通过客户端发送 hello, 服务端读取后输出客户端发送的内容, 最后客户端关闭连接, 服务端因读取数据失败而关闭连接, 处理结束.
TCP 黏包
之所以会出现黏包, 主要原因是 tcp 数据传递模式是流模式, 在保持长连接的时候可以进行多次的收和发, 黏包可以发生在发送端, 也可以发生在接收端, 下面是两种黏包情况:
- 由 Nagle 算法造成的发送端的黏包, Nagle 算法是一种改善网络传输效率的算法, 简单来说就是当我们提交一段数据给 TCP 的时候, TCP 并不立即发送此段数据, 而是等待一小段时间看看在等待期间是否还有要发送的数据, 若有则会一次把这两端数据发送出去
- 接收端接收不及时造成的接收端黏包, tcp 会把接收到的数据存在自己的缓冲区中, 然后通知应用层取数据, 当应用层由于某些原因不能及时把 tcp 的数据取出来, 就会造成 tcp 缓冲区中存放几段数据
出现黏包的关键在于接收方不确定将要传输的数据包的大小, 因此我们可以对数据包进行封包和拆包的操作. 封包就是给一段数据加上包头, 这样一来数据包就分为包头和包体两部分内容(过滤非法包时封包会加入包尾内容), 包头部分的长度是固定的, 并且它存储了包体的长度, 根据包头长度固定以及包头中含有包体长度的变量就能正确地拆分出一个完整的包数据, 为此我们可以自己定义一个协议, 比如数据包的前4个字节为包头, 里面存储的是发送的数据的长度:
package proto
import (
"bufio"
"bytes"
"encoding/binary"
)
// 将消息编码
func Encode(message string)([]byte, error) {
// 读取消息的长度, 转换成 int32 类型(占4个字节)
var length = int32(len(message))
var pkg = new(bytes.Buffer)
// 写入消息头, 小端方式, 低位在内存的低地址
err := binary.Write(pkg, binary.LittleEndian, length)
if err != nil {
return nil, err
}
// 写入消息实体
err = binary.Write(pkg, binary.LittleEndian, []byte(message))
if err != nil {
return nil, err
}
// 最后返回缓冲区中的所有字节内容
return pkg.Bytes(), nil
}
// 解码消息
func Decode(reader *bufio.Reader) (string, error) {
// 读取消息的长度
lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
lengthBuff := bytes.NewBuffer(lengthByte)
var length int32
err := binary.Read(lengthBuff, binary.LiggleEndian, &length)
if err != nil {
return "", err
}
// Buffered 返回缓冲区中现有的可读取的字节数
if int32(reader.Buffered()) < length + 4 {
return "", err
}
// 读取真正的消息数据
pack := make([]byte, int(4 + length))
_, err = reader.Read(pack)
if err != nil {
return "", err
}
return string(pack[4:]), nil
}
需要注意的是在使用的是小端写的时候,也要用小端读.
UDP
UDP 协议中文名称是用户数据协议, 是 OSI 参考模型中一种无连接的传输层协议, 不需要建立连接就能直接进行数据的发送和接收, 属于不可靠的, 没有时序的通信, 但是 UDP 协议的实时性比较好, 通常用于视频直播相关领域.
UDP 服务器
使用 go 语言的 net 包就能实现一个 UDP 的 server 端:
func main() {
listen, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: 30000,
})
if err != nil {
fmt.Println("listen failed, err:", err)
return
}
defer listen.Close()
for {
var data [1024]byte
n, addr, err := listen.ReadFromUDP(data[:]) // 接收数据
if err != nil {
fmt.Println("read udp failed, err:", err)
continue
}
fmt.Printf("data:%v addr:%v count:%v\n", string(data[:n]), addr, n)
_, err = listen.WriteToUDP(data[:n], addr)
if err != nil {
fmt.Println("write to udp failed, err:", err)
continue
}
}
}
这里我们开启了一个 udp 服务, 指定本地端口和地址, 然后循环读取1024个字节, 最后将数据写回到指定的远程地址, 可以看到我们这里并没有建立连接, 有了服务端, 接下来我们来看看如何实现客户端:
func main() {
socket, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: 30000,
})
if err != nil {
fmt.Println("连接服务器失败: err", err)
return
}
defer socket.Close()
sendData := []byte("Hello server")
_, err := socket.Write(sendData) // 发送数据
if err != nil {
fmt.Println("发送数据失败, err", err)
return
}
data := make([]byte, 4096)
n, remoteAddr, err := socket.ReadFromUDP(data[:]) // 接收数据
if err := nil {
fmt.Println("接收数据失败, err:", err)
return
}
fmt.Printf("recv: %v, addr: %v, count: %v\n", string(data[:n]), remoteAddr, n)
}
这里我们通过拨号创建了一个 UDP 的客户端, 并且指定了服务端的地址, 然后通过 Write 向服务端写入数据, 读取数据与服务端一样都是通过 ReadFromUDP 方法
HTTP
go 语言内置的 net/http 包提供了 HTTP 客户端和服务端的实现, http 协议也叫做超文本传输协议, 是互联网上应用最广泛的一种网络传输协议, 所有的 www 文件都必须遵守这个标准, 设计 http 最初的目的是为了提供一种发布和接收 HTML 页面的方法
HTTP 客户端
下面是一个 GET 请求示例:
package main
import (
"fmt"
"io/ioutil"
"net/http"
)
func main() {
resp, err := http.Get("https://www.baidu.com")
if err != nil {
fmt.Println("get failed, err:", err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("read from resp.Body failed, err:", err)
return
}
fmt.Println(string(body))
}
这里通过 http.Get 发送了一个 get 请求, 在获取响应之后读取响应体 body 中的数据输出到. 需要注意的是为了确保 Body 流被关闭, 我们通过 defer 指定了 main 函数退出之前的关闭逻辑.
有时候我们还需要对 url 的参数进行编码, 特别是 url 的路径中的 query 比较复杂的时候, 下面是一个例子:
func main() {
apiUrl := "http://127.0.0.1:9090/get"
data := url.Values{}
data.Set("name", "name")
data.Set("age", 19)
u, err := url.ParseRequestURI(apiUrl)
if err != nil {
fmt.Printf("parse url requestUrl failed, err: %v\n", err)
}
u.RawQuery = data.Encode()
fmt.Println(u.String())
resp, err := http.Get(u.String())
if err != nil {
fmt.Println("post failed, err:%v\n", err)
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("get resp failed, err:%v\n", err)
return
}
fmt.Println(string(b))
}
在这里我们首先定义了一个请求的 url 地址, 为了对 url 的 query 参数进行编码我们声明了一个 url.Values 结构体, 分别设置了 name, age 两个 query 参数, 最后解析 url 地址并设置其 RawQuery 参数为编码之后的值, 编码的 query 字符串通过调用 url.Values 的 Encode 方法获取. 设置了编码后的url之后我们就能直接通过 Get 方法发送请求了.
除了Get请求我们还可以发送Post, PostForm请求, 也能通过 http.NewRequest 自定义请求, 下面是其简单的示例:
resp1, err1 := http.Post("http://upload", "image/jpeg", &buf)
resp2, err2 := http.PostForm("http://a.com", url.Values{"key", {"value"}, "id", {"123"}})
req, err := http.NewRequest("Post", "http:127.0.0.1", &buf)
req.Header.Add("X-Forward-For", "xxx")
resp3, err3 := http.DefaultClient.Do(req)
这里 Post 接收一个 url, 文件类型和 io.Reader 类型的指针, PostForm 只需要指定 url 和 Values 即可, 最后我们通过 NewRequest 构建了一个 Post 请求, 其参数分别为请求方法, 请求 url 和一个 io.Reader 类型的指针, 然后通过Header.Add方法自定义其请求头并通过 DefautlClient.Do 发送这个请求. 需要注意的是, 程序在使用完 response 之后必须关闭回复的主体:
defer resp.Body.Close()
当我们需要管理代理, TSL 设置, keep-alive, 压缩和其他设置的时候, 可以创建一个 Transport, 下面是一个使用 Transport 的例子:
tr := &http.Transport{
TSLClientConfig: &tsl.Config(RootCAs: pool},
DisableCompression: true,
}
client := &http.Client{Transport: tr}
resp, err := client.Get("https://exampole.com")
这里的 Client 和 Transport 类型都可以安全地被多个 go 进程同时使用, 出于效率考虑, 我们应该只建立一次, 然后进行重用
http 服务端
package main
import (
"net/http"
)
func f1(w http.ResponseWriter, r *http.Request) {
str := `<h1 style="color: red">Hello world!</h1>`
w.Write([]byte(str))
}
func main() {
http.HandleFunc("/", f1)
http.ListenAndServe("127.0.0.1:9090", nil)
}
我们这里通过指定一个监听的路径和处理函数 f1来响应本地9090端口根路径下的请求, 每当有一个请求来的时候 http 都会开启一个 goroutine 去处理. 打开浏览器访问9090端口便可看到 Hello world! 的红色字体.
和客户端一样, 我们也可以定义一些服务端地行为, 例如超时等设置:
s := &http.Server{
Addr: ":8080",
Handler: myHandler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
log.Faltal(s.ListenAndServe())
这里的 ReadTimeout 和 WriteTimeout 分别对应读写的超时时间, MaxHeaderBytes 指定最大的请求头大小.