Java asynchronous programming

Writing Socket process communication program with asynchronous iostream

In Merlin, the application program interface packages for realizing asynchronous input and output mechanism are added: java.nio (a new input and output package, which defines many basic types of buffers) and java.nio.channels (channels and selectors, etc.). , for asynchronous input and output), and java.nio.charset (encoding and decoding of characters). A channel first registers the events it is interested in in the selector, and when the corresponding events occur, the selector notifies the registered channels through SelectionKey. Then the channel packages the information to be processed through the buffer, encodes/decodes it, and completes the input and output control.

Channel introduction:

This paper mainly introduces ServerSocketChannel and SocketChannel. Both are optional channels, which can work in synchronous and asynchronous modes respectively (note that optional here does not mean that you can choose two working modes, but you can selectively register the events you are interested in). You can use the channel. ConfigureBlocking (Boolean) sets its working mode. Compared with the previous version of API, ServerSocketChannel is equivalent to ServerSocket, and SocketChannel is equivalent to Socket. When the channel works in synchronous mode, the programming method is basically similar to the previous one, and the asynchronous working mode is mainly introduced here.

The so-called asynchronous I/O mechanism means that when dealing with I/O, I don't have to wait until I/O is finished before returning. Therefore, asynchronous synonyms are not blocked. On the server side, the ServerSocketChannel returns an instance serverChl through the static function open (). Then the channel calls serverChl.socket (). Bind () binds to the server port and calls register (selector sel, selectionkey). OP_ACCEPT) registers the OP_ACCEPT event in the selector (the ServerSocketChannel can only register the op _ accept event). When a customer requests a connection, the selector will inform the channel that there is a customer connection request, so as to carry out corresponding input and output control; On the client side, after the clientChl instance registers the events it is interested in (which can be the combination of op _ connect, op _ read and op _ write), it calls the client Chl. Connect (inetsocketaddress) connects to the server, and then processes it accordingly. Note that the connection here is asynchronous, that is, it will immediately return and continue to execute the following code.

Introduction of Selector and Selection Key:

The function of the selector is to put the events that the channel is interested in into the queue, instead of submitting them to the application immediately, waiting for the registered channel to request to handle these events. In other words, the selector will report the prepared channels at any time, and in the first-in first-out order. So, what does the selector report through? Select key. The function of the selection key is to indicate which channel is ready and what to do. You may immediately think that this must be an event of interest to the registered channel. Yes, for example, for the server-side serverChl, you can call key.isAcceptable () to inform the serverChl that there is a client connection request. The corresponding functions are: select key. isreadable(),selectionkey。 isreatable()。 Usually, events of interest are polled in a loop (please refer to the following code for details). If no channel registration event occurs in the selector, the call to Selector.select () will be blocked until the event occurs. In addition, you can also call selectNow () or select (long timeout). The former returns immediately, and returns 0 when there is no event; The latter waits for a timeout and returns. A selector can be used by up to 63 channels at the same time.

Application example:

The following is an example client/server program with asynchronous input/output mechanism-program list 1 (limited to space, only the server-side implementation is given, and readers can refer to the client code):

Program class diagram

Public class NBlockingServer {

int port = 8000

int buffer size = 1024;

Selector selector = null

serversocket channel server channel = null;

HashMap clientChannelMap = null// is used to store sockets and channels corresponding to each client connection.

Public NBlockingServer( int port)

this . clientchannelmap = new HashMap();

this.port = port

}

The public void initialize () throws IOException {

//Initialization: instantiate a selector respectively, and a server can select channels.

This.selector = selector.open ();

this . server channel = serversocketchannel . open();

this . server channel . configure blocking(false);

inet address localhost = inet address . get localhost();

InetSocketAddress isa = new InetSocketAddress(localhost,this . port);

this.serverChannel.socket()。 bind(isa); //Bind the socket to an available port of the server.

}

//finally release resources

