目录MelodyCentrifugoMelodyMelody是一个用Go语言编写的WebSocket服务器库,它提供了高性能且易于使用的接口来处理WebSocket连接。适用于构建实时应用如聊天室、在线协作工具等。安装Melody首先确保你的环境中已安装了Go语
Melody 是一个用 Go 语言编写的 WebSocket 服务器库,它提供了高性能且易于使用的接口来处理 WebSocket 连接。适用于构建实时应用如聊天室、在线协作工具等。
首先确保你的环境中已安装了 Go 语言环境。然后通过以下命令安装 Melody:
go get github.com/gorilla/websocket
go get github.com/dgrijalva/jwt-go
go get github.com/olahol/melody
创建一个简单的 WebSocket 服务器:
package main
import (
"log"
"net/http"
"github.com/olahol/melody"
)
func main() {
m := melody.New()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
m.HandleRequest(w, r)
})
go func() {
for {
conn, err := m.Accept()
if err != nil {
log.Println(err)
continue
}
go func(conn *melody.Session) {
defer conn.Close()
for {
msg, err := conn.ReadMessage()
if err != nil {
log.Println(err)
break
}
log.Printf("recv: %s", msg)
conn.WriteMessage(msg)
}
}(conn)
}
}()
log.Fatal(http.ListenAndServe(":8080", nil))
}
这段代码创建了一个监听 8080 端口的 WebSocket 服务器,每当有新连接时,都会读取消息并将其原样返回给客户端。
在多用户环境中,我们通常希望将一条消息发送给所有连接的客户端:
m := melody.New()
...
go func() {
for {
conn, err := m.Accept()
if err != nil {
log.Println(err)
continue
}
go func(conn *melody.Session) {
defer conn.Close()
for {
msg, err := conn.ReadMessage()
if err != nil {
log.Println(err)
break
}
log.Printf("recv: %s", msg)
m.Broadcast([]byte(msg)) // 广播消息
}
}(conn)
}
}()
在实际应用中,需要优雅地处理各种错误情况,并且当客户端断开连接时能够及时响应:
for {
conn, err := m.Accept()
if err != nil {
log.Println(err)
continue
}
go func(conn *melody.Session) {
defer func() {
if err := recover(); err != nil {
log.Println("Recovered in f", err)
conn.Close()
}
}()
defer conn.Close()
for {
msg, err := conn.ReadMessage()
if err != nil {
log.Println(err)
break
}
log.Printf("recv: %s", msg)
m.Broadcast([]byte(msg))
}
}(conn)
}
为了保证安全性,可以添加基于 JWT 的身份验证机制:
import "github.com/dgrijalva/jwt-go"
...
func parseToken(tokenString string) (*jwt.Token, error) {
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
return []byte("my_secret_key"), nil
})
if err != nil || !token.Valid {
return nil, errors.New("invalid token")
}
return token, nil
}
...
go func() {
for {
conn, err := m.Accept()
if err != nil {
log.Println(err)
continue
}
tokenHeader := conn.Request().Header.Get("Authorization")
token, err := parseToken(strings.Replace(tokenHeader, "Bearer ", "", 1))
if err != nil {
conn.CloseWithError(websocket.ClosePolicyViolation, "Invalid token")
continue
}
go func(conn *melody.Session) {
defer conn.Close()
for {
msg, err := conn.ReadMessage()
if err != nil {
log.Println(err)
break
}
log.Printf("recv: %s", msg)
m.Broadcast([]byte(msg))
}
}(conn)
}
}()
Centrifugo 是一个高性能的实时消息推送系统,支持 WebSocket 和 HTTP 长轮询等多种协议。它可以轻松集成到任何后端系统中,非常适合用于构建实时应用,如聊天室、在线协作工具等。
首先确保你的环境中已安装了 Go 语言环境。然后按照以下步骤安装 Centrifugo:
下载并安装 Centrifugo:
wget https://github.com/centrifugal/centrifugo/releases/download/v4.1.1/centrifugo-4.1.1-linux-amd64.tar.gz
tar -xzf centrifugo-4.1.1-linux-amd64.tar.gz
mv centrifugo-4.1.1-linux-amd64/centrifugo /usr/local/bin/
启动 Centrifugo
服务:
centrifugo --config=centrifugo.json
配置文件 centrifugo.json
:
{
"api": {
"listen": ":8000",
"cors_origins": ["*"]
},
"node": {
"listen": ":9000"
},
"auth": {
"secret": "my_secret_key"
}
}
安装 Go 客户端库:
go get github.com/centrifugal/centrifuge
创建一个简单的 WebSocket 服务器:
package main
import (
"context"
"fmt"
"log"
"net/http"
"github.com/centrifugal/centrifuge"
)
func main() {
node, err := centrifuge.New(
centrifuge.Config{
API: centrifuge.APIConfig{
Host: "localhost",
Port: 8000,
},
Secret: "my_secret_key",
},
)
if err != nil {
log.Fatalf("error creating node: %v", err)
}
http.HandleFunc("/connect", func(w http.ResponseWriter, r *http.Request) {
token, err := node.Token(¢rifuge.ClientInfo{
User: "user1",
})
if err != nil {
http.Error(w, "Error generating token", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Token: %s", token)
})
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
h, _ := upgrader.Upgrade(w, r, nil)
client, err := node.NewClient(ctx, h, centrifuge.ClientInfo{
User: "user1",
})
if err != nil {
http.Error(w, "Error creating client", http.StatusInternalServerError)
return
}
go func() {
defer client.Disconnect()
for {
select {
case <-ctx.Done():
return
default:
message, err := client.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return
}
log.Printf("Received: %s", message)
client.WriteMessage(message)
}
}
}()
})
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
log.Fatal(http.ListenAndServe(":8080", nil))
}
在多用户环境中,我们通常希望将一条消息发送给所有连接的客户端:
node.On(centrifuge.ConnectedEvent, func(client *centrifuge.Client, presence *centrifuge.PresenceUpdate, ctx context.Context) {
log.Printf("Client connected: %s", client.UserID())
})
node.On(centrifuge.DisconnectedEvent, func(client *centrifuge.Client, presence *centrifuge.PresenceUpdate, ctx context.Context) {
log.Printf("Client disconnected: %s", client.UserID())
})
node.On(centrifuge.PublishEvent, func(client *centrifuge.Client, publish *centrifuge.Publish, ctx context.Context) error {
log.Printf("Publish event: %s", publish.Data)
return nil
})
node.On(centrifuge.SubscribeEvent, func(client *centrifuge.Client, subscribe *centrifuge.Subscribe, ctx context.Context) error {
log.Printf("Subscribe event: %s", subscribe.Channel)
return nil
})
node.On(centrifuge.UnsubscribeEvent, func(client *centrifuge.Client, unsubscribe *centrifuge.Unsubscribe, ctx context.Context) error {
log.Printf("Unsubscribe event: %s", unsubscribe.Channel)
return nil
})
为了保证安全性,可以添加基于 JWT 的身份验证机制:
import (
"github.com/dgrijalva/jwt-go"
"net/http"
"strings"
)
func parseToken(tokenString string) (*jwt.Token, error) {
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
return []byte("my_secret_key"), nil
})
if err != nil || !token.Valid {
return nil, errors.New("invalid token")
}
return token, nil
}
http.HandleFunc("/connect", func(w http.ResponseWriter, r *http.Request) {
tokenHeader := r.Header.Get("Authorization")
token, err := parseToken(strings.Replace(tokenHeader, "Bearer ", "", 1))
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
token, err = node.Token(¢rifuge.ClientInfo{
User: "user1",
})
if err != nil {
http.Error(w, "Error generating token", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Token: %s", token)
})
实现频道订阅和发布的功能:
http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
channel := r.URL.Query().Get("channel")
message := r.URL.Query().Get("message")
err := node.Publish(context.Background(), channel, message)
if err != nil {
http.Error(w, "Error publishing message", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Message published"))
})
http.HandleFunc("/subscribe", func(w http.ResponseWriter, r *http.Request) {
channel := r.URL.Query().Get("channel")
subscribe, err := node.Subscribe(context.Background(), channel, centrifuge.SubscribeConfig{})
if err != nil {
http.Error(w, "Error subscribing to channel", http.StatusInternalServerError)
return
}
go func() {
for {
select {
case <-ctx.Done():
return
default:
message, err := subscribe.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return
}
log.Printf("Received: %s", message)
}
}
}()
w.WriteHeader(http.StatusOK)
w.Write([]byte("Subscribed to channel"))
})
优雅地处理各种错误情况,并且当客户端断开连接时能够及时响应:
node.On(centrifuge.DisconnectedEvent, func(client *centrifuge.Client, presence *centrifuge.PresenceUpdate, ctx context.Context) {
log.Printf("Client disconnected: %s", client.UserID())
})
node.On(centrifuge.PublishEvent, func(client *centrifuge.Client, publish *centrifuge.Publish, ctx context.Context) error {
log.Printf("Publish event: %s", publish.Data)
return nil
})
node.On(centrifuge.SubscribeEvent, func(client *centrifuge.Client, subscribe *centrifuge.Subscribe, ctx context.Context) error {
log.Printf("Subscribe event: %s", subscribe.Channel)
return nil
})
node.On(centrifuge.UnsubscribeEvent, func(client *centrifuge.Client, unsubscribe *centrifuge.Unsubscribe, ctx context.Context) error {
log.Printf("Unsubscribe event: %s", unsubscribe.Channel)
return nil
})
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!