服务端非阻塞式通信(Java Server Socket Channel)



服务端非阻塞式通信(Java Server Socket Channel)

这是我调试了一天的代码,出现很多问题,报了很多错,最后才调试通过。我看网上给的一些例子,都有问题,要不就是太简单(现实中设计的通信协议是不会那么简单的)。请大家放到Eclipse里面运行一下看看吧,希望对大家有帮助。

本示例中实现了一个通信协议:访客端向座席端发送数据,以BYTE_BUFFER_CAPACITY为块大小发送数据。每个块的前8个字节(一个long型)代表块内实际包含的有效字节数,之后的字节都是潜在的有效字节数。当long型指定的有效字节数不足BYTE_BUFFER_CAPACITY-8时,认为本次通信的消息结束。

访客端每次接收9个字节作为一条消息,超过9字节的部分认为是下一条服务器的回复消息。

另外,还有一点需要注意的是,Java非阻塞式通信是“同步非阻塞式”的。比如说,在读消息的时候,如果不使用线程来处理,就会把当前操作卡住,无法继续处理其他客户端连接。

有时间我会不断更新这篇博文,争取把程序做成真实可用的二进制通信协议。

服务端代码:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* 服务器端非阻塞式IO(Java Server Socket Channel)
* @author Bright Lee
*/
public class ServerSocketChannelTest {

public static final int BYTE_BUFFER_CAPACITY = 1024;

public static void main(String[] args) {
Selector selector = null;
ServerSocketChannel serverSocketChannel = null;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
SelectionKey selectionKey = null;
Iterator<SelectionKey> it = null;
try {
if (selector.select() == 0) {
continue;
}
it = selector.selectedKeys().iterator();
while (it.hasNext()) {
selectionKey = it.next();
it.remove();
if (!selectionKey.isValid()) {
System.out.println(“valid!”);
it.remove();
continue;
}
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
}
if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
if (selectionKey.isWritable()) {
handleWrite(selectionKey);
}
if (selectionKey.isConnectable()) {
System.out.println(“connectable!”);
}
}
} catch (Exception e) {
e.printStackTrace();
try {
if (selectionKey != null) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.close();
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
close(selector, serverSocketChannel);
}
}

private static void handleAccept(SelectionKey selectionKey) throws IOException {
System.out.println(“acceptable!”);
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, new Attachment());
}

private static void handleRead(SelectionKey selectionKey) throws IOException {
//System.out.println(“readable!”);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

Attachment readObject = (Attachment) selectionKey.attachment();
ByteArrayOutputStream out = readObject.getByteArrayOutputStream();
ByteBuffer buffer = readObject.getByteBuffer();

socketChannel.read(buffer);

if (buffer.capacity() – buffer.remaining() < 8) {
return;
}

buffer.flip();
long size = IOUtil.readLong(buffer);
long limit = buffer.limit();
if (limit == BYTE_BUFFER_CAPACITY) {
for (int i = 0; i < size; i++) {
out.write(buffer.get());
}
buffer.clear();
if (size < BYTE_BUFFER_CAPACITY – 8) {
String message = new String(out.toByteArray(), “UTF-8″);
System.out.println(“接收到客户端发来的数据============>” + message);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE);
return;
}
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtil.writeLong(baos, size);
while (buffer.hasRemaining()) {
byte b = buffer.get();
baos.write(b);
}
buffer.clear();
buffer.put(baos.toByteArray());
}
}

private static void handleWrite(SelectionKey selectionKey) throws IOException {
//System.out.println(“writable!”);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

StringBuilder buf = new StringBuilder(9999);
for (int i = 0; i < 50; i++) {
buf.append(‘A’);
}
ByteBuffer buffer = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
buffer.put(buf.toString().getBytes(“UTF-8″));

buffer.flip();
int limit = buffer.limit();
int count = 0;

while (true) {
int size = socketChannel.write(buffer);
count += size;
if (count == limit) {
break;
}
}

Attachment attachment = new Attachment();
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, attachment);
}

