View Javadoc

1   package org.indi.client;
2   
3   import java.io.IOException;
4   import java.io.PipedInputStream;
5   import java.io.PipedOutputStream;
6   import java.nio.ByteBuffer;
7   import java.nio.channels.ReadableByteChannel;
8   import java.nio.channels.SelectableChannel;
9   import java.nio.channels.SelectionKey;
10  import java.util.Queue;
11  
12  import javax.xml.parsers.ParserConfigurationException;
13  import javax.xml.parsers.SAXParser;
14  import javax.xml.parsers.SAXParserFactory;
15  
16  import org.indi.reactor.OutputQueue;
17  import org.indi.reactor.Reactor;
18  import org.xml.sax.SAXException;
19  
20  public class ServerHandler extends OutputQueue implements Runnable {
21      private final PipedInputStream fromServer;
22      private final PipedOutputStream toThread;
23      private final SAXParser parser;
24      private Queue<Object> threadToClientQueue;
25  
26      public ServerHandler(Reactor r, SelectableChannel ch) throws IOException,
27  	    ParserConfigurationException, SAXException {
28  	super(r, ch);
29  	this.fromServer = new PipedInputStream();
30  	this.toThread = new PipedOutputStream(this.fromServer);
31  	this.toThread.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?> <doc>"
32  		.getBytes());
33  	this.parser = SAXParserFactory.newInstance().newSAXParser();
34      }
35  
36      public void setQueue(Queue<Object> toClientQueue) {
37  	this.threadToClientQueue = toClientQueue;
38  	(new Thread(this)).start();
39      }
40  
41      public void run() {
42  	SaxHandler h = new SaxHandler(this.threadToClientQueue);
43  
44  	try {
45  	    this.parser.parse(this.fromServer, h);
46  	} catch (SAXException e) {
47  	    e.printStackTrace();
48  	} catch (IOException e) {
49  	    e.printStackTrace();
50  	}
51      }
52  
53      @Override
54      public void onRead() throws IOException {
55  	ByteBuffer input = ByteBuffer.allocate(10000);
56  	try {
57  	    ((ReadableByteChannel) this.channel).read(input);
58  	} catch (IOException e) {
59  	    register(this.registeredOperations
60  		    - (SelectionKey.OP_READ & this.registeredOperations));
61  	    return;
62  	}
63  	input.flip();
64  	if (input.position() == input.limit()) {
65  	    onClientDisconnected();
66  	    return;
67  	}
68  	if (input.position() == input.limit()) {
69  	    register(this.registeredOperations
70  		    - (SelectionKey.OP_READ & this.registeredOperations));
71  	}
72  	this.toThread.write(input.array(), input.position(), input.limit());
73  	input.clear();
74      }
75  
76      private void onClientDisconnected() {
77  	this.reactor.unregister(this);
78      }
79  }