cache教程5.分布式节点的通信
- 互联网
- 2025-07-21 19:13:49

0.对原教程的一些见解
其回顾完请求流程就是抽象了两个接口,PeerPicker和PeerGetter。这样操作,读者阅读时可能很难快速明白其含义,不好理解为什么就创建出两个接口,感觉会比较疑惑。原教程的评论中也有讨论这点。
本教程就先不创建接口,而是使用struct方式,这样可能好理解点。
1.节点请求处理的流程先弄清楚我们查询缓存的逻辑。
单节点:
客户发送查询请求到节点A,该节点有缓存就立即返回,若是没有就执行用户设置的回调函数获取值并添加到缓存中,然后返回。
分布式节点:
客户端发送查询请求到某个缓存节点,该节点会判断该key是否在本地,若是不在本地,使用一致性哈希选择节点,若不是在远程节点,则就退回到本地节点处理;若在远程节点,该节点会发送请求去访问其他 node 节点。(不是客户端再去访问其他节点)
从这可以看出,一个node要处理两种请求,一个是来自客户端的外部请求,一个是来自其他远端节点的内部请求。
为了清晰,划分职责,我们可以在一个node中启动两种HTTP服务,一个处理客户端请求(APIServer), 一个处理节点之间的请求(CacheServer)。
2.HTTP客户端之前我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来先实现客户端的功能。这个客户端是节点作为客户端去访问其他节点。
baseURL 表示将要访问的远程节点的地址,例如 http://example /geecache/。 type httpGetter struct { baseURL string } func (h *httpGetter) Get(group string, key string) ([]byte, error) { //QueryEscape 对字符串进行转义,以便可以将其安全地放置在 URL 查询中。 u := fmt.Sprintf("%v/%v/%v", h.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return nil, err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("server returned: %v", res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("reading response body: %v", err) } return bytes, nil } 3.回顾上一章节实现的单节点的访问流程 func (g *Group) Get(key string) (ByteView, error) { //现在本地查询 if v, ok := g.mainCache.get(key); ok { return v, nil } return g.load(key) } func (g *Group) load(key string) (ByteView, error) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{b: cloneByte(bytes)} g.mainCache.add(key, value) return value, nil }那很明显是需要修改load方法,让其可以去访问远程节点。
在load方法中,伪代码如下。
func func (g *Group) load(key string) (ByteView, error){ if 有远程节点 { if 找到key所在的远程节点 { 本地作为客户端去访问该远程节点 } } 没有远程节点,只能在本地调用回调函数去源地方获取 }要想在Group中访问节点,那么就要在Group中存储节点集合。
节点结合结构体Peers那节点集合是不是又要创建一个结构体?那先试试创建一个结构体Peers。
因为 hash 环的 map 不是线程安全的,所以这里要加锁。
成员变量 httpGetters,映射远程节点与对应的 httpGetter。(httpGetter就是个客户端,是一个节点作为客户端),每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关,map的key是远程节点的地址,比如"http://localhost:10000"
type Peers struct { addr string //这个是用于进行选择节点时用来判断是不是本地节点 basePath string mutex sync.Mutex //guards peersHashRing and httpGetters peersHashRing *consistenthash.HashRing httpGetters map[string]*httpGetter } //这是HTTP服务端章节的HTTPPool,这是很相似的 type HTTPPool struct { addr string basePath string }那么该结构体Peers就要有添加远程节点和通过key去获取远程节点的方法。
增添远程节点方法Set
通过该方法可以知道其map的key是远程节点的地址。
// 使用用例:Set("http://localhost:8001","http://localhost:8002") func (p *Peers) Set(peers ...string) { p.mutex.Lock() defer p.mutex.Unlock() p.peersHashRing = consistenthash.NewHash(50, nil) p.peersHashRing.Add(peers...) //在 hash 环上添加真实节点和虚拟节点 //存储远端节点信息 p.httpGetters = make(map[string]*httpGetter) for _, peer := range peers { p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath} } }通过key去获取远程节点的方法PickPeer
Peers结构体中的变量addr在这里派上用场了,返回的地址要是等于本身addr,那就返回false,不用自己作为客户端再去访问自己。
func (p *Peers) PickPeer(key string) (*httpGetter, bool) { p.mutex.Lock() defer p.mutex.Unlock() //这里返回的peer是个地址,可以查看(Peers).Set函数中的参数 if peer := p.peersHashRing.Get(key); peer != "" && peer != p.addr { fmt.Println("pick peer ", peer) return p.httpGetters[peer], true } return &httpGetter{}, false }Peers这个结构体就实现了,可以看到其与HTTPPool是很相似的。对比HTTPPool,就是成员变量添加了一些,方法也添加了一些,也没有改变HTTPPool原有的逻辑,只是扩张了。所以可以把Peers的内容添加到HTTPPool中去,具体的代码就不在这里显示了。
type HTTPPool struct { addr string basePath string //新添加的,把Peers内容增添到HTTPPool中 mutex sync.Mutex peersHashRing *consistenthash.HashRing httpGetters map[string]*httpGetter } 4.集成,实现主流程最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。
在Group结构体中有改变。
新增 RegisterPeers() 方法,将 peers 注入到 Group 中。
type Group struct { name string mainCache cache getter Getter peers *Peers //添加了节点集合 } // 往分组内注册节点集合 func (g *Group) RegisterPeers(peers *Peers) { if g.peers != nil { panic("RegisterPeerPicker called more than once") } g.peers = peers }最终再回到load函数,这个函数是需要修改的。
func (g *Group) load(key string) (value ByteView, err error) { if g.peers != nil { //有远程节点的情况 if peer, ok := g.peers.PickPeer(key); ok { //通过key找到该远程节点 if value, err = g.getFromPeer(peer, key); err == nil { return value, nil //找到值 } log.Println("[GeeCache] Failed to get from peer", err) } } return g.getLocally(key) //回到本地处理 } func (g *Group) getFromPeer(peer *httpGetter, key string) (ByteView, error) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{b: bytes}, nil } func (g *Group) getLocally(key string) (ByteView, error) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{b: cloneByte(bytes)} g.mainCache.add(key, value) return value, nil } 新增 getFromPeer() 方法,使用httpGetter 访问远程节点,获取缓存值。修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()。 5. 测试 总结——缓存节点启动的流程 创建 Group 对象.(用于存储我们的缓存数据)启动缓存 http 服务.(创建 HTTPPool,添加节点信息,注册到缓存分组中)启动 API 服务.(用于与客户端进行交互)测试代码:
var db = map[string]string{ "Tom": "630", "Jack": "589", "Sam": "567", } func main() { var port int var api bool flag.IntVar(&port, "port", 8001, "Geecache server port") flag.BoolVar(&api, "api", false, "Start a api server?") flag.Parse() apiAddr := "http://localhost:9999" addrMap := map[int]string{ 8001: "http://localhost:8001", 8002: "http://localhost:8002", 8003: "http://localhost:8003", } var addrs []string for _, v := range addrMap { addrs = append(addrs, v) } gee := createGroup() if api { go startAPIServer(apiAddr, gee) } startCacheServer(addrMap[port], addrs, gee) time.Sleep(time.Second * 1000) } func createGroup() *cache.Group { return cache.NewGroup("scores", 2<<10, cache.GetterFunc(func(key string) ([]byte, error) { if v, ok := db[key]; ok { return []byte(v), nil } return nil, fmt.Errorf("%s not exit", key) })) } func startCacheServer(addr string, addrs []string, groups *cache.Group) { //HTTPPool是节点结合和HTTP服务端 peers := cache.NewHTTPPool(addr, cache.DefaultBasePath) peers.Set(addrs...) //添加节点 groups.RegisterPeers(peers) //注册节点集合 log.Println("geecache is running at", addr) http.ListenAndServe(addr[7:], peers) } func startAPIServer(apiAddr string, groups *cache.Group) { http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") view, err := groups.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/octet-stream") w.Write(view.ByteSlice()) }) log.Println("fontend server is running at", apiAddr) http.ListenAndServe(apiAddr[7:], nil) }为了方便,我们将启动的命令封装为一个 shell 脚本:
我们开启了三个节点(都是在同一个台机器上的,只是用不同端口来当做一个节点,进行区分)。
在端口8003的节点上开启APIServer,用户去访问时候,都是访问端口8003的那个节点。
#!/bin/bash #trap 命令用于在 shell 脚本退出时,删掉临时文件,结束在该shell脚本运行的后台程序 trap "rm server;kill 0" EXIT go build -o server ./server -port=8001 & ./server -port=8002 & ./server -port=8003 -api=1 & sleep 2 echo ">>> start test" curl "http://localhost:9999/api?key=Tom" & curl "http://localhost:9999/api?key=Tom" & curl "http://localhost:9999/api?key=Tom" & wait结果
测试的时候,我们并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。
但是会有一个问题,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。
三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这个问题下一次解决。
6.多节点的访问流程图
完整代码: github /liwook/Go-projects/tree/main/go-cache/5-multi-nodes
cache教程5.分布式节点的通信由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“cache教程5.分布式节点的通信”