|
|
@ -116,7 +116,7 @@ public class FsMQManager { |
|
|
|
public void sendMessage(String address, String msg) throws UnknownIdentityException, IOException, NullPointerException { |
|
|
|
if (address != null) { |
|
|
|
if (msg != null) { |
|
|
|
FsMQMessage mqMsg = new FsMQMessage(self, msg); |
|
|
|
FsMQMessageInternal mqMsg = new FsMQMessageInternal(self, msg); |
|
|
|
String serializedStr = mqMsg.serialize(); |
|
|
|
getQueueForIdentity(address).add(serializedStr); |
|
|
|
} else { |
|
|
@ -131,15 +131,15 @@ public class FsMQManager { |
|
|
|
getQueueForIdentity(self.getAddress()).clear(); |
|
|
|
} |
|
|
|
|
|
|
|
public String receiveMessage() throws UnknownIdentityException, IOException, ClassNotFoundException, TimeoutException { |
|
|
|
public FsMQMessage receiveMessage() throws IOException, ClassNotFoundException, TimeoutException { |
|
|
|
return receiveMessage(0); |
|
|
|
} |
|
|
|
|
|
|
|
// Blocks until timeout
|
|
|
|
public String receiveMessage(int timeoutSec) throws UnknownIdentityException, IOException, ClassNotFoundException, TimeoutException { |
|
|
|
String ret = null; |
|
|
|
public FsMQMessage receiveMessage(int timeoutSec) throws IOException, ClassNotFoundException, TimeoutException { |
|
|
|
FsMQMessage ret = null; |
|
|
|
FsMsgQueue onwQueue = getQueueForIdentity(self.getAddress()); |
|
|
|
FsMQMessage mqMsg = null; |
|
|
|
FsMQMessageInternal mqMsg = null; |
|
|
|
int pollInterval = 100; |
|
|
|
int pollRepeats = timeoutSec * 1000 / pollInterval; |
|
|
|
int pollCounter = 0; |
|
|
@ -152,16 +152,16 @@ public class FsMQManager { |
|
|
|
} |
|
|
|
} |
|
|
|
String serializedMsg = onwQueue.remove(); |
|
|
|
mqMsg = FsMQMessage.deserialize(serializedMsg); |
|
|
|
mqMsg = FsMQMessageInternal.deserialize(serializedMsg); |
|
|
|
} while (doHandshakeProtocol(mqMsg)); |
|
|
|
ret = mqMsg.msg; |
|
|
|
ret = mqMsg.toFsMQMessage(); |
|
|
|
return ret; |
|
|
|
} |
|
|
|
|
|
|
|
// True if existing
|
|
|
|
// False if not
|
|
|
|
// Exception on not unique
|
|
|
|
public boolean identityExists(String address) { |
|
|
|
public boolean identityExists(String address) throws IllegalStateException, NullPointerException{ |
|
|
|
boolean ret = false; |
|
|
|
if (address != null) { |
|
|
|
List<FsMQIdentity> matches = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()); |
|
|
@ -178,14 +178,14 @@ public class FsMQManager { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// // Return null if not existing
|
|
|
|
// public FsMQIdentity getIdentityForAddress(String address) throws UnknownIdentityException, IllegalStateException {
|
|
|
|
// FsMQIdentity ret = null;
|
|
|
|
// if (identityExists(address)) {
|
|
|
|
// ret = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()).get(0);
|
|
|
|
// }
|
|
|
|
// return ret;
|
|
|
|
// }
|
|
|
|
// Returns null if not existing
|
|
|
|
public FsMQIdentity getIdentityForAddress(String address) { |
|
|
|
FsMQIdentity ret = null; |
|
|
|
if (identityExists(address)) { |
|
|
|
ret = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()).get(0); |
|
|
|
} |
|
|
|
return ret; |
|
|
|
} |
|
|
|
|
|
|
|
public List<FsMQIdentity> getIdentities() { |
|
|
|
return new ArrayList<FsMQIdentity>(identities); |
|
|
@ -225,7 +225,6 @@ public class FsMQManager { |
|
|
|
return ret; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// public void handshake(FsMQIdentity ident) {
|
|
|
|
// String msg = "";
|
|
|
|
// sendSYN(ident);
|
|
|
@ -236,7 +235,7 @@ public class FsMQManager { |
|
|
|
// sendACK(ident);
|
|
|
|
// }
|
|
|
|
|
|
|
|
private boolean doHandshakeProtocol(FsMQMessage msg) { |
|
|
|
private boolean doHandshakeProtocol(FsMQMessageInternal msg) { |
|
|
|
boolean ret = false; |
|
|
|
//
|
|
|
|
// if(msg.matches(SYNMSG)) {
|
|
|
@ -263,21 +262,11 @@ public class FsMQManager { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
class FsMQMessage implements java.io.Serializable { |
|
|
|
FsMQIdentity from = null; |
|
|
|
class FsMQMessageInternal extends FsMQMessage implements java.io.Serializable { |
|
|
|
FsMQHandshakeHeader header = null; |
|
|
|
String msg = null; |
|
|
|
|
|
|
|
FsMQMessage(FsMQIdentity from, String msg) throws IllegalStateException { |
|
|
|
if (from == null || msg == null) { |
|
|
|
throw new IllegalStateException("from and msg cant be null"); |
|
|
|
} |
|
|
|
this.from = from; |
|
|
|
this.msg = msg; |
|
|
|
} |
|
|
|
|
|
|
|
public String getMsg() { |
|
|
|
return msg; |
|
|
|
FsMQMessageInternal(FsMQIdentity from, String msg) throws IllegalStateException { |
|
|
|
super(from,msg); |
|
|
|
} |
|
|
|
|
|
|
|
public FsMQHandshakeHeader getHeader() { |
|
|
@ -288,16 +277,22 @@ class FsMQMessage implements java.io.Serializable { |
|
|
|
this.header = header; |
|
|
|
} |
|
|
|
|
|
|
|
public static FsMQMessage deserialize(String serializedMsg) throws IOException, ClassNotFoundException { |
|
|
|
FsMQMessage ret = null; |
|
|
|
public FsMQMessage toFsMQMessage() throws NullPointerException { |
|
|
|
FsMQMessage ret = new FsMQMessage(this.getFrom(),this.getMsg()); |
|
|
|
return ret; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public static FsMQMessageInternal deserialize(String serializedMsg) throws IOException, ClassNotFoundException { |
|
|
|
FsMQMessageInternal ret = null; |
|
|
|
byte[] data = Base64.getDecoder().decode(serializedMsg); |
|
|
|
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); |
|
|
|
Object obj = ois.readObject(); |
|
|
|
ois.close(); |
|
|
|
if (!(obj instanceof FsMQMessage)) { |
|
|
|
throw new ClassNotFoundException("Unvalid serialized string"); |
|
|
|
if (!(obj instanceof FsMQMessageInternal)) { |
|
|
|
throw new ClassNotFoundException("Invalid serialized string"); |
|
|
|
} else { |
|
|
|
ret = (FsMQMessage) obj; |
|
|
|
ret = (FsMQMessageInternal) obj; |
|
|
|
} |
|
|
|
return ret; |
|
|
|
} |
|
|
|