Go语言实现实时通讯

目录MelodyCentrifugoMelodyMelody是一个用Go语言编写的WebSocket服务器库,它提供了高性能且易于使用的接口来处理WebSocket连接。适用于构建实时应用如聊天室、在线协作工具等。安装Melody首先确保你的环境中已安装了Go语

目录


Melody

Melody 是一个用 Go 语言编写的 WebSocket 服务器库,它提供了高性能且易于使用的接口来处理 WebSocket 连接。适用于构建实时应用如聊天室、在线协作工具等。

安装 Melody

首先确保你的环境中已安装了 Go 语言环境。然后通过以下命令安装 Melody:

go get github.com/gorilla/websocket
go get github.com/dgrijalva/jwt-go
go get github.com/olahol/melody

基础使用

创建一个简单的 WebSocket 服务器:

package main

import (
    "log"
    "net/http"

    "github.com/olahol/melody"
)

func main() {
    m := melody.New()

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        m.HandleRequest(w, r)
    })

    go func() {
        for {
            conn, err := m.Accept()
            if err != nil {
                log.Println(err)
                continue
            }

            go func(conn *melody.Session) {
                defer conn.Close()
                for {
                    msg, err := conn.ReadMessage()
                    if err != nil {
                        log.Println(err)
                        break
                    }
                    log.Printf("recv: %s", msg)
                    conn.WriteMessage(msg)
                }
            }(conn)
        }
    }()

    log.Fatal(http.ListenAndServe(":8080", nil))
}

这段代码创建了一个监听 8080 端口的 WebSocket 服务器,每当有新连接时,都会读取消息并将其原样返回给客户端。

消息广播

在多用户环境中,我们通常希望将一条消息发送给所有连接的客户端:

m := melody.New()
...
go func() {
    for {
        conn, err := m.Accept()
        if err != nil {
            log.Println(err)
            continue
        }

        go func(conn *melody.Session) {
            defer conn.Close()
            for {
                msg, err := conn.ReadMessage()
                if err != nil {
                    log.Println(err)
                    break
                }
                log.Printf("recv: %s", msg)
                m.Broadcast([]byte(msg)) // 广播消息
            }
        }(conn)
    }
}()

处理错误与断开连接

在实际应用中,需要优雅地处理各种错误情况,并且当客户端断开连接时能够及时响应:

for {
    conn, err := m.Accept()
    if err != nil {
        log.Println(err)
        continue
    }

    go func(conn *melody.Session) {
        defer func() {
            if err := recover(); err != nil {
                log.Println("Recovered in f", err)
                conn.Close()
            }
        }()

        defer conn.Close()
        for {
            msg, err := conn.ReadMessage()
            if err != nil {
                log.Println(err)
                break
            }
            log.Printf("recv: %s", msg)
            m.Broadcast([]byte(msg))
        }
    }(conn)
}

客户端认证

为了保证安全性,可以添加基于 JWT 的身份验证机制:

import "github.com/dgrijalva/jwt-go"

...

func parseToken(tokenString string) (*jwt.Token, error) {
    token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
        if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
            return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
        }
        return []byte("my_secret_key"), nil
    })
    if err != nil || !token.Valid {
        return nil, errors.New("invalid token")
    }
    return token, nil
}

...

go func() {
    for {
        conn, err := m.Accept()
        if err != nil {
            log.Println(err)
            continue
        }

        tokenHeader := conn.Request().Header.Get("Authorization")
        token, err := parseToken(strings.Replace(tokenHeader, "Bearer ", "", 1))
        if err != nil {
            conn.CloseWithError(websocket.ClosePolicyViolation, "Invalid token")
            continue
        }

        go func(conn *melody.Session) {
            defer conn.Close()
            for {
                msg, err := conn.ReadMessage()
                if err != nil {
                    log.Println(err)
                    break
                }
                log.Printf("recv: %s", msg)
                m.Broadcast([]byte(msg))
            }
        }(conn)
    }
}()

Centrifugo

Centrifugo 是一个高性能的实时消息推送系统,支持 WebSocket 和 HTTP 长轮询等多种协议。它可以轻松集成到任何后端系统中,非常适合用于构建实时应用,如聊天室、在线协作工具等。

安装 Centrifugo

首先确保你的环境中已安装了 Go 语言环境。然后按照以下步骤安装 Centrifugo:

下载并安装 Centrifugo

wget https://github.com/centrifugal/centrifugo/releases/download/v4.1.1/centrifugo-4.1.1-linux-amd64.tar.gz
tar -xzf centrifugo-4.1.1-linux-amd64.tar.gz
mv centrifugo-4.1.1-linux-amd64/centrifugo /usr/local/bin/

启动 Centrifugo 服务:

centrifugo --config=centrifugo.json

配置文件 centrifugo.json

{
  "api": {
    "listen": ":8000",
    "cors_origins": ["*"]
  },
  "node": {
    "listen": ":9000"
  },
  "auth": {
    "secret": "my_secret_key"
  }
}

Go客户端库安装

安装 Go 客户端库:

go get github.com/centrifugal/centrifuge

基础使用

创建一个简单的 WebSocket 服务器:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"

    "github.com/centrifugal/centrifuge"
)

