问题描述
我有一个带有以下引导程序的带有Spring Boot 2.3.1
的Netty TCP服务器:
I have a Netty TCP Server with Spring Boot 2.3.1
with the following handler :
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final CarParkPermissionService permissionService;
private final Gson gson = new Gson();
private String remoteAddress;
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
remoteAddress = ctx.channel().remoteAddress().toString();
if (log.isDebugEnabled()) {
log.debug(remoteAddress);
}
ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("CLIENT_IP: {}", remoteAddress);
String stringMsg = (String) msg;
log.info("CLIENT_REQUEST: {}", stringMsg);
String lowerCaseMsg = stringMsg.toLowerCase();
if (RequestType.HEARTBEAT.containsName(lowerCaseMsg)) {
HeartbeatRequest heartbeatRequest = gson.fromJson(stringMsg, HeartbeatRequest.class);
log.debug("heartbeat request: {}", heartbeatRequest);
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
ctx.writeAndFlush(response + "\n\r");
}
}
请求DTO:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatRequest {
private String messageID;
}
响应DTO:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatResponse {
private String responseCode;
}
逻辑很简单.只有我必须知道客户端的IP地址.
Logic is quite simple. Only I have to know the IP address of the client.
我也需要对其进行测试.
I need to test it as well.
我一直在寻找许多资源来测试Netty的处理程序,例如
I have been looking for many resources for testing handlers for Netty, like
- Testing Netty with EmbeddedChannel
- How to unit test netty handler
但是,它对我不起作用.
However, it didn't work for me.
对于EmbeddedChannel,我遇到以下错误-Your remote address is embedded.
For EmbeddedChannel I have following error - Your remote address is embedded.
这是代码:
@ActiveProfiles("test")
@RunWith(MockitoJUnitRunner.class)
public class ProcessingHandlerTest_Embedded {
@Mock
private PermissionService permissionService;
private EmbeddedChannel embeddedChannel;
private final Gson gson = new Gson();
private ProcessingHandler processingHandler;
@Before
public void setUp() {
processingHandler = new ProcessingHandler(permissionService);
embeddedChannel = new EmbeddedChannel(processingHandler);
}
@Test
public void testHeartbeatMessage() {
// given
HeartbeatRequest heartbeatMessage = HeartbeatRequest.builder()
.messageID("heartbeat")
.build();
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
String request = gson.toJson(heartbeatMessage).concat("\r\n");
String expected = gson.toJson(response).concat("\r\n");
// when
embeddedChannel.writeInbound(request);
// then
Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());
}
}
输出:
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)
org.junit.ComparisonFailure:
<Click to see difference>
但是,我不知道如何对这种情况进行精确测试.
However, I don't know how to do exact testing for such a case.
以下是配置的摘录:
@Bean
@SneakyThrows
public InetSocketAddress tcpSocketAddress() {
// for now, hostname is: localhost/127.0.0.1:9090
return new InetSocketAddress("localhost", nettyProperties.getTcpPort());
// for real client devices: A05264/172.28.1.162:9090
// return new InetSocketAddress(InetAddress.getLocalHost(), nettyProperties.getTcpPort());
}
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the text line codec combination first
pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
如何使用客户端的IP地址测试处理程序?
推荐答案
两件事可能有帮助:
-
如果处理程序不可共享,请不要使用
@ChannelHandler.Sharable
进行注释.这可能会产生误导.从处理程序中删除不必要的状态.在您的情况下,应删除remoteAddress
成员变量,并确保Gson
和CarParkPermissionService
可以重用并且是线程安全的.
Do not annotate with
@ChannelHandler.Sharable
if your handler is NOT sharable. This can be misleading. Remove unnecessary state from handlers. In your case you should remove theremoteAddress
member variable and ensure thatGson
andCarParkPermissionService
can be reused and are thread-safe.
"Your remote address is embedded"
不是错误.它实际上是您的处理程序在出站通道上写的消息(请参见您的channelActive()
方法)
"Your remote address is embedded"
is NOT an error. It actually is the message written by your handler onto the outbound channel (cf. your channelActive()
method)
所以看起来它可以工作.
So it looks like it could work.
在这里,您的意见是关于第二点的一些说明.我的意思是:
Following your comments here are some clarifications regarding the second point. I mean that:
- 您使用
EmbeddedChannel
的代码几乎是正确的.预期结果(声明)只是个误会.
- your code making use of
EmbeddedChannel
is almost correct. There is just a misunderstanding on the expected results (assert).
要使单元测试成功,您只需:
To make the unit test successful, you just have either:
- 在
channelActive()
中注释此行:ctx.writeAndFlush("Your remote ...")
- 或轮询
testHeartbeatMessage()
中来自
Queue<Object> outboundMessages
的第二条消息- to comment this line in
channelActive()
:ctx.writeAndFlush("Your remote ...")
- or to poll the second message from
Queue<Object> outboundMessages
intestHeartbeatMessage()
实际上,当您这样做时:
Indeed, when you do this:
// when
embeddedChannel.writeInbound(request);
(1)实际上,您一次打开了一个频道,这会触发一个channelActive()
事件.您没有日志,但是我们看到变量remoteAddress
之后不为空,这意味着它是在channelActive()
方法中分配的.
(1) You actually open the channel once, which fires a channelActive()
event. You don't have a log in it but we see that the variable remoteAddress
is not null afterwards, meaning that it was assigned in the channelActive()
method.
(2)在channelActive()
方法的最后,您最终已经通过在通道管道上进行写操作来发送回消息,如以下行所示:
(2) At the end of the channelActive()
method, you eventually already send back a message by writing on the channel pipeline, as seen at this line:
ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
// In fact, this is the message you see in your failed assertion.
(3)然后,接收并读取由embeddedChannel.writeInbound(request)
编写的消息,这将引发channelRead()
事件.这次,我们在您的日志输出中看到了这一点:
(3) Then the message written by embeddedChannel.writeInbound(request)
is received and can be read, which fires a channelRead()
event. This time, we see this in your log output:
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)
(4),在channelRead(ChannelHandlerContext ctx, Object msg)
的结尾,您将发送 second 消息(预期的消息):
(4) At the end of channelRead(ChannelHandlerContext ctx, Object msg)
, you will then send a second message (the expected one):
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
ctx.writeAndFlush(response + "\n\r");
因此,使用下面的单元测试代码...
Therefore, with the following code of your unit test...
Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());
...您应该能够poll()
两条消息:
... you should be able to poll()
two messages:
-
"Your remote address is embedded"
-
"{ResponseCode":"ok"}
"Your remote address is embedded"
"{ResponseCode":"ok"}
这对您有意义吗?
这篇关于Netty如何测试使用客户端远程地址的处理程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!