How does alpaka and pub sub work?

How does alpaka and pub sub work?

This topic is based on a conversation between an anonymous user of Akka and our “Ask AI about Akka” bot in the documentation.

Alpakka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala. It provides connectors for various technologies, including Google Cloud Pub/Sub.

Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers. It allows for secure and highly available communication between independently written applications.

Alpakka provides two connectors for Google Cloud Pub/Sub:

  1. Alpakka Google Cloud Pub/Sub: This connector communicates with Pub/Sub via HTTP requests.

Here is an example of how to use it in Scala:

implicit val system: ActorSystem = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"

And in Java:

ActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String topic = "topic1";
String subscription = "subscription1";
  1. Alpakka Google Cloud Pub/Sub gRPC: This connector communicates with Pub/Sub via the gRPC protocol. The integration between Akka Stream and gRPC is handled by Akka gRPC.

Here is an example of how to use it in Scala:

val projectId = "alpakka"
val topic = "simpleTopic"

val publishMessage: PubsubMessage =
  PubsubMessage()
    .withData(ByteString.copyFromUtf8("Hello world!"))

val publishRequest: PublishRequest =
  PublishRequest()
    .withTopic(s"projects/$projectId/topics/$topic")
    .addMessages(publishMessage)

val source: Source[PublishRequest, NotUsed] =
  Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] =
  GooglePubSub.publish(parallelism = 1)

val publishedMessageIds: Future[Seq[PublishResponse]] = source.via(publishFlow).runWith(Sink.seq)

And in Java:

final String projectId = "alpakka";
final String topic = "simpleTopic";

final PubsubMessage publishMessage =
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build();

final PublishRequest publishRequest =
    PublishRequest.newBuilder()
        .setTopic("projects/" + projectId + "/topics/" + topic)
        .addMessages(publishMessage)
        .build();

final Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

To run the example code, you will need to configure a project and pub/sub in Google Cloud and provide your own credentials.

How does grpc and http differ?

gRPC and HTTP are both protocols used for communication between services in a distributed system. However, they differ in several ways (source):

  1. Encoding: While HTTP is more flexible about encoding and can use different formats like XML, JSON, etc., gRPC standardizes on Protobuf, which is a binary protocol and is more efficient in terms of both speed and size.
  2. Schema: HTTP can be either schemaless or use a 3rd-party schema. On the other hand, gRPC always declares the service and messages in a Protobuf schema definition. This schema-first design favors well-defined and decoupled service interfaces over brittle ad-hoc solutions.
  3. Transport: While HTTP can work with different transport protocols, gRPC standardizes on HTTP/2. HTTP/2 allows multiplexing several data streams over a single connection, which is more efficient.
  4. Streaming: gRPC has first-class support for streaming requests and responses. This is not typically available in HTTP/1.1, but is available in HTTP/2.
  5. Interoperability: gRPC provides tools for many languages allowing seamless interoperability between clients and services written in different languages.
  6. Synchronous vs Asynchronous: While built on an efficient non-blocking implementation, gRPC is still ‘synchronous’ in the sense that it requires both ‘sides’ of the communication to be available at the same time. HTTP, on the other hand, can support both synchronous and asynchronous communication.
  7. Usage: gRPC is well-suited for connections between internal services, connecting to external services that expose a gRPC API (even ones written in other languages), and serving data to web or mobile front-ends. HTTP, being a more general-purpose protocol, can be used for a wider range of applications.

I want to subscribe message allowing high performance and source control and backpresure.

To subscribe to messages with high performance and source control, you can use Akka Streams with backpressure. Akka Streams is a library that is designed to process and transfer a sequence of elements using bounded buffer space. It implements backpressure, which is a form of flow control to ensure that fast producers do not overwhelm slow consumers.

Here is an example of how to use Akka Streams with backpressure in Scala, using Alpakka’s MQTT connector:

val mqttSource: Source[MqttMessage, Future[Done]] =
  MqttSource.atMostOnce(
    connectionSettings.withClientId(clientId = "source-spec/source"),
    MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
    bufferSize = 8
  )

val (subscribed, streamResult) = mqttSource
  .take(messages.size)
  .toMat(Sink.seq)(Keep.both)
  .run()