func main() {
    node, err := centrifuge.New(
        centrifuge.Config{
            API: centrifuge.APIConfig{
                Host: "localhost",
                Port: 8000,
            },
            Secret: "my_secret_key",
        },
    )
    if err != nil {
        log.Fatalf("error creating node: %v", err)
    }

    http.HandleFunc("/connect", func(w http.ResponseWriter, r *http.Request) {
        token, err := node.Token(&centrifuge.ClientInfo{
            User: "user1",
        })
        if err != nil {
            http.Error(w, "Error generating token", http.StatusInternalServerError)
            return
        }
        fmt.Fprintf(w, "Token: %s", token)
    })

    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        ctx := context.Background()
        h, _ := upgrader.Upgrade(w, r, nil)
        client, err := node.NewClient(ctx, h, centrifuge.ClientInfo{
            User: "user1",
        })
        if err != nil {
            http.Error(w, "Error creating client", http.StatusInternalServerError)
            return
        }

        go func() {
            defer client.Disconnect()
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    message, err := client.ReadMessage()
                    if err != nil {
                        log.Println("Error reading message:", err)
                        return
                    }
                    log.Printf("Received: %s", message)
                    client.WriteMessage(message)
                }
            }
        }()
    })

    upgrader := websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }

    log.Fatal(http.ListenAndServe(":8080", nil))
}

消息广播

在多用户环境中,我们通常希望将一条消息发送给所有连接的客户端:

node.On(centrifuge.ConnectedEvent, func(client *centrifuge.Client, presence *centrifuge.PresenceUpdate, ctx context.Context) {
    log.Printf("Client connected: %s", client.UserID())
})

node.On(centrifuge.DisconnectedEvent, func(client *centrifuge.Client, presence *centrifuge.PresenceUpdate, ctx context.Context) {
    log.Printf("Client disconnected: %s", client.UserID())
})

node.On(centrifuge.PublishEvent, func(client *centrifuge.Client, publish *centrifuge.Publish, ctx context.Context) error {
    log.Printf("Publish event: %s", publish.Data)
    return nil
})

node.On(centrifuge.SubscribeEvent, func(client *centrifuge.Client, subscribe *centrifuge.Subscribe, ctx context.Context) error {
    log.Printf("Subscribe event: %s", subscribe.Channel)
    return nil
})

node.On(centrifuge.UnsubscribeEvent, func(client *centrifuge.Client, unsubscribe *centrifuge.Unsubscribe, ctx context.Context) error {
    log.Printf("Unsubscribe event: %s", unsubscribe.Channel)
    return nil
})

客户端认证

为了保证安全性,可以添加基于 JWT 的身份验证机制:

import (
    "github.com/dgrijalva/jwt-go"
    "net/http"
    "strings"
)

func parseToken(tokenString string) (*jwt.Token, error) {
    token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
        if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
            return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
        }
        return []byte("my_secret_key"), nil
    })
    if err != nil || !token.Valid {
        return nil, errors.New("invalid token")
    }
    return token, nil
}

http.HandleFunc("/connect", func(w http.ResponseWriter, r *http.Request) {
    tokenHeader := r.Header.Get("Authorization")
    token, err := parseToken(strings.Replace(tokenHeader, "Bearer ", "", 1))
    if err != nil {
        http.Error(w, "Invalid token", http.StatusUnauthorized)
        return
    }

    token, err = node.Token(&centrifuge.ClientInfo{
        User: "user1",
    })
    if err != nil {
        http.Error(w, "Error generating token", http.StatusInternalServerError)
        return
    }
    fmt.Fprintf(w, "Token: %s", token)
})

实现频道订阅和发布

实现频道订阅和发布的功能:

http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
    channel := r.URL.Query().Get("channel")
    message := r.URL.Query().Get("message")

    err := node.Publish(context.Background(), channel, message)
    if err != nil {
        http.Error(w, "Error publishing message", http.StatusInternalServerError)
        return
    }
    w.WriteHeader(http.StatusOK)
    w.Write([]byte("Message published"))
})

http.HandleFunc("/subscribe", func(w http.ResponseWriter, r *http.Request) {
    channel := r.URL.Query().Get("channel")

    subscribe, err := node.Subscribe(context.Background(), channel, centrifuge.SubscribeConfig{})
    if err != nil {
        http.Error(w, "Error subscribing to channel", http.StatusInternalServerError)
        return
    }

    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            default:
                message, err := subscribe.ReadMessage()
                if err != nil {
                    log.Println("Error reading message:", err)
                    return
                }
                log.Printf("Received: %s", message)
            }
        }
    }()

    w.WriteHeader(http.StatusOK)
    w.Write([]byte("Subscribed to channel"))
})

错误处理和断开连接

优雅地处理各种错误情况,并且当客户端断开连接时能够及时响应:

node.On(centrifuge.DisconnectedEvent, func(client *centrifuge.Client, presence *centrifuge.PresenceUpdate, ctx context.Context) {
    log.Printf("Client disconnected: %s", client.UserID())
})

node.On(centrifuge.PublishEvent, func(client *centrifuge.Client, publish *centrifuge.Publish, ctx context.Context) error {
    log.Printf("Publish event: %s", publish.Data)
    return nil
})

node.On(centrifuge.SubscribeEvent, func(client *centrifuge.Client, subscribe *centrifuge.Subscribe, ctx context.Context) error {
    log.Printf("Subscribe event: %s", subscribe.Channel)
    return nil
})

node.On(centrifuge.UnsubscribeEvent, func(client *centrifuge.Client, unsubscribe *centrifuge.Unsubscribe, ctx context.Context) error {
    log.Printf("Unsubscribe event: %s", unsubscribe.Channel)
    return nil
})
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
天涯学馆
天涯学馆
0x9d6d...50d5
资深大厂程序员,12年开发经验,致力于探索前沿技术!