Spring Cloud GCP Pub/Sub Support

 



Spring Cloud GCP Pub/Sub Support

Spring Cloud GCP offers robust and idiomatic support for Google Cloud Pub/Sub, enabling Spring developers to seamlessly integrate event-driven and message-based architectures into their applications. The support is provided through two main abstractions: Spring Integration and the Spring Cloud Stream Binder. These tools simplify the complexities of interacting with the Pub/Sub API, allowing developers to focus on business logic.

Spring Integration for Google Cloud Pub/Sub

Spring Integration provides a framework for building message-driven architectures. The Spring Cloud GCP library offers specialized adapters to connect Spring Integration message channels to Google Cloud Pub/Sub topics and subscriptions. This approach is ideal for developers who prefer a more declarative, component-based style of programming.

Inbound and Outbound Adapters

The integration provides both inbound and outbound channel adapters.

  • Inbound Channel Adapter: Listens to messages from a Pub/Sub subscription and routes them to a Spring Integration message channel.

  • Outbound Channel Adapter: Takes messages from a Spring Integration message channel and publishes them to a Pub/Sub topic.

Code Sample: Spring Integration

The following Java configuration demonstrates a simple setup for a Pub/Sub publisher and subscriber.

import com.google.cloud.spring.integration.gcp.pubsub.inbound.PubSubInboundGateway;
import com.google.cloud.spring.integration.gcp.pubsub.outbound.PubSubMessageHandler;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;

@Configuration
public class PubSubIntegrationConfig {

    // --- Outbound Configuration (Publisher) ---
    @Bean
    public MessageChannel pubsubOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "pubsubOutputChannel")
    public PubSubMessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        return new PubSubMessageHandler(pubSubTemplate, "example-topic");
    }

    @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
    public interface PubsubGateway {
        void sendToPubsub(String text);
    }

    // --- Inbound Configuration (Subscriber) ---
    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload, @Header("gcp_pubsub_acknowledgement") BasicAcknowledgeablePubsubMessage message) {
        System.out.println("Received message: " + payload);
        message.ack();
    }

    @Bean
    public PubSubInboundGateway inboundGateway(
            @Qualifier("pubsubInputChannel") MessageChannel messageChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundGateway gateway = new PubSubInboundGateway(pubSubTemplate, "example-subscription");
        gateway.setOutputChannel(messageChannel);
        gateway.setPayloadType(String.class);
        return gateway;
    }
}

Spring Cloud Stream Binder for Google Cloud Pub/Sub

Spring Cloud Stream provides a powerful abstraction for building event-driven microservices. The Pub/Sub binder hides the underlying message broker details, allowing developers to write producer and consumer applications that are platform-agnostic. This is particularly useful for applications that might need to switch between different messaging systems without significant code changes.

Functional Programming Model

The binder promotes a functional programming model where message producers and consumers are simple Supplier, Function, or Consumer beans. This approach is highly flexible and aligns with modern Spring Boot practices.

Code Sample: Spring Cloud Stream

The following example shows how to use the binder with a functional approach.

import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;

@SpringBootApplication
public class PubSubStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(PubSubStreamApplication.class, args);
    }

    // --- Outbound Configuration (Publisher) ---
    @Bean
    public Supplier<Flux<Message<String>>> producer() {
        return () -> Flux.just(
            MessageBuilder.withPayload("Hello from Spring Cloud Stream!").build(),
            MessageBuilder.withPayload("Another message!").build()
        );
    }

    // --- Inbound Configuration (Subscriber) ---
    @Bean
    public Consumer<Message<String>> consumer() {
        return message -> {
            System.out.println("Received message from Pub/Sub: " + message.getPayload());
            // Acknowledgement is handled automatically by the binder
        };
    }
}

Configuration in application.properties would be:

spring.cloud.gcp.pubsub.binder.enabled=true
spring.cloud.gcp.pubsub.binder.consumer.ackMode=MANUAL
spring.cloud.gcp.pubsub.binder.default.group=example-subscription
spring.cloud.gcp.pubsub.binder.topics=example-topic

References and Further Reading

For detailed guides and configuration options, refer to the official documentation.

Comments