private static void close(Selector selector, ServerSocketChannel serverSocketChannel) {
try {
if (selector != null) {
selector.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
客户端代码:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

/**
* 客户端阻塞式IO(Java Socket)。
* @author Bright Lee
*/
public class SocketTest {


public static void main(String[] args) throws UnknownHostException, IOException {
while (true) {
Socket socket = null;
OutputStream out = null;
InputStream in = null;
try {
socket = new Socket(“127.0.0.1″, 6666);
socket.setKeepAlive(true);

out = socket.getOutputStream();
output(out);
out.flush();

in = socket.getInputStream();
byte[] bs = input(in);
String message = new String(bs, “UTF-8″);
System.out.println(“接收到服务器端发来的消息=================>”+message);
} catch (Exception e) {
e.printStackTrace();
break;
} finally {
close(out, in, socket);
}
}
}

private static void output(OutputStream out) throws IOException {
StringBuilder buf = new StringBuilder(99999);
for (int i = 0; i < 1234; i++) {
buf.append(“你好”).append(i).append(‘,’);
}
String s = buf.toString();
byte[] bs = s.getBytes(“UTF-8″);
writeByteArray(out, bs);
}

private static void writeByteArray(OutputStream out, byte[] bs) throws IOException {
if (bs == null) {
return;
}
int segmentSize = ServerSocketChannelTest.BYTE_BUFFER_CAPACITY – 8;
int segmentCount = bs.length / segmentSize + 1;
int lastSegmentSize = bs.length % segmentSize;
int index = -1;
int length = 0;
for (int i = 0; i < segmentCount; i++) {
if (i < segmentCount – 1) {
IOUtil.writeLong(out, segmentSize);
length = segmentSize;
index = i * segmentSize;
try {
out.write(bs, index, length);
} catch (Exception e) {
e.printStackTrace();
}
} else {
IOUtil.writeLong(out, lastSegmentSize);
length = lastSegmentSize;
index = i * segmentSize;
out.write(bs, index, length);
out.write(new byte[segmentSize - length]);
}
}
}

private static byte[] input(InputStream in) throws IOException {
ByteArrayOutputStream o = new ByteArrayOutputStream();
int value = -1;
int count = 0;
while ((value = in.read()) != -1) {
o.write(value);
count++;
if (count == 50) {
break;
}
}

byte[] bs2 = o.toByteArray();
return bs2;
}

private static void close(OutputStream out, InputStream in, Socket socket) {
try {
if (out != null) {
out.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (in != null) {
in.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (socket != null) {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
附件代码:

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;

/**
* 读数据专用对象。
* @author Bright Lee
*/
public class Attachment {

private ByteArrayOutputStream out = new ByteArrayOutputStream(9999);
private ByteBuffer buffer = ByteBuffer.allocate(ServerSocketChannelTest.BYTE_BUFFER_CAPACITY);
private long need = 0;
private long count = 0;

public Attachment() {
}

public ByteArrayOutputStream getByteArrayOutputStream() {
return out;
}

public ByteBuffer getByteBuffer() {
return buffer;
}

public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}

public long getNeed() {
return need;
}

public void setNeed(long need) {
this.need = need;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
IO实用程序类

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class IOUtil {

public static void writeLong(OutputStream out, long value) throws IOException {
byte[] bs = new byte[8];
for (int i = 0; i < bs.length; i++) {
value = value >> (i * 8);
byte b = (byte) (0x00FF & value);
bs[bs.length - i - 1] = b;
}
out.write(bs);
}

public static long readLong(ByteBuffer buffer) {
long value = 0;
for (int i = 0; i < 8; i++) {
byte b = buffer.get();
value = (value << 8) | (0xFF & b);;
}
return value;
}

}
———————
作者:look4liming
来源:CSDN
原文:https://blog.csdn.net/look4liming/article/details/86539896
版权声明:本文为博主原创文章,转载请附上博文链接!