Hello everyone
I am facing a weird issue with the Alpakka S3 sink/flow: I am not getting any feedback (errors) and no document is stored, but the stream continues.
I should note that I am running against a local Minio and not AWS. The bucket has been created in advance and a read-write policy has been set in Minio. I can use the API via Postman.
private Sink<MyClass, NotUsed> createS3Flow() {
final S3Settings s3Settings =
S3Ext.get(system).settings()
.withEndpointUrl("http://localhost:30004")
.withS3RegionProvider(() -> Region.EU_CENTRAL_1)
.withAccessStyle(AccessStyle.pathAccessStyle())
.withCredentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("FOO", "BAR")))
.withListBucketApiVersion(
ApiVersion.getListBucketVersion1());
return Flow.of(MyClass.class)
.log("x", MyClass::getX).addAttributes(Attributes.createLogLevels(Attributes.logLevelInfo()))
.map(x -> ByteString.fromString(x.getKundennummerArbeitgeber()))
.to(S3.multipartUpload("testbucket", "x",
ContentTypes.TEXT_PLAIN_UTF8).withAttributes(
S3Attributes.settings(s3Settings)));
}
What strikes me is, that even when I configure a wrong password, the stream continues without complaining.
Another thing to note: we are running Quarkus and hence our source is actually provided by Mutiny (shipped with Quarkus), but since I prefer the akka-streams api I plugged the two together
var source = Source.fromPublisher(mutinySource);
source.to(createS3()).run(system);
The source is configured to pull an item from the DB every 2 seconds and I can see the element in the log-operator, but the S3-sink isnt doing anything (despite pulling new items)