View Javadoc

1   /*
2   Copyright (C) 2007 Dirk Huenniger
3   
4   This library is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Lesser General Public
6   License as published by the Free Software Foundation; either
7   version 2.1 of the License, or (at your option) any later version.
8   
9   This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  Lesser General Public License for more details.
13  
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17   */
18  package org.indi.reactor;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.SelectableChannel;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.WritableByteChannel;
25  import java.util.LinkedList;
26  import java.util.Queue;
27  
28  /**
29   * An Output queue that can filled with as many ByteBuffers as requiered. It is
30   * registered with a reactor and associated with a SelectableChannel. It empties
31   * itself though the channel as the the channel is detected writable by the
32   * reactor.
33   * 
34   * @author Dirk Hünniger
35   */
36  public class OutputQueue extends SimpleEventHandler implements
37  	WritableByteChannel {
38      /**
39       * The queue containing the ByteBuffers to be written into the channel.
40       */
41      protected Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
42      /**
43       * true until close has been called. Used to implement the
44       * WritableChannel contract
45       */
46      private boolean open;
47      /**
48       * true if data if waiting in the queue to be writen into the channel.
49       */
50      protected boolean active;
51      /**
52       * false until onClose has been called. (usually to be called by the
53       * reactor)
54       */
55      protected boolean dead = false;
56  
57      /**
58       * class constructor
59       * 
60       * @param r
61       *                the reactor to register with.
62       * @param ch
63       *                the SelectableChannel to write the information to
64       */
65      public OutputQueue(Reactor r, SelectableChannel ch) {
66  	super(r, ch);
67      }
68  
69      /**
70       * Called by the reactor when channel is ready for writing.
71       */
72      @Override
73      public void onWrite() throws IOException {
74  	ByteBuffer e = this.queue.peek();
75  	if (e != null) {
76  	    int remaining = e.remaining();
77  	    int written = 0;
78  	    try {
79  		written = ((WritableByteChannel) this.channel).write(e);
80  	    } catch (IOException e1) {
81  		int temp = this.registeredOperations & SelectionKey.OP_WRITE;
82  		register(this.registeredOperations - temp);
83  	    }
84  	    if (remaining == written) {
85  		// we are done writing this buffer
86  		this.queue.poll();
87  	    } else {
88  		// there are still bytes to be written
89  		e.position(e.position() + remaining - written);
90  	    }
91  	}
92  	if (null == this.queue.peek()) {
93  	    // if the queue is empty we dont want to write anymore
94  	    int temp = this.registeredOperations & SelectionKey.OP_WRITE;
95  	    register(this.registeredOperations - temp);
96  	    this.active = false;
97  	}
98      }
99  
100     /**
101      * Put a new ByteBuffer into the queue. It will be writen to the channel
102      * as soon as it becomes writable.
103      * 
104      * @return the number of Byte put into the queue. -1 if failed.
105      */
106     public int write(ByteBuffer src) {
107 	if (this.dead) {
108 	    return -1;
109 	}
110 	this.queue.add(src);
111 	if (!this.active) {
112 	    register(SelectionKey.OP_WRITE | this.registeredOperations);
113 	    this.active = true;
114 	}
115 	return src.remaining();
116     }
117 
118     /**
119      * return whether the channel is open. (part of the channel interface)
120      */
121     public boolean isOpen() {
122 	return this.open;
123     }
124 
125     /**
126      * close the channel. (part of the channel interface)
127      */
128     public void close() {
129 	this.open = false;
130     }
131 
132     /**
133      * Called by the reactor when the channel is deregistered
134      */
135     @Override
136     public void onClose() {
137 	this.dead = true;
138     }
139 }