Hi,
I have a question regarding the timeout handling in a stream that lists the content of a S3 bucket. Basically I have an actor that periodically starts a Akka Stream that lists the content of a S3 bucket and sends the contained objects as messages to the actor itself:
/**
* Execute the polling of the source
*/
@Override
protected UniqueKillSwitch executePolling() {
ActorRef self = getSelf();
Timeout timeout = Timeout.apply(5, TimeUnit.SECONDS);
self.tell(SourceS3ActorAPI.PollingStarted.create(), getSelf());
SourceS3ConfigData config = (SourceS3ConfigData) getConfig();
Pair<UniqueKillSwitch, CompletionStage<Done>> pair =
S3.listBucket(config.getBucket(), Option.apply(config.getPrefix()))
.withAttributes(s3Attributes)
.mapAsync(5, e -> FutureConverters.toJava(
Patterns.ask(
self,
S3Object.create(e.getKey(), e.getSize(), e.getStorageClass(), e.getLastModified()),
timeout)).toCompletableFuture())
.viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.ignore(), Keep.both())
.run(config.getMaterializer());
pair.second().toCompletableFuture().whenComplete(
(r, t) -> {
if (r != null) {
self.tell(SourceS3ActorAPI.PollingFinished.create(), getSelf());
} else {
self.tell(SourceS3ActorAPI.PollingFailed.create(t), getSelf());
}
});
return pair.first();
}
That method returns a UniqueKillSwitch which I want to use to abort the stream if necessary.
Today my S3 provider seems to have some problems so that even the Web-Consol of the provider is not responsive. Anyhow I can not terminate the Akka Stream from above using the KillSwitch. It seems as if the dispatcher thread is blocked within the listBucket source and so the KillSwitch never gets evaluated.
From my logs I am getting the following output:
10:41:07.207 INFO Layline.Workflow.S3ToLocalFile - [LAY-02510] workflow 'S3ToLocalFile' is running
10:41:07.215 INFO Layline.Workflow.S3ToLocalFile.P1 - [LAY-02533] processor 1 for workflow 'S3ToLocalFile' started
10:41:07.216 INFO Layline.Source.SourceS3 - [LAY-01113] actor 'Actor[akka://layline/user/controller/workflows/S3ToLocalFile/Processor-1#1067835194]' subscribed to source 'SourceS3'
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-38] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$a] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-41] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$d] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-40] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$c] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-39] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$b] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:11.931] [layline-akka.actor.default-dispatcher-29] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 100 milliseconds.
[WARN] [01/25/2019 10:41:13.388] [layline-akka.actor.default-dispatcher-30] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 200 milliseconds.
[WARN] [01/25/2019 10:41:14.918] [layline-akka.actor.default-dispatcher-34] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 400 milliseconds.
[WARN] [01/25/2019 10:41:16.669] [layline-akka.actor.default-dispatcher-34] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 800 milliseconds.
[WARN] [01/25/2019 10:41:18.952] [layline-akka.actor.default-dispatcher-35] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 1600 milliseconds.
[WARN] [01/25/2019 10:41:22.034] [layline-akka.actor.default-dispatcher-17] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 3200 milliseconds.
10:42:07.238 ERROR Layline.Source.SourceS3 - [LAY-03123] missing heartbeat from polling operation
10:42:07.238 ERROR Layline.Source.SourceS3 - [LAY-03124] killing polling operation
10:43:07.236 ERROR Layline.Source.SourceS3 - [LAY-03123] missing heartbeat from polling operation
10:44:07.249 ERROR Layline.Source.SourceS3 - [LAY-03123] missing heartbeat from polling operation
10:45:07.237 ERROR Layline.Source.SourceS3 - [LAY-03123] missing heartbeat from polling operation
Are there any timeout settings I have to apply on the S3 settings?
Regards,
Lay