/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.client.jdbc.internal.google.cloud.storage;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Objects;
import java.util.stream.Collector;
import net.snowflake.client.jdbc.internal.google.api.core.ApiFuture;
import net.snowflake.client.jdbc.internal.google.api.core.ApiFutureCallback;
import net.snowflake.client.jdbc.internal.google.api.core.ApiFutures;
import net.snowflake.client.jdbc.internal.google.api.core.BetaApi;
import net.snowflake.client.jdbc.internal.google.api.core.InternalApi;
import net.snowflake.client.jdbc.internal.google.api.gax.grpc.GrpcCallContext;
import net.snowflake.client.jdbc.internal.google.api.gax.rpc.ApiExceptions;
import net.snowflake.client.jdbc.internal.google.api.gax.rpc.ClientStreamingCallable;
import net.snowflake.client.jdbc.internal.google.cloud.storage.BlobInfo;
import net.snowflake.client.jdbc.internal.google.cloud.storage.BlobWriteSessionConfig;
import net.snowflake.client.jdbc.internal.google.cloud.storage.BlobWriteSessionConfigs;
import net.snowflake.client.jdbc.internal.google.cloud.storage.BufferHandle;
import net.snowflake.client.jdbc.internal.google.cloud.storage.BufferedWritableByteChannelSession;
import net.snowflake.client.jdbc.internal.google.cloud.storage.ByteStringStrategy;
import net.snowflake.client.jdbc.internal.google.cloud.storage.Conversions;
import net.snowflake.client.jdbc.internal.google.cloud.storage.CrossTransportUtils;
import net.snowflake.client.jdbc.internal.google.cloud.storage.GrpcStorageImpl;
import net.snowflake.client.jdbc.internal.google.cloud.storage.Hasher;
import net.snowflake.client.jdbc.internal.google.cloud.storage.RecoveryFile;
import net.snowflake.client.jdbc.internal.google.cloud.storage.RecoveryFileManager;
import net.snowflake.client.jdbc.internal.google.cloud.storage.ResumableMedia;
import net.snowflake.client.jdbc.internal.google.cloud.storage.ResumableWrite;
import net.snowflake.client.jdbc.internal.google.cloud.storage.StorageInternal;
import net.snowflake.client.jdbc.internal.google.cloud.storage.ThroughputMovingWindow;
import net.snowflake.client.jdbc.internal.google.cloud.storage.ThroughputSink;
import net.snowflake.client.jdbc.internal.google.cloud.storage.TransportCompatibility;
import net.snowflake.client.jdbc.internal.google.cloud.storage.UnifiedOpts;
import net.snowflake.client.jdbc.internal.google.cloud.storage.WritableByteChannelSession;
import net.snowflake.client.jdbc.internal.google.cloud.storage.WriteCtx;
import net.snowflake.client.jdbc.internal.google.common.annotations.VisibleForTesting;
import net.snowflake.client.jdbc.internal.google.common.collect.ImmutableList;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.MoreExecutors;
import net.snowflake.client.jdbc.internal.google.storage.v2.WriteObjectRequest;
import net.snowflake.client.jdbc.internal.google.storage.v2.WriteObjectResponse;
import net.snowflake.client.jdbc.internal.javax.annotation.concurrent.Immutable;
import net.snowflake.client.jdbc.internal.org.checkerframework.checker.nullness.qual.MonotonicNonNull;

