From fb7e0614b8cf035166da5696f195f9f3446f4a43 Mon Sep 17 00:00:00 2001 From: heck Date: Thu, 20 May 2021 19:19:41 +0200 Subject: [PATCH] Test: PityTest - Add sync/async transport with plugin processing (enc/dec) --- .../transport/StringProcessorInterface.java | 7 + .../test/utils/transport/Transport.java | 153 ++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 test/java/foundation/pEp/jniadapter/test/utils/transport/StringProcessorInterface.java create mode 100644 test/java/foundation/pEp/jniadapter/test/utils/transport/Transport.java diff --git a/test/java/foundation/pEp/jniadapter/test/utils/transport/StringProcessorInterface.java b/test/java/foundation/pEp/jniadapter/test/utils/transport/StringProcessorInterface.java new file mode 100644 index 0000000..54fa034 --- /dev/null +++ b/test/java/foundation/pEp/jniadapter/test/utils/transport/StringProcessorInterface.java @@ -0,0 +1,7 @@ +package foundation.pEp.jniadapter.test.utils.transport; + +@FunctionalInterface +public interface StringProcessorInterface { + // generic method + T func(T t); +} diff --git a/test/java/foundation/pEp/jniadapter/test/utils/transport/Transport.java b/test/java/foundation/pEp/jniadapter/test/utils/transport/Transport.java new file mode 100644 index 0000000..d7bebbb --- /dev/null +++ b/test/java/foundation/pEp/jniadapter/test/utils/transport/Transport.java @@ -0,0 +1,153 @@ +package foundation.pEp.jniadapter.test.utils.transport; + +import foundation.pEp.jniadapter.test.utils.model.TestIdentity; +import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.FsMQIdentity; +import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.FsMQManager; +import foundation.pEp.jniadapter.test.utils.transport.fsmqmanager.FsMQMessage; +import foundation.pEp.pitytest.utils.Pair; + +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +public class Transport { + private FsMQManager fsMQTransport = null; + private ProcessingSenderThread sender = null; + private ProcessingReceiverThread receiver = null; + private FsMQIdentity myself = null; + private List peers = null; + + // StringMsg queues + private LinkedBlockingQueue> txQueue = new LinkedBlockingQueue<>(); + private LinkedBlockingQueue rxQueue = new LinkedBlockingQueue<>(); + + public Transport(FsMQIdentity ownIdent, List peers) { + this.myself = ownIdent; + this.peers = peers; + this.sender = new ProcessingSenderThread(this, txQueue); + this.receiver = new ProcessingReceiverThread(this, rxQueue); + this.fsMQTransport = new FsMQManager(ownIdent); + + for (TestIdentity ti : peers) { + fsMQTransport.identities.addAll(ti.getAllTransportIdents()); + } + } + + public void setAsyncTxProcessor(StringProcessorInterface asyncTxProcessor) { + this.sender.setMsgProcessor(asyncTxProcessor); + } + + public void setAsyncRxProcessor(StringProcessorInterface asyncRxProcessor) { + this.receiver.setMsgProcessor(asyncRxProcessor); + } + + public void clearOwnQueue() { + fsMQTransport.clearOwnQueue(); + } + + public boolean canReceiveAsync() { + return !rxQueue.isEmpty(); + } + + public void sendAsync(String msg, TestIdentity receiver) { + txQueue.add(new Pair<>(msg, receiver)); + } + + public String receiveAsyncNonBlocking() { + return rxQueue.remove(); + } + + public void sendRaw(TestIdentity receiver, String msg) { + TestIdentity id = receiver; + for (FsMQIdentity tID : id.getAllTransportIdents()) { + fsMQTransport.sendMessage(tID.getAddress(), msg); +// log("send() to: " + tID.getAddress()); + } + } + + public String receiveRaw() { + FsMQMessage rx = fsMQTransport.receiveMessage(2000); + return rx.getMsg(); + } + + public void start() { + sender.start(); + receiver.start(); + } +} + + +abstract class ProcessingTransportThread extends Thread { + protected Transport transport = null; + protected StringProcessorInterface msgProcessor = null; + + public ProcessingTransportThread(Transport transport) { + this.transport = transport; + } + + public void setMsgProcessor(StringProcessorInterface msgProcessor) { + this.msgProcessor = msgProcessor; + } + + @Override + public void run() { + while (true) { + doTransport(); + } + } + + abstract protected void doTransport(); +} + +class ProcessingSenderThread extends ProcessingTransportThread { + private LinkedBlockingQueue> queue; + + public ProcessingSenderThread(Transport transport, LinkedBlockingQueue> queue) { + super(transport); + this.queue = queue; + } + + protected void doTransport() { + String msgProc; + try { + Pair msgRcv = queue.take(); + if (msgProcessor != null) { + msgProc = msgProcessor.func(msgRcv.getKey()); + } else { + msgProc = msgRcv.getKey(); + } + transport.sendRaw(msgRcv.getValue(), msgProc); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } +} + +class ProcessingReceiverThread extends ProcessingTransportThread { + private LinkedBlockingQueue queue; + + public ProcessingReceiverThread(Transport transport, LinkedBlockingQueue queue) { + super(transport); + this.queue = queue; + } + + public void setMsgProcessor(StringProcessorInterface msgProcessor) { + this.msgProcessor = msgProcessor; + } + + @Override + protected void doTransport() { + String msg = transport.receiveRaw(); + String msgProc; + if (msgProcessor != null) { + msgProc = msgProcessor.func(msg); + } else { + msgProc = msg; + } + try { + queue.put(msgProc); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file