服务端非阻塞式通信(Java Server Socket Channel)



服务端非阻塞式通信(Java Server Socket Channel)

这是我调试了一天的代码,出现很多问题,报了很多错,最后才调试通过。我看网上给的一些例子,都有问题,要不就是太简单(现实中设计的通信协议是不会那么简单的)。请大家放到Eclipse里面运行一下看看吧,希望对大家有帮助。

本示例中实现了一个通信协议:访客端向座席端发送数据,以BYTE_BUFFER_CAPACITY为块大小发送数据。每个块的前8个字节(一个long型)代表块内实际包含的有效字节数,之后的字节都是潜在的有效字节数。当long型指定的有效字节数不足BYTE_BUFFER_CAPACITY-8时,认为本次通信的消息结束。

访客端每次接收9个字节作为一条消息,超过9字节的部分认为是下一条服务器的回复消息。

另外,还有一点需要注意的是,Java非阻塞式通信是“同步非阻塞式”的。比如说,在读消息的时候,如果不使用线程来处理,就会把当前操作卡住,无法继续处理其他客户端连接。

有时间我会不断更新这篇博文,争取把程序做成真实可用的二进制通信协议。

服务端代码:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* 服务器端非阻塞式IO(Java Server Socket Channel)
* @author Bright Lee
*/
public class ServerSocketChannelTest {

public static final int BYTE_BUFFER_CAPACITY = 1024;

public static void main(String[] args) {
Selector selector = null;
ServerSocketChannel serverSocketChannel = null;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
SelectionKey selectionKey = null;
Iterator<SelectionKey> it = null;
try {
if (selector.select() == 0) {
continue;
}
it = selector.selectedKeys().iterator();
while (it.hasNext()) {
selectionKey = it.next();
it.remove();
if (!selectionKey.isValid()) {
System.out.println(“valid!”);
it.remove();
continue;
}
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
}
if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
if (selectionKey.isWritable()) {
handleWrite(selectionKey);
}
if (selectionKey.isConnectable()) {
System.out.println(“connectable!”);
}
}
} catch (Exception e) {
e.printStackTrace();
try {
if (selectionKey != null) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.close();
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
close(selector, serverSocketChannel);
}
}

private static void handleAccept(SelectionKey selectionKey) throws IOException {
System.out.println(“acceptable!”);
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, new Attachment());
}

private static void handleRead(SelectionKey selectionKey) throws IOException {
//System.out.println(“readable!”);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

Attachment readObject = (Attachment) selectionKey.attachment();
ByteArrayOutputStream out = readObject.getByteArrayOutputStream();
ByteBuffer buffer = readObject.getByteBuffer();

socketChannel.read(buffer);

if (buffer.capacity() – buffer.remaining() < 8) {
return;
}

buffer.flip();
long size = IOUtil.readLong(buffer);
long limit = buffer.limit();
if (limit == BYTE_BUFFER_CAPACITY) {
for (int i = 0; i < size; i++) {
out.write(buffer.get());
}
buffer.clear();
if (size < BYTE_BUFFER_CAPACITY – 8) {
String message = new String(out.toByteArray(), “UTF-8″);
System.out.println(“接收到客户端发来的数据============>” + message);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE);
return;
}
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtil.writeLong(baos, size);
while (buffer.hasRemaining()) {
byte b = buffer.get();
baos.write(b);
}
buffer.clear();
buffer.put(baos.toByteArray());
}
}

private static void handleWrite(SelectionKey selectionKey) throws IOException {
//System.out.println(“writable!”);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

StringBuilder buf = new StringBuilder(9999);
for (int i = 0; i < 50; i++) {
buf.append(‘A’);
}
ByteBuffer buffer = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
buffer.put(buf.toString().getBytes(“UTF-8″));

buffer.flip();
int limit = buffer.limit();
int count = 0;

while (true) {
int size = socketChannel.write(buffer);
count += size;
if (count == limit) {
break;
}
}

Attachment attachment = new Attachment();
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, attachment);
}

