1 /*
2 Copyright (C) 2007 Dirk Huenniger
3
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 package org.indi.reactor;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.SelectableChannel;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.WritableByteChannel;
25 import java.util.LinkedList;
26 import java.util.Queue;
27
28 /**
29 * An Output queue that can filled with as many ByteBuffers as requiered. It is
30 * registered with a reactor and associated with a SelectableChannel. It empties
31 * itself though the channel as the the channel is detected writable by the
32 * reactor.
33 *
34 * @author Dirk Hünniger
35 */
36 public class OutputQueue extends SimpleEventHandler implements
37 WritableByteChannel {
38 /**
39 * The queue containing the ByteBuffers to be written into the channel.
40 */
41 protected Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
42 /**
43 * true until close has been called. Used to implement the
44 * WritableChannel contract
45 */
46 private boolean open;
47 /**
48 * true if data if waiting in the queue to be writen into the channel.
49 */
50 protected boolean active;
51 /**
52 * false until onClose has been called. (usually to be called by the
53 * reactor)
54 */
55 protected boolean dead = false;
56
57 /**
58 * class constructor
59 *
60 * @param r
61 * the reactor to register with.
62 * @param ch
63 * the SelectableChannel to write the information to
64 */
65 public OutputQueue(Reactor r, SelectableChannel ch) {
66 super(r, ch);
67 }
68
69 /**
70 * Called by the reactor when channel is ready for writing.
71 */
72 @Override
73 public void onWrite() throws IOException {
74 ByteBuffer e = this.queue.peek();
75 if (e != null) {
76 int remaining = e.remaining();
77 int written = 0;
78 try {
79 written = ((WritableByteChannel) this.channel).write(e);
80 } catch (IOException e1) {
81 int temp = this.registeredOperations & SelectionKey.OP_WRITE;
82 register(this.registeredOperations - temp);
83 }
84 if (remaining == written) {
85 // we are done writing this buffer
86 this.queue.poll();
87 } else {
88 // there are still bytes to be written
89 e.position(e.position() + remaining - written);
90 }
91 }
92 if (null == this.queue.peek()) {
93 // if the queue is empty we dont want to write anymore
94 int temp = this.registeredOperations & SelectionKey.OP_WRITE;
95 register(this.registeredOperations - temp);
96 this.active = false;
97 }
98 }
99
100 /**
101 * Put a new ByteBuffer into the queue. It will be writen to the channel
102 * as soon as it becomes writable.
103 *
104 * @return the number of Byte put into the queue. -1 if failed.
105 */
106 public int write(ByteBuffer src) {
107 if (this.dead) {
108 return -1;
109 }
110 this.queue.add(src);
111 if (!this.active) {
112 register(SelectionKey.OP_WRITE | this.registeredOperations);
113 this.active = true;
114 }
115 return src.remaining();
116 }
117
118 /**
119 * return whether the channel is open. (part of the channel interface)
120 */
121 public boolean isOpen() {
122 return this.open;
123 }
124
125 /**
126 * close the channel. (part of the channel interface)
127 */
128 public void close() {
129 this.open = false;
130 }
131
132 /**
133 * Called by the reactor when the channel is deregistered
134 */
135 @Override
136 public void onClose() {
137 this.dead = true;
138 }
139 }