View Javadoc

1   package org.jmux.app.component;
2   
3   import org.jmux.Log;
4   import org.jmux.app.App;
5   import org.jmux.app.component.config.Config;
6   import org.jmux.app.component.jmx.Access;
7   import org.jmux.app.component.jmx.JMX;
8   
9   import java.io.IOException;
10  import java.io.ObjectInputStream;
11  import java.io.ObjectOutputStream;
12  import java.net.InetSocketAddress;
13  import java.net.ServerSocket;
14  import java.net.Socket;
15  
16  /******************************************************************************
17   jmux - Java Modules Using XML
18   Copyright © 2006 jmux.org
19  
20   This library is free software; you can redistribute it and/or
21   modify it under the terms of the GNU Lesser General Public
22   License as published by the Free Software Foundation; either
23   version 2.1 of the License, or (at your option) any later version.
24  
25   This library is distributed in the hope that it will be useful,
26   but WITHOUT ANY WARRANTY; without even the implied warranty of
27   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
28   Lesser General Public License for more details.
29  
30   You should have received a copy of the GNU Lesser General Public
31   License along with this library; if not, write to the Free Software
32   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
33   *****************************************************************************/
34  
35  /***
36   * @author donaldw
37   */
38  public class ClusterProcess extends AbstractThreadedControl {
39      public static final int DEFAULT_PORT = 8080;
40  
41      private InetSocketAddress clusterAddress;
42      private ServerSocket serverSocket;
43      private Socket clientSocket;
44      private boolean stopped = false;
45  
46      @JMX(Access.READ)
47      private int read;
48  
49      @JMX(Access.READ)
50      private int written;
51  
52      private long lastReadTime;
53  
54      public void configure(Config config, App app) {
55          super.configure(config, app);
56          this.clusterAddress = new InetSocketAddress("localhost", config.getInteger(DEFAULT_PORT, "port"));
57      }
58  
59      private void connect() throws InterruptedException {
60          try {
61              serverSocket = new ServerSocket();
62              serverSocket.bind(clusterAddress);
63              log("Created cluster");
64          } catch (IOException e) {
65              serverSocket = null;
66              try {
67                  clientSocket = new Socket();
68                  clientSocket.connect(clusterAddress);
69                  Thread thread = handleSocket(clientSocket, false);
70                  log("Connected to cluster");
71                  thread.join();
72              } catch (IOException e1) {
73                  clientSocket = null;
74                  logError("Error connecting to cluster");
75              }
76          }
77  
78          if (serverSocket != null) {
79              try {
80                  while (!Thread.interrupted()) {
81                      Socket socket = serverSocket.accept();
82                      handleSocket(socket, true);
83                  }
84              } catch (IOException e) {
85                  logError("Error accepting socket", e);
86              }
87          }
88      }
89  
90      /*
91       * Must setup streams in different orders on each side of the socket, since the constructors
92       * are blocking
93       */
94      private Thread handleSocket(Socket socket, boolean serverSocket) throws IOException {
95          Thread retVal;
96          if (serverSocket) {
97              handleOutputSocket(socket);
98              retVal = handleInputSocket(socket);
99          } else {
100             retVal = handleInputSocket(socket);
101             handleOutputSocket(socket);
102         }
103 
104         return retVal;
105     }
106 
107     private Thread handleInputSocket(Socket socket) throws IOException {
108         final ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
109         Thread thread =
110                 new Thread() {
111                     public void run() {
112                         try {
113                             Object obj;
114                             while (!this.isInterrupted()) {
115                                 obj = ois.readObject();
116                                 Log.info(obj.toString());
117                                 read++;
118 
119                                 if (read % 10000 == 0) {
120                                     if (lastReadTime != 0) {
121                                         long thisReadTime = System.currentTimeMillis();
122 
123                                         Log.info(10000000.0 / (thisReadTime - lastReadTime) + "msgs/sec%n");
124 
125                                         lastReadTime = thisReadTime;
126                                     } else {
127                                         lastReadTime = System.currentTimeMillis();
128                                     }
129                                 }
130 
131                             }
132                         } catch (Exception e) {
133                             logDebug("Exception reading object", e);
134                         } finally {
135                             try {
136                                 if (ois != null) {
137                                     ois.close();
138                                 }
139                             } catch (IOException e) {
140                                 // oh well;
141                             }
142                         }
143                     }
144                 };
145 
146         thread.start();
147         thread.setName("ClusterProcess:handleInputSocket");
148 
149         return thread;
150     }
151 
152     private void handleOutputSocket(Socket socket) throws IOException {
153         final ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
154         Thread thread =
155                 new Thread() {
156                     public void run() {
157                         try {
158                             while (!this.isInterrupted()) {
159                                 oos.writeObject(new byte[1]);
160                                 if (++written % 1000 == 0) {
161                                     oos.reset();
162                                 }
163                                 //Thread.sleep(10);
164                             }
165                         } catch (Exception e) {
166                             logDebug("Exception writing object", e);
167                         } finally {
168                             try {
169                                 if (oos != null) {
170                                     oos.close();
171                                 }
172                             } catch (IOException e) {
173                                 // oh well;
174                             }
175                         }
176                     }
177                 };
178 
179         thread.setName("ClusterProcess:handleOutputStream");
180         thread.start();
181     }
182 
183 
184     public void doStart() {
185         stopped = false;
186 
187         while (!stopped) {
188             try {
189                 connect();
190             } catch (InterruptedException e) {
191                 stopped = true;
192             }
193         }
194     }
195 
196     public void doStop() {
197         stopped = true;
198         try {
199             if (serverSocket != null) {
200                 serverSocket.close();
201             }
202         } catch (IOException e) {
203             logError("Error closing serverSocket", e);
204         }
205         try {
206             if (clientSocket != null) {
207                 clientSocket.close();
208             }
209         } catch (IOException e) {
210             logError("Error closing clientSocket", e);
211         }
212     }
213 }