1 /* 2 Copyright (C) 2007 Dirk Huenniger 3 4 This library is free software; you can redistribute it and/or 5 modify it under the terms of the GNU Lesser General Public 6 License as published by the Free Software Foundation; either 7 version 2.1 of the License, or (at your option) any later version. 8 9 This library is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 Lesser General Public License for more details. 13 14 You should have received a copy of the GNU Lesser General Public 15 License along with this library; if not, write to the Free Software 16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 */ 18 package org.indi.reactor; 19 20 import java.io.IOException; 21 import java.nio.ByteBuffer; 22 import java.nio.channels.SelectableChannel; 23 import java.nio.channels.SelectionKey; 24 import java.nio.channels.WritableByteChannel; 25 import java.util.LinkedList; 26 import java.util.Queue; 27 28 /** 29 * An Output queue that can filled with as many ByteBuffers as requiered. It is 30 * registered with a reactor and associated with a SelectableChannel. It empties 31 * itself though the channel as the the channel is detected writable by the 32 * reactor. 33 * 34 * @author Dirk Hünniger 35 */ 36 public class OutputQueue extends SimpleEventHandler implements 37 WritableByteChannel { 38 /** 39 * The queue containing the ByteBuffers to be written into the channel. 40 */ 41 protected Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>(); 42 /** 43 * true until close has been called. Used to implement the 44 * WritableChannel contract 45 */ 46 private boolean open; 47 /** 48 * true if data if waiting in the queue to be writen into the channel. 49 */ 50 protected boolean active; 51 /** 52 * false until onClose has been called. (usually to be called by the 53 * reactor) 54 */ 55 protected boolean dead = false; 56 57 /** 58 * class constructor 59 * 60 * @param r 61 * the reactor to register with. 62 * @param ch 63 * the SelectableChannel to write the information to 64 */ 65 public OutputQueue(Reactor r, SelectableChannel ch) { 66 super(r, ch); 67 } 68 69 /** 70 * Called by the reactor when channel is ready for writing. 71 */ 72 @Override 73 public void onWrite() throws IOException { 74 ByteBuffer e = this.queue.peek(); 75 if (e != null) { 76 int remaining = e.remaining(); 77 int written = 0; 78 try { 79 written = ((WritableByteChannel) this.channel).write(e); 80 } catch (IOException e1) { 81 int temp = this.registeredOperations & SelectionKey.OP_WRITE; 82 register(this.registeredOperations - temp); 83 } 84 if (remaining == written) { 85 // we are done writing this buffer 86 this.queue.poll(); 87 } else { 88 // there are still bytes to be written 89 e.position(e.position() + remaining - written); 90 } 91 } 92 if (null == this.queue.peek()) { 93 // if the queue is empty we dont want to write anymore 94 int temp = this.registeredOperations & SelectionKey.OP_WRITE; 95 register(this.registeredOperations - temp); 96 this.active = false; 97 } 98 } 99 100 /** 101 * Put a new ByteBuffer into the queue. It will be writen to the channel 102 * as soon as it becomes writable. 103 * 104 * @return the number of Byte put into the queue. -1 if failed. 105 */ 106 public int write(ByteBuffer src) { 107 if (this.dead) { 108 return -1; 109 } 110 this.queue.add(src); 111 if (!this.active) { 112 register(SelectionKey.OP_WRITE | this.registeredOperations); 113 this.active = true; 114 } 115 return src.remaining(); 116 } 117 118 /** 119 * return whether the channel is open. (part of the channel interface) 120 */ 121 public boolean isOpen() { 122 return this.open; 123 } 124 125 /** 126 * close the channel. (part of the channel interface) 127 */ 128 public void close() { 129 this.open = false; 130 } 131 132 /** 133 * Called by the reactor when the channel is deregistered 134 */ 135 @Override 136 public void onClose() { 137 this.dead = true; 138 } 139 }