diff --git a/libs/WebSocket.jar b/libs/WebSocket.jar old mode 100644 new mode 100755 index ef36379..1daaf13 Binary files a/libs/WebSocket.jar and b/libs/WebSocket.jar differ diff --git a/src/io/socket/IOCallback.java b/src/io/socket/IOCallback.java index f45d32c..8f8b95d 100644 --- a/src/io/socket/IOCallback.java +++ b/src/io/socket/IOCallback.java @@ -12,6 +12,7 @@ /** * The Interface IOCallback. A callback interface to SocketIO + * WARNING: Do NOT use "Thread.sleep" in the callback methods */ public interface IOCallback { diff --git a/src/io/socket/IOConnection.java b/src/io/socket/IOConnection.java index 0febf78..863ae06 100644 --- a/src/io/socket/IOConnection.java +++ b/src/io/socket/IOConnection.java @@ -8,11 +8,15 @@ */ package io.socket; +import io.socket.IOMessage.TypeMessage; + +import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; +import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -20,14 +24,15 @@ import java.util.List; import java.util.Map.Entry; import java.util.Properties; -import java.util.Scanner; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; import org.json.JSONArray; import org.json.JSONException; @@ -74,6 +79,10 @@ class IOConnection implements IOCallback { /** The url for this connection. */ private URL url; + + public enum VersionSocketIO { V09x , V10x }; + /** The version of socket.io server. */ + private VersionSocketIO version; /** The transport for this connection. */ private IOTransport transport; @@ -170,9 +179,33 @@ private class ReconnectTask extends TimerTask { @Override public void run() { connectTransport(); - if (!keepAliveInQueue) { - sendPlain("2::"); - keepAliveInQueue = true; + if (!keepAliveInQueue) + { + sendPlain(IOMessage.createMessage(TypeMessage.TYPE_HEARTBEAT,"","", version).toString()); + keepAliveInQueue = true; + } + } + } + + /** The reconnect task. Null if no reconnection is in progress. */ + private HeartBeatTask heartBeatTask = null; + + /** + * The Class ReconnectTask. Handles reconnect attempts + */ + private class HeartBeatTask extends TimerTask { + + /* + * (non-Javadoc) + * + * @see java.util.TimerTask#run() + */ + @Override + public void run() + { + if (IOConnection.this.getState() == STATE_READY) + { + sendPlain(IOMessage.createMessage(TypeMessage.TYPE_HEARTBEAT,"","", version).toString()); } } } @@ -256,14 +289,16 @@ static public IOConnection register(String origin, SocketIO socket) { * @return true, if successfully registered on this transport, otherwise * false. */ - public synchronized boolean register(SocketIO socket) { + public synchronized boolean register(SocketIO socket) + { String namespace = socket.getNamespace(); if (sockets.containsKey(namespace)) return false; sockets.put(namespace, socket); socket.setHeaders(headers); - IOMessage connect = new IOMessage(IOMessage.TYPE_CONNECT, - socket.getNamespace(), ""); +// IOMessage connect = new IOMessage(IOMessage.TYPE_CONNECT, +// socket.getNamespace(), "",this.version); + IOMessage connect = IOMessage.createMessage(TypeMessage.TYPE_CONNECT, "", socket.getNamespace(), this.version); sendPlain(connect.toString()); return true; } @@ -275,8 +310,9 @@ public synchronized boolean register(SocketIO socket) { * @param socket * the socket to be shut down */ - public synchronized void unregister(SocketIO socket) { - sendPlain("0::" + socket.getNamespace()); + public synchronized void unregister(SocketIO socket) + { + sendPlain(IOMessage.createMessage(TypeMessage.TYPE_DISCONNECT, "", socket.getNamespace(), version).toString()); sockets.remove(socket.getNamespace()); socket.getCallback().onDisconnect(); @@ -291,11 +327,13 @@ public synchronized void unregister(SocketIO socket) { */ private void handshake() { URL url; - String response; + String response = ""; URLConnection connection; - try { + try + { setState(STATE_HANDSHAKE); - url = new URL(IOConnection.this.url.toString() + SOCKET_IO_1); + url = new URL(IOConnection.this.url.toString() + SOCKET_IO_1 + "?EIO=2&transport=polling"); + logger.info("URL: "+url.toString()); connection = url.openConnection(); if (connection instanceof HttpsURLConnection) { ((HttpsURLConnection) connection) @@ -305,19 +343,69 @@ private void handshake() { connection.setReadTimeout(connectTimeout); /* Setting the request headers */ - for (Entry entry : headers.entrySet()) { - connection.setRequestProperty((String) entry.getKey(), - (String) entry.getValue()); + for (Entry entry : headers.entrySet()) + { + connection.setRequestProperty((String) entry.getKey(),(String) entry.getValue()); } - - InputStream stream = connection.getInputStream(); - Scanner in = new Scanner(stream); - response = in.nextLine(); - String[] data = response.split(":"); - sessionId = data[0]; - heartbeatTimeout = Long.parseLong(data[1]) * 1000; - closingTimeout = Long.parseLong(data[2]) * 1000; - protocols = Arrays.asList(data[3].split(",")); + + + BufferedReader inB = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String inputLine; + while ((inputLine = inB.readLine()) != null) + { + response += inputLine; + } + inB.close(); + +// InputStream stream = connection.getInputStream(); +// Scanner in = new Scanner(stream); +// while (in.hasNext()) +// { +// response += in.next(); +// } +// in.close(); + + Pattern pattern = Pattern.compile("\\w+:\\w+:\\w+:\\w+"); + Matcher matcher = pattern.matcher(response); + if (matcher.find()) + { + logger.info("Response: "+response); + this.version = VersionSocketIO.V09x; + logger.info("Version: V09x"); + String[] data = response.split(":"); + sessionId = data[0]; + heartbeatTimeout = Long.parseLong(data[1]) * 1000; + closingTimeout = Long.parseLong(data[2]) * 1000; + protocols = Arrays.asList(data[3].split(",")); + } else + { + response = response.substring(response.indexOf('{')); + response = response.substring(0,response.lastIndexOf('}')+1); + logger.info("Response: "+response); + this.version = VersionSocketIO.V10x; + logger.info("Version: V10x"); + try + { + JSONObject data = null; + data = new JSONObject(response); + sessionId = data.getString("sid"); + heartbeatTimeout = data.getLong("pingInterval"); + closingTimeout = data.getLong("pingTimeout"); + +// JSONArray arr = data.getJSONArray("upgrades"); +// for (int i = 0; i < arr.length(); i++) { +// protocols.add(arr.getString(i)); +// } + protocols = new ArrayList(); + protocols.add("websocket"); //FIXME + + }catch (JSONException e) + { + logger.warning("Malformated JSON received"); + } + } + + } catch (Exception e) { error(new SocketIOException("Error while handshaking", e)); } @@ -326,12 +414,13 @@ private void handshake() { /** * Connect transport. */ - private synchronized void connectTransport() { + private synchronized void connectTransport() + { if (getState() == STATE_INVALID) return; setState(STATE_CONNECTING); if (protocols.contains(WebsocketTransport.TRANSPORT_NAME)) - transport = WebsocketTransport.create(url, this); + transport = WebsocketTransport.create(url, this,this.version); else if (protocols.contains(XhrTransport.TRANSPORT_NAME)) transport = XhrTransport.create(url, this); else { @@ -351,7 +440,8 @@ else if (protocols.contains(XhrTransport.TRANSPORT_NAME)) * @return an {@link IOAcknowledge} instance, may be null if * server doesn't request one. */ - private IOAcknowledge remoteAcknowledge(IOMessage message) { + private IOAcknowledge remoteAcknowledge(IOMessage message) + { String _id = message.getId(); if (_id.equals("")) return null; @@ -359,21 +449,24 @@ else if (_id.endsWith("+") == false) _id = _id + "+"; final String id = _id; final String endPoint = message.getEndpoint(); - return new IOAcknowledge() { + return new IOAcknowledge() + { @Override - public void ack(Object... args) { - JSONArray array = new JSONArray(); - for (Object o : args) { - try { - array.put(o == null ? JSONObject.NULL : o); - } catch (Exception e) { - error(new SocketIOException( - "You can only put values in IOAcknowledge.ack() which can be handled by JSONArray.put()", - e)); - } - } - IOMessage ackMsg = new IOMessage(IOMessage.TYPE_ACK, endPoint, - id + array.toString()); + public void ack(Object... args) + { +// JSONArray array = new JSONArray(); +// for (Object o : args) { +// try { +// array.put(o == null ? JSONObject.NULL : o); +// } catch (Exception e) { +// error(new SocketIOException( +// "You can only put values in IOAcknowledge.ack() which can be handled by JSONArray.put()", +// e)); +// } +// } + IOMessage ackMsg = IOMessage.createMessage(TypeMessage.TYPE_ACK, id, endPoint,IOConnection.this.version); +// new IOMessage(IOMessage.TYPE_ACK, endPoint, +// id + array.toString(),IOConnection.this.version); sendPlain(ackMsg.toString()); } }; @@ -454,15 +547,20 @@ private void error(SocketIOException e) { * @param text * the Text to be send. */ - private synchronized void sendPlain(String text) { + private synchronized void sendPlain(String text) + { if (getState() == STATE_READY) - try { - logger.info("> " + text); + { + try + { + logger.info("-->SEND " + text); transport.send(text); + } catch (Exception e) { logger.info("IOEx: saving"); outputBuffer.add(text); } + } else { outputBuffer.add(text); } @@ -484,10 +582,12 @@ private synchronized void resetTimeout() { if (heartbeatTimeoutTask != null) { heartbeatTimeoutTask.cancel(); } - if (getState() != STATE_INVALID) { + + if (getState() != STATE_INVALID) + { heartbeatTimeoutTask = new HearbeatTimeoutTask(); backgroundTimer.schedule(heartbeatTimeoutTask, closingTimeout - + heartbeatTimeout); + + heartbeatTimeout); } } @@ -516,12 +616,18 @@ private IOCallback findCallback(IOMessage message) throws SocketIOException { * * {@link IOTransport} calls this when a connection is established. */ - public synchronized void transportConnected() { + public synchronized void transportConnected() + { setState(STATE_READY); if (reconnectTask != null) { reconnectTask.cancel(); reconnectTask = null; } + if (heartBeatTask!= null) { + heartBeatTask.cancel(); + } + heartBeatTask = new HeartBeatTask(); // heartbeat loop + backgroundTimer.schedule(heartBeatTask,heartbeatTimeout,heartbeatTimeout); resetTimeout(); if (transport.canSendBulk()) { ConcurrentLinkedQueue outputBuffer = this.outputBuffer; @@ -609,18 +715,14 @@ public void transportData(String text) { * @param text * the text */ - public void transportMessage(String text) { - logger.info("< " + text); - IOMessage message; - try { - message = new IOMessage(text); - } catch (Exception e) { - error(new SocketIOException("Garbage from server: " + text, e)); - return; - } + public void transportMessage(String text) + { + logger.info("<--RCV: " + text); + IOMessage message = IOMessage.parseMessage(text, version); resetTimeout(); + switch (message.getType()) { - case IOMessage.TYPE_DISCONNECT: + case TYPE_DISCONNECT: try { findCallback(message).onDisconnect(); } catch (Exception e) { @@ -628,15 +730,14 @@ public void transportMessage(String text) { "Exception was thrown in onDisconnect()", e)); } break; - case IOMessage.TYPE_CONNECT: + case TYPE_CONNECT: try { - if (firstSocket != null && "".equals(message.getEndpoint())) { + if (firstSocket != null && "".equals(message.getEndpoint())) + { if (firstSocket.getNamespace().equals("")) { firstSocket.getCallback().onConnect(); } else { - IOMessage connect = new IOMessage( - IOMessage.TYPE_CONNECT, - firstSocket.getNamespace(), ""); + IOMessage connect = IOMessage.createMessage(TypeMessage.TYPE_CONNECT,"",firstSocket.getNamespace(),this.version); sendPlain(connect.toString()); } } else { @@ -648,11 +749,31 @@ public void transportMessage(String text) { "Exception was thrown in onConnect()", e)); } break; - case IOMessage.TYPE_HEARTBEAT: - sendPlain("2::"); - break; - case IOMessage.TYPE_MESSAGE: - try { + case TYPE_HEARTBEAT: + { + switch (this.version) + { + case V09x: + sendPlain("2::"); + break; + case V10x: + sendPlain("3"+message.getData()); + break; + } + }break; + case TYPE_PONG: + { + if(message.getData() == "probe") + { + sendPlain(IOMessage.createMessage( + TypeMessage.TYPE_UPGRADE, + "","probe", + message.getEndpoint(), version).toString()); + } + }break; + case TYPE_MESSAGE: + try + { findCallback(message).onMessage(message.getData(), remoteAcknowledge(message)); } catch (Exception e) { @@ -661,15 +782,16 @@ public void transportMessage(String text) { + "Message was: " + message.toString(), e)); } break; - case IOMessage.TYPE_JSON_MESSAGE: - try { + case TYPE_JSON_MESSAGE: + try + { JSONObject obj = null; String data = message.getData(); + logger.info("JSON to Parse: "+data); if (data.trim().equals("null") == false) obj = new JSONObject(data); try { - findCallback(message).onMessage(obj, - remoteAcknowledge(message)); + findCallback(message).onMessage(obj,remoteAcknowledge(message)); } catch (Exception e) { error(new SocketIOException( "Exception was thrown in onMessage(JSONObject).\n" @@ -679,34 +801,28 @@ public void transportMessage(String text) { logger.warning("Malformated JSON received"); } break; - case IOMessage.TYPE_EVENT: + case TYPE_EVENT: + { + try { - JSONObject event = new JSONObject(message.getData()); - Object[] argsArray; - if (event.has("args")) { - JSONArray args = event.getJSONArray("args"); - argsArray = new Object[args.length()]; - for (int i = 0; i < args.length(); i++) { - if (args.isNull(i) == false) - argsArray[i] = args.get(i); - } - } else - argsArray = new Object[0]; - String eventName = event.getString("name"); - try { - findCallback(message).on(eventName, - remoteAcknowledge(message), argsArray); - } catch (Exception e) { - error(new SocketIOException( - "Exception was thrown in on(String, JSONObject[]).\n" - + "Message was: " + message.toString(), e)); - } - } catch (JSONException e) { - logger.warning("Malformated JSON received"); + findCallback(message).on(message.getEvent(), + remoteAcknowledge(message), message.getArgs()); + } catch (Exception e) { + error(new SocketIOException( + "Exception was thrown in on(String, JSONObject[]).\n" + + "Message was: " + message.toString(), e)); } - break; - - case IOMessage.TYPE_ACK: +// try { +// findCallback(message).on(message.getEvent(), +// remoteAcknowledge(message), argsArray); +// } catch (Exception e) { +// error(new SocketIOException( +// "Exception was thrown in on(String, JSONObject[]).\n" +// + "Message was: " + message.toString(), e)); +// } + } break; + + case TYPE_ACK: String[] data = message.getData().split("\\+", 2); if (data.length == 2) { try { @@ -727,11 +843,19 @@ public void transportMessage(String text) { } catch (JSONException e) { logger.warning("Received malformated Acknowledge data!"); } - } else if (data.length == 1) { - sendPlain("6:::" + data[0]); + } else if (data.length == 1) + { + switch (this.version) + { + case V09x: + sendPlain("6:::" + data[0]); + break; + case V10x: + break; + } } break; - case IOMessage.TYPE_ERROR: + case TYPE_ERROR: try { findCallback(message).onError( new SocketIOException(message.getData())); @@ -743,10 +867,14 @@ public void transportMessage(String text) { cleanup(); } break; - case IOMessage.TYPE_NOOP: + case TYPE_NOOP: + break; + case TYPE_DISCONNECTED: + break; + case TYPE_CONNECTED: break; default: - logger.warning("Unkown type received" + message.getType()); + logger.warning("Unkown type received: " + message.getType()); break; } } @@ -756,7 +884,8 @@ public void transportMessage(String text) { * do not shut down TCP-connections when switching from HSDPA to Wifi */ public synchronized void reconnect() { - if (getState() != STATE_INVALID) { + if (getState() != STATE_INVALID) + { invalidateTransport(); setState(STATE_INTERRUPTED); if (reconnectTask != null) { @@ -775,6 +904,38 @@ public synchronized void reconnect() { public String getSessionId() { return sessionId; } + + /** + * Returns the version. This should be called from a {@link IOTransport} + * + * @return the version used (and null if not received yet) + */ + public IOConnection.VersionSocketIO getVersionSocket() { + return this.version; + } + /** + * Force the connection to wait to get connected + * + * @return state of connection + */ + public boolean canSend() + { + long startTime = System.nanoTime(); + long elapsedTime = System.nanoTime() - startTime; + while(!isConnected() && elapsedTime < 5000000) + { + try + { + Thread.sleep(100); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + elapsedTime = System.nanoTime() - startTime; + } + if(!isConnected()) + return false; + return true; + } /** * sends a String message from {@link SocketIO} to the {@link IOTransport}. @@ -786,11 +947,23 @@ public String getSessionId() { * @param text * the text */ - public void send(SocketIO socket, IOAcknowledge ack, String text) { - IOMessage message = new IOMessage(IOMessage.TYPE_MESSAGE, - socket.getNamespace(), text); - synthesizeAck(message, ack); - sendPlain(message.toString()); + public void send(SocketIO socket, IOAcknowledge ack, String text) + { + if(!canSend()) + { + logger.warning("An error occured to send your message"); + return; + } + if(this.version == VersionSocketIO.V10x ) + { + this.emit(socket, "message", ack, text); + } + else + { + IOMessage message = IOMessage.createMessage(TypeMessage.TYPE_MESSAGE,"",socket.getNamespace(),text,this.version); + synthesizeAck(message, ack); + sendPlain(message.toString()); + } } /** @@ -803,11 +976,23 @@ public void send(SocketIO socket, IOAcknowledge ack, String text) { * @param json * the json */ - public void send(SocketIO socket, IOAcknowledge ack, JSONObject json) { - IOMessage message = new IOMessage(IOMessage.TYPE_JSON_MESSAGE, - socket.getNamespace(), json.toString()); - synthesizeAck(message, ack); - sendPlain(message.toString()); + public void send(SocketIO socket, IOAcknowledge ack, JSONObject json) + { + if(!canSend()) + { + logger.warning("An error occured to send your message"); + return; + } + if(this.version == VersionSocketIO.V10x ) + { + this.emit(socket, "message", ack, json.toString()); + } + else + { + IOMessage message = IOMessage.createMessage(TypeMessage.TYPE_JSON_MESSAGE,"",socket.getNamespace(),json.toString(),this.version); + synthesizeAck(message, ack); + sendPlain(message.toString()); + } } /** @@ -823,19 +1008,22 @@ public void send(SocketIO socket, IOAcknowledge ack, JSONObject json) { * the arguments to be send */ public void emit(SocketIO socket, String event, IOAcknowledge ack, - Object... args) { - try { - JSONObject json = new JSONObject().put("name", event).put("args", - new JSONArray(Arrays.asList(args))); - IOMessage message = new IOMessage(IOMessage.TYPE_EVENT, - socket.getNamespace(), json.toString()); - synthesizeAck(message, ack); - sendPlain(message.toString()); - } catch (JSONException e) { - error(new SocketIOException( - "Error while emitting an event. Make sure you only try to send arguments, which can be serialized into JSON.")); + Object... args) + { + if(!canSend()) + { + logger.warning("An error occured to send your message"); + return; } - + IOMessage message = IOMessage.createMessage(TypeMessage.TYPE_EVENT,"", + socket.getNamespace(),this.version); + message.setEvent(event); + for (Object i : args) + { + message.addData(i); + } + synthesizeAck(message, ack); + sendPlain(message.toString()); } /** diff --git a/src/io/socket/IOMessage.java b/src/io/socket/IOMessage.java index a911c50..a088b2d 100644 --- a/src/io/socket/IOMessage.java +++ b/src/io/socket/IOMessage.java @@ -8,61 +8,130 @@ */ package io.socket; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.logging.Logger; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + /** * The Class IOMessage. */ -class IOMessage { - - /** Message type disconnect */ - public static final int TYPE_DISCONNECT = 0; - - /** Message type connect */ - public static final int TYPE_CONNECT = 1; - - /** Message type heartbeat */ - public static final int TYPE_HEARTBEAT = 2; - - /** Message type message */ - public static final int TYPE_MESSAGE = 3; - - /** Message type JSON message */ - public static final int TYPE_JSON_MESSAGE = 4; - - /** Message type event */ - public static final int TYPE_EVENT = 5; - - /** Message type acknowledge */ - public static final int TYPE_ACK = 6; - - /** Message type error */ - public static final int TYPE_ERROR = 7; - - /** Message type noop */ - public static final int TYPE_NOOP = 8; +class IOMessage +{ + /** Debug logger */ + static final Logger logger = Logger.getLogger("io.message"); + + public enum TypeMessage { TYPE_UNKNOWN, + TYPE_DISCONNECT, + TYPE_DISCONNECTED, + TYPE_CONNECT, + TYPE_CONNECTED, + TYPE_HEARTBEAT, + TYPE_PONG, + TYPE_MESSAGE, + TYPE_JSON_MESSAGE, + TYPE_EVENT, + TYPE_ACK, + TYPE_ERROR, + TYPE_NOOP, + TYPE_BINARY_EVENT, + TYPE_BINARY_ACK, + TYPE_UPGRADE, + }; /** Index of the type field in a message */ - public static final int FIELD_TYPE = 0; + public static final int FIELD_CONTROL = 0; /** Index of the id field in a message */ public static final int FIELD_ID = 1; /** Index of the end point field in a message */ public static final int FIELD_ENDPOINT = 2; - - /** Index of the data field in a message */ + + /** Index of the end point field in a message */ public static final int FIELD_DATA = 3; /** Number of fields in a message. */ - public static final int NUM_FIELDS = 4; + public static final int NUM_FIELDS = 3; - /** The field values */ - private final String[] fields = new String[NUM_FIELDS]; + public static final int NUM_FIELDS_MESSAGE = 4; + /** The field values */ + protected final String[] fields = new String[NUM_FIELDS]; + + protected JSONArray args = new JSONArray(); + protected String data; + + protected String separator = ":"; + + /** Type */ - private int type; + protected TypeMessage type; // CAN BE TYPE_DISCONNECT,TYPE_CONNECT, TYPE_MESSAGE + + /** Event name*/ + protected String eventName = ""; // CAN BE "message","myevent","anything" + + /** Acknoledgement*/ + protected String ack = ""; // CAN BE "message","myevent","anything" + +// /** +// * Instantiates a new IOMessage by given data. +// * +// * @param type +// * the type +// * @param id +// * the id +// * @param namespace +// * the namespace +// * @param data +// * the data +// */ +// protected IOMessage(TypeMessage type, String id, String namespace, String data) +// { +// this.type = type; +// this.fields[FIELD_ID] = id; +// this.fields[FIELD_CONTROL] = "" + type; +// this.fields[FIELD_ENDPOINT] = namespace; +// this.data = data; +// args.put(data); +// } + + protected final Map messages_codes = this.createMapMessages(); + + protected Map createMapMessages() + { + Map result= new HashMap() ; + result.put(0, IOMessage.TypeMessage.TYPE_DISCONNECT); + result.put(1, IOMessage.TypeMessage.TYPE_CONNECT); + result.put(2, IOMessage.TypeMessage.TYPE_HEARTBEAT); + result.put(3, IOMessage.TypeMessage.TYPE_MESSAGE); + result.put(4, IOMessage.TypeMessage.TYPE_JSON_MESSAGE); + result.put(5, IOMessage.TypeMessage.TYPE_EVENT); + result.put(6, IOMessage.TypeMessage.TYPE_ACK); + result.put(7, IOMessage.TypeMessage.TYPE_ERROR); + result.put(8, IOMessage.TypeMessage.TYPE_NOOP); + //return Collections.unmodifiableMap(result); + return result; + } /** * Instantiates a new IOMessage by given data. + */ + protected IOMessage() + { + this.type = TypeMessage.TYPE_UNKNOWN; + this.fields[FIELD_ID] = ""; + this.fields[FIELD_CONTROL] = ""; + this.fields[FIELD_ENDPOINT] = ""; + this.data = ""; + } + + /** + * Instantiates a new IOMessage without data. * * @param type * the type @@ -70,29 +139,67 @@ class IOMessage { * the id * @param namespace * the namespace - * @param data - * the data */ - public IOMessage(int type, String id, String namespace, String data) { + protected IOMessage(TypeMessage type, String id, String namespace) + { this.type = type; this.fields[FIELD_ID] = id; - this.fields[FIELD_TYPE] = "" + type; + this.fields[FIELD_CONTROL] = "" + getControlNumber(type); this.fields[FIELD_ENDPOINT] = namespace; - this.fields[FIELD_DATA] = data; } - - /** - * Instantiates a new IOMessage by given data. - * - * @param type - * the type - * @param namespace - * the name space - * @param data - * the data - */ - public IOMessage(int type, String namespace, String data) { - this(type, null, namespace, data); + + protected int getControlNumber(TypeMessage type) + { + for (Entry entry : this.messages_codes.entrySet()) + { + if (type.equals(entry.getValue())) { + return entry.getKey(); + } + } + return 0; + } + + public static IOMessage parseMessage(String message, IOConnection.VersionSocketIO version) + { + IOMessage messageOut; + switch (version) + { + case V09x: + messageOut = new IOMessage(message); + break; + case V10x: + messageOut = new IOMessageV10x(message); + break; + default: + messageOut = new IOMessage(message); + break; + } + return messageOut; + } + + public static IOMessage createMessage(TypeMessage type, String id, String namespace, String data, IOConnection.VersionSocketIO version) + { + IOMessage messageOut; + switch (version) + { + case V09x: + messageOut = new IOMessage(type,id,namespace); + break; + case V10x: + messageOut = new IOMessageV10x(type,id,namespace); + break; + default: + messageOut = new IOMessage(type,id,namespace); + break; + } + if(data != "") + messageOut.addData(data); + return messageOut; + } + + public static IOMessage createMessage(TypeMessage type, String id, String namespace,IOConnection.VersionSocketIO version) + { + return createMessage(type,id,namespace,"",version); } /** @@ -101,28 +208,82 @@ public IOMessage(int type, String namespace, String data) { * * @param message * the message + * @param version + * The parsing for a socket io 0.9.x and 1.0.x are different */ - public IOMessage(String message) { - String[] fields = message.split(":", NUM_FIELDS); - for (int i = 0; i < fields.length; i++) { - this.fields[i] = fields[i]; - if(i == FIELD_TYPE) - this.type = Integer.parseInt(fields[i]); + protected IOMessage(String message) + { + String[] fields = message.split(":", NUM_FIELDS_MESSAGE); + this.fields[FIELD_CONTROL] = fields[FIELD_CONTROL]; + this.fields[FIELD_ENDPOINT] = fields[FIELD_ENDPOINT]; + this.fields[FIELD_ID] = fields[FIELD_ID]; + this.type = messages_codes.get(Integer.parseInt(this.fields[FIELD_CONTROL])); + if(this.type == TypeMessage.TYPE_EVENT) + { + data = fields[FIELD_DATA]; + try + { + JSONObject event = new JSONObject(data); + if (event.has("args")) + { + args = event.getJSONArray("args"); + } + this.eventName = event.getString("name"); + + } catch (JSONException e) { + logger.warning("Malformated JSON received"); + } + } + else + { + data = fields[FIELD_DATA]; + args.put(data); } } - + /** * Generates a String representation of this object. */ @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - for(int i = 0; i < fields.length; i++) { - builder.append(':'); - if (fields[i] != null) - builder.append(fields[i]); + public String toString() + { + StringBuilder builder = new StringBuilder(); + builder.append(this.fields[FIELD_CONTROL]); + builder.append(this.separator); + + String pIdL = this.fields[FIELD_ID]; + if (ack == "data") + { + pIdL += "+"; + } + + // Do not write pid for acknowledgements + if (this.type != TypeMessage.TYPE_ACK) + { + builder.append(pIdL); } - return builder.substring(1); + builder.append(this.separator); + + // Add the end point for the namespace to be used, as long as it is not + // an ACK, heartbeat, or disconnect packet + if (this.type != TypeMessage.TYPE_ACK + && this.type != TypeMessage.TYPE_HEARTBEAT + && this.type != TypeMessage.TYPE_DISCONNECT) + builder.append(this.fields[FIELD_ENDPOINT]); + builder.append(this.separator); + + if (args.length() != 0) + { + String ackpId = ""; + // This is an acknowledgement packet, so, prepend the ack pid to the data + if (this.type == TypeMessage.TYPE_ACK) + { + ackpId += pIdL+"+"; + } + builder.append(ackpId); + builder.append(this.stringify()); + } + return builder.toString(); } /** @@ -130,7 +291,7 @@ public String toString() { * * @return the type */ - public int getType() { + public TypeMessage getType() { return type; } @@ -151,7 +312,34 @@ public String getId() { public void setId(String id) { fields[FIELD_ID] = id; } + + /** + * Returns the eventName of this IOMessage. + * + * @return the eventName + */ + public String getEvent() { + return eventName; + } + + /** + * Sets the event name of this IOMessage + * + * @param event + */ + public void setEvent(String event) { + eventName = event; + } + /** + * Sets the endpoint of this IOMessage. + * + * @param the endpoint + */ + private void setEndpoint(String endPoint) { + fields[FIELD_ENDPOINT] = endPoint; + } + /** * Returns the endpoint of this IOMessage. * @@ -161,13 +349,210 @@ public String getEndpoint() { return fields[FIELD_ENDPOINT]; } + /** + * Add data element + * Careful: one message type in V0.9.x can only have one element + * + * @param the data + */ + public void addData(String data) + { + this.data += data; + args.put(data); + } + + /** + * Add Data element + * + * @param the data + */ + public void addData(Object data) + { + args.put(data); + } + /** * Returns the data of this IOMessage. * * @return the data */ - public String getData() { - return fields[FIELD_DATA]; + public String getData() + { + return data; + } + + /** + * Returns the args. + * + * @return the data received in objet array form + */ + public Object[] getArgs() + { + Object[] argsArray; + argsArray = new Object[args.length()]; + try + { + for (int i = 0; i < args.length(); i++) + { + if (args.isNull(i) == false) + argsArray[i] = args.get(i); + } + } catch (JSONException e) + { + logger.warning("Error when exporting the data: "+e.toString()); + } + return argsArray; + } + + /** + * Stringify the data. + * + * @return the data stringified + */ + public String stringify() + { + String res = ""; + try + { + if(this.type == TypeMessage.TYPE_MESSAGE || this.type == TypeMessage.TYPE_JSON_MESSAGE ) + res = args.get(0).toString(); + else if(this.type == TypeMessage.TYPE_EVENT) + { + JSONObject event = new JSONObject(); + event.put("args", args); + event.put("name", eventName); + res = event.toString(); + } + else + res = this.getData(); + } catch (JSONException e) + { + logger.warning("Error when stringify data: "+e.toString()); + } + return res; } } + + +class IOMessageV10x extends IOMessage +{ + @Override + protected Map createMapMessages() + { + Map result= new HashMap() ; + result.put(0, IOMessage.TypeMessage.TYPE_DISCONNECTED); + result.put(1, IOMessage.TypeMessage.TYPE_CONNECTED); + result.put(2, IOMessage.TypeMessage.TYPE_HEARTBEAT); + result.put(3, IOMessage.TypeMessage.TYPE_PONG); + result.put(4, IOMessage.TypeMessage.TYPE_MESSAGE); + result.put(5, IOMessage.TypeMessage.TYPE_UPGRADE); + result.put(6, IOMessage.TypeMessage.TYPE_NOOP); + result.put(40, IOMessage.TypeMessage.TYPE_CONNECT); + result.put(41, IOMessage.TypeMessage.TYPE_DISCONNECT); + result.put(42, IOMessage.TypeMessage.TYPE_EVENT); + result.put(43, IOMessage.TypeMessage.TYPE_ACK); + result.put(44, IOMessage.TypeMessage.TYPE_ERROR); + result.put(45, IOMessage.TypeMessage.TYPE_BINARY_EVENT); + result.put(46, IOMessage.TypeMessage.TYPE_BINARY_ACK); + //return Collections.unmodifiableMap(result); + return result; + } + + + protected IOMessageV10x(TypeMessage type, String id, String namespace) + { + super(type,id,namespace); + ///OVERWRITE + separator = ""; + this.fields[FIELD_CONTROL] = "" + this.getControlNumber(type); + ///OVERWRITE END + } + + protected IOMessageV10x(String message) + { + ///OVERWRITE + separator = ""; + ///OVERWRITE END + //42["message","{\"type\":\"redirect\",\"url\":\"/logout\",\"rid\":\"test\",\"action\":\"reject\"}"] + int control = message.charAt(0) - '0'; + data = message.substring(1); + if(messages_codes.get(control) == IOMessage.TypeMessage.TYPE_MESSAGE) + { + control = 40; + control += data.charAt(0) - '0'; + data = data.substring(1); + } + this.fields[FIELD_CONTROL] = String.valueOf(control); + this.type = messages_codes.get(control); + + String endpoint = ""; + int nendpoint = data.indexOf("["); + if(nendpoint != -1) + { + endpoint = data.substring(0, nendpoint); + data = data.substring(nendpoint); + } + this.fields[FIELD_ENDPOINT] = endpoint; + this.fields[FIELD_ID] = ""; + + if(this.type == TypeMessage.TYPE_EVENT) + { + JSONArray arraydata; + try + { + arraydata = new JSONArray(data); + eventName = arraydata.getString(0); + for (int i = 1; i < arraydata.length(); ++i) + { + args.put(arraydata.get(i)); + } + } catch (JSONException e) + { + + } + + } + } + + protected int getControlNumber(TypeMessage type) + { + return super.getControlNumber(type); + } + + /** + * Returns the data of this IOMessage. + * + * @return the data + */ + @Override + public String stringify() + { + String res = ""; + if(this.type == TypeMessage.TYPE_EVENT) + { + JSONArray event = new JSONArray(); + event.put(eventName); + try { + for (int i=0; i