1 构建一个类似聊天室
每个计算模版相当于一个聊天房间,用户进入这个模版计算页面,就是进入了一个聊天房间,可以和服务器发送消息。例如填好参数,点击计算按钮,那么就是把计算参数作为消息发送给服务器,服务器根据情况,将结果发送给房间内所有用户,例如排队情况,前面还有多少人在等待计算。
2 并发消息
既然是websocket,还有多个房间(各种计算模版),除了math类的计算书模版,还有excel和ansys类的计算书模板,以后是不是还有midas之类的,所以要将消息里加锁。借助deepseek,它提供的解决方案是:解决方案:使用共享锁保护共享资源
核心思路是将 连接(*websocket.Conn) 和 对应的互斥锁 绑定成一个原子单元,确保所有操作都通过同一个锁进行同步。
// 将锁加入连接里
type SafeConn struct {
conn *websocket.Conn
mu sync.Mutex
}
// 新建连接
func NewSafeConn(conn *websocket.Conn) *SafeConn {
return &SafeConn{conn: conn}
}
// 线程安全的 WriteJSON 方法——根据不同分类,不同的消息类型,分别写一个对应的发送消息的方法
func (sc *SafeConn) WriteMathcalJSON(message MathcalMessage) error {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.conn.WriteJSON(message)
}
// 线程安全的 WriteJSON 方法——根据不同分类,不同的消息类型,分别写一个对应的发送消息的方法
func (sc *SafeConn) WriteMathJSON(message Message) error {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.conn.WriteJSON(message)
}
// 1.广播新接入连接信息(用户名)到页面
func sendfirstconnect() {
for {
msg := <-broadcast
for _, v := range clients {
for j, w := range v.Wscon {
if msg.MathTempleID == v.MathTempleID {
err := w.Connect.WriteMathJSON(msg)
if err != nil {
log.Printf("sendfirstconnect client.WriteJSON error: %v", err)
w.Connect.conn.Close()
Pollindex(clients, v.MathTempleID, j)
}
}
}
}
}
}
// 2.广播断开信息,分为排在前面的人退出,和后面的人退出
func sendendconnect() {
for {
msg := <-broadcast2
for _, v := range clients {
for j, w := range v.Wscon {
if msg.MathTempleID == v.MathTempleID {
// 不用给自己发
if msg.UserId != w.UserId {
err := w.Connect.WriteMathJSON(msg)
if err != nil {
log.Printf("sendendconnect client.WriteJSON error: %v", err)
w.Connect.conn.Close()
Pollindex(clients, v.MathTempleID, j)
}
}
}
}
}
}
}
// 3.中间计算过程信息,只发给本人
func sendmiddleconnect() {
for {
msg := <-broadcast3
for _, v := range clients {
for j, w := range v.Wscon {
if msg.MathTempleID == v.MathTempleID {
// 只给自己发
if msg.UserId == w.UserId {
err := w.Connect.WriteMathJSON(msg)
if err != nil {
log.Printf("sendmiddleconnect client.WriteJSON error: %v", err)
w.Connect.conn.Close()
Pollindex(clients, v.MathTempleID, j)
}
}
}
}
}
}
}
// 4.发送存活心跳——检查出来未连接的直接删除队列和关闭ws连接
func procLoop() {
msg := Message{Message: "heartbeat from mathserver"}
// 启动一个gouroutine发送心跳
go func() {
for {
time.Sleep(30 * time.Second) //30s发送一次
for _, v := range clients {
for j, w := range v.Wscon {
err := w.Connect.WriteMathJSON(msg)
if err != nil {
log.Printf("client.WriteJSON error: %v", err)
user, err := models.GetUserByUserId(w.UserId)
if err != nil {
logs.Error(err)
}
msg = Message{MathTempleID: v.MathTempleID, UserNickname: user.Nickname, Message: "断开啦~"}
broadcast2 <- msg //通知其他人,断线了
w.Connect.conn.Close() //端口客户端
Pollindex(clients, v.MathTempleID, j) //删除客户端
}
}
}
}
}()
}
3 并发安全的map存储计算软件实例
要想办法把某个模版打开的math软件这个实例存起来,其他用户进入这个模版计算页面时,得到这个math软件实例,进行计算,不像之前,要通过打开这个math软件来获得实例。
那么存储这个math软件实例,开始用struct,也是一个加锁的并发安全的结构体,发现不是最优的,因为结构体里要比对一个元素,需要循环,而map就方便了,直接key value,把模版的id作为key,value就是math软件的实例,一个指针(地址)对象
func (m *Map) CompareAndDelete(key, old any) bool
func (m *Map) CompareAndSwap(key, old, new any) bool
func (m *Map) Delete(key any)
func (m *Map) Load(key any) (any, bool)
func (m *Map) LoadAndDelete(key any) (any, bool)
func (m *Map) LoadOrStore(key, value any) (any, bool)
func (m *Map) Range(f func(key, value any) bool)
func (m *Map) Store(key, value any)
func (m *Map) Swap(key, value any) (any, bool)
package main
import (
"fmt"
"sync"
)
//声明sync.Map
var syncmap sync.Map
func main() {
//Store方法将键值对保存到sync.Map
syncmap.Store("zhangsan", 97)
syncmap.Store("lisi", 100)
syncmap.Store("wangmazi", 200)
// LoadOrStore key不存在
v, ok := syncmap.LoadOrStore(3, "three")
fmt.Println(v, ok) // three false
// LoadOrStore key存在
v, ok = syncmap.LoadOrStore(1, "thisOne")
fmt.Println(v, ok) // one ture
// Load方法获取sync.Map 键所对应的值
fmt.Println(syncmap.Load("lisi"))
// Delete方法键删除对应的键值对
syncmap.Delete("lisi")
var syncmap sync.Map
// LoadAndDelete key不存在
v, ok := syncmap.LoadAndDelete("xiaomi")
fmt.Println(v, ok) // <nil> false
syncmap.Store("xiaomi", "xiaomi")
// LoadAndDelete key存在
v, ok = syncmap.LoadAndDelete("xiaomi")
fmt.Println(v, ok) // xiaomi true
// Range遍历所有sync.Map中的键值对
syncmap.Range(func(k, v interface{}) bool {
fmt.Println(k, v)
return true
})
}
注意
声明 score,类型为 sync.Map,注意,sync.Map 不能使用 make 创建。
将一系列键值对保存到 sync.Map 中,sync.Map 将键和值以 interface{} 类型进行保存。
Range() 方法可以遍历 sync.Map,遍历需要提供一个匿名函数,参数为 k、v,类型为 interface{},每次 Range() 在遍历一个元素时,都会调用这个匿名函数把结果返回。
sync.Map 没有提供获取 map 数量的方法,替代方法是在获取 sync.Map 时遍历自行计算数量,sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。
Go 语言原生 map 并不是线程安全的,对它进行并发读写操作的时候,需要加锁。而 sync.map 则是一种并发安全的 map,在 Go 1.9 引入。
sync.map 是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。
sync.map 的零值是有效的,并且零值是一个空的 map。在第一次使用之后,不允许被拷贝。
有什么用
一般情况下解决并发读写 map 的思路是加一把大锁,或者把一个 map 分成若干个小 map,对 key 进行哈希,只操作相应的小 map。前者锁的粒度比较大,影响效率;后者实现起来比较复杂,容易出错。
而使用 sync.map 之后,对 map 的读写,不需要加锁。并且它通过空间换时间的方式,使用 read 和 dirty 两个 map 来进行读写分离,降低锁时间来提高效率。
如何使用
使用非常简单,和普通 map 相比,仅遍历的方式略有区别:
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 1. 写入
m.Store("qcrao", 18)
m.Store("stefno", 20)
// 2. 读取
age, _ := m.Load("qcrao")
fmt.Println(age.(int))
// 3. 遍历
m.Range(func(key, value interface{}) bool {
name := key.(string)
age := value.(int)
fmt.Println(name, age)
return true
})
// 4. 删除
m.Delete("qcrao")
age, ok := m.Load("qcrao")
fmt.Println(age, ok)
// 5. 读取或写入
m.LoadOrStore("stefno", 100)
age, _ = m.Load("stefno")
fmt.Println(age)
}
第 1 步,写入两个 k-v 对;
第 2 步,使用 Load 方法读取其中的一个 key;
第 3 步,遍历所有的 k-v 对,并打印出来;
第 4 步,删除其中的一个 key,再读这个 key,得到的就是 nil;
第 5 步,使用 LoadOrStore,尝试读取或写入 “Stefno”,因为这个 key 已经存在,因此写入不成功,并且读出原值。
程序输出:
18
stefno 20
qcrao 18
<nil> false
20
Golang中使用interface实现任意类型指针的灵活操作与转换技巧
要从interface{}中提取具体的指针类型,我们需要使用类型断言。类型断言的语法是value, ok := x.(Type),其中x是interface{}类型的变量,Type是我们希望提取的具体类型。
type MyStruct struct {
Name string
}
func main() {
var x interface{} = &MyStruct{Name: "Alice"}
if ptr, ok := x.(*MyStruct); ok {
fmt.Println(ptr.Name) // 输出: Alice
// 将 *MyStruct 转换为 *string
strPtr := &ptr.Name
fmt.Println(*strPtr) // 输出: Alice
} else {
fmt.Println("类型断言失败")
}
}
其他例子
Args := make(map[string]interface{})//可以存放string的key和任意类型的value
Args["id"] = 1
Args["plan_id"] = "第一个"
//fmt.Println(Args["id"]+1) 会报错(mismatched types interface {} and int)
fmt.Println(Args["id"].(int) +1)
results := make(map[string]interface {})
results :=
[
{...},
{
"children": [
{
"children": null,
"key": "6_label_temp",
"label": "label_temp",
"value": "6"
},
{
"children": null,
"key": "7_label_temp",
"label": "label_temp",
"value": "7"
}
],
"key": "5_label_temp",
"label": "label_temp",
"value": "5"
},
...
{...}
]
fmt.Println("results[1]: ", results[1])
//以下3个变量的数据类型都是string,其实也需要转换一下(results[1]["key"].(string)),不过直接打印的话就不用;
//如果是int类型,想要进行基础运算操作,就需要转换类型(例如results["num"].(int)+1)
fmt.Println("key: ", results[1]["key"])
fmt.Println("label: ", results[1]["label"])
fmt.Println("value, ", results[1]["value"])
//注意下面这个数据类型,要先把interface{}转换为[]map[string]interface{}
child := results[1]["children"].([]map[string]interface{})
fmt.Println("children, ", child[0]) //child是一个数组,此处取第0个元素
4 模拟多用户连接测试
同样依靠deepseek
const (
numUsers = 2 // 模拟用户数量
loginURL = "http://127.0.0.1:8081/loginpost?uname=qin.xc&pwd=****" // 登录接口
wsURL = "ws://127.0.0.1:8081/v1/math/wsmathcal/7" // WebSocket地址
)
var wg sync.WaitGroup
wg.Add(numUsers)
for i := 0; i < numUsers; i++ {
// 在main函数中添加连接间隔
time.Sleep(50 * time.Millisecond) // 控制每秒20连接
// userID := 9
go func(userID int) {
defer wg.Done()
// 1. 创建带Cookie管理的HTTP客户端
jar, _ := cookiejar.New(nil)
client := &http.Client{
Jar: jar,
Timeout: 10 * time.Second,
}
// url := "http://127.0.0.1:8081/loginpost"
// jsonMap := map[string]interface{}{"uname": "qin.xc", "pwd": "957873qxc"}
jsonMap := make(map[string]interface{})
jsonMap["uname"] = "qin.xc" //账号
jsonMap["pwd"] = "957873qxc" //密码
// 转换成postBody
bytesData, err := json.Marshal(jsonMap)
if err != nil {
fmt.Println(err.Error())
}
postBody := bytes.NewReader(bytesData)
method := "POST"
formData := url.Values{}
formData.Set("uname", "qin.xc")
formData.Set("pwd", "957873qxc")
req, err := http.NewRequest(method, loginURL, postBody)
if err != nil {
fmt.Println(err)
return
}
req.PostForm = formData // 放header里无效,放body里也无效,应该放路由里
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := client.Do(req)
if err != nil || resp.StatusCode != http.StatusOK {
fmt.Printf("用户%d登录失败: %v\n", userID, err)
return
}
defer resp.Body.Close()
cookies := resp.Cookies()
for _, cookie := range cookies {
fmt.Printf("Cookie: %s=%s\n", cookie.Name, cookie.Value)
}
// 3. 创建WebSocket连接(携带登录后的Cookie)
dialer := websocket.Dialer{
Jar: jar, // 复用Cookie
HandshakeTimeout: 5 * time.Second,
}
// 添加连接耗时统计
start := time.Now()
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
fmt.Printf("用户%d连接WebSocket失败: %v\n", userID, err)
return
}
defer conn.Close()
fmt.Printf("连接耗时: %v\n", time.Since(start))
// 在连接成功后记录状态
stats := struct {
sync.Mutex
connected int
failed int
}{}
stats.Lock()
stats.connected++
stats.Unlock()
// 在连接成功后启动心跳
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := conn.WriteControl(
websocket.PingMessage,
[]byte{},
time.Now().Add(5*time.Second),
); err != nil {
return
}
}
}
}()
// 单独goroutine处理接收消息
go func() {
for {
_, msg, err := conn.ReadMessage()
if err != nil {
break
}
fmt.Printf("收到消息: %s\n", msg)
}
}()
// 4. 模拟消息交互
// 发送消息
msg := []byte(fmt.Sprintf("Hello from user%d", userID))
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
fmt.Printf("用户%d发送消息失败: %v\n", userID, err)
return
}
// 接收响应(可选)
_, respMsg, err := conn.ReadMessage()
if err != nil {
fmt.Printf("用户%d接收消息失败: %v\n", userID, err)
return
}
fmt.Printf("用户%d收到响应: %s\n", userID, respMsg)
}(i)
}
wg.Wait()
fmt.Println("所有WebSocket连接测试完成")
最后编辑:秦晓川 更新时间:2025-03-03 22:01