package net.jxta.impl.util;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import java.util.Vector;
import net.jxta.document.MimeMediaType;
import net.jxta.endpoint.Message;
import net.jxta.impl.util.BidirectionalPipeService;
import net.jxta.impl.util.SchedulerService;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.protocol.PipeAdvertisement;
import net.sf.saxon.style.StandardNames;
import org.apache.log4j.Category;

/* loaded from: input_file:lib/ptolemy.jar:/ptII/vendors/sun/jxta/jxta.jar:net/jxta/impl/util/ReliablePipeService.class */
public class ReliablePipeService {
    private static final Category LOG;
    BidirectionalPipeService bidirPipeService;
    SchedulerService schedulerService = new SchedulerService();
    static final String RELIABLE_HEADER = "RELIABLE_HEADER";
    static final int SYN = 1;
    static final int SYN_ACK = 2;
    static final int MSG = 3;
    static final int ACK = 4;
    static final int FIN1 = 5;
    static final int FIN2 = 6;
    static final int ESTABLISHING = 1;
    static final int CONNECTED = 2;
    static final int TIMED_OUT = 3;
    static final int CLOSE_WAIT = 4;
    static final int CLOSED = 5;
    static final int MAX_RETRANSMITS = 5;
    static final int MSG_TIMEOUT = 30000;
    static final int RETRANSMIT_TIMEOUT = 1000;
    static final int WINDOW_FLUSH_TIMEOUT = 500;
    static Class class$net$jxta$impl$util$ReliablePipeService;

    /* loaded from: input_file:lib/ptolemy.jar:/ptII/vendors/sun/jxta/jxta.jar:net/jxta/impl/util/ReliablePipeService$AcceptPipe.class */
    public class AcceptPipe {
        BidirectionalPipeService.AcceptPipe acceptPipe;
        private final ReliablePipeService this$0;

        AcceptPipe(ReliablePipeService reliablePipeService, BidirectionalPipeService.AcceptPipe acceptPipe) {
            this.this$0 = reliablePipeService;
            this.acceptPipe = acceptPipe;
        }

        public PipeAdvertisement getAdvertisement() {
            return this.acceptPipe.getAdvertisement();
        }

        public Pipe accept(int i) throws IOException, InterruptedException {
            ReliablePipeService.LOG.debug("WATCH: REALLY waiting for connections....");
            Timer timer = new Timer();
            timer.start();
            BidirectionalPipeService.Pipe accept = this.acceptPipe.accept(i);
            ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: got bidirPipe...").append(accept).toString());
            int elapsed = (int) (i - timer.elapsed());
            ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: maxTimeout = ").append(elapsed).toString());
            if (elapsed <= 0) {
                throw new IOException("Connect timed out.");
            }
            ReliablePipeService.LOG.debug("WATCH: creating new Pipe...");
            return new Pipe(this.this$0, accept, elapsed);
        }
    }

    /* loaded from: input_file:lib/ptolemy.jar:/ptII/vendors/sun/jxta/jxta.jar:net/jxta/impl/util/ReliablePipeService$Pipe.class */
    public class Pipe implements InputPipe, OutputPipe {
        IncomingMessageQueue incomingQueue;
        IncomingMessageQueue incomingQueue2;
        OutgoingMessageQueue outgoingQueue;
        HashMap pendingRetransmits;
        long outgoingSeq;
        long highestInSeq;
        long lastAckedSeq;
        InputPipe inputPipe;
        OutputPipe outputPipe;
        Receiver receiver;
        Sender sender;
        int outgoingWindow;
        int state;
        boolean running;
        private final ReliablePipeService this$0;
        long synSeq = -1;
        Object lock = new Object();
        Vector listeners = new Vector();

        /* loaded from: input_file:lib/ptolemy.jar:/ptII/vendors/sun/jxta/jxta.jar:net/jxta/impl/util/ReliablePipeService$Pipe$Receiver.class */
        class Receiver implements Runnable {
            private final Pipe this$1;

            Receiver(Pipe pipe) {
                this.this$1 = pipe;
            }