private static void close(Selector selector, ServerSocketChannel serverSocketChannel) {
try {
if (selector != null) {
selector.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

客户端代码:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;


/**
* 客户端阻塞式IO(Java Socket)。
* @author Bright Lee
*/
public class SocketTest {

public static void main(String[] args) throws UnknownHostException, IOException {
while (true) {
Socket socket = null;
OutputStream out = null;
InputStream in = null;
try {
socket = new Socket(“127.0.0.1″, 6666);
socket.setKeepAlive(true);

out = socket.getOutputStream();
output(out);
out.flush();

in = socket.getInputStream();
byte[] bs = input(in);
String message = new String(bs, “UTF-8″);
System.out.println(“接收到服务器端发来的消息=================>”+message);
} catch (Exception e) {
e.printStackTrace();
break;
} finally {
close(out, in, socket);
}
}
}

private static void output(OutputStream out) throws IOException {
StringBuilder buf = new StringBuilder(99999);
for (int i = 0; i < 1234; i++) {
buf.append(“你好”).append(i).append(‘,’);
}
String s = buf.toString();
byte[] bs = s.getBytes(“UTF-8″);
writeByteArray(out, bs);
}

private static void writeByteArray(OutputStream out, byte[] bs) throws IOException {
if (bs == null) {
return;
}
int segmentSize = ServerSocketChannelTest.BYTE_BUFFER_CAPACITY – 8;
int segmentCount = bs.length / segmentSize + 1;
int lastSegmentSize = bs.length % segmentSize;
int index = -1;
int length = 0;
for (int i = 0; i < segmentCount; i++) {
if (i < segmentCount – 1) {
IOUtil.writeLong(out, segmentSize);
length = segmentSize;
index = i * segmentSize;
try {
out.write(bs, index, length);
} catch (Exception e) {
e.printStackTrace();
}
} else {
IOUtil.writeLong(out, lastSegmentSize);
length = lastSegmentSize;
index = i * segmentSize;
out.write(bs, index, length);
out.write(new byte[segmentSize - length]);
}
}
}

private static byte[] input(InputStream in) throws IOException {
ByteArrayOutputStream o = new ByteArrayOutputStream();
int value = -1;
int count = 0;
while ((value = in.read()) != -1) {
o.write(value);
count++;
if (count == 50) {
break;
}
}

byte[] bs2 = o.toByteArray();
return bs2;
}

private static void close(OutputStream out, InputStream in, Socket socket) {
try {
if (out != null) {
out.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (in != null) {
in.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (socket != null) {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
附件代码:

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;

/**
* 读数据专用对象。
* @author Bright Lee
*/
public class Attachment {

private ByteArrayOutputStream out = new ByteArrayOutputStream(9999);
private ByteBuffer buffer = ByteBuffer.allocate(ServerSocketChannelTest.BYTE_BUFFER_CAPACITY);
private long need = 0;
private long count = 0;

public Attachment() {
}

public ByteArrayOutputStream getByteArrayOutputStream() {
return out;
}

public ByteBuffer getByteBuffer() {
return buffer;
}

public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}

public long getNeed() {
return need;
}

public void setNeed(long need) {
this.need = need;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
IO实用程序类

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class IOUtil {

public static void writeLong(OutputStream out, long value) throws IOException {
byte[] bs = new byte[8];
for (int i = 0; i < bs.length; i++) {
value = value >> (i * 8);
byte b = (byte) (0x00FF & value);
bs[bs.length - i - 1] = b;
}
out.write(bs);
}

public static long readLong(ByteBuffer buffer) {
long value = 0;
for (int i = 0; i < 8; i++) {
byte b = buffer.get();
value = (value << 8) | (0xFF & b);;
}
return value;
}

}
———————
作者:look4liming
来源:CSDN
原文:https://blog.csdn.net/look4liming/article/details/86539896
版权声明:本文为博主原创文章,转载请附上博文链接!