package com.atlassian.pipelines.runner.core.util.file.upload;

import com.atlassian.bitbucketci.common.base.uuid.Uuid;
import com.atlassian.pipelines.runner.api.file.File;
import com.atlassian.pipelines.runner.core.exception.S3UploadException;
import com.atlassian.pipelines.runner.core.service.S3UrlGenerator;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;

/* loaded from: input_file:com/atlassian/pipelines/runner/core/util/file/upload/S3MultiPartUploaderImpl.class */
public class S3MultiPartUploaderImpl implements S3Uploader {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) S3MultiPartUploaderImpl.class);
    private static final int MAX_PARTS = 10000;
    private final int urlBatchSize;
    private final int maxPartSizeBytes;
    private final WebClient s3WebClient;
    private final int maxConcurrentUploads;
    private final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryStrategy;

    public S3MultiPartUploaderImpl(WebClient webClient, int i, int i2, int i3, Function<? super Flowable<Throwable>, ? extends Publisher<?>> function) {
        this.s3WebClient = webClient;
        this.maxPartSizeBytes = i;
        this.maxConcurrentUploads = i2;
        this.urlBatchSize = i3;
        this.retryStrategy = function;
    }

    @Override // com.atlassian.pipelines.runner.core.util.file.upload.S3Uploader
    public Single<Uuid> upload(File file, Uuid uuid, S3UrlGenerator s3UrlGenerator) {
        return fileChunksInfo(file).flatMapCompletable(fileChunksInfo -> {
            return chunkAndUpload(file, s3UrlGenerator, fileChunksInfo).flatMapCompletable(list -> {
                return completeUpload(s3UrlGenerator, (List<ChunkEtag>) list);
            }).doOnSubscribe(disposable -> {
                logger.info("Uploading {} chunks to s3", fileChunksInfo.getTotalChunks());
            });
        }).andThen(Single.just(uuid)).doOnSuccess(uuid2 -> {
            logger.info("Successfully uploaded file to s3 (file uuid {})", uuid);
        }).doOnError(th -> {
            logger.error("Error while uploading file to s3", th);
        });
    }

    private Completable completeUpload(S3UrlGenerator s3UrlGenerator, List<ChunkEtag> list) {
        return s3UrlGenerator.getCompleteUrl().flatMapCompletable(str -> {
            return completeUpload(str, (List<ChunkEtag>) list);
        });
    }

    private Completable completeUpload(String str, List<ChunkEtag> list) {
        CompleteMultipartUpload build = ImmutableCompleteMultipartUpload.builder().withParts((Iterable) list.stream().sorted().collect(Collectors.toList())).build();
        return Completable.defer(() -> {
            return Completable.fromPublisher(((WebClient.RequestBodySpec) this.s3WebClient.post().uri(URI.create(str))).bodyValue(build).retrieve().toBodilessEntity()).observeOn(Schedulers.io());
        }).retryWhen(this.retryStrategy).doOnSubscribe(disposable -> {
            logger.info("Completing multi part upload");
        }).doOnComplete(() -> {
            logger.info("Successfully completed multi part upload");
        }).onErrorResumeNext(th -> {
            return Completable.error(new S3UploadException("Failed to complete multipart upload", th));
        });
    }

    private Single<List<ChunkEtag>> chunkAndUpload(File file, S3UrlGenerator s3UrlGenerator, FileChunksInfo fileChunksInfo) {
        return Flowable.range(0, fileChunksInfo.getTotalChunks().intValue()).buffer(this.urlBatchSize).concatMap(list -> {
            return chunkAndUploadBatch(file, s3UrlGenerator, fileChunksInfo, list);
        }).toList();
    }

    private Flowable<ChunkEtag> chunkAndUploadBatch(File file, S3UrlGenerator s3UrlGenerator, FileChunksInfo fileChunksInfo, List<Integer> list) {
        return s3UrlGenerator.getS3UploadUrls(this.urlBatchSize, 1 + (list.get(0).intValue() / this.urlBatchSize)).flattenAsFlowable(list2 -> {
            return list2;
        }).zipWith(list, (BiFunction<? super U, ? super U, ? extends R>) (str, num) -> {
            return toS3Chunk(num.intValue(), file.getPath(), str, fileChunksInfo);
        }).flatMapSingle(this::uploadChunk, false, this.maxConcurrentUploads);
    }

    private Single<ChunkEtag> uploadChunk(S3Chunk s3Chunk) {
        return Single.using(() -> {
            return new BufferedInputStream(s3Chunk.newInputStream());
        }, bufferedInputStream -> {
            return Single.fromPublisher(((WebClient.RequestBodySpec) this.s3WebClient.put().uri(URI.create(s3Chunk.getUrl()))).contentLength(s3Chunk.getChunkSize().toBytes()).body(BodyInserters.fromResource(new InputStreamResource(bufferedInputStream))).retrieve().toBodilessEntity()).observeOn(Schedulers.io()).map(this::toEtag).map(str -> {
                return ChunkEtag.from(s3Chunk, str);
            });
        }, (v0) -> {
            v0.close();
        }).retryWhen(this.retryStrategy).onErrorResumeNext(th -> {
            return Single.error(new S3UploadException(String.format("Failed to upload chunk, part number %s", Integer.valueOf(s3Chunk.getPartNumber())), th));
        });
    }

    private String toEtag(ResponseEntity<Void> responseEntity) {
        S3UploadException s3UploadException = new S3UploadException("ETag not present on header");
        return (String) Optional.ofNullable(responseEntity.getHeaders().getETag()).orElseThrow(() -> {
            return s3UploadException;
        });
    }

    private Single<FileChunksInfo> fileChunksInfo(File file) {
        return Single.fromCallable(() -> {
            long bytes = length(file).toBytes();
            int intExact = Math.toIntExact((bytes / this.maxPartSizeBytes) + (bytes % ((long) this.maxPartSizeBytes) == 0 ? 0 : 1));
            if (intExact > 10000) {
                throw new S3UploadException(String.format("Too many chunks in a single upload. The file is too big or chunks are too small. Number of Chunks: %s, Max allowed: %s", Integer.valueOf(intExact), 10000));
            }
            return FileChunksInfo.from(DataSize.ofBytes(bytes), Integer.valueOf(intExact));
        }).doOnSuccess(fileChunksInfo -> {
            logger.info("Successfully got total chunks {}.", fileChunksInfo);
        }).onErrorResumeNext(th -> {
            return Single.error(new S3UploadException("Failed to get total chunks", th));
        });
    }

    private S3Chunk toS3Chunk(int i, Path path, String str, FileChunksInfo fileChunksInfo) {
        long j = i * this.maxPartSizeBytes;
        return ImmutableS3Chunk.builder().withFilePath(path).withStartPosition(j).withChunkSize(DataSize.ofBytes(Math.min(this.maxPartSizeBytes, fileChunksInfo.getDataSize().toBytes() - j))).withPartNumber(i + 1).withUrl(str).build();
    }

    private static DataSize length(File file) {
        try {
            return file.getLength();
        } catch (IOException e) {
            throw new S3UploadException("Unable to calculate file length", e);
        }
    }
}