@BetaApi
@TransportCompatibility(value={TransportCompatibility.Transport.GRPC})
@Immutable
public final class JournalingBlobWriteSessionConfig
extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
    private static final long serialVersionUID = 9059242302276891867L;
    private transient @MonotonicNonNull ImmutableList<Path> paths;
    private final boolean includeLoggingSink;
    private volatile @MonotonicNonNull ArrayList<String> absolutePaths;

    @InternalApi
    JournalingBlobWriteSessionConfig(ImmutableList<Path> paths, boolean includeLoggingSink) {
        this.paths = paths;
        this.includeLoggingSink = includeLoggingSink;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof JournalingBlobWriteSessionConfig)) {
            return false;
        }
        JournalingBlobWriteSessionConfig that = (JournalingBlobWriteSessionConfig)o;
        return this.includeLoggingSink == that.includeLoggingSink && Objects.equals(this.paths, that.paths) && Objects.equals(this.absolutePaths, that.absolutePaths);
    }

    @Override
    public int hashCode() {
        return Objects.hash(this.paths, this.includeLoggingSink, this.absolutePaths);
    }

    @InternalApi
    @VisibleForTesting
    JournalingBlobWriteSessionConfig withIncludeLoggingSink() {
        return new JournalingBlobWriteSessionConfig(this.paths, true);
    }

    @Override
    @InternalApi
    BlobWriteSessionConfig.WriterFactory createFactory(Clock clock) throws IOException {
        Duration window = Duration.ofMinutes(10L);
        RecoveryFileManager recoveryFileManager = RecoveryFileManager.of(this.paths, this.getRecoverVolumeSinkFactory(clock, window));
        ThroughputSink gcs = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock);
        gcs = this.includeLoggingSink ? ThroughputSink.tee(ThroughputSink.logged("gcs", clock), gcs) : gcs;
        return new Factory(recoveryFileManager, clock, gcs);
    }

    private RecoveryFileManager.RecoveryVolumeSinkFactory getRecoverVolumeSinkFactory(Clock clock, Duration window) {
        return path -> {
            ThroughputSink windowed = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock);
            if (this.includeLoggingSink) {
                return ThroughputSink.tee(ThroughputSink.logged(path.toAbsolutePath().toString(), clock), windowed);
            }
            return windowed;
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeObject(ObjectOutputStream out) throws IOException {
        if (this.absolutePaths == null) {
            JournalingBlobWriteSessionConfig journalingBlobWriteSessionConfig = this;
            synchronized (journalingBlobWriteSessionConfig) {
                if (this.absolutePaths == null) {
                    this.absolutePaths = this.paths.stream().map(Path::toAbsolutePath).map(Path::toString).collect(Collector.of(ArrayList::new, ArrayList::add, (left, right) -> {
                        left.addAll(right);
                        return left;
                    }, new Collector.Characteristics[0]));
                }
            }
        }
        out.defaultWriteObject();
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        this.paths = this.absolutePaths.stream().map(x$0 -> Paths.get(x$0, new String[0])).collect(ImmutableList.toImmutableList());
    }

    private static final class Factory
    implements BlobWriteSessionConfig.WriterFactory {
        private final RecoveryFileManager recoveryFileManager;
        private final Clock clock;
        private final ThroughputSink gcs;

        private Factory(RecoveryFileManager recoveryFileManager, Clock clock, ThroughputSink gcs) {
            this.recoveryFileManager = recoveryFileManager;
            this.clock = clock;
            this.gcs = gcs;
        }

        @Override
        @InternalApi
        public WritableByteChannelSession<?, BlobInfo> writeSession(StorageInternal storage, BlobInfo info, UnifiedOpts.Opts<UnifiedOpts.ObjectTargetOpt> opts) {
            if (storage instanceof GrpcStorageImpl) {
                GrpcStorageImpl grpcStorage = (GrpcStorageImpl)storage;
                RecoveryFile recoveryFile = this.recoveryFileManager.newRecoveryFile(info);
                GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
                ApiFuture<ResumableWrite> f = grpcStorage.startResumableWrite(grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts);
                ApiFuture<WriteCtx<ResumableWrite>> start = ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor());
                ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write = grpcStorage.storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);
                BufferedWritableByteChannelSession<WriteObjectResponse> session = ResumableMedia.gapic().write().byteChannel(write).setHasher(Hasher.noop()).setByteStringStrategy(ByteStringStrategy.copy()).journaling().withRetryConfig(grpcStorage.getOptions(), grpcStorage.retryAlgorithmManager.idempotent(), grpcStorage.storageClient.queryWriteStatusCallable()).withBuffer(BufferHandle.allocate(0x200000)).withRecoveryBuffer(BufferHandle.allocate(0x200000)).withRecoveryFile(recoveryFile).setStartAsync(start).build();
                return new JournalingUpload(session, start);
            }
            return (WritableByteChannelSession)CrossTransportUtils.throwGrpcOnly(BlobWriteSessionConfigs.class, "journaling");
        }

        private final class JournalingUpload<WBC extends WritableByteChannel>
        implements WritableByteChannelSession<WBC, BlobInfo> {
            private final WritableByteChannelSession<WBC, WriteObjectResponse> session;
            private final ApiFuture<WriteCtx<ResumableWrite>> start;
            private final Conversions.Decoder<WriteObjectResponse, BlobInfo> decoder;

            public JournalingUpload(WritableByteChannelSession<WBC, WriteObjectResponse> session, ApiFuture<WriteCtx<ResumableWrite>> start) {
                this.session = session;
                this.start = start;
                this.decoder = Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource);
            }

            @Override
            public ApiFuture<WBC> openAsync() {
                final Instant begin = Factory.this.clock.instant();
                ApiFutures.addCallback(this.session.getResult(), new ApiFutureCallback<WriteObjectResponse>(){

                    @Override
                    public void onFailure(Throwable t2) {
                        Instant end = Factory.this.clock.instant();
                        WriteCtx writeCtx = (WriteCtx)ApiExceptions.callAndTranslateApiException(JournalingUpload.this.start);
                        long totalSentBytes = writeCtx.getTotalSentBytes().get();
                        Factory.this.gcs.recordThroughput(ThroughputSink.Record.of(totalSentBytes, begin, end, true));
                    }

                    @Override
                    public void onSuccess(WriteObjectResponse result) {
                        Instant end = Factory.this.clock.instant();
                        long totalSentBytes = -1L;
                        if (result.hasResource()) {
                            totalSentBytes = result.getResource().getSize();
                        } else if (result.hasPersistedSize()) {
                            totalSentBytes = result.getPersistedSize();
                        }
                        if (totalSentBytes > -1L) {
                            Factory.this.gcs.recordThroughput(ThroughputSink.Record.of(totalSentBytes, begin, end, false));
                        }
                    }
                }, MoreExecutors.directExecutor());
                return this.session.openAsync();
            }

            @Override
            public ApiFuture<BlobInfo> getResult() {
                return ApiFutures.transform(this.session.getResult(), this.decoder::decode, MoreExecutors.directExecutor());
            }
        }
    }
}