In this example, MqttSource.atMostOnce creates a source that connects to the MQTT server and receives messages from the subscribed topics. The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies. This means that if the source produces messages faster than they can be consumed, it will start to slow down once the buffer is full, preventing the consumer from being overwhelmed (source).

For Google Cloud Pub/Sub, you can use Alpakka’s Google Cloud Pub/Sub connector:

val subscriptionSource: Source[ReceivedMessage, Cancellable] =
  GooglePubSub.subscribe(subscription, config)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(subscription, config)

subscriptionSource
  .map { message =>
    // do something fun

    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)

In this example, GooglePubSub.subscribe creates a source that subscribes to a Pub/Sub topic and receives messages. The ackSink is a sink that acknowledges the received messages, signaling that they have been processed and can be removed from the Pub/Sub system. The groupedWithin operator is used to batch the acknowledgements, improving performance (source).

Remember that to run these examples, you will need to configure your MQTT server or Google Cloud Pub/Sub and provide your own credentials.

I need the code in java.

Sure, here is an example of how to use Akka Streams with backpressure in Java, using Alpakka’s Google Cloud Pub/Sub connector:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import akka.stream.alpakka.googlecloud.pubsub.javadsl.GooglePubSub;
import akka.stream.alpakka.googlecloud.pubsub.AcknowledgeRequest;
import akka.stream.alpakka.googlecloud.pubsub.ReceivedMessage;
import java.util.concurrent.CompletionStage;

ActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String subscription = "subscription1";

Source<ReceivedMessage, NotUsed> subscriptionSource =
  GooglePubSub.subscribe(subscription, config, system);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
  GooglePubSub.acknowledge(subscription, config, system);

subscriptionSource
  .map(message -> {
    // do something with the message

    return message.ackId();
  })
  .groupedWithin(1000, Duration.ofMinutes(1))
  .map(AcknowledgeRequest::create)
  .to(ackSink);

In this example, GooglePubSub.subscribe creates a source that subscribes to a Pub/Sub topic and receives messages. The ackSink is a sink that acknowledges the received messages, signaling that they have been processed and can be removed from the Pub/Sub system. The groupedWithin operator is used to batch the acknowledgements, improving performance (source).

Remember that to run this example, you will need to configure your Google Cloud Pub/Sub and provide your own credentials.

Where is the resource buffer and back pressure handled and restart?

In Akka, resource buffering and backpressure are handled in the Akka Streams library. Akka Streams implement an asynchronous non-blocking back-pressure protocol standardized by the Reactive Streams specification. The user of the library does not have to write any explicit back-pressure handling code — it is built in and dealt with automatically by all of the provided Akka Streams operators (source).

Here is an example of how to use Akka Streams with backpressure in Java:

tweets
    .buffer(10, OverflowStrategy.dropHead())
    .map(t -> slowComputation(t))
    .runWith(Sink.ignore(), system);

In this example, the buffer method is used to handle buffering with a buffer size of 10 elements. The OverflowStrategy.dropHead() strategy is used for backpressure, which means that if the buffer is full and a new element arrives, the oldest element in the buffer will be dropped (source).

As for restarts, they are handled in Akka’s fault tolerance model. When an actor fails (throws an exception), it is suspended and its supervisor is informed of the failure. The supervisor then decides what to do, which could be to restart the actor. Here is an example of how to handle restarts in Java:

return Behaviors.receive(String.class)
                  .onMessage(
                      String.class,
                      msg -> {
                        // message handling that might throw an exception
                        String[] parts = msg.split(" ");
                        resource.process(parts);
                        return Behaviors.same();
                      })
                  .onSignal(
                      PreRestart.class,
                      signal -> {
                        resource.close();
                        return Behaviors.same();
                      })
                  .onSignal(
                      PostStop.class,
                      signal -> {
                        resource.close();
                        return Behaviors.same();
                      })
                  .build();
            }))
    .onFailure(Exception.class, SupervisorStrategy.restart());

In this example, the SupervisorStrategy.restart() strategy is used, which means that if the actor throws an exception, it will be restarted (source).