            @Override // java.lang.Runnable
            public void run() {
                Message poll;
                while (this.this$1.state != 5) {
                    try {
                        poll = this.this$1.inputPipe.poll(30000);
                        ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: got message ").append(poll).toString());
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e2) {
                        return;
                    }
                    if (!this.this$1.running) {
                        return;
                    }
                    if (poll != null) {
                        ReliablePipeService.LOG.debug("WATCH: message not null, processing");
                        this.this$1.process(poll);
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:lib/ptolemy.jar:/ptII/vendors/sun/jxta/jxta.jar:net/jxta/impl/util/ReliablePipeService$Pipe$RetransmitAction.class */
        public class RetransmitAction implements SchedulerService.Action {
            Message msg;
            int retransmits;
            private final Pipe this$1;

            RetransmitAction(Pipe pipe, Message message) {
                this.this$1 = pipe;
                this.msg = message;
            }

            @Override // net.jxta.impl.util.SchedulerService.Action
            public void perform(SchedulerService schedulerService) {
                long j = 0;
                try {
                    DataInputStream dataInputStream = new DataInputStream(this.msg.getElement(ReliablePipeService.RELIABLE_HEADER).getStream());
                    j = dataInputStream.readLong();
                    int readInt = dataInputStream.readInt();
                    dataInputStream.readInt();
                    ReliablePipeService.LOG.debug(new StringBuffer().append("Retransmitting ").append(j).append(" type ").append(readInt).toString());
                    this.this$1.outputPipe.send(this.msg);
                } catch (IOException e) {
                }
                if (this.retransmits < 5) {
                    this.this$1.pendingRetransmits.put(new Long(j), schedulerService.scheduleAction(this, 1000L));
                    this.retransmits++;
                } else {
                    this.this$1.state = 3;
                    synchronized (this.this$1.lock) {
                        this.this$1.lock.notifyAll();
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:lib/ptolemy.jar:/ptII/vendors/sun/jxta/jxta.jar:net/jxta/impl/util/ReliablePipeService$Pipe$Sender.class */
        public class Sender implements Runnable {
            private final Pipe this$1;

            Sender(Pipe pipe) {
                this.this$1 = pipe;
            }

            @Override // java.lang.Runnable
            public void run() {
                Message dequeue;
                ReliablePipeService.LOG.debug("WATCH: sending messages on queue");
                while (this.this$1.state != 4) {
                    Timer timer = new Timer();
                    timer.start();
                    ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: outgoingWindow = ").append(this.this$1.outgoingWindow).toString());
                    int i = 0;
                    while (true) {
                        if (i >= this.this$1.outgoingWindow) {
                            break;
                        }
                        try {
                            ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: getting message off the queue, i = ").append(i).toString());
                            ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: elapsed time = ").append(timer.elapsed()).toString());
                            long elapsed = 500 - timer.elapsed();
                            if (elapsed > 0) {
                                dequeue = this.this$1.outgoingQueue.dequeue(elapsed);
                            } else if (!this.this$1.outgoingQueue.isEmpty()) {
                                dequeue = this.this$1.outgoingQueue.dequeue(0L);
                            }
                            if (dequeue == null) {
                                ReliablePipeService.LOG.debug("WATCH: No message :-(");
                                break;
                            }
                            try {
                                ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: sending message ").append(this.this$1.outgoingSeq).append(" ping = ").append(dequeue.getString(StandardNames.COUNT)).toString());
                                Pipe pipe = this.this$1;
                                long j = pipe.outgoingSeq;
                                pipe.outgoingSeq = j + 1;
                                this.this$1.sendMsg(dequeue, j);
                            } catch (IOException e) {
                            }
                            i++;
                        } catch (InterruptedException e2) {
                        }
                    }
                }
                if (this.this$1.state == 4) {
                    try {
                        Pipe pipe2 = this.this$1;
                        Pipe pipe3 = this.this$1;
                        long j2 = pipe3.outgoingSeq;
                        pipe3.outgoingSeq = j2 + 1;
                        pipe2.sendMsg(6, j2, 0);
                    } catch (IOException e3) {
                    }
                }
            }
        }

        Pipe(ReliablePipeService reliablePipeService, BidirectionalPipeService.Pipe pipe, long j) throws IOException {
            this.this$0 = reliablePipeService;
            this.running = false;
            ReliablePipeService.LOG.debug("WATCH: Pipe constructor");
            this.inputPipe = pipe.getInputPipe();
            this.outputPipe = pipe.getOutputPipe();
            this.incomingQueue = new IncomingMessageQueue(512);
            this.incomingQueue2 = new IncomingMessageQueue(512);
            this.outgoingQueue = new OutgoingMessageQueue(512);
            this.pendingRetransmits = new HashMap();
            this.state = 1;
            ReliablePipeService.LOG.debug("WATCH: starting receiver");
            this.running = true;
            Receiver receiver = new Receiver(this);
            this.receiver = receiver;
            new Thread(receiver).start();
            establish(j);
        }

        public InputPipe getInputPipe() {
            return this;
        }

        public OutputPipe getOutputPipe() {
            return this;
        }

        void establish(long j) throws IOException {
            ReliablePipeService.LOG.debug("WATCH: sending SYN...");
            sendMsg(1, this.synSeq, 0);
            while (this.state == 1) {
                try {
                    synchronized (this.lock) {
                        this.lock.wait(j);
                    }
                } catch (InterruptedException e) {
                    throw new IOException("Interrupted while establishing connection.");
                }
            }
            if (this.state != 2) {
                throw new IOException("Connect timed out.");
            }
            ReliablePipeService.LOG.debug("WATCH: starting sender...");
            Sender sender = new Sender(this);
            this.sender = sender;
            new Thread(sender).start();
        }

        @Override // net.jxta.pipe.InputPipe
        public void close() {
            if (this.state != 4) {
                this.state = 4;
                try {
                    sendMsg(5, -1L, 0);
                } catch (IOException e) {
                }
            }
        }

        @Override // net.jxta.pipe.OutputPipe
        public synchronized void send(Message message) throws IOException {
            switch (this.state) {
                case 1:
                    throw new Error("Should never be here.  Theis state is only while the contructor is running.");
                case 2:
                    ReliablePipeService.LOG.debug("WATCH: enqueuing message...");
                    try {
                        this.outgoingQueue.enqueue(message);
                        return;
                    } catch (InterruptedException e) {
                        throw new IOException(new StringBuffer().append("Thread was interrupted while in send(): ").append(e.getMessage()).toString());
                    }
                case 3:
                    throw new IOException("The peer at the other end has dropped the connection.");
                case 4:
                    throw new IOException("Shutdown in progress.");
                case 5:
                    throw new IOException("Connection has closed.");
                default:
                    return;
            }
        }

        @Override // net.jxta.pipe.InputPipe
        public synchronized Message waitForMessage() throws InterruptedException {
            return poll(-1);
        }

        @Override // net.jxta.pipe.InputPipe
        public synchronized Message poll(int i) throws InterruptedException {
            return this.incomingQueue.pop(i);
        }

        public synchronized void addPipeMsgListener(PipeMsgListener pipeMsgListener) {
            if (this.listeners.contains(pipeMsgListener)) {
                return;
            }
            this.listeners.add(pipeMsgListener);
        }

        public synchronized void removePipeMsgListener(PipeMsgListener pipeMsgListener) {
            this.listeners.remove(pipeMsgListener);
        }

        void notifyListeners() {
            Message popNext = this.incomingQueue.popNext();
            while (true) {
                Message message = popNext;
                if (message == null) {
                    return;
                }
                PipeMsgEvent pipeMsgEvent = new PipeMsgEvent(this, message, null);
                Iterator it = this.listeners.iterator();
                while (it.hasNext()) {
                    ((PipeMsgListener) it.next()).pipeMsgEvent(pipeMsgEvent);
                }
                popNext = this.incomingQueue.popNext();
            }
        }

        void process(Message message) throws IOException {
            DataInputStream dataInputStream = new DataInputStream(message.getElement(ReliablePipeService.RELIABLE_HEADER).getStream());
            long readLong = dataInputStream.readLong();
            int readInt = dataInputStream.readInt();
            int readInt2 = dataInputStream.readInt();
            if (readInt != 1 && readInt != 2 && readInt != 4) {
                this.incomingQueue2.put(message, readLong);
                this.highestInSeq = this.incomingQueue2.popUntilBreakInSequence(this.highestInSeq);
                ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: Sending ACK ").append(this.highestInSeq).toString());
                ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: Free Slots = ").append(this.incomingQueue.getFreeSlots()).toString());
                sendMsg(4, this.highestInSeq, this.incomingQueue.getFreeSlots());
            }
            switch (this.state) {
                case 1:
                    switch (readInt) {
                        case 1:
                            ReliablePipeService.LOG.debug("WATCH: Received SYN, Sending SYN_ACK");
                            this.outgoingSeq = new Random().nextLong();
                            long j = this.outgoingSeq;
                            this.outgoingSeq = j + 1;
                            sendMsg(2, j, 1);
                            return;
                        case 2:
                            ReliablePipeService.LOG.debug("WATCH: Received SYN_ACK");
                            Object obj = this.pendingRetransmits.get(new Long(this.synSeq));
                            if (obj != null) {
                                ReliablePipeService.LOG.debug(new StringBuffer().append("Canceling retransmit for ").append(this.synSeq).toString());
                                this.this$0.schedulerService.cancelAction(obj);
                                this.pendingRetransmits.remove(new Long(this.synSeq));
                            } else {
                                ReliablePipeService.LOG.debug(new StringBuffer().append("No retransmit scheduled for ").append(this.synSeq).toString());
                            }
                            this.highestInSeq = readLong;
                            this.lastAckedSeq = readLong;
                            this.state = 2;
                            this.incomingQueue.setNextExpected(readLong + 1);
                            this.outgoingWindow = readInt2;
                            synchronized (this.lock) {
                                this.lock.notifyAll();
                            }
                            return;
                        default:
                            return;
                    }
                case 2:
                    switch (readInt) {
                        case 3:
                            ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: putting msg ").append(readLong).append(" on incoming queue *****").toString());
                            this.incomingQueue.put(message, readLong);
                            notifyListeners();
                            break;
                        case 4:
                            ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: got ACK ").append(readLong).toString());
                            Object obj2 = this.pendingRetransmits.get(new Long(readLong));
                            if (obj2 != null) {
                                ReliablePipeService.LOG.debug(new StringBuffer().append("Canceling retransmit for ").append(readLong).toString());
                                this.this$0.schedulerService.cancelAction(obj2);
                                this.pendingRetransmits.remove(new Long(readLong));
                                ReliablePipeService.LOG.debug(new StringBuffer().append("WATCH: seq = ").append(readLong).append(" lastAckedSeq = ").append(this.lastAckedSeq).toString());
                                this.lastAckedSeq = readLong;
                            } else {
                                ReliablePipeService.LOG.debug(new StringBuffer().append("No retransmit scheduled for ").append(readLong).toString());
                            }
                            this.outgoingWindow = readInt2;
                            break;
                        case 5:
                            this.state = 4;
                            break;
                    }
                case 3:
                default:
                    return;
                case 4:
                    break;
            }
            switch (readInt) {
                case 6:
                    this.state = 5;
                    return;
                default:
                    return;
            }
        }

        void sendMsg(int i, long j, int i2) throws IOException {
            Message createMessage = this.this$0.bidirPipeService.pipeService.createMessage();
            setHeader(createMessage, i, j, i2);
            this.outputPipe.send(createMessage);
            if (i == 4 || i == 2) {
                return;
            }
            this.pendingRetransmits.put(new Long(j), this.this$0.schedulerService.scheduleAction(new RetransmitAction(this, createMessage), 1000L));
        }

        void sendMsg(Message message, long j) throws IOException {
            setHeader(message, 3, j, 0);
            this.outputPipe.send(message);
            this.pendingRetransmits.put(new Long(j), this.this$0.schedulerService.scheduleAction(new RetransmitAction(this, message), 1000L));
        }

        void setHeader(Message message, int i, long j, int i2) throws IOException {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(12);
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            dataOutputStream.writeLong(j);
            dataOutputStream.writeInt(i);
            dataOutputStream.writeInt(i2);
            message.addElement(message.newMessageElement(ReliablePipeService.RELIABLE_HEADER, (MimeMediaType) null, byteArrayOutputStream.toByteArray()));
        }
    }

    public ReliablePipeService(BidirectionalPipeService bidirectionalPipeService) {
        this.bidirPipeService = bidirectionalPipeService;
        new Thread(this.schedulerService).start();
    }

    public Pipe connect(PipeAdvertisement pipeAdvertisement, int i) throws IOException {
        LOG.debug("WATCH: REALLY Connecting...");
        Timer timer = new Timer();
        timer.start();
        BidirectionalPipeService.Pipe connect = this.bidirPipeService.connect(pipeAdvertisement, i);
        LOG.debug("WATCH: got bidirPipe");
        int elapsed = (int) (i - timer.elapsed());
        if (elapsed <= 0) {
            throw new IOException("Connect timed out.");
        }
        LOG.debug("WATCH: creating new Pipe");
        return new Pipe(this, connect, elapsed);
    }

    public AcceptPipe bind(String str) throws IOException {
        return new AcceptPipe(this, this.bidirPipeService.bind(str));
    }

    public AcceptPipe bind(PipeAdvertisement pipeAdvertisement) throws IOException {
        return new AcceptPipe(this, this.bidirPipeService.bind(pipeAdvertisement));
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError(e.getMessage());
        }
    }

    static {
        Class cls;
        if (class$net$jxta$impl$util$ReliablePipeService == null) {
            cls = class$("net.jxta.impl.util.ReliablePipeService");
            class$net$jxta$impl$util$ReliablePipeService = cls;
        } else {
            cls = class$net$jxta$impl$util$ReliablePipeService;
        }
        LOG = Category.getInstance(cls);
    }
}
