Files
jurong_circle_chat_black/src/service/chat.gateway.ts
Sun_sun 51052e59a2 2025-09-30
redis存储未读数据
2025-09-30 13:41:41 +08:00

162 lines
5.3 KiB
TypeScript

import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
ConnectedSocket, OnGatewayInit
} from '@nestjs/websockets';
import {Server, Socket} from 'socket.io';
import {ProgramGroupEntity, ProgramGroupMessageEntity, UsersEntity} from "../entity";
import {InjectRepository} from "@nestjs/typeorm";
import {Repository} from "typeorm";
import {RedisService} from "./redis.service";
@WebSocketGateway({
cors: {
origin: '*',
}
})
export class ChatGateway implements OnGatewayInit {
constructor(
@InjectRepository(ProgramGroupMessageEntity) private readonly messageRepository: Repository<ProgramGroupMessageEntity>,
@InjectRepository(ProgramGroupEntity) private readonly groupRepository: Repository<ProgramGroupEntity>,
@InjectRepository(UsersEntity) private readonly usersRepository: Repository<UsersEntity>,
private readonly redisService: RedisService
) {
}
@WebSocketServer()
server: Server;
afterInit(server: any) {
console.log("ChatGateway === 初始化成功")
}
/**
* 加入群组
* @param groupEntity
*/
@SubscribeMessage('addGroup')
async createGroup(@ConnectedSocket() client: Socket, @MessageBody() groupEntity: ProgramGroupEntity): Promise<any> {
// 查找群组信息
let group = await this.groupRepository.findOne({
where: {
groupId: groupEntity.groupId
}
})
if (group != null) {
client.join(group.groupId.toString());
console.log(`客户端 ${client.id} 加入连接`)
}
}
/**
* 接收客户端消息
* @param data
*/
@SubscribeMessage('clientMsg')
async clientMessage(@MessageBody() data: ProgramGroupMessageEntity): Promise<any> {
// 不在线用户
let userIds = await this.activeUser(data.groupId)
if (userIds.length != 0) {
// 保存未读消息 redis
console.log(userIds)
for (let i = 0; i < userIds.length; i++) {
let item = userIds[i]
// 先取再存
let redisData = await this.redisService.getValue(`${data.groupId}_${item}`)
if (redisData == null) {
this.redisService.setValue(`${data.groupId}_${item}`, JSON.stringify([data]))
} else {
let arr: any = []
arr = JSON.parse(redisData)
arr.push(data)
this.redisService.setValue(`${data.groupId}_${item}`, JSON.stringify(arr))
}
}
}
// 保存记录
let saveMessage = await this.messageRepository.save(data)
saveMessage.userInfo = await this.usersRepository.findOne({
where: {
id: saveMessage.createId
}
})
this.server.to(data.groupId.toString()).emit('serverMsg', saveMessage)
}
async activeUser(roomId: any) {
// 获取房间应该存在的用户
let roomUsers = await this.groupRepository.findOne({
where: {
groupId: roomId
}
})
let userIdArr = Object.values((this.server.engine as any).clients).map(item => {
return (item as any).request._query.userId;
});
userIdArr = Array.from(new Set(userIdArr));
if (roomUsers != null) {
if (userIdArr.includes(roomUsers.userId.toString())) {
userIdArr.splice(userIdArr.indexOf(roomUsers.userId.toString()), 1)
} else {
userIdArr.push(roomUsers.userId.toString())
}
if (userIdArr.includes(roomUsers.chargeId.toString())) {
userIdArr.splice(userIdArr.indexOf(roomUsers.chargeId.toString()), 1)
} else {
userIdArr.push(roomUsers.chargeId.toString())
}
if (userIdArr.includes(roomUsers.customerId.toString())) {
userIdArr.splice(userIdArr.indexOf(roomUsers.customerId.toString()), 1)
} else {
userIdArr.push(roomUsers.customerId.toString())
}
}
return userIdArr;
}
/**
* 断开连接
* @param client
*/
@SubscribeMessage('leaveGroup')
async handleLeaveGroup(@ConnectedSocket() client: Socket, @MessageBody() groupEntity: ProgramGroupEntity): Promise<any> {
client.leave(groupEntity.groupId.toString())
client.disconnect(true)
console.log(`客户端 ${client.id} 断开连接`)
// let userIdArr = Object.values((this.server.engine as any).clients).map(item => {
// return (item as any).request._query.userId;
// });
// console.log(userIdArr)
}
// 手动清理所有连接
cleanupConnections() {
const rooms = this.server.sockets.adapter.rooms;
const sids = this.server.sockets.adapter.sids;
console.log('Current rooms:', rooms.size);
console.log('Current sids:', sids.size);
// 强制断开所有连接
this.server.sockets.sockets.forEach((socket) => {
socket.disconnect(true);
});
}
@SubscribeMessage('cleanup')
handleCleanup(@ConnectedSocket() client: Socket) {
this.cleanupConnections();
client.emit('cleanup_complete');
}
}