Forums > Java

Trying to create a mxj socket listener

October 30, 2007 | 8:40 pm

First the SBOD then crash. Is this because of a threading issue? I basically created a while(true){listen to the socket} method. Any advice? Anywhere I should look in particular?


October 30, 2007 | 8:53 pm

On 30 Oct 2007, at 20:40, Timothy Dalbey wrote:

> First the SBOD then crash. Is this because of a threading issue?

Probably: I’ve written socket listeners for MXJ without any problems.

Here’s a hack to an O’Reilly demo I did some years ago to demo how to
build a TCP/IP server for MXJ. (Apologies to everyone else for the
noise…)

// This example is from _Java Examples in a Nutshell_. (http://
http://www.oreilly.com)
// Copyright (c) 1997 by David Flanagan
// This example is provided WITHOUT ANY WARRANTY either expressed or
implied.
// You may study, use, modify, and distribute it for non-commercial
purposes.
// For any commercial use, see http://www.davidflanagan.com/javaexamples

package net.loadbang.mxj.demo;

import java.io.*;
import java.net.*;
import java.util.*;
import com.cycling74.max.*;

/**
* This class is a generic framework for a flexible, multi-threaded
server.
* It listens on any number of specified ports, and, when it receives a
* connection on a port, passes input and output streams to a
specified Service
* object which provides the actual service. It can limit the number of
* concurrent connections, and logs activity to a specified stream.
**/
public class Server extends MaxObject {
/**
* A main() method for running the server as a standalone program.
* The command-line arguments to the program should be pairs of
servicenames
* and port numbers. For each pair, the program will dynamically
load the
* named Service class, instantiate it, and tell the server to
provide that
* Service on the specified port. The special -control argument
should be
* followed by a password and port, and will start special server
control
* service running on the specified port, protected by the specified
* password.
**/

/*
public static void main(String[] args) {
try {
if (args.length < 2) // Check number of arguments
throw new IllegalArgumentException("Must start at least one
service");

// Create a Server object that uses standard out as its log and
// has a limit of ten concurrent connections at once.
Server s = new Server(System.out, 10);

// Parse the argument list
int i = 0;
while(i < args.length) {
if (args[i].equals("-control")) { // Handle the -control
argument
i++;
String password = args[i++];
int port = Integer.parseInt(args[i++]);
s.addService(new Control(s, password), port); // add
control service
}
else {
// Otherwise start a named service on the specified port.
// Dynamically load and instantiate a class that
implements Service.
String serviceName = args[i++];
Class serviceClass = Class.forName(serviceName); //
dynamic load
Service service = (Service)serviceClass.newInstance(); //
instantiate
int port = Integer.parseInt(args[i++]);
s.addService(service, port);
}
}
}
catch (Exception e) { // Display a message if anything goes wrong
System.err.println("Server: " + e);
System.err.println("Usage: java Server [-control

] " +
"[

... ]");
System.exit(1);
}
}*/

public Server() {
createInfoOutlet(false);
declareTypedIO("A","M");
init(null, 10);
}

public void serve(String name, int port)
throws ClassNotFoundException,
InstantiationException,
IllegalAccessException,
IOException
{
Class serviceClass = Class.forName(name); // dynamic load
Service service = (Service) serviceClass.newInstance(); //
instantiate
addService(service, port);
}

// This is the state for the server
ConnectionManager connectionManager; // The ConnectionManager object
Hashtable services; // The current
services and their ports
ThreadGroup threadGroup; // The threadgroup for all
our threads
PrintWriter logStream; // Where we send our logging
output to

/**
* This is the Server() constructor. It must be passed a stream
* to send log output to (may be null), and the limit on the
number of
* concurrent connections. It creates and starts a ConnectionManager
* thread which enforces this limit on connections.
**/
private void init(OutputStream logStream, int maxConnections) {
log("Starting server");
threadGroup = new ThreadGroup("Server");
connectionManager = new ConnectionManager(threadGroup,
maxConnections);
connectionManager.start();
services = new Hashtable();
}

/** Write the specified string to the log */
protected synchronized void log(String s) {
outlet(0, "[" + s + "]");
}

/** Write the specified object to the log */
protected void log(Object o) { log(o.toString()); }

/**
* This method makes the server start providing a new service.
* It runs the specified Service object on the specified port.
**/
public void addService(Service service, int port) throws
IOException {
Integer key = new Integer(port); // the hashtable key
// Check whether a service is already on that port
if (services.get(key) != null)
throw new IllegalArgumentException("Port " + port + " already
in use.");
// Create a Listener object to listen for connections on the port
Listener listener = new Listener(threadGroup, port, service);
// Store it in the hashtable
services.put(key, listener);
// Log it
log("Starting service " + service.getClass().getName() +
" on port " + port);
// Start the listener running.
listener.start();
}

/**
* This method makes the server stop providing a service on a port.
* It does not terminate any pending connections to that service,
merely
* causes the server to stop accepting new connections
**/
public void removeService(int port) {
Integer key = new Integer(port); // hashtable key
// Look up the Listener object for the port in the hashtable of
services
final Listener listener = (Listener) services.get(key);
if (listener == null) return;
// Ask the listener to stop
listener.pleaseStop();
// Remove it from the hashtable
services.remove(key);
// And log it.
log("Stopping service " + listener.service.getClass().getName() +
" on port " + port);
}

/**
* This nested Thread subclass is a "listener". It listens for
connections
* on a specified port (using a ServerSocket) and when it gets a
connection
* request, it calls a method of the ConnectionManager to accept
(or reject)
* the connection. There is one Listener for each Service being
provided
* by the Server. The Listener passes the Server object to the
* ConnectionManager, but doesn’t do anything with it itself.
*/
public class Listener extends Thread {
ServerSocket listen_socket; // The socket we listen for
connections on
int port; // The port we’re listening on
Service service; // The service to provide on that
port
boolean stop = false; // Whether we’ve been asked to stop

/**
* The Listener constructor creates a thread for itself in the
specified
* threadgroup. It creates a ServerSocket to listen for
connections
* on the specified port. It arranges for the ServerSocket to be
* interruptible, so that services can be removed from the server.
**/
public Listener(ThreadGroup group, int port, Service service)
throws IOException {
super(group, "Listener:" + port);
listen_socket = new ServerSocket(port);
// give the socket a non-zero timeout so accept() can be
interrupted
listen_socket.setSoTimeout(600000);
this.port = port;
this.service = service;
}

/** This is the nice way to get a Listener to stop accepting
connections */
public void pleaseStop() {
this.stop = true; // set the stop flag
this.interrupt(); // and make the accept() call stop blocking
}

/**
* A Listener is a Thread, and this is its body.
* Wait for connection requests, accept them, and pass the
socket on
* to the ConnectionManager object of this server
**/
@Override
public void run() {
while(!stop) { // loop until we’re asked to stop.
try {
Socket client = listen_socket.accept();
connectionManager.addConnection(client, service);
}
catch (InterruptedIOException e) {}
catch (IOException e) {log(e);}
}
}
}
/**
* This nested class manages client connections for the server.
* It maintains a list of current connections and enforces the
* maximum connection limit. It creates a separate thread (one per
* server) that sits around and wait()s to be notify()’d that a
connection
* has terminated. When this happens, it updates the list of
connections.
**/
public class ConnectionManager extends Thread {
int maxConnections; // The maximum number of allowed connections
Vector connections; // The current list of connections

/**
* Create a ConnectionManager in the specified thread group to
enforce
* the specified maximum connection limit. Make it a daemon
thread so
* the interpreter won’t wait around for it to exit.
**/
public ConnectionManager(ThreadGroup group, int maxConnections) {
super(group, "ConnectionManager");
this.setDaemon(true);
this.maxConnections = maxConnections;
connections = new Vector(maxConnections);
log("Starting connection manager. Max connections: " +
maxConnections);
}

/**
* This is the method that Listener objects call when they accept a
* connection from a client. It either creates a Connection object
* for the connection and adds it to the list of current
connections,
* or, if the limit on connections has been reached, it closes the
* connection.
**/
synchronized void addConnection(Socket s, Service service) {
// If the connection limit has been reached
if (connections.size() >= maxConnections) {
try {
PrintWriter out = new PrintWriter(s.getOutputStream());
// Then tell the client it is being rejected.
out.println("Connection refused; " +
"server has reached maximum number of clients.");
out.flush();
// And close the connection to the rejected client.
s.close();
// And log it, of course
log("Connection refused to " + s.getInetAddress
().getHostAddress() +
":" + s.getPort() + ": max connections reached.");
} catch (IOException e) {log(e);}
}
else { // Otherwise, if the limit has not been reached
// Create a Connection thread to handle this connection
Connection c = new Connection(s, service);
// Add it to the list of current connections
connections.addElement(c);
// Log this new connection
log("Connected to " + s.getInetAddress().getHostAddress() +
":" + s.getPort() + " on port " + s.getLocalPort() +
" for service " + service.getClass().getName());
// And start the Connection thread running to provide the
service
c.start();
}
}

/**
* A Connection object calls this method just before it exits.
* This method uses notify() to tell the ConnectionManager thread
* to wake up and delete the thread that has exited.
**/
public synchronized void endConnection() { this.notify(); }

/** Change the current connection limit */
public synchronized void setMaxConnections(int max)
{ maxConnections=max; }

/**
* Output the current list of connections to the specified stream.
* This method is used by the Control service defined below.
**/
public synchronized void printConnections(PrintWriter out) {
for(int i = 0; i < connections.size(); i++) {
Connection c = (Connection)connections.elementAt(i);
out.println("CONNECTED TO " +
c.client.getInetAddress().getHostAddress() + ":" +
c.client.getPort() + " ON PORT " +
c.client.getLocalPort()+
" FOR SERVICE " + c.service.getClass().getName());
}
}

/**
* The ConnectionManager is a thread, and this is the body of that
* thread. While the ConnectionManager methods above are called
by other
* threads, this method is run in its own thread. The job of
this thread
* is to keep the list of connections up to date by removing
connections
* that are no longer alive. It uses wait() to block until
notify()’d by
* the endConnection() method.
**/
@Override
public void run() {
while(true) { // infinite loop
// Check through the list of connections, removing dead ones
for(int i = 0; i < connections.size(); i++) {
Connection c = (Connection)connections.elementAt(i);
if (!c.isAlive()) {
connections.removeElementAt(i);
log("Connection to " + c.client.getInetAddress
().getHostAddress() +
":" + c.client.getPort() + " closed.");
}
}
// Now wait to be notify()’d that a connection has exited
// When we wake up we’ll check the list of connections again.
try { synchronized(this) { this.wait(); } }
catch(InterruptedException e) {}
}
}
}

/**
* This class is a subclass of Thread that handles an individual
connection
* between a client and a Service provided by this server.
Because each
* such connection has a thread of its own, each Service can have
multiple
* connections pending at once. Despite all the other threads in
use, this
* is the key feature that makes this a multi-threaded server
implementation.
**/
public class Connection extends Thread {
Socket client; // The socket to talk to the client through
Service service; // The service being provided to that client

/**
* This constructor just saves some state and calls the superclass
* constructor to create a thread to handle the connection.
Connection
* objects are created by Listener threads. These threads are
part of
* the server’s ThreadGroup, so all Connection threads are part
of that
* group, too.
**/
public Connection(Socket client, Service service) {
super("Server.Connection:" + client.getInetAddress
().getHostAddress() +
":" + client.getPort());
this.client = client;
this.service = service;
}

/**
* This is the body of each and every Connection thread.
* All it does is pass the client input and output streams to the
* serve() method of the specified Service object. That method
* is responsible for reading from and writing to those streams to
* provide the actual service. Recall that the Service object
has been
* passed from the Server.addService() method to a Listener object
* to the ConnectionManager.addConnection() to this Connection
object,
* and is now finally getting used to provide the service.
* Note that just before this thread exits it calls the
* ConnectionManager.endConnection() method to wake up the
* ConnectionManager thread so that it can remove this Connection
* from its list of active connections.
**/
@Override
public void run() {
try {
InputStream in = client.getInputStream();
OutputStream out = client.getOutputStream();
service.serve(in, out, itsLogger);
}
catch (IOException e) {log(e);}
finally { connectionManager.endConnection(); }
}
}

/** A logging (outlet) interface. */

public interface Logger {
public void log(String line);
}
/**
* Here is the Service interface that we have seen so much of.
* It defines only a single method which is invoked to provide the
service.
* serve() will be passed an input stream and an output stream to
the client.
* It should do whatever it wants with them, and should close them
before
* returning.
*
* All connections through the same port to this service share a
single
* Service object. Thus, any state local to an individual
connection must
* be stored in local variables within the serve() method. State
that should
* be global to all connections on the same port should be stored in
* instance variables of the Service class. If the same Service
is running
* on more than one port, there will typically be different
Service instances
* for each port. Data that should be global to all connections
on any port
* should be stored in static variables.
*
* Note that implementations of this interface must have a no-
argument
* constructor if they are to be dynamically instantiated by the
main()
* method of the Server class.
**/
public interface Service {
public void serve(InputStream in, OutputStream out, Logger
logger) throws IOException;
}

private Logger itsLogger = new Logger() {
public void log(String line) {
outlet(0, line);
}
};

/**
* This service is an HTTP mirror, just like the HttpMirror class
* implemented earlier in this chapter. It echos back the client’s
* HTTP request
**/
public static class HTTPMirror implements Service {
public void serve(InputStream i, OutputStream o, Logger logger)
throws IOException {
BufferedReader in = new BufferedReader(new InputStreamReader(i));
PrintWriter out = new PrintWriter(new OutputStreamWriter(o));
out.println("HTTP/1.0 200 ");
out.println("Content-Type: text/plain");
out.println();
String line;
while((line = in.readLine()) != null) {
if (line.length() == 0) break;
out.println(line);
}
out.close();
in.close();
}
}

/**
* This service demonstrates how to maintain state across connections
* by saving it in instance variables and using synchronized
access to
* those variables. It maintains a count of how many clients have
connected
* and tells each client what number it is
**/
public static class UniqueID implements Service {
public int id=0;
public synchronized int nextId() { return id++; }
public void serve(InputStream i, OutputStream o, Logger logger)
throws IOException {
PrintWriter out = new PrintWriter(new OutputStreamWriter(o));
out.println("You are client #: " + nextId());
out.close();
i.close();
}
}

/**
* This is a non-trivial service. It implements a command-based
protocol
* that gives password-protected runtime control over the
operation of the
* server. See the main() method of the Server class to see how this
* service is started.
*
* The recognized commands are:
* password: give password; authorization is required for most
commands
* add: dynamically add a named service on a specified port
* remove: dynamically remove the service running on a
specified port
* max: change the current maximum connection limit.
* status: display current services, connections, and
connection limit
* help: display a help message
* quit: disconnect
*
* This service displays a prompt, and sends all of its output to
the user
* in capital letters. Only one client is allowed to connect to
this service
* at a time.
**/
public static class Control implements Service {
Server server; // The server we control
String password; // The password we require
boolean connected = false; // Whether a client is already
connected to us
/**
* Create a new Control service. It will control the specified
Server
* object, and will require the specified password for
authorization
* Note that this Service does not have a no argument
constructor, which
* means that it cannot be dynamically instantiated and added as
the other,
* generic services above can be.
**/
public Control(Server server, String password) {
this.server = server;
this.password = password;
}

/**
* This is the serve method that provides the service. It reads
a line
* the client, and uses java.util.StringTokenizer to parse it into
* commands and arguments. It does various things depending on the
* command.
**/
public void serve(InputStream i, OutputStream o, Logger logger)
throws IOException {
// Setup the streams
BufferedReader in = new BufferedReader(new InputStreamReader(i));
PrintWriter out = new PrintWriter(new OutputStreamWriter(o));
String line;
boolean authorized = false; // Has the user has given the
password yet?

// If there is already a client connected to this service,
display a
// message to this client and close the connection. We use a
// synchronized block to prevent a race condition.
synchronized(this) {
if (connected) {
out.println("ONLY ONE CONTROL CONNECTION ALLOWED AT A
TIME.");
out.close();
return;
}
else connected = true;
}

for(;;) { // infinite loop
out.print("> "); // Display a prompt
out.flush(); // Make it appear right away
line = in.readLine(); // Get the user’s input
if (line == null) break; // Quit if we get EOF.
try {
// Use a StringTokenizer to parse the user’s command
StringTokenizer t = new StringTokenizer(line);
if (!t.hasMoreTokens()) continue; // if input was blank line
// Get the first word of the input and convert to lower case
String command = t.nextToken().toLowerCase();
// Now compare it to each of the possible commands, doing the
// appropriate thing for each command
if (command.equals("password")) { // Password command
String p = t.nextToken(); // Get the next
word of input
if (p.equals(this.password)) { // Does it equal
the password
out.println("OK"); // Say so
authorized = true; // Grant
authorization
}
else out.println("INVALID PASSWORD"); // Otherwise fail
}
else if (command.equals("add")) { // Add Service
command
// Check whether password has been given
if (!authorized) out.println("PASSWORD REQUIRED");
else {
// Get the name of the service and try to dynamically
load
// and instantiate it. Exceptions will be handled below
String serviceName = t.nextToken();
Class serviceClass = Class.forName(serviceName);
Service service;
try { service = (Service) serviceClass.newInstance(); }
catch (NoSuchMethodError e) {
throw new IllegalArgumentException("Service must
have a " +
"no-argument
constructor");
}
int port = Integer.parseInt(t.nextToken());
// If no exceptions occurred, add the service
server.addService(service, port);
out.println("SERVICE ADDED"); // acknowledge
}
}
else if (command.equals("remove")) { // Remove service
command
if (!authorized) out.println("PASSWORD REQUIRED");
else {
int port = Integer.parseInt(t.nextToken()); // get port
server.removeService(port); // remove the service
on it
out.println("SERVICE REMOVED"); // acknowledge
}
}
else if (command.equals("max")) { // Set max
connection limit
if (!authorized) out.println("PASSWORD REQUIRED");
else {
int max = Integer.parseInt(t.nextToken()); //
get limit
server.connectionManager.setMaxConnections(max); //
set limit
out.println("MAX CONNECTIONS CHANGED"); //
acknowledge
}
}
else if (command.equals("status")) { // Status Display
command
if (!authorized) out.println("PASSWORD REQUIRED");
else {
// Display a list of all services currently running
Enumeration keys = server.services.keys();
while(keys.hasMoreElements()) {
Integer port = (Integer) keys.nextElement();
Listener listener = (Listener)server.services.get
(port);
out.println("SERVICE " + listener.service.getClass
().getName()+
" ON PORT " + port);
}
// Display a list of all current connections
server.connectionManager.printConnections(out);
// Display the current connection limit
out.println("MAX CONNECTIONS: " +
server.connectionManager.maxConnections);
}
}
else if (command.equals("help")) { // Help command
// Display command syntax. Password not required
out.println("COMMANDS:n" +
"tpassword

n" +
"tadd

n" +
"tremove
n" +
"tmax n" +
"tstatusn" +
"thelpn" +
"tquit");
}
else if (command.equals("quit")) break; // Quit
command. Exit.
else out.println("UNRECOGNIZED COMMAND"); // Unknown
command error
}
catch (Exception e) {
// If an exception occurred during the command, print an
error
// message, then output details of the exception.
out.println("EXCEPTION WHILE PARSING OR EXECUTING COMMAND:");
out.println(e);
}
}
// Finally, when the loop command loop ends, close the streams
// and set our connected flag to false so that other clients can
// now connect.
out.close();
in.close();
connected = false;
}
}
}

Nick Rothwell / Cassiel.com Limited
http://www.cassiel.com
http://www.myspace.com/cassieldotcom
http://www.last.fm/music/cassiel
http://www.loadbang.net


Viewing 2 posts - 1 through 2 (of 2 total)