diff --git a/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQIdentity.java b/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQIdentity.java new file mode 100644 index 0000000..5afdc24 --- /dev/null +++ b/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQIdentity.java @@ -0,0 +1,21 @@ +package foundation.pEp.jniadapter.test.utils.transport.fsmqmanager; + + +class FsMQIdentity implements java.io.Serializable { + private String address = null; + private String qDir = null; + + FsMQIdentity(String address, String qDir) { + this.address = address; + this.qDir = qDir; + } + + public String getAddress() { + return address; + } + + public String getqDir() { + return qDir; + } + +} \ No newline at end of file diff --git a/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQManager.java b/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQManager.java index b0676e3..5a52726 100644 --- a/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQManager.java +++ b/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/FsMQManager.java @@ -1,76 +1,204 @@ package foundation.pEp.jniadapter.test.utils.transport.fsmqmanager; + +import foundation.pEp.jniadapter.Identity; import foundation.pEp.jniadapter.test.framework.TestUtils; import foundation.pEp.jniadapter.test.utils.transport.fsmsgqueue.FsMsgQueue; -import java.util.HashMap; -import java.util.Map; +import java.io.*; +import java.util.*; +import java.util.stream.Collectors; import static foundation.pEp.jniadapter.test.framework.TestLogger.log; public class FsMQManager { - private String ownAddress; - private FsMsgQueue ownQueue; - private Map peerQueues = new HashMap<>(); + private FsMQIdentity self = null; + private List identities = new ArrayList<>(); + private Map identityAddressQueues = new HashMap(); - private static String SIGNALONLINEMSG = "SIGONLINE"; + private static String SYNMSG = "SYN"; + private static String SYNACKMSG = "SYNACK"; + private static String ACKMSG = "ACK"; public FsMQManager(String ownAddr, String ownQueueDir) { - ownAddress = ownAddr; - ownQueue = new FsMsgQueue(ownQueueDir); + self = new FsMQIdentity(ownAddr, ownQueueDir); + addOrUpdateIdentity(self); } - public void addPeer(String address, String queueDir) { - FsMsgQueue q = new FsMsgQueue(queueDir); - peerQueues.put(address, q); + // Identity address must be unique + public void addOrUpdateIdentity(FsMQIdentity ident) { + try { + getIdentityForAddress(ident.getAddress()); + } catch (UnknownIdentityException e) { + // Good, add new ident + addIdent(ident); + return; + } + // Ok, update ident + removeIdent(ident); + addIdent(ident); + return; } - public void sendMsgToPeer(String address, String msg) throws UnknownPeerException { - getQueueForPeer(address).add(msg); + public void sendMsgToIdentity(FsMQIdentity ident, String msg) throws UnknownIdentityException, IOException { + FsMQMessage mqMsg = new FsMQMessage(self, msg); + String serializedStr = mqMsg.serialize(); + getQueueForIdentity(ident).add(serializedStr); } - public void waitForPeerOnline(String address) { - String msg = ""; - while (msg != "startup from " + address) { - log("Waiting for " + address); - msg = waitForMsg(); - } + public void clearOwnQueue() { + getQueueForIdentity(self).clear(); + } + + public String waitForMsg() throws UnknownIdentityException, IOException, ClassNotFoundException { + String ret = null; + FsMsgQueue onwQueue = getQueueForIdentity(self); + FsMQMessage mqMsg = null; + do { + while (onwQueue.isEmpty()) { + TestUtils.sleep(100); + } + String serializedMsg = onwQueue.remove(); + mqMsg = FsMQMessage.deserialize(serializedMsg); + } while (doHandshakeProtocol(mqMsg)); + ret = mqMsg.msg; + return ret; + } + + // undefined behaviour if already existing + private void addIdent(FsMQIdentity ident) { + identities.add(ident); + createQueueForIdent(ident); + } + + // undefined behaviour if already existing + // Removes the identity from identities and identityQueues by address + private void removeIdent(FsMQIdentity ident) { + identities.removeIf(i -> i.getAddress().equals(ident.getAddress())); + identityAddressQueues.entrySet().removeIf(iq -> iq.getKey().equals(ident.getAddress())); } - public void clearOwnQueue() { - ownQueue.clear(); + private void createQueueForIdent(FsMQIdentity ident) { + FsMsgQueue q = new FsMsgQueue(ident.getqDir()); + identityAddressQueues.put(ident.getAddress(), q); } - public String waitForMsg() { - while (ownQueue.isEmpty()) { - TestUtils.sleep(100); + private FsMsgQueue getQueueForIdentity(FsMQIdentity ident) throws UnknownIdentityException { + FsMsgQueue ret = null; + ret = identityAddressQueues.get(ident.getAddress()); + if (ret != null) { + throw new UnknownIdentityException("Unknown identity address: " + ident.getAddress()); } - return ownQueue.remove(); + return ret; } - public void sendSigOnlineToPeer(String address) { - String msg = SIGNALONLINEMSG + " " + ownAddress; - log("Sending SIGONLINE to: " + address); - sendMsgToPeer(address, msg); + private FsMQIdentity getIdentityForAddress(String address) throws UnknownIdentityException, IllegalStateException { + FsMQIdentity ret = null; + List matches = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()); + if (matches.size() <= 0) { + throw new UnknownIdentityException("No identity with address:" + address); + } + if (matches.size() > 1) { + throw new IllegalStateException("Identity address not unique: " + address); + } + ret = matches.get(0); + return ret; } - public void broadcastSigOnline() { - for (String k : peerQueues.keySet()) { - sendSigOnlineToPeer(k); + +// public void handshake(FsMQIdentity ident) { +// String msg = ""; +// sendSYN(ident); +// while (msg != SYNACKMSG + " " + ident.getAddress()) { +// log("Waiting for SYNACK from " + ident.getAddress()); +// msg = waitForMsg(); +// } +// sendACK(ident); +// } + + private boolean doHandshakeProtocol(FsMQMessage msg) { + boolean ret = false; +// +// if(msg.matches(SYNMSG)) { +// +// } +// if(msg.matches(SYNACK)) { +// +// } + + return ret; + } + +// public void sendSYN(FsMQIdentity ident) { +// String msg = SYNMSG + " " + self.getAddress(); +// log("Sending SYN to: " + ident.getAddress()); +// sendMsgToIdentity(ident, msg); +// } +// +// public void sendACK(FsMQIdentity ident) { +// String msg = ACKMSG + " " + self.getAddress(); +// log("Sending ACK to: " + ident.getAddress()); +// sendMsgToIdentity(ident, msg); +// } + +} + +class FsMQMessage implements java.io.Serializable { + FsMQIdentity from = null; + FsMQHandshakeHeader header = null; + String msg = null; + + FsMQMessage(FsMQIdentity from, String msg) throws IllegalStateException{ + if(from == null || msg == null) { + throw new IllegalStateException("from and msg cant be null"); } + this.from = from; + this.msg = msg; + } + + public String getMsg() { + return msg; + } + + public FsMQHandshakeHeader getHeader() { + return header; } - private FsMsgQueue getQueueForPeer(String address) throws UnknownPeerException { - FsMsgQueue ret = peerQueues.get(address); - if (ret == null) { - throw new UnknownPeerException("No peer with address:" + address); + public void setHeader(FsMQHandshakeHeader header) { + this.header = header; + } + + public static FsMQMessage deserialize(String serializedMsg) throws IOException, ClassNotFoundException { + FsMQMessage ret = null; + byte[] data = Base64.getDecoder().decode(serializedMsg); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); + Object obj = ois.readObject(); + ois.close(); + if(!(obj instanceof FsMQMessage)) { + throw new ClassNotFoundException("Unvalid serialized string"); + } else { + ret = (FsMQMessage) obj; } return ret; } + + public String serialize() throws IOException { + String ret = null; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(this); + oos.close(); + ret = Base64.getEncoder().encodeToString(baos.toByteArray()); + return ret; + } + + class FsMQHandshakeHeader implements java.io.Serializable { + String operation = null; + } } -class UnknownPeerException extends RuntimeException { - UnknownPeerException(String message) { +class UnknownIdentityException extends RuntimeException { + UnknownIdentityException(String message) { super(message); } -} \ No newline at end of file +} diff --git a/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/test/regression/TestMain.java b/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/test/regression/TestMain.java index 98e8f1f..0501f14 100644 --- a/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/test/regression/TestMain.java +++ b/test/java/foundation/pEp/jniadapter/test/utils/transport/fsmqmanager/test/regression/TestMain.java @@ -4,24 +4,18 @@ import static foundation.pEp.jniadapter.test.framework.TestLogger.*; import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.*; import foundation.pEp.jniadapter.test.framework.*; -import java.util.HashMap; -import java.util.Map; - class FsMsgQueueTestContext extends AbstractTestContext { - Map peers; String ownAddress = "Alice"; String ownQDir = "../resources/fsmsgqueue-test/alice"; - String addressBob = "Bob"; - String addressCarol = "Carol"; + + FsMQIdentity bob = null; FsMQManager qm; @Override public void init() throws Throwable { - peers = new HashMap<>(); - peers.put(addressBob, "../resources/fsmsgqueue-test/bob"); - peers.put(addressCarol, "../resources/fsmsgqueue-test/carol"); + bob = new FsMQIdentity("Bob","../resources/fsmsgqueue-test/bob"); } } @@ -39,25 +33,11 @@ class TestMain { ctx.qm.clearOwnQueue(); }).add(); - new TestUnit("Add peer bob", testCtx, ctx -> { - for(String k : ctx.peers.keySet()) { - log("Adding peer: " + k); - ctx.qm.addPeer(k, ctx.peers.get(k)); - } - }).add(); - - new TestUnit("Broadcast online", testCtx, ctx -> { - ctx.qm.broadcastSigOnline(); + new TestUnit("Add ident bob", testCtx, ctx -> { + log("Adding ident: " + ctx.bob.getAddress()); + ctx.qm.addOrUpdateIdentity(ctx.bob); }).add(); - new TestUnit("Wait for bob", testCtx, ctx -> { - log("Waiting for Bob to signal online"); - ctx.qm.waitForPeerOnline(ctx.addressBob); - log("Bob is online"); - }).add(); - - - TestSuite.run(); }