1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.indi.reactor;
19
20 import java.io.IOException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.Selector;
24 import java.util.ArrayList;
25 import java.util.Date;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.PriorityQueue;
29 import java.util.Queue;
30 import java.util.Set;
31
32
33
34
35
36
37
38
39
40
41
42
43 public class Reactor {
44
45
46
47 protected Selector selector;
48
49
50
51 private final List<QueueHandler> queueHandlers;
52
53
54
55 private final Queue<TimerHandler> timerQueue;
56
57
58
59
60
61
62 public Reactor() throws IOException {
63 this.selector = Selector.open();
64 this.queueHandlers = new ArrayList<QueueHandler>();
65 this.timerQueue = new PriorityQueue<TimerHandler>();
66 }
67
68
69
70
71
72
73
74
75
76 public void register(EventHandler h, int ops) {
77 try {
78 h.channel().register(this.selector, ops, h);
79 } catch (ClosedChannelException e) {
80 unregister(h);
81 } catch (java.nio.channels.CancelledKeyException e) {
82 unregister(h);
83 }
84 }
85
86
87
88
89
90
91
92 public void register(QueueHandler h) {
93 this.queueHandlers.add(h);
94 }
95
96
97
98
99
100
101
102 public void register(TimerHandler h) {
103 this.timerQueue.add(h);
104 }
105
106
107
108
109
110
111 public void unregister(EventHandler h) {
112 Set keys = this.selector.keys();
113 Iterator it = keys.iterator();
114 while (it.hasNext()) {
115 SelectionKey k = (SelectionKey) (it.next());
116 if (k.attachment() == h) {
117 k.cancel();
118 }
119 }
120 h.onClose();
121 }
122
123
124
125
126
127
128
129
130
131
132 public int handleEvents(long timeout) throws IOException {
133 long selectTimeout = timeout;
134 long now = new Date().getTime();
135 TimerHandler nextTimer = this.timerQueue.peek();
136 if (!(nextTimer == null)) {
137 long next = nextTimer.getExpirationDate().getTime();
138 if ((next - now) < selectTimeout) {
139 selectTimeout = next - now;
140 if (selectTimeout <= 0) {
141 selectTimeout = 1;
142 }
143 }
144 }
145 this.selector.select(selectTimeout);
146
147 now = new Date().getTime();
148 while (null != (nextTimer = this.timerQueue.peek())) {
149 long next = nextTimer.getExpirationDate().getTime();
150 if ((next - now) < 0) {
151 nextTimer = this.timerQueue.poll();
152 nextTimer.onTimer();
153 } else {
154 break;
155 }
156 }
157 Set selected = this.selector.selectedKeys();
158 Iterator it = selected.iterator();
159 int count = 0;
160 while (it.hasNext()) {
161 SelectionKey k = (SelectionKey) (it.next());
162 EventHandler h = (EventHandler) k.attachment();
163 int ops = k.readyOps();
164 if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
165 h.onWrite();
166 count++;
167 }
168 if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
169 h.onRead();
170 count++;
171 }
172 if ((ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
173 h.onAccept();
174 count++;
175 }
176 if ((ops & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) {
177 h.onConnect();
178 count++;
179 }
180 }
181 for (QueueHandler h : this.queueHandlers) {
182 Queue<Object> q = h.getQueue();
183 Object o = q.poll();
184 if (o != null) {
185 h.onRead(o);
186 }
187 }
188 selected.clear();
189 return count;
190 }
191
192 void close() {
193 };
194 }