Kafka (Message queue service)

Apache Kafka is an open-source stream-processing software platform.

It is a framework for storing, reading and analyzing streaming data. See the Kafka documentation for more information.

Supported versions 

Grid Dedicated Gen 3 Dedicated Gen 2
  • 3.2
None available None available

The following versions aren’t available in the EU-1 and US-1 regions:

  • 3.2

Consider region migration if your project is in those regions.

Relationship 

The format exposed in the $PLATFORM_RELATIONSHIPS environment variable:

{
    "service": "kafka25",
    "ip": "169.254.27.10",
    "hostname": "t7lv3t3ttyh3vyrzgqguj5upwy.kafka25.service._.eu-3.platformsh.site",
    "cluster": "rjify4yjcwxaa-master-7rqtwti",
    "host": "kafka.internal",
    "rel": "kafka",
    "scheme": "kafka",
    "type": "kafka:2.5",
    "port": 9092
}

Usage example 

1. Configure the service 

To define the service, use the kafka type:

.platform/services.yaml
<SERVICE_NAME>:
    type: kafka:<VERSION>
    disk: 512

Note that if you later change the name, it’s treated as an entirely new service. This removes all data from your service. Always backup your data before changing the service.

2. Add the relationship 

To define the relationship, use the kafka endpoint :

.platform.app.yaml
relationships:
    <RELATIONSHIP_NAME>: "<SERVICE_NAME>:kafka"

You can define <SERVICE_NAME> and <RELATIONSHIP_NAME> as you like, but it’s best if they’re distinct.

Example Configuration 

Service definition

.platform/services.yaml
queuekafka:
    type: kafka:3.2
    disk: 512

App configuration

.platform.app.yaml
relationships:
    kafkaqueue: "queuekafka:kafka"

Use in app 

Then use the service in your app with a configuration file like the following:

package sh.platform.languages.sample;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sh.platform.config.Config;
import sh.platform.config.Kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class KafkaSample implements Supplier<String> {

    @Override
    public String get() {
        StringBuilder logger = new StringBuilder();

        // Create a new config object to ease reading the Platform.sh environment variables.
        // You can alternatively use getenv() yourself.
        Config config = new Config();

        try {
            // Get the credentials to connect to the Kafka service.
            final Kafka kafka = config.getCredential("kafka", Kafka::new);
            Map<String, Object> configProducer = new HashMap<>();
            configProducer.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "animals");
            final Producer<Long, String> producer = kafka.getProducer(configProducer);

            logger.append("<ul>");
            // Sending data into the stream.
            RecordMetadata metadata = producer.send(new ProducerRecord<>("animals", "lion")).get();
            logger.append("<li>Record sent with to partition <code>").append(metadata.partition())
                    .append("</code> with offset <code>").append(metadata.offset()).append("</code></li>");

            metadata = producer.send(new ProducerRecord<>("animals", "dog")).get();
            logger.append("<li>Record sent with to partition <code>").append(metadata.partition())
                    .append("</code> with offset <code>").append(metadata.offset()).append("</code></li>");

            metadata = producer.send(new ProducerRecord<>("animals", "cat")).get();
            logger.append("<li>Record sent with to partition <code>").append(metadata.partition())
                    .append("</code> with offset <code>").append(metadata.offset()).append("</code></li>");
            logger.append("</ul>");

            // Consumer, read data from the stream.
            final HashMap<String, Object> configConsumer = new HashMap<>();
            configConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
            configConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            Consumer<Long, String> consumer = kafka.getConsumer(configConsumer, "animals");
            ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(3));

            logger.append("<ul>");
            // Print each record.
            consumerRecords.forEach(record -> {
                logger.append("<li>Record: Key <code>" + record.key());
                logger.append("</code> value <code>" + record.value());
                logger.append("</code> partition <code>" + record.partition());
                logger.append("</code> offset <code>" + record.offset()).append("</code></li>");
            });
            logger.append("</ul>");

            // Commits the offset of record to broker.
            consumer.commitSync();

            return logger.toString();
        } catch (Exception exp) {
            throw new RuntimeException("An error when execute Kafka", exp);
        }
    }
}
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config


def usage_example():
    # Create a new Config object to ease reading the Platform.sh environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')
    
    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
        
        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)
        
        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )
        
        consumer.subscribe(['numtest'])
        
        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output
    
    except Exception as e:
        return e
## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

(The specific way to inject configuration into your application varies. Consult your application or framework’s documentation.)