|
@ -1,17 +1,18 @@ |
|
|
package foundation.pEp.jniadapter.test.utils.transport.fsmqmanager; |
|
|
package foundation.pEp.jniadapter.test.utils.transport.fsmqmanager; |
|
|
|
|
|
|
|
|
import foundation.pEp.jniadapter.Identity; |
|
|
|
|
|
import foundation.pEp.jniadapter.test.framework.TestUtils; |
|
|
import foundation.pEp.jniadapter.test.framework.TestUtils; |
|
|
import foundation.pEp.jniadapter.test.utils.transport.fsmsgqueue.FsMsgQueue; |
|
|
import foundation.pEp.jniadapter.test.utils.transport.fsmsgqueue.FsMsgQueue; |
|
|
|
|
|
|
|
|
import java.io.*; |
|
|
import java.io.*; |
|
|
import java.util.*; |
|
|
import java.util.*; |
|
|
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
import java.util.stream.Collectors; |
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
import static foundation.pEp.jniadapter.test.framework.TestLogger.log; |
|
|
import static foundation.pEp.jniadapter.test.framework.TestLogger.log; |
|
|
|
|
|
|
|
|
public class FsMQManager { |
|
|
public class FsMQManager { |
|
|
private FsMQIdentity self = null; |
|
|
private FsMQIdentity self = null; |
|
|
|
|
|
|
|
|
private List<FsMQIdentity> identities = new ArrayList<>(); |
|
|
private List<FsMQIdentity> identities = new ArrayList<>(); |
|
|
private Map<String, FsMsgQueue> identityAddressQueues = new HashMap<String, FsMsgQueue>(); |
|
|
private Map<String, FsMsgQueue> identityAddressQueues = new HashMap<String, FsMsgQueue>(); |
|
|
|
|
|
|
|
@ -19,24 +20,27 @@ public class FsMQManager { |
|
|
private static String SYNACKMSG = "SYNACK"; |
|
|
private static String SYNACKMSG = "SYNACK"; |
|
|
private static String ACKMSG = "ACK"; |
|
|
private static String ACKMSG = "ACK"; |
|
|
|
|
|
|
|
|
public FsMQManager(String ownAddr, String ownQueueDir) { |
|
|
public FsMQManager(FsMQIdentity self) { |
|
|
self = new FsMQIdentity(ownAddr, ownQueueDir); |
|
|
this.self = self; |
|
|
addOrUpdateIdentity(self); |
|
|
addOrUpdateIdentity(self); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Identity address must be unique
|
|
|
// Identity address must be unique
|
|
|
public void addOrUpdateIdentity(FsMQIdentity ident) { |
|
|
// Returns
|
|
|
|
|
|
// - true for added
|
|
|
|
|
|
// - false for updated
|
|
|
|
|
|
public boolean addOrUpdateIdentity(FsMQIdentity ident) { |
|
|
try { |
|
|
try { |
|
|
getIdentityForAddress(ident.getAddress()); |
|
|
getIdentityForAddress(ident.getAddress()); |
|
|
} catch (UnknownIdentityException e) { |
|
|
} catch (UnknownIdentityException e) { |
|
|
// Good, add new ident
|
|
|
// Good, add new ident
|
|
|
addIdent(ident); |
|
|
addIdent(ident); |
|
|
return; |
|
|
return true; |
|
|
} |
|
|
} |
|
|
// Ok, update ident
|
|
|
// Ok, update ident
|
|
|
removeIdent(ident); |
|
|
removeIdent(ident); |
|
|
addIdent(ident); |
|
|
addIdent(ident); |
|
|
return; |
|
|
return false; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void sendMsgToIdentity(FsMQIdentity ident, String msg) throws UnknownIdentityException, IOException { |
|
|
public void sendMsgToIdentity(FsMQIdentity ident, String msg) throws UnknownIdentityException, IOException { |
|
@ -49,13 +53,24 @@ public class FsMQManager { |
|
|
getQueueForIdentity(self).clear(); |
|
|
getQueueForIdentity(self).clear(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public String waitForMsg() throws UnknownIdentityException, IOException, ClassNotFoundException { |
|
|
public String waitForMsg() throws UnknownIdentityException, IOException, ClassNotFoundException, TimeoutException { |
|
|
|
|
|
return waitForMsg(0); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public String waitForMsg(int timeoutSec) throws UnknownIdentityException, IOException, ClassNotFoundException, TimeoutException { |
|
|
String ret = null; |
|
|
String ret = null; |
|
|
FsMsgQueue onwQueue = getQueueForIdentity(self); |
|
|
FsMsgQueue onwQueue = getQueueForIdentity(self); |
|
|
FsMQMessage mqMsg = null; |
|
|
FsMQMessage mqMsg = null; |
|
|
|
|
|
int pollInterval = 100; |
|
|
|
|
|
int pollRepeats = timeoutSec * 1000 / pollInterval; |
|
|
|
|
|
int pollCounter = 0; |
|
|
do { |
|
|
do { |
|
|
while (onwQueue.isEmpty()) { |
|
|
while (onwQueue.isEmpty()) { |
|
|
TestUtils.sleep(100); |
|
|
TestUtils.sleep(100); |
|
|
|
|
|
pollCounter++; |
|
|
|
|
|
if (pollCounter >= pollRepeats) { |
|
|
|
|
|
throw new TimeoutException(""); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
String serializedMsg = onwQueue.remove(); |
|
|
String serializedMsg = onwQueue.remove(); |
|
|
mqMsg = FsMQMessage.deserialize(serializedMsg); |
|
|
mqMsg = FsMQMessage.deserialize(serializedMsg); |
|
@ -70,7 +85,6 @@ public class FsMQManager { |
|
|
createQueueForIdent(ident); |
|
|
createQueueForIdent(ident); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// undefined behaviour if already existing
|
|
|
|
|
|
// Removes the identity from identities and identityQueues by address
|
|
|
// Removes the identity from identities and identityQueues by address
|
|
|
private void removeIdent(FsMQIdentity ident) { |
|
|
private void removeIdent(FsMQIdentity ident) { |
|
|
identities.removeIf(i -> i.getAddress().equals(ident.getAddress())); |
|
|
identities.removeIf(i -> i.getAddress().equals(ident.getAddress())); |
|
@ -85,13 +99,13 @@ public class FsMQManager { |
|
|
private FsMsgQueue getQueueForIdentity(FsMQIdentity ident) throws UnknownIdentityException { |
|
|
private FsMsgQueue getQueueForIdentity(FsMQIdentity ident) throws UnknownIdentityException { |
|
|
FsMsgQueue ret = null; |
|
|
FsMsgQueue ret = null; |
|
|
ret = identityAddressQueues.get(ident.getAddress()); |
|
|
ret = identityAddressQueues.get(ident.getAddress()); |
|
|
if (ret != null) { |
|
|
if (ret == null) { |
|
|
throw new UnknownIdentityException("Unknown identity address: " + ident.getAddress()); |
|
|
throw new UnknownIdentityException("Unknown identity address: " + ident.getAddress()); |
|
|
} |
|
|
} |
|
|
return ret; |
|
|
return ret; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private FsMQIdentity getIdentityForAddress(String address) throws UnknownIdentityException, IllegalStateException { |
|
|
public FsMQIdentity getIdentityForAddress(String address) throws UnknownIdentityException, IllegalStateException { |
|
|
FsMQIdentity ret = null; |
|
|
FsMQIdentity ret = null; |
|
|
List<FsMQIdentity> matches = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()); |
|
|
List<FsMQIdentity> matches = identities.stream().filter(i -> i.getAddress().equals(address)).collect(Collectors.toList()); |
|
|
if (matches.size() <= 0) { |
|
|
if (matches.size() <= 0) { |
|
@ -147,8 +161,8 @@ class FsMQMessage implements java.io.Serializable { |
|
|
FsMQHandshakeHeader header = null; |
|
|
FsMQHandshakeHeader header = null; |
|
|
String msg = null; |
|
|
String msg = null; |
|
|
|
|
|
|
|
|
FsMQMessage(FsMQIdentity from, String msg) throws IllegalStateException{ |
|
|
FsMQMessage(FsMQIdentity from, String msg) throws IllegalStateException { |
|
|
if(from == null || msg == null) { |
|
|
if (from == null || msg == null) { |
|
|
throw new IllegalStateException("from and msg cant be null"); |
|
|
throw new IllegalStateException("from and msg cant be null"); |
|
|
} |
|
|
} |
|
|
this.from = from; |
|
|
this.from = from; |
|
@ -173,7 +187,7 @@ class FsMQMessage implements java.io.Serializable { |
|
|
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); |
|
|
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); |
|
|
Object obj = ois.readObject(); |
|
|
Object obj = ois.readObject(); |
|
|
ois.close(); |
|
|
ois.close(); |
|
|
if(!(obj instanceof FsMQMessage)) { |
|
|
if (!(obj instanceof FsMQMessage)) { |
|
|
throw new ClassNotFoundException("Unvalid serialized string"); |
|
|
throw new ClassNotFoundException("Unvalid serialized string"); |
|
|
} else { |
|
|
} else { |
|
|
ret = (FsMQMessage) obj; |
|
|
ret = (FsMQMessage) obj; |
|
@ -197,8 +211,3 @@ class FsMQMessage implements java.io.Serializable { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UnknownIdentityException extends RuntimeException { |
|
|
|
|
|
UnknownIdentityException(String message) { |
|
|
|
|
|
super(message); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|