
3 changed files with 194 additions and 65 deletions
@ -0,0 +1,21 @@ |
|||
package foundation.pEp.jniadapter.test.utils.transport.fsmqmanager; |
|||
|
|||
|
|||
class FsMQIdentity implements java.io.Serializable { |
|||
private String address = null; |
|||
private String qDir = null; |
|||
|
|||
FsMQIdentity(String address, String qDir) { |
|||
this.address = address; |
|||
this.qDir = qDir; |
|||
} |
|||
|
|||
public String getAddress() { |
|||
return address; |
|||
} |
|||
|
|||
public String getqDir() { |
|||
return qDir; |
|||
} |
|||
|
|||
} |
@ -1,76 +1,204 @@ |
|||
package foundation.pEp.jniadapter.test.utils.transport.fsmqmanager; |
|||
|
|||
import foundation.pEp.jniadapter.Identity; |
|||
import foundation.pEp.jniadapter.test.framework.TestUtils; |
|||
import foundation.pEp.jniadapter.test.utils.transport.fsmsgqueue.FsMsgQueue; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import java.io.*; |
|||
import java.util.*; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static foundation.pEp.jniadapter.test.framework.TestLogger.log; |
|||
|
|||
public class FsMQManager { |
|||
private String ownAddress; |
|||
private FsMsgQueue ownQueue; |
|||
private Map<String, FsMsgQueue> peerQueues = new HashMap<>(); |
|||
private FsMQIdentity self = null; |
|||
private List<FsMQIdentity> identities = new ArrayList<>(); |
|||
private Map<String, FsMsgQueue> identityAddressQueues = new HashMap<String, FsMsgQueue>(); |
|||
|
|||
private static String SIGNALONLINEMSG = "SIGONLINE"; |
|||
private static String SYNMSG = "SYN"; |
|||
private static String SYNACKMSG = "SYNACK"; |
|||
private static String ACKMSG = "ACK"; |
|||
|
|||
public FsMQManager(String ownAddr, String ownQueueDir) { |
|||
ownAddress = ownAddr; |
|||
ownQueue = new FsMsgQueue(ownQueueDir); |
|||
self = new FsMQIdentity(ownAddr, ownQueueDir); |
|||
addOrUpdateIdentity(self); |
|||
} |
|||
|
|||
public void addPeer(String address, String queueDir) { |
|||
FsMsgQueue q = new FsMsgQueue(queueDir); |
|||
peerQueues.put(address, q); |
|||
// Identity address must be unique
|
|||
public void addOrUpdateIdentity(FsMQIdentity ident) { |
|||
try { |
|||
getIdentityForAddress(ident.getAddress()); |
|||
} catch (UnknownIdentityException e) { |
|||
// Good, add new ident
|
|||
addIdent(ident); |
|||
return; |
|||
} |
|||
// Ok, update ident
|
|||
removeIdent(ident); |
|||
addIdent(ident); |
|||
return; |
|||
} |
|||
|
|||
public void sendMsgToPeer(String address, String msg) throws UnknownPeerException { |
|||
getQueueForPeer(address).add(msg); |
|||
public void sendMsgToIdentity(FsMQIdentity ident, String msg) throws UnknownIdentityException, IOException { |
|||
FsMQMessage mqMsg = new FsMQMessage(self, msg); |
|||
String serializedStr = mqMsg.serialize(); |
|||
getQueueForIdentity(ident).add(serializedStr); |
|||
} |
|||
|
|||
public void waitForPeerOnline(String address) { |
|||
String msg = ""; |
|||
while (msg != "startup from " + address) { |
|||
log("Waiting for " + address); |
|||
msg = waitForMsg(); |
|||
} |
|||
public void clearOwnQueue() { |
|||
getQueueForIdentity(self).clear(); |
|||
} |
|||
|
|||
public String waitForMsg() throws UnknownIdentityException, IOException, ClassNotFoundException { |
|||
String ret = null; |
|||
FsMsgQueue onwQueue = getQueueForIdentity(self); |
|||
FsMQMessage mqMsg = null; |
|||
do { |
|||
while (onwQueue.isEmpty()) { |
|||
TestUtils.sleep(100); |
|||
} |
|||
String serializedMsg = onwQueue.remove(); |
|||
mqMsg = FsMQMessage.deserialize(serializedMsg); |
|||
} while (doHandshakeProtocol(mqMsg)); |
|||
ret = mqMsg.msg; |
|||
return ret; |
|||
} |
|||
|
|||
// undefined behaviour if already existing
|
|||
private void addIdent(FsMQIdentity ident) { |
|||
identities.add(ident); |
|||
createQueueForIdent(ident); |
|||
} |
|||
|
|||
// undefined behaviour if already existing
|
|||
// Removes the identity from identities and identityQueues by address
|
|||
private void removeIdent(FsMQIdentity ident) { |
|||
identities.removeIf(i -> i.getAddress().equals(ident.getAddress())); |
|||
identityAddressQueues.entrySet().removeIf(iq -> iq.getKey().equals(ident.getAddress())); |
|||
} |
|||
|
|||
public void clearOwnQueue() { |
|||
ownQueue.clear(); |
|||
private void createQueueForIdent(FsMQIdentity ident) { |
|||
FsMsgQueue q = new FsMsgQueue(ident.getqDir()); |
|||
identityAddressQueues.put(ident.getAddress(), q); |
|||
} |
|||
|
|||
public String waitForMsg() { |
|||
while (ownQueue.isEmpty()) { |
|||
TestUtils.sleep(100); |
|||
private FsMsgQueue getQueueForIdentity(FsMQIdentity ident) throws UnknownIdentityException { |
|||
FsMsgQueue ret = null; |
|||
ret = identityAddressQueues.get(ident.getAddress()); |
|||
if (ret != null) { |
|||
throw new UnknownIdentityException("Unknown identity address: " + ident.getAddress()); |
|||
} |
|||
return ownQueue.remove(); |
|||
return ret; |
|||
} |
|||
|
|||
public void sendSigOnlineToPeer(String address) { |
|||
String msg = SIGNALONLINEMSG + " " + ownAddress; |
|||
log("Sending SIGONLINE to: " + address); |
|||
sendMsgToPeer(address, msg); |
|||
private FsMQIdentity getIdentityForAddress(String address) throws UnknownIdentityException, IllegalStateException { |
|||
FsMQIdentity ret = null; |
|||
List<FsMQIdentity> matches = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()); |
|||
if (matches.size() <= 0) { |
|||
throw new UnknownIdentityException("No identity with address:" + address); |
|||
} |
|||
if (matches.size() > 1) { |
|||
throw new IllegalStateException("Identity address not unique: " + address); |
|||
} |
|||
ret = matches.get(0); |
|||
return ret; |
|||
} |
|||
|
|||
public void broadcastSigOnline() { |
|||
for (String k : peerQueues.keySet()) { |
|||
sendSigOnlineToPeer(k); |
|||
|
|||
// public void handshake(FsMQIdentity ident) {
|
|||
// String msg = "";
|
|||
// sendSYN(ident);
|
|||
// while (msg != SYNACKMSG + " " + ident.getAddress()) {
|
|||
// log("Waiting for SYNACK from " + ident.getAddress());
|
|||
// msg = waitForMsg();
|
|||
// }
|
|||
// sendACK(ident);
|
|||
// }
|
|||
|
|||
private boolean doHandshakeProtocol(FsMQMessage msg) { |
|||
boolean ret = false; |
|||
//
|
|||
// if(msg.matches(SYNMSG)) {
|
|||
//
|
|||
// }
|
|||
// if(msg.matches(SYNACK)) {
|
|||
//
|
|||
// }
|
|||
|
|||
return ret; |
|||
} |
|||
|
|||
// public void sendSYN(FsMQIdentity ident) {
|
|||
// String msg = SYNMSG + " " + self.getAddress();
|
|||
// log("Sending SYN to: " + ident.getAddress());
|
|||
// sendMsgToIdentity(ident, msg);
|
|||
// }
|
|||
//
|
|||
// public void sendACK(FsMQIdentity ident) {
|
|||
// String msg = ACKMSG + " " + self.getAddress();
|
|||
// log("Sending ACK to: " + ident.getAddress());
|
|||
// sendMsgToIdentity(ident, msg);
|
|||
// }
|
|||
|
|||
} |
|||
|
|||
class FsMQMessage implements java.io.Serializable { |
|||
FsMQIdentity from = null; |
|||
FsMQHandshakeHeader header = null; |
|||
String msg = null; |
|||
|
|||
FsMQMessage(FsMQIdentity from, String msg) throws IllegalStateException{ |
|||
if(from == null || msg == null) { |
|||
throw new IllegalStateException("from and msg cant be null"); |
|||
} |
|||
this.from = from; |
|||
this.msg = msg; |
|||
} |
|||
|
|||
public String getMsg() { |
|||
return msg; |
|||
} |
|||
|
|||
public FsMQHandshakeHeader getHeader() { |
|||
return header; |
|||
} |
|||
|
|||
private FsMsgQueue getQueueForPeer(String address) throws UnknownPeerException { |
|||
FsMsgQueue ret = peerQueues.get(address); |
|||
if (ret == null) { |
|||
throw new UnknownPeerException("No peer with address:" + address); |
|||
public void setHeader(FsMQHandshakeHeader header) { |
|||
this.header = header; |
|||
} |
|||
|
|||
public static FsMQMessage deserialize(String serializedMsg) throws IOException, ClassNotFoundException { |
|||
FsMQMessage ret = null; |
|||
byte[] data = Base64.getDecoder().decode(serializedMsg); |
|||
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); |
|||
Object obj = ois.readObject(); |
|||
ois.close(); |
|||
if(!(obj instanceof FsMQMessage)) { |
|||
throw new ClassNotFoundException("Unvalid serialized string"); |
|||
} else { |
|||
ret = (FsMQMessage) obj; |
|||
} |
|||
return ret; |
|||
} |
|||
|
|||
public String serialize() throws IOException { |
|||
String ret = null; |
|||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
|||
ObjectOutputStream oos = new ObjectOutputStream(baos); |
|||
oos.writeObject(this); |
|||
oos.close(); |
|||
ret = Base64.getEncoder().encodeToString(baos.toByteArray()); |
|||
return ret; |
|||
} |
|||
|
|||
class FsMQHandshakeHeader implements java.io.Serializable { |
|||
String operation = null; |
|||
} |
|||
} |
|||
|
|||
|
|||
class UnknownPeerException extends RuntimeException { |
|||
UnknownPeerException(String message) { |
|||
class UnknownIdentityException extends RuntimeException { |
|||
UnknownIdentityException(String message) { |
|||
super(message); |
|||
} |
|||
} |
|||
} |
|||
|
Loading…
Reference in new issue