广播的个人理解
可以发送到多台主机,但是主机的监听端口号要相同。
但是有也可以不同,比如广播的时候,同时发送给多个 host+port,应该也是可以的。
定义传输对象
/**
* @author binbin.hou
* @since 1.0.0
*/
public class MessageBean {
private String time;
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
@Override
public String toString() {
return "MessageBean{" +
"time='" + time + '\'' +
'}';
}
}
服务器
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class UdpServer {
public static void main(String[] args) throws InterruptedException {
Bootstrap serverBootstrap = new Bootstrap();
EventLoopGroup eventExecutors = new NioEventLoopGroup();
serverBootstrap.group(eventExecutors)
.channel(NioDatagramChannel.class)
// 支持广播
.option(ChannelOption.SO_BROADCAST, true)
// 添加编码器
.handler(new UdpServerEncoder());
// 循环广播内容-5S钟推送一次服务器时间
// 这个端口绑定的是0
Channel channel = serverBootstrap.bind(0).syncUninterruptibly().channel();
for(int i = 0; i < 10000; i++) {
MessageBean messageBean = new MessageBean();
messageBean.setTime(LocalDateTime.now().toString());
channel.writeAndFlush(messageBean);
System.out.println("[Server] broadcast: " + messageBean);
TimeUnit.SECONDS.sleep(5);
}
//close
channel.closeFuture().syncUninterruptibly();
eventExecutors.shutdownGracefully();
}
}
编码器
package com.github.houbb.netty.inaction.chap13.udp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* 编码器
* 1. 将服务器的对象==》DatagramPacket
* @author binbin.hou
* @since 1.0.0
*/
public class UdpServerEncoder extends MessageToMessageEncoder<MessageBean> {
/**
* 需要传输的远方地址
*/
private final InetSocketAddress remoteAddress;
public UdpServerEncoder() {
// 广播地址
this.remoteAddress = new InetSocketAddress("255.255.255.255", UdpClient.PORT);
}
@Override
protected void encode(ChannelHandlerContext ctx, MessageBean msg, List<Object> out) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer(msg.getTime(), StandardCharsets.UTF_8);
System.out.println("[Server] encode to " + remoteAddress.toString());
out.add(new DatagramPacket(byteBuf, remoteAddress));
}
}
这里做了两件事:
-
对象编码为 DatagramPacket
-
指定了广播的地址。
如果想发送给不同的 ip+port, 在 out.add(new DatagramPacket(byteBuf, remoteAddress));
这句话调整下即可。
客户端
引导类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class UdpClient {
/**
* 客户端端口号
*/
public static final int PORT = 8080;
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventExecutors = new NioEventLoopGroup();
bootstrap.group(eventExecutors)
.channel(NioDatagramChannel.class)
// 指定允许广播
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new UdpClientDecoder())
.addLast(new UdpClientHandler());
}
})
;
// 监听固定的端口
ChannelFuture channelFuture = bootstrap.bind(PORT).syncUninterruptibly();
channelFuture.channel().closeFuture().syncUninterruptibly();
eventExecutors.shutdownGracefully();
}
}
解码
将 udp 消息转换为对象。
package com.github.houbb.netty.inaction.chap13.udp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class UdpClientDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {
final String result = msg.content().toString(CharsetUtil.UTF_8);
System.out.println("[Client] decode msg: " + result);
MessageBean messageBean = new MessageBean();
messageBean.setTime(result);
out.add(messageBean);
}
}
handler 处理类
package com.github.houbb.netty.inaction.chap13.udp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class UdpClientHandler extends SimpleChannelInboundHandler<MessageBean> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageBean msg) throws Exception {
System.out.println("[Client] received from server: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
测试
服务器
[Server] broadcast: MessageBean{time='2019-05-03T10:26:33.607'}
[Server] encode to /255.255.255.255:8080
[Server] broadcast: MessageBean{time='2019-05-03T10:26:38.624'}
[Server] encode to /255.255.255.255:8080
....
客户端
[Client] decode msg: 2019-05-03T10:26:43.628
[Client] received from server: MessageBean{time='2019-05-03T10:26:43.628'}
[Client] decode msg: 2019-05-03T10:26:48.629
[Client] received from server: MessageBean{time='2019-05-03T10:26:48.629'}
...
参考资料
《Netty in Action》 P185