1 package org.jmux.app.component;
2
3 import org.jmux.Log;
4 import org.jmux.app.App;
5 import org.jmux.app.component.config.Config;
6 import org.jmux.app.component.jmx.Access;
7 import org.jmux.app.component.jmx.JMX;
8
9 import java.io.IOException;
10 import java.io.ObjectInputStream;
11 import java.io.ObjectOutputStream;
12 import java.net.InetSocketAddress;
13 import java.net.ServerSocket;
14 import java.net.Socket;
15
16 /******************************************************************************
17 jmux - Java Modules Using XML
18 Copyright © 2006 jmux.org
19
20 This library is free software; you can redistribute it and/or
21 modify it under the terms of the GNU Lesser General Public
22 License as published by the Free Software Foundation; either
23 version 2.1 of the License, or (at your option) any later version.
24
25 This library is distributed in the hope that it will be useful,
26 but WITHOUT ANY WARRANTY; without even the implied warranty of
27 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
28 Lesser General Public License for more details.
29
30 You should have received a copy of the GNU Lesser General Public
31 License along with this library; if not, write to the Free Software
32 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
33 *****************************************************************************/
34
35 /***
36 * @author donaldw
37 */
38 public class ClusterProcess extends AbstractThreadedControl {
39 public static final int DEFAULT_PORT = 8080;
40
41 private InetSocketAddress clusterAddress;
42 private ServerSocket serverSocket;
43 private Socket clientSocket;
44 private boolean stopped = false;
45
46 @JMX(Access.READ)
47 private int read;
48
49 @JMX(Access.READ)
50 private int written;
51
52 private long lastReadTime;
53
54 public void configure(Config config, App app) {
55 super.configure(config, app);
56 this.clusterAddress = new InetSocketAddress("localhost", config.getInteger(DEFAULT_PORT, "port"));
57 }
58
59 private void connect() throws InterruptedException {
60 try {
61 serverSocket = new ServerSocket();
62 serverSocket.bind(clusterAddress);
63 log("Created cluster");
64 } catch (IOException e) {
65 serverSocket = null;
66 try {
67 clientSocket = new Socket();
68 clientSocket.connect(clusterAddress);
69 Thread thread = handleSocket(clientSocket, false);
70 log("Connected to cluster");
71 thread.join();
72 } catch (IOException e1) {
73 clientSocket = null;
74 logError("Error connecting to cluster");
75 }
76 }
77
78 if (serverSocket != null) {
79 try {
80 while (!Thread.interrupted()) {
81 Socket socket = serverSocket.accept();
82 handleSocket(socket, true);
83 }
84 } catch (IOException e) {
85 logError("Error accepting socket", e);
86 }
87 }
88 }
89
90
91
92
93
94 private Thread handleSocket(Socket socket, boolean serverSocket) throws IOException {
95 Thread retVal;
96 if (serverSocket) {
97 handleOutputSocket(socket);
98 retVal = handleInputSocket(socket);
99 } else {
100 retVal = handleInputSocket(socket);
101 handleOutputSocket(socket);
102 }
103
104 return retVal;
105 }
106
107 private Thread handleInputSocket(Socket socket) throws IOException {
108 final ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
109 Thread thread =
110 new Thread() {
111 public void run() {
112 try {
113 Object obj;
114 while (!this.isInterrupted()) {
115 obj = ois.readObject();
116 Log.info(obj.toString());
117 read++;
118
119 if (read % 10000 == 0) {
120 if (lastReadTime != 0) {
121 long thisReadTime = System.currentTimeMillis();
122
123 Log.info(10000000.0 / (thisReadTime - lastReadTime) + "msgs/sec%n");
124
125 lastReadTime = thisReadTime;
126 } else {
127 lastReadTime = System.currentTimeMillis();
128 }
129 }
130
131 }
132 } catch (Exception e) {
133 logDebug("Exception reading object", e);
134 } finally {
135 try {
136 if (ois != null) {
137 ois.close();
138 }
139 } catch (IOException e) {
140
141 }
142 }
143 }
144 };
145
146 thread.start();
147 thread.setName("ClusterProcess:handleInputSocket");
148
149 return thread;
150 }
151
152 private void handleOutputSocket(Socket socket) throws IOException {
153 final ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
154 Thread thread =
155 new Thread() {
156 public void run() {
157 try {
158 while (!this.isInterrupted()) {
159 oos.writeObject(new byte[1]);
160 if (++written % 1000 == 0) {
161 oos.reset();
162 }
163
164 }
165 } catch (Exception e) {
166 logDebug("Exception writing object", e);
167 } finally {
168 try {
169 if (oos != null) {
170 oos.close();
171 }
172 } catch (IOException e) {
173
174 }
175 }
176 }
177 };
178
179 thread.setName("ClusterProcess:handleOutputStream");
180 thread.start();
181 }
182
183
184 public void doStart() {
185 stopped = false;
186
187 while (!stopped) {
188 try {
189 connect();
190 } catch (InterruptedException e) {
191 stopped = true;
192 }
193 }
194 }
195
196 public void doStop() {
197 stopped = true;
198 try {
199 if (serverSocket != null) {
200 serverSocket.close();
201 }
202 } catch (IOException e) {
203 logError("Error closing serverSocket", e);
204 }
205 try {
206 if (clientSocket != null) {
207 clientSocket.close();
208 }
209 } catch (IOException e) {
210 logError("Error closing clientSocket", e);
211 }
212 }
213 }