1 package org.indi.client;
2
3 import java.io.IOException;
4 import java.io.PipedInputStream;
5 import java.io.PipedOutputStream;
6 import java.nio.ByteBuffer;
7 import java.nio.channels.ReadableByteChannel;
8 import java.nio.channels.SelectableChannel;
9 import java.nio.channels.SelectionKey;
10 import java.util.Queue;
11
12 import javax.xml.parsers.ParserConfigurationException;
13 import javax.xml.parsers.SAXParser;
14 import javax.xml.parsers.SAXParserFactory;
15
16 import org.indi.reactor.OutputQueue;
17 import org.indi.reactor.Reactor;
18 import org.xml.sax.SAXException;
19
20 public class ServerHandler extends OutputQueue implements Runnable {
21 private final PipedInputStream fromServer;
22 private final PipedOutputStream toThread;
23 private final SAXParser parser;
24 private Queue<Object> threadToClientQueue;
25
26 public ServerHandler(Reactor r, SelectableChannel ch) throws IOException,
27 ParserConfigurationException, SAXException {
28 super(r, ch);
29 this.fromServer = new PipedInputStream();
30 this.toThread = new PipedOutputStream(this.fromServer);
31 this.toThread.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?> <doc>"
32 .getBytes());
33 this.parser = SAXParserFactory.newInstance().newSAXParser();
34 }
35
36 public void setQueue(Queue<Object> toClientQueue) {
37 this.threadToClientQueue = toClientQueue;
38 (new Thread(this)).start();
39 }
40
41 public void run() {
42 SaxHandler h = new SaxHandler(this.threadToClientQueue);
43
44 try {
45 this.parser.parse(this.fromServer, h);
46 } catch (SAXException e) {
47 e.printStackTrace();
48 } catch (IOException e) {
49 e.printStackTrace();
50 }
51 }
52
53 @Override
54 public void onRead() throws IOException {
55 ByteBuffer input = ByteBuffer.allocate(10000);
56 try {
57 ((ReadableByteChannel) this.channel).read(input);
58 } catch (IOException e) {
59 register(this.registeredOperations
60 - (SelectionKey.OP_READ & this.registeredOperations));
61 return;
62 }
63 input.flip();
64 if (input.position() == input.limit()) {
65 onClientDisconnected();
66 return;
67 }
68 if (input.position() == input.limit()) {
69 register(this.registeredOperations
70 - (SelectionKey.OP_READ & this.registeredOperations));
71 }
72 this.toThread.write(input.array(), input.position(), input.limit());
73 input.clear();
74 }
75
76 private void onClientDisconnected() {
77 this.reactor.unregister(this);
78 }
79 }