The public void finalize () throws IOException {

this . server channel . close();

this.selector.close()。

}

//Decode the information read into the byte buffer

Common string decoding (ByteBuffer byteBuffer) throws.

Exception in character encoding {

charset charset = charset . forname(" ISO-8859- 1 ");

charset decoder decoder = charset . new decoder();

char buffer char buffer = decoder . decode(byte buffer);

String result = charbuffer.tostring ();

Return the result;

}

//Listen to the port and perform the corresponding operation when the channel is ready.

Public void portListening () throws IOException, InterruptedException {

//The server-side channel registers the OP_ACCEPT event.

selection key accept key = this . server channel . register(this . selector,

Select key. OP _ ACCEPT);

//When the registered event occurs, the return value of select () will be greater than 0.

while (acceptKey.selector()。 select()& gt; 0 ) {

System.out.println ("event occurs");

//Get all the prepared selection keys.

set readyKeys = this . selector . selected keys();

//Poll the selection key using iterator.

Iterator I = readykeys.iterator ();

And (I

Else if (key.isReadable()) {// If it is a channel read ready event,

System.out.println ("readable");

//Get the channel and socket corresponding to the selection key.

SelectableChannel nextReady =

(SelectableChannel)key . channel();

Socket Socket =(Socket)key . attachment();

//To handle this event, the handling method has been encapsulated in the ClientChInstance class.

this . readfromchannel(socket . get channel(),

(Client instance)

this . clientchannelmap . get(socket));

}

Else If(key. is write able()){// If it is a channel write ready event,

system . out . println(" writeable ");

//Post-processing after socket acquisition, as above.

Socket Socket =(Socket)key . attachment();

socket channel channel =(socket channel)

socket . get channel();

This.writeToChannel( channel, "This is from the server!" );

}

}

}

}

//Write operation to the channel

Public void write to channel (socket channel, string message)

Throw IOException {

byte buffer buf = byte buffer . wrap(message . getbytes());

int nbytes = channel . write(buf);

}

//Read operation on channel

public void readFromChannel(socket channel channel,clientchininstance client instance)

Throw IOException, InterruptedException {

byte buffer byte buffer = byte buffer . allocate(buffer size);

int nbytes = channel . read(byte buffer);

byte buffer . flip();

String result = this.decode (bytebuffer);

//When the client issues the "@exit" exit command, close its channel.

if(result . index of(" @ exit ")& gt; = 0 ) {

channel . close();

}

Otherwise {

client instance . append(result . tostring());

//After reading a line, perform the corresponding operation.

if(result . index of(" \ n ")& gt; = 0 ){

System.out.println ("client input"+result);

client instance . execute();

}

}

}

//This class encapsulates how to operate the client channel, which can be achieved by overloading the execute () method.

Public class ClientChInstance {

SocketChannel channel;

string buffer buffer = new string buffer();

Public client instance (SocketChannel channel)

This.channel = channel;

}

Public void execute () throws IOException {

String message = "This is the response after reading from the channel!" ;

writeToChannel( this.channel,message);

buffer = new string buffer();

}

//When a line is not finished, put the current word at the end of the buffer.

Public void append (string value) (

Buffer.append (value);

}

}

//main program

Public static void main( String[] args) {

NBlockingServer nbServer = new NBlockingServer(8000);

Try {

nbserver . initialize();

Catch (exception e) {

e . printstacktrace();

system . exit(- 1);

}

Try {

nbserver . port listening();

}

Catch (exception e)

e . printstacktrace();

}

}

}

Program list 1

Summary:

As can be seen from the above program segment, the server has completed the multi-client client/server mode without introducing redundant threads. Callback mode is used in this program. It should be noted that please do not mix the original I/O package with the newly added I/O package, because the two packages are incompatible for some reason. That is, when using the channel, please use the buffer to complete the input and output control. The program passed the telnet test successfully under Windows 2000 and J2SE 1.4.