package com.atolio.connector.core.feeder;

import com.atolio.connector.core.model.dto.EntityDTOBase;
import com.atolio.pbingest.DocumentOuterClass;
import com.atolio.pbingest.Feed;
import com.atolio.pbingest.Operation;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/lib/core-dc-connector-0.0.0.2.jar:com/atolio/connector/core/feeder/FeederStream.class */
public class FeederStream {
    private static final Logger LOG = LoggerFactory.getLogger(FeederStream.class);
    private final Feeder feeder;
    private final StreamObserver<Feed.PublishResponse> responseObserver;
    private ClientCallStreamObserver<Feed.PublishRequest> requestObserver;
    private boolean closed = true;

    public FeederStream(Feeder feeder, StreamObserver<Feed.PublishResponse> streamObserver) {
        this.responseObserver = streamObserver;
        this.feeder = feeder;
    }

    public void onCompletedRequest() {
        LOG.debug("Calling request observer onCompleted() on grpc stream");
        this.requestObserver.onCompleted();
        close();
    }

    public <E extends EntityDTOBase> void publishEntity(E e) {
        if (e == null) {
            LOG.warn("skipped publishing NULL entity. Possible error during DTO mapping.");
        } else {
            publishDocument(this.feeder.transformersFactory.getTransformer(e, this.feeder.dataAccessor.getSource()).transform(), Operation.OperationType.OPERATION_TYPE_UPSERT);
        }
    }

    public <E extends EntityDTOBase> void deleteEntity(E e) {
        if (e == null) {
            LOG.warn("skipped deleting NULL entity. Possible error during DTO mapping.");
        } else {
            publishDocument(this.feeder.transformersFactory.getTransformer(e, this.feeder.dataAccessor.getSource()).transform(), Operation.OperationType.OPERATION_TYPE_DELETE);
        }
    }

    protected synchronized void initStream() {
        this.feeder.initChannel();
        if (this.requestObserver == null || this.closed) {
            LOG.debug("creating new grpc stream");
            StreamObserver<Feed.PublishRequest> publish = this.feeder.stub.publish(wrapResponseObserver(this.responseObserver));
            if (!(publish instanceof ClientCallStreamObserver)) {
                throw new RuntimeException("Unexpected StreamObserver type returned by grpc stub");
            }
            this.requestObserver = (ClientCallStreamObserver) publish;
            this.closed = false;
        }
    }

    protected void close() {
        LOG.info("Closing stream");
        this.closed = true;
    }

    private void publishDocument(DocumentOuterClass.Document document, Operation.OperationType operationType) {
        Objects.requireNonNull(document, "Document can't be published! Document is NULL.");
        Objects.requireNonNull(operationType, "Document can't be published! OperationType is NULL.");
        try {
            Feed.PublishRequest build = Feed.PublishRequest.newBuilder().setDocument(document).setOperationType(operationType).build();
            initStream();
            while (!this.requestObserver.isReady()) {
                LOG.trace("Waiting for server to be ready to send more messages");
                Thread.sleep(100L);
            }
            this.requestObserver.onNext(build);
        } catch (Throwable th) {
            throw new RuntimeException("Unexpected Exception appeared while publishing Document to Feeder", th);
        }
    }

    private StreamObserver<Feed.PublishResponse> wrapResponseObserver(final StreamObserver<Feed.PublishResponse> streamObserver) {
        return new StreamObserver<Feed.PublishResponse>() { // from class: com.atolio.connector.core.feeder.FeederStream.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Feed.PublishResponse publishResponse) {
                if (publishResponse.hasError()) {
                    FeederStream.LOG.error("Received error response from Feeder: {}", publishResponse);
                }
                streamObserver.onNext(publishResponse);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                FeederStream.LOG.error("Received error from grpc server", th);
                if (!FeederStream.this.closed) {
                    FeederStream.LOG.debug("Closing grpc stream due to response observer onError signal");
                    FeederStream.this.close();
                }
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                FeederStream.LOG.debug("Received completed signal from grpc server");
                streamObserver.onCompleted();
            }
        };
    }
}
