package com.allanbank.mongodb.client.connection.socket;

import com.allanbank.mongodb.MongoClientConfiguration;
import com.allanbank.mongodb.MongoDbException;
import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
import com.allanbank.mongodb.bson.io.StringDecoderCache;
import com.allanbank.mongodb.bson.io.StringEncoderCache;
import com.allanbank.mongodb.client.Message;
import com.allanbank.mongodb.client.callback.AddressAware;
import com.allanbank.mongodb.client.callback.ReplyCallback;
import com.allanbank.mongodb.client.connection.Connection;
import com.allanbank.mongodb.client.message.BuildInfo;
import com.allanbank.mongodb.client.message.PendingMessage;
import com.allanbank.mongodb.client.message.PendingMessageQueue;
import com.allanbank.mongodb.client.state.Server;
import com.allanbank.mongodb.client.state.ServerUpdateCallback;
import com.allanbank.mongodb.util.IOUtils;
import java.io.IOException;
import java.net.SocketException;

/* loaded from: input_file:com/allanbank/mongodb/client/connection/socket/TwoThreadSocketConnection.class */
public class TwoThreadSocketConnection extends AbstractSocketConnection {
    protected final BufferingBsonOutputStream myBsonOut;
    protected final PendingMessageQueue myToSendQueue;
    private final Thread myReceiver;
    private final Thread mySender;

    /* loaded from: input_file:com/allanbank/mongodb/client/connection/socket/TwoThreadSocketConnection$SendRunnable.class */
    protected class SendRunnable implements Runnable {
        private boolean myNeedToFlush = false;
        private final PendingMessage myPendingMessage = new PendingMessage();

        protected SendRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (TwoThreadSocketConnection.this.myOpen.get() && !z) {
                try {
                    try {
                        try {
                            try {
                                sendOne();
                                this.myPendingMessage.clear();
                            } catch (Throwable th) {
                                this.myPendingMessage.clear();
                                throw th;
                            }
                        } catch (Error e) {
                            TwoThreadSocketConnection.this.myLog.error(e, "Error sending a message.", new Object[0]);
                            TwoThreadSocketConnection.this.raiseError(e, this.myPendingMessage.getReplyCallback());
                            z = true;
                            this.myPendingMessage.clear();
                        } catch (InterruptedException e2) {
                            TwoThreadSocketConnection.this.raiseError(e2, this.myPendingMessage.getReplyCallback());
                            this.myPendingMessage.clear();
                        }
                    } catch (IOException e3) {
                        TwoThreadSocketConnection.this.myLog.warn(e3, "I/O Error sending a message.", new Object[0]);
                        TwoThreadSocketConnection.this.raiseError(e3, this.myPendingMessage.getReplyCallback());
                        z = true;
                        this.myPendingMessage.clear();
                    } catch (RuntimeException e4) {
                        TwoThreadSocketConnection.this.myLog.warn(e4, "Runtime error sending a message.", new Object[0]);
                        TwoThreadSocketConnection.this.raiseError(e4, this.myPendingMessage.getReplyCallback());
                        z = true;
                        this.myPendingMessage.clear();
                    }
                } catch (Throwable th2) {
                    try {
                        try {
                            if (TwoThreadSocketConnection.this.myOpen.get()) {
                                doFlush();
                            }
                            IOUtils.close(TwoThreadSocketConnection.this);
                        } catch (IOException e5) {
                            TwoThreadSocketConnection.this.myLog.warn(e5, "I/O Error on final flush of messages.", new Object[0]);
                            IOUtils.close(TwoThreadSocketConnection.this);
                            throw th2;
                        }
                        throw th2;
                    } finally {
                        IOUtils.close(TwoThreadSocketConnection.this);
                    }
                }
            }
            try {
                try {
                    if (TwoThreadSocketConnection.this.myOpen.get()) {
                        doFlush();
                    }
                } catch (IOException e6) {
                    TwoThreadSocketConnection.this.myLog.warn(e6, "I/O Error on final flush of messages.", new Object[0]);
                    IOUtils.close(TwoThreadSocketConnection.this);
                }
            } finally {
                IOUtils.close(TwoThreadSocketConnection.this);
            }
        }

        protected final void doFlush() throws IOException {
            if (this.myNeedToFlush) {
                TwoThreadSocketConnection.this.flush();
                this.myNeedToFlush = false;
            }
        }

