Go语言源码sync.Map解读,一种高性能并发安全的字典类型

首先来看一个例子,在我们刚开始对写 GO 的代码的过程一定遇到这个错误。通过阅读 Map源码实现知道map是不支持线程安全的,所以一般并发场景下都是 加锁 来解决,相对的话性能上就会有一定的损耗。

package main

import "time"

func main() {
	m := make(map[int]string)
	m[1] = `test`
	go func() {
		for {
			m[1] = `test`
		}
	}()
	go func() {
		for {
			_ = m[1]
		}
	}()
	time.Sleep(time.Second)
}
fatal error: concurrent map read and map write
goroutine 6 [running]:
runtime.throw(0x107c19c, 0x21)
.....

官方在 go1.9 的引入 sync.Map 为并发map提供了解决方案。

下面通过一个例子,来看看 sync.Map 的使用:

func main() {
	var m sync.Map
	// 新增
	m.Store(1, `add`)
	// 获取
	if value, ok := m.Load(1); ok {
		fmt.Println(value.(string))
	}
	// 遍历
	m.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
	})
	// 删除
	m.Delete(1)
}

来看看 sync.Map 结构

type Map struct { 
	mu Mutex
	// read map的 k v(entry) 是不变的,删除只是打标记,插入新key会加锁写到dirty中
	// 因此对read的读取无需加锁
	read atomic.Value // readOnly
	// 读写需要加锁
	dirty map[interface{}]*entry
 	// 当load操作在 read.m 中未找到,尝试从dirty中进行 load 时(不管是否存在),misses++
	// 当misses >= dirty map len时,dirty被提升为read 并且重新分配dirty == nil
	misses int
}

type readOnly struct {
	m       map[interface{}]*entry
	amended bool // 如果是 true 的话说明 dirty 里面有些 key 在 m 是没有的。
}

type entry struct {	
	// p  指向 *interface{}
	// 如果 p == nil, 说明 kv 已经被删除并且 m.dirty == nil
	// 如果 p == expunged, 说明 kv 已经删除,并且 m.dirty != nil, 这个 key 在 dirty 中未找到
	// 其他情况 p 是 interface{}地址,m.dirty != nil 的话说明 read.m[key] 和 dirty[key] 指向一个 *entry
	p unsafe.Pointer // *interface{}
}
graph TB; subgraph read.m key1[key1] key2[key2] end subgraph dirty key_a[key1] key_b end key1-->entry[*entry] key_a-->entry
当 read.m 和 dirty 中都有key1时,因为 *entry 为指针类型,所以一处修改都会受到影响 。

Load 查找

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	read, _ := m.read.Load().(readOnly) // 1
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly) // 2
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// m.misses++
			// 当 miss 数量满足一定条件时,会触发dirty 提升为 read
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*interface{})(p), true
}

获取的话这块比较简单,先是在未加锁的情况下从 read 中查找,找到立即返回 ,反之加锁再从 dirty 中查找,并且 miss 数递增,当 miss 数量满足一定条件时,也就是miss > len(dirty),会触发dirty 提升为 read。不太容易理解可能的就是 1 2 中为什么两次从 read 获取数据?这里其实就是为了避免加锁时,dirty 提升为 read。

Store 新增

func (m *Map) Store(key, value interface{}) {
	read, _ := m.read.Load().(readOnly)
	// 如果 read 有的话,直接尝试修改 read.m[key] = value
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// 如果 read 有这个 key,并且 p == expunged, 说明 m.dirty != nil 并且dirty里面没有这个 key 
			// 1. 设置 p = nil
 			// 2. dirty 中新增 key
			// 3. 让dirty[key] 和 read.m[key] 指向同一个 *entry
			m.dirty[key] = e
		}
 		// e.p = &value
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
 		// read 中没有,dirty中有,直接修改
		e.storeLocked(&value)
	} else {
		if !read.amended {
			// 新增的key 在dirty 中有, 但是read中没有,标记下
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}
	// m.dirty == nil的话,将read里面不是expunged kv 复制到dirty
	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

func (e *entry) tryStore(i *interface{}) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

Delete 删除元素

func (m *Map) Delete(key interface{}) {
	m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete()
	}
	return nil, false
}

// 删除的话,不是直接删除kv,而是让 e.p == nil
func (e *entry) delete() (value interface{}, ok bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return nil, false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return *(*interface{})(p), true
		}
	}
}

Range 遍历读取

func (m *Map) Range(f func(key, value interface{}) bool) {
	// 如果所有的 key 都在read 中是不用加锁的
	read, _ := m.read.Load().(readOnly)
	if read.amended {
		// m.dirty包含一些不在read.m中的kv,遍历的时候就会触发触发dirty 提升为 read
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

其他还有一些像LoadAndDelete LoadOrStore 等方法,就是上述代码的整合操作,这里就不做解释了。

通过阅读学习源码和官方文档可以了解 sync.Map 的设计特性,整体设计上其实并不复杂。里面一些好的设计思路值得我们在平时编码过程中借鉴应用的。

  • 以空间换效率,通过read和dirty两个map来提高读取效率
  • 优先从read中读取(无锁),否则再从dirty map中读取(加锁)
  • 动态调整,当misses次数过多时,将dirty提升为read
  • 延迟删除,删除只是为value打一个标记,在dirty map提升时才执行真正的删除

下面是map操作非常详细的图解,图片搭配代码更好理解代码,map图解 来源 地址

Map图解

comments powered by Disqus

相关