名字
看到很多人问t-io怎么集群,正好自己也要实现集群,那就开干吧。记忆中隐约记得老谭说过内置了集群,就去翻了下代码,果然有!
第一步:集群需要配置一个redis,这里使用redisson,redis可以设置成集群或者单机模式
public class RedissonConfig {
private static final Logger logger = LoggerFactory.getLogger(RedissonConfig.class);
private static RedissonConfig redissonConfig;
private static RedissonClient redissonClient;
public static RedissonConfig newInstance() {
if (redissonConfig == null) {
synchronized (RedissonConfig.class) {
if (redissonConfig == null) {
init();
}
}
}
return redissonConfig;
}
private static void init() {
if (redissonConfig == null) {
String redisCluster = P.get("redisCluster");
if("0".equals(redisCluster)){
String host = P.get("redis.host");
if(StrUtil.isEmpty(host)) {
logger.error("redis服务器IP不能为空");
}
Integer port = P.getInt("redis.port");
if(port == null) {
logger.error("redis端口不能为空");
}
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setIdleConnectionTimeout(5000);
singleServerConfig.setConnectTimeout(5000);
singleServerConfig.setConnectionMinimumIdleSize(300);
singleServerConfig.setConnectionPoolSize(1000);
singleServerConfig.setTimeout(2000);
String password = P.get("redis.password");
if (!StrUtil.isEmpty(password)) {
singleServerConfig.setAddress("redis://"+host+":"+port)
.setPassword(AES256Utils.decrypt(password))
.setTimeout(P.getInt("redis.timeout"));
} else {
singleServerConfig.setAddress("redis://"+host+":"+port)
.setTimeout(P.getInt("redis.timeout"));
}
try {
redissonClient = Redisson.create(config);
redissonConfig = new RedissonConfig();
} catch (Exception e) {
logger.error("redis连接失败", e);
}
}else{
String[] hosts = P.get("redis.urls").split(",");
if (hosts==null||hosts.length<1) {
logger.error("redis服务器IP不能为空");
}
Config config = new Config();
ClusterServersConfig clusterServersConfig = config.useClusterServers();
clusterServersConfig.setIdleConnectionTimeout(5000);
clusterServersConfig.setConnectTimeout(5000);
clusterServersConfig.setReadMode(ReadMode.SLAVE);
clusterServersConfig.setMasterConnectionPoolSize(1000);
clusterServersConfig.setSlaveConnectionPoolSize(1000);
clusterServersConfig.setSubscriptionConnectionPoolSize(1000);
clusterServersConfig.setSubscriptionMode(SubscriptionMode.SLAVE);
for(String host:hosts){
clusterServersConfig.addNodeAddress("redis://"+host);
}
String password = P.get("redis.password");
if (!StrUtil.isEmpty(password)) {
clusterServersConfig.setPassword(AES256Utils.decrypt(password)).setTimeout(P.getInt("redis.timeout"));
} else {
clusterServersConfig.setTimeout(P.getInt("redis.timeout"));
}
try {
redissonClient = Redisson.create(config);
redissonConfig = new RedissonConfig();
} catch (Exception e) {
logger.error("redis连接失败", e);
}
}
}
}
public RedissonClient getRedissonClient() {
return redissonClient;
}
}
第二步:开启内置集群能力,t-io已经内置了消息集群分发,在ShowcaseWebsocketStarter类的ShowcaseWebsocketStarter方法中加入以下代码即可
RedissonTioClusterTopic redissonTioClusterTopic = new RedissonTioClusterTopic(ShowcaseServerConfig.CHANNEL, RedissonConfig.newInstance().getRedissonClient());
TioClusterConfig tioClusterConfig = new TioClusterConfig(redissonTioClusterTopic);
tioClusterConfig.setCluster4group(true);
serverTioConfig.setTioClusterConfig(tioClusterConfig);
pom.xml集成org.redisson,这里的版本是3.10.4
app.properties增加redis配置
redis.port=6379
redis.host=localhost
redis.password=
redis.timeout=3000
这样就激活了内置的消息集群分发,如果需要获取群组内的组员则需要自己实现此业务,这里附上我自己的实现,在ShowcaseWsMsgHandler.onAfterHandshaked中:
//获取当前人员信息
JSONObject user = getUser(channelContext.userid);
if (user == null) {
Tio.remove(channelContext,"User does not exist");
return;
}
String groupId = user.getString("groupId");
//绑定到群组,后面会有群发
Tio.bindGroup(channelContext, groupId);
//将当前用户映射到对应的组内
RSetMultimap<String, JSONObject> groupUserList = redissonClient.getSetMultimap(Const.GROUP_ID);
groupUserList.put(groupId, user);
//组装消息体,发送登录消息给群组内所有人
JSONObject messageObject = new JSONObject();
messageObject.put("fromId",user.getString("userId"));
messageObject.put("fromName",user.getString("realName"));
messageObject.put("userInfo",user);
messageObject.put("type",MessageType.SYSADMINMESSAGE.getCode());
messageObject.put("message", String.format(Const.userInTemplate, String.format(Const.FROMUSERNAME,user.getString("realName")),
Tio.groupCount(channelContext.tioConfig, groupId)));
messageObject.put("time", System.currentTimeMillis());
WsResponse wsResponse = WsResponse.fromText(messageObject.toJSONString(), ShowcaseServerConfig.CHARSET);
Tio.sendToGroup(channelContext.tioConfig, groupId, wsResponse);
//获取组内成员
Set<JSONObject> userInfos = groupUserList.get(groupId);
//发送消息给自己刷新列表
messageObject.remove("userInfo");
messageObject.put("type",MessageType.FLISHUSERLISTNOTIC.getCode());
messageObject.put("userInfos",userInfos);
messageObject.put("time", System.currentTimeMillis());
WsResponse wsResponse2 = WsResponse.fromText(messageObject.toJSONString(), ShowcaseServerConfig.CHARSET);
Tio.send(channelContext, wsResponse2);
log.info("{} a list of members sent to you: {}, userSize: {}", user.getString("userId"), userInfos.toString(), Tio.groupCount(channelContext.tioConfig, groupId));
至此,一个简单的消息集群就完成了!
最新评论 我的评论
t-io为本站提供HTTP、WebSocket、Socket、页面渲染与压缩等服务,nginx为本站提供反向代理服务
© 2017-2021 钛特云 版权所有 | 浙ICP备17032976号 | 浙公网安备 33011802002129号