        protected final void sendOne() throws InterruptedException, IOException {
            boolean z;
            if (this.myNeedToFlush) {
                z = TwoThreadSocketConnection.this.myToSendQueue.poll(this.myPendingMessage);
            } else {
                TwoThreadSocketConnection.this.myToSendQueue.take(this.myPendingMessage);
                z = true;
            }
            if (!z) {
                doFlush();
                return;
            }
            this.myNeedToFlush = true;
            this.myPendingMessage.getMessage().write(this.myPendingMessage.getMessageId(), TwoThreadSocketConnection.this.myBsonOut);
            TwoThreadSocketConnection.this.send(this.myPendingMessage, TwoThreadSocketConnection.this.myBsonOut.getOutput());
            this.myPendingMessage.clear();
        }
    }

    public TwoThreadSocketConnection(Server server, MongoClientConfiguration mongoClientConfiguration) throws SocketException, IOException {
        this(server, mongoClientConfiguration, new StringEncoderCache(), new StringDecoderCache());
    }

    public TwoThreadSocketConnection(Server server, MongoClientConfiguration mongoClientConfiguration, StringEncoderCache stringEncoderCache, StringDecoderCache stringDecoderCache) throws SocketException, IOException {
        super(server, mongoClientConfiguration, stringEncoderCache, stringDecoderCache);
        this.myBsonOut = new BufferingBsonOutputStream(new RandomAccessOutputStream(stringEncoderCache));
        this.myToSendQueue = new PendingMessageQueue(mongoClientConfiguration.getMaxPendingOperationsPerConnection(), mongoClientConfiguration.getLockType());
        this.myReceiver = mongoClientConfiguration.getThreadFactory().newThread(new ReceiveRunnable(this));
        this.myReceiver.setDaemon(true);
        this.myReceiver.setName("MongoDB " + this.mySocket.getLocalPort() + "<--" + this.myServer.getCanonicalName());
        this.mySender = mongoClientConfiguration.getThreadFactory().newThread(new SendRunnable());
        this.mySender.setDaemon(true);
        this.mySender.setName("MongoDB " + this.mySocket.getLocalPort() + "-->" + this.myServer.getCanonicalName());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        boolean z = this.myOpen.get();
        this.myOpen.set(false);
        this.mySender.interrupt();
        this.myReceiver.interrupt();
        try {
            if (Thread.currentThread() != this.mySender) {
                this.mySender.join();
            }
        } catch (InterruptedException e) {
        } finally {
            this.myOutput.close();
            this.myInput.close();
            this.mySocket.close();
        }
        try {
            if (Thread.currentThread() != this.myReceiver) {
                this.myReceiver.join();
            }
        } catch (InterruptedException e2) {
        }
        this.myEventSupport.firePropertyChange(Connection.OPEN_PROP_NAME, z, false);
    }

    @Override // com.allanbank.mongodb.client.connection.socket.AbstractSocketConnection, com.allanbank.mongodb.client.connection.Connection
    public int getPendingCount() {
        return super.getPendingCount() + this.myToSendQueue.size();
    }

    @Override // com.allanbank.mongodb.client.connection.socket.AbstractSocketConnection, com.allanbank.mongodb.client.connection.Connection
    public boolean isIdle() {
        return super.isIdle() && this.myToSendQueue.isEmpty();
    }

    @Override // com.allanbank.mongodb.client.connection.socket.AbstractSocketConnection, com.allanbank.mongodb.client.connection.Connection
    public void raiseErrors(MongoDbException mongoDbException) {
        PendingMessage pendingMessage = new PendingMessage();
        while (this.myToSendQueue.poll(pendingMessage)) {
            raiseError(mongoDbException, pendingMessage.getReplyCallback());
        }
        super.raiseErrors(mongoDbException);
    }

    @Override // com.allanbank.mongodb.client.connection.Connection
    public void send(Message message, Message message2, ReplyCallback replyCallback) throws MongoDbException {
        validate(message, message2);
        if (replyCallback instanceof AddressAware) {
            ((AddressAware) replyCallback).setAddress(this.myServer.getCanonicalName());
        }
        try {
            this.myToSendQueue.put(message, null, message2, replyCallback);
        } catch (InterruptedException e) {
            throw new MongoDbException(e);
        }
    }

    @Override // com.allanbank.mongodb.client.connection.Connection
    public void send(Message message, ReplyCallback replyCallback) throws MongoDbException {
        validate(message, null);
        if (replyCallback instanceof AddressAware) {
            ((AddressAware) replyCallback).setAddress(this.myServer.getCanonicalName());
        }
        try {
            this.myToSendQueue.put(message, replyCallback);
        } catch (InterruptedException e) {
            throw new MongoDbException(e);
        }
    }

    @Override // com.allanbank.mongodb.client.connection.socket.AbstractSocketConnection
    public void start() {
        this.myReceiver.start();
        this.mySender.start();
        if (this.myServer.needBuildInfo()) {
            send(new BuildInfo(), new ServerUpdateCallback(this.myServer));
        }
    }
}
