View Javadoc

1   /*
2   Copyright (C) 2007 Dirk Huenniger
3   
4   This library is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Lesser General Public
6   License as published by the Free Software Foundation; either
7   version 2.1 of the License, or (at your option) any later version.
8   
9   This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  Lesser General Public License for more details.
13  
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17   */
18  package org.indi.reactor;
19  
20  import java.io.IOException;
21  import java.nio.channels.ClosedChannelException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.Selector;
24  import java.util.ArrayList;
25  import java.util.Date;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.PriorityQueue;
29  import java.util.Queue;
30  import java.util.Set;
31  
32  /**
33   * A simple implementation of the reactor pattern (similar to eventloop /event
34   * driven system / event demultiplexer) around the java newIO system. Just
35   * register event handlers with the reactor and call handle events frequntly.
36   * The callbackmethods of the event handlers will be called as soon as the
37   * corresponding events have been detected. See
38   * {@link http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf} for a basic
39   * introduction to the concept.
40   * 
41   * @author Dirk Hünniger
42   */
43  public class Reactor {
44      /**
45       * The Selector used by the Reactor to listen for IO events
46       */
47      protected Selector selector;
48      /**
49       * List of handlers for 1ueue events
50       */
51      private final List<QueueHandler> queueHandlers;
52      /**
53       * Priority queue of handlers for timer events
54       */
55      private final Queue<TimerHandler> timerQueue;
56  
57      /**
58       * Class constructor
59       * 
60       * @throws IOException
61       */
62      public Reactor() throws IOException {
63  	this.selector = Selector.open();
64  	this.queueHandlers = new ArrayList<QueueHandler>();
65  	this.timerQueue = new PriorityQueue<TimerHandler>();
66      }
67  
68      /**
69       * Registers a new IO-EventHandler with the reactor
70       * 
71       * @param h
72       *                EventHandler to be installed
73       * @param ops
74       *                Bitmask describing the IO Events to listen to.
75       */
76      public void register(EventHandler h, int ops) {
77  	try {
78  	    h.channel().register(this.selector, ops, h);
79  	} catch (ClosedChannelException e) {
80  	    unregister(h);
81  	} catch (java.nio.channels.CancelledKeyException e) {
82  	    unregister(h);
83  	}
84      }
85  
86      /**
87       * Register a new Queue-EventHandler with the reactor
88       * 
89       * @param h
90       *                QueueHandler to be regsitered
91       */
92      public void register(QueueHandler h) {
93  	this.queueHandlers.add(h);
94      }
95  
96      /**
97       * Register a new Timer-EventHanlder with the reactor
98       * 
99       * @param h
100      *                the TimeHandler to be registered
101      */
102     public void register(TimerHandler h) {
103 	this.timerQueue.add(h);
104     }
105 
106     /**
107      * Unregister a IO-EventHandler
108      * 
109      * @param h
110      */
111     public void unregister(EventHandler h) {
112 	Set keys = this.selector.keys();
113 	Iterator it = keys.iterator();
114 	while (it.hasNext()) {
115 	    SelectionKey k = (SelectionKey) (it.next());
116 	    if (k.attachment() == h) {
117 		k.cancel();
118 	    }
119 	}
120 	h.onClose();
121     }
122 
123     /**
124      * Call the methods frequently. In this methods all events are detected
125      * and dispached to the approriate EventHandlers
126      * 
127      * @param timeout
128      *                the maximum time to wait for events in miliseconds
129      * @return Number of IO events handled
130      * @throws IOException
131      */
132     public int handleEvents(long timeout) throws IOException {
133 	long selectTimeout = timeout;
134 	long now = new Date().getTime();
135 	TimerHandler nextTimer = this.timerQueue.peek();
136 	if (!(nextTimer == null)) {
137 	    long next = nextTimer.getExpirationDate().getTime();
138 	    if ((next - now) < selectTimeout) {
139 		selectTimeout = next - now;
140 		if (selectTimeout <= 0) {
141 		    selectTimeout = 1;
142 		}
143 	    }
144 	}
145 	this.selector.select(selectTimeout);
146 
147 	now = new Date().getTime();
148 	while (null != (nextTimer = this.timerQueue.peek())) {
149 	    long next = nextTimer.getExpirationDate().getTime();
150 	    if ((next - now) < 0) {
151 		nextTimer = this.timerQueue.poll();
152 		nextTimer.onTimer();
153 	    } else {
154 		break;
155 	    }
156 	}
157 	Set selected = this.selector.selectedKeys();
158 	Iterator it = selected.iterator();
159 	int count = 0;
160 	while (it.hasNext()) {
161 	    SelectionKey k = (SelectionKey) (it.next());
162 	    EventHandler h = (EventHandler) k.attachment();
163 	    int ops = k.readyOps();
164 	    if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
165 		h.onWrite();
166 		count++;
167 	    }
168 	    if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
169 		h.onRead();
170 		count++;
171 	    }
172 	    if ((ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
173 		h.onAccept();
174 		count++;
175 	    }
176 	    if ((ops & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) {
177 		h.onConnect();
178 		count++;
179 	    }
180 	}
181 	for (QueueHandler h : this.queueHandlers) {
182 	    Queue<Object> q = h.getQueue();
183 	    Object o = q.poll();
184 	    if (o != null) {
185 		h.onRead(o);
186 	    }
187 	}
188 	selected.clear();
189 	return count;
190     }
191 
192     void close() {
193     };
194 }