diff --git a/test/java/foundation/pEp/jniadapter/test/jni153/CTXMultiNode.java b/test/java/foundation/pEp/jniadapter/test/jni153/CTXMultiNode.java index 1f2d99c..83f8ec7 100644 --- a/test/java/foundation/pEp/jniadapter/test/jni153/CTXMultiNode.java +++ b/test/java/foundation/pEp/jniadapter/test/jni153/CTXMultiNode.java @@ -5,15 +5,13 @@ import foundation.pEp.jniadapter.Message; import foundation.pEp.jniadapter.decrypt_message_Return; import foundation.pEp.jniadapter.test.utils.TestCallbacks; import foundation.pEp.jniadapter.test.utils.model.*; +import foundation.pEp.jniadapter.test.utils.transport.Transport; import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.FsMQIdentity; -import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.FsMQManager; -import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.FsMQMessage; import foundation.pEp.pitytest.AbstractTestContext; import foundation.pEp.pitytest.TestContextInterface; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; import static foundation.pEp.pitytest.TestLogger.log; @@ -23,19 +21,31 @@ public class CTXMultiNode extends AbstractTestContext { public Transport transport; // Model - public TestModel model; + public TestModel> model; // Mappings private NodeName ownNodeName; - private TestNode ownNode; - public TestIdentity myself; - public TestIdentity partner; - + private TestNode ownNode; + public pEpTestIdentity myself; + public pEpTestIdentity partner; CTXMultiNode(NodeName ownNodeName) { this.ownNodeName = ownNodeName; } + private TestModel> setupModel() { + TestModel> ret = new TestModel(pEpTestIdentity::new, TestNode::new); + + ret.getNode(NodeName.NODE_A1).setDefaultRole(Role.ALICE); + ret.getNode(NodeName.NODE_B1).setDefaultRole(Role.BOB); + + ret.getIdent(Role.ALICE).setDefaultPartner(Role.BOB); + ret.getIdent(Role.BOB).setDefaultPartner(Role.ALICE); +// ret.getIdent(Role.CAROL).setDefaultPartner(Role.ALICE); + + return ret; + } + @Override public TestContextInterface init() throws Throwable { // pEp @@ -50,134 +60,24 @@ public class CTXMultiNode extends AbstractTestContext { // Setup Perspective ownNode = model.getNode(ownNodeName); myself = ownNode.getIdent(); - partner = myself.getDefaultPartner(); + partner = (pEpTestIdentity) myself.getDefaultPartner(); // Transport // Create own transport identity and Transport FsMQIdentity transportIdent = myself.getTransportIdent(ownNodeName); - transport = new Transport(transportIdent, model.getAllIdents()); - + List peers = new ArrayList<>(); + peers.addAll(model.getAllIdents()); + transport = new Transport(transportIdent, peers); + transport.setAsyncTxProcessor(this::asyncEnc); + transport.setAsyncRxProcessor(this::asyncDec); return this; } - private TestModel setupModel() { - TestModel ret = new TestModel(); - - ret.getNode(NodeName.NODE_A1).setRole(Role.ALICE); - ret.getNode(NodeName.NODE_B1).setRole(Role.BOB); - - ret.getIdent(Role.ALICE).setDefaultPartner(Role.BOB); - ret.getIdent(Role.BOB).setDefaultPartner(Role.ALICE); -// ret.getIdent(Role.CAROL).setDefaultPartner(Role.ALICE); - - return ret; - } - -} - -class Transport { - private FsMQManager fsMQTransport = null; - private EncryptingSenderThread sender = null; - private DecryptingReceiverThread receiver = null; - private FsMQIdentity myself = null; - private List peers = null; - - // Message queues - private LinkedBlockingQueue txQueue = new LinkedBlockingQueue<>(); - private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue<>(); - - public Transport(FsMQIdentity ownIdent, List peers) { - this.myself = ownIdent; - this.peers = peers; - this.sender = new EncryptingSenderThread(this, txQueue); - this.receiver = new DecryptingReceiverThread(this, rxQueue); - this.fsMQTransport = new FsMQManager(ownIdent); - - for (TestIdentity ti : peers) { - fsMQTransport.identities.addAll(ti.getAllTransportIdents()); - } - } - - public void clearOwnQueue() { - fsMQTransport.clearOwnQueue(); - } - - public boolean canReceiveAsync() { - return !rxQueue.isEmpty(); - } - - public void sendAsync(Message msg) { - txQueue.add(msg); - } - - public Message receiveAsyncNonBlocking() { - return rxQueue.remove(); - } - - public void sendRaw(String pEpAddress, String msg) { - //Find identity for address - List res = peers.stream().filter(i -> { - return i.pEpIdent.address.equals(pEpAddress); - }).collect(Collectors.toList()); - - if (res.size() > 1) { - throw new RuntimeException("Unknown Error"); - } else if (res.size() <= 0) { - throw new RuntimeException("Unknown address"); - } - TestIdentity id = res.get(0); - - for (FsMQIdentity tID : id.getAllTransportIdents()) { - fsMQTransport.sendMessage(tID.getAddress(), msg); -// log("send() to: " + tID.getAddress()); - } - } - - public String receiveRaw() { - FsMQMessage rx = fsMQTransport.receiveMessage(2000); - return rx.getMsg(); - } - - public void start() { - sender.start(); - receiver.start(); - } -} - - -class EncryptingSenderThread extends Thread { - private Engine engine = null; - private Transport transport = null; - private LinkedBlockingQueue queue; - - public EncryptingSenderThread(Transport transport, LinkedBlockingQueue queue) { - this.transport = transport; - this.queue = queue; - } - - @Override - public void run() { - engine = new Engine(); - Message msg; - String msgEnc; - while (true) { - try { - msg = queue.take(); - if (msg.getTo().size() != 1) { - throw new RuntimeException("Sorry, msg.To has to have exactly 1 receiver for now"); - } - String to = msg.getTo().get(0).address; - msgEnc = encryptMessage(msg); - transport.sendRaw(to, msgEnc); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - public String encryptMessage(Message msg) { + private String asyncEnc(String mime_text) { + Message msg = new Message(mime_text); + msg.setDir(Message.Direction.Outgoing); String longMessage = msg.getLongmsg(); - +// log(AdapterTestUtils.msgToString(msg, true)); Message msgEnc = engine.encrypt_message(msg, null, Message.EncFormat.PEP); String encFormat; String transportMsg; @@ -191,36 +91,8 @@ class EncryptingSenderThread extends Thread { log("<- : [" + encFormat + "] - " + longMessage); return transportMsg; } -} - - -class DecryptingReceiverThread extends Thread { - private Engine engine = null; - private Transport transport = null; - private LinkedBlockingQueue queue; - - public DecryptingReceiverThread(Transport transport, LinkedBlockingQueue queue) { - this.transport = transport; - this.queue = queue; - } - - @Override - public void run() { - engine = new Engine(); - String msg; - Message msgDec; - while (true) { - msg = transport.receiveRaw(); - msgDec = decryptMessage(msg); - try { - queue.put(msgDec); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - private Message decryptMessage(String msg) { + private String asyncDec(String msg) { Message msgIn = new Message(msg); Message msgInDec = null; @@ -234,6 +106,8 @@ class DecryptingReceiverThread extends Thread { } log("-> : [" + encFormat + "] - " + msgInDec.getLongmsg()); - return msgInDec; + return msgInDec.encodeMIME(); } + + } \ No newline at end of file