Platform.sh User Documentation

Kafka (Message queue service)

Upsun Beta

Access our newest offering - Upsun!

Get your free trial by clicking the link below.

Get your Upsun free trial

Apache Kafka is an open-source stream-processing software platform.

It is a framework for storing, reading and analyzing streaming data. See the Kafka documentation for more information.

Supported versions Anchor to this heading

You can select the major and minor version.

Patch versions are applied periodically for bug fixes and the like. When you deploy your app, you always get the latest available patches.

Grid Dedicated Gen 3 Dedicated Gen 2
  • 3.2
  • 3.4
None available None available

Relationship reference Anchor to this heading

Example information available through the PLATFORM_RELATIONSHIPS environment variable or by running platform relationships.

Note that the information about the relationship can change when an app is redeployed or restarted or the relationship is changed. So your apps should only rely on the PLATFORM_RELATIONSHIPS environment variable directly rather than hard coding any values.

{
    "service": "kafka25",
    "ip": "169.254.27.10",
    "hostname": "t7lv3t3ttyh3vyrzgqguj5upwy.kafka25.service._.eu-3.platformsh.site",
    "cluster": "rjify4yjcwxaa-master-7rqtwti",
    "host": "kafka.internal",
    "rel": "kafka",
    "scheme": "kafka",
    "type": "kafka:3.4",
    "port": 9092
}

Usage example Anchor to this heading

1. Configure the service Anchor to this heading

To define the service, use the kafka type:

.platform/services.yaml
# The name of the service container. Must be unique within a project.
<SERVICE_NAME>:

    type: kafka:<VERSION>
    disk: 512

Note that changing the name of the service replaces it with a brand new service and all existing data is lost. Back up your data before changing the service.

2. Add the relationship Anchor to this heading

To define the relationship, use the kafka endpoint :

.platform.app.yaml
# Relationships enable access from this app to a given service.
relationships:
    <RELATIONSHIP_NAME>: "<SERVICE_NAME>:kafka"

You can define <SERVICE_NAME> and <RELATIONSHIP_NAME> as you like, but it’s best if they’re distinct. With this definition, the application container now has access to the service via the relationship <RELATIONSHIP_NAME>.

Example Configuration Anchor to this heading

Service definition Anchor to this heading

.platform/services.yaml
# The name of the service container. Must be unique within a project.
queuekafka:

    type: kafka:3.4
    disk: 512

App configuration Anchor to this heading

.platform.app.yaml
# Relationships enable access from this app to a given service.
relationships:
    kafkaqueue: "queuekafka:kafka"

Use in app Anchor to this heading

To use the configured service in your app, add a configuration file similar to the following to your project.

package sh.platform.languages.sample;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sh.platform.config.Config;
import sh.platform.config.Kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class KafkaSample implements Supplier<String> {

    @Override
    public String get() {
        StringBuilder logger = new StringBuilder();

        // Create a new config object to ease reading the Platform.sh environment variables.
        // You can alternatively use getenv() yourself.
        Config config = new Config();

        try {
            // Get the credentials to connect to the Kafka service.
            final Kafka kafka = config.getCredential("kafka", Kafka::new);
            Map<String, Object> configProducer = new HashMap<>();
            configProducer.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "animals");
            final Producer<Long, String> producer = kafka.getProducer(configProducer);

            logger.append("<ul>");
            // Sending data into the stream.
            RecordMetadata metadata = producer.send(new ProducerRecord<>("animals", "lion")).get();
            logger.append("<li>Record sent with to partition <code>").append(metadata.partition())
                    .append("</code> with offset <code>").append(metadata.offset()).append("</code></li>");

            metadata = producer.send(new ProducerRecord<>("animals", "dog")).get();
            logger.append("<li>Record sent with to partition <code>").append(metadata.partition())
                    .append("</code> with offset <code>").append(metadata.offset()).append("</code></li>");

            metadata = producer.send(new ProducerRecord<>("animals", "cat")).get();
            logger.append("<li>Record sent with to partition <code>").append(metadata.partition())
                    .append("</code> with offset <code>").append(metadata.offset()).append("</code></li>");
            logger.append("</ul>");

            // Consumer, read data from the stream.
            final HashMap<String, Object> configConsumer = new HashMap<>();
            configConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
            configConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            Consumer<Long, String> consumer = kafka.getConsumer(configConsumer, "animals");
            ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(3));

            logger.append("<ul>");
            // Print each record.
            consumerRecords.forEach(record -> {
                logger.append("<li>Record: Key <code>" + record.key());
                logger.append("</code> value <code>" + record.value());
                logger.append("</code> partition <code>" + record.partition());
                logger.append("</code> offset <code>" + record.offset()).append("</code></li>");
            });
            logger.append("</ul>");

            // Commits the offset of record to broker.
            consumer.commitSync();

            return logger.toString();
        } catch (Exception exp) {
            throw new RuntimeException("An error when execute Kafka", exp);
        }
    }
}
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config


def usage_example():
    # Create a new Config object to ease reading the Platform.sh environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')
    
    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
        
        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)
        
        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )
        
        consumer.subscribe(['numtest'])
        
        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output
    
    except Exception as e:
        return e

## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

(The specific way to inject configuration into your application varies. Consult your application or framework’s documentation.)

Is this page helpful?