Kafka (Message Queue service)

Apache Kafka is an open-source stream-processing software platform. It is a framework for storing, reading and analysing streaming data.

See the Kafka documentation for more information.

Supported versions

  • 2.1
  • 2.2

Relationship

The format exposed in the $PLATFORM_RELATIONSHIPS environment variable:

{
    "service": "kafka",
    "ip": "169.254.252.225",
    "hostname": "spvcs2jx7mu63nhq2cb7iblsim.kafka.service._.eu-3.platformsh.site",
    "cluster": "rjify4yjcwxaa-master-7rqtwti",
    "host": "kafka.internal",
    "rel": "kafka",
    "scheme": "kafka",
    "type": "kafka:2.2",
    "port": 9092
}

Usage example

In your .platform/services.yaml:

queuekafka:
    type: kafka:2.2
    disk: 512

In your .platform.app.yaml:

relationships:
    kafkaqueue: "queuekafka:kafka"

You can then use the service in a configuration file of your application with something like:

Java
Python
Ruby
package sh.platform.languages.sample;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sh.platform.config.Config;
import sh.platform.config.Kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class KafkaSample implements Supplier<String> {

    @Override
    public String get() {
        StringBuilder logger = new StringBuilder();

        // Create a new config object to ease reading the Platform.sh environment variables.
        // You can alternatively use getenv() yourself.
        Config config = new Config();

        try {
            // Get the credentials to connect to the Kafka service.
            final Kafka kafka = config.getCredential("kafka", Kafka::new);
            Map<String, Object> configProducer = new HashMap<>();
            configProducer.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "animals");
            final Producer<Long, String> producer = kafka.getProducer(configProducer);

            // Sending data into the stream.
            RecordMetadata metadata = producer.send(new ProducerRecord<>("animals", "lion")).get();
            logger.append("Record sent with to partition ").append(metadata.partition())
                    .append(" with offset ").append(metadata.offset()).append('\n');

            metadata = producer.send(new ProducerRecord<>("animals", "dog")).get();
            logger.append("Record sent with to partition ").append(metadata.partition())
                    .append(" with offset ").append(metadata.offset()).append('\n');

            metadata = producer.send(new ProducerRecord<>("animals", "cat")).get();
            logger.append("Record sent with to partition ").append(metadata.partition())
                    .append(" with offset ").append(metadata.offset()).append('\n');

            // Consumer, read data from the stream.
            final HashMap<String, Object> configConsumer = new HashMap<>();
            configConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
            configConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            Consumer<Long, String> consumer = kafka.getConsumer(configConsumer, "animals");
            ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(3));

            // Print each record.
            consumerRecords.forEach(record -> {
                logger.append("Record: Key " + record.key());
                logger.append(" value " + record.value());
                logger.append(" partition " + record.partition());
                logger.append(" offset " + record.offset()).append('\n');
            });

            // Commits the offset of record to broker.
            consumer.commitSync();

            return logger.toString();
        } catch (Exception exp) {
            throw new RuntimeException("An error when execute Kafka", exp);
        }
    }
}
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config


def usage_example():
    # Create a new Config object to ease reading the Platform.sh environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')
    
    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
        
        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)
        
        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )
        
        consumer.subscribe(['numtest'])
        
        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output
    
    except Exception as e:
        return e
## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

(The specific way to inject configuration into your application will vary. Consult your application or framework's documentation.)