Kafka (Message Queue service)

Apache Kafka is an open-source stream-processing software platform. It is a framework for storing, reading and analysing streaming data.

See the Kafka documentation for more information.

Supported versions

  • 2.1

Relationship

The format exposed in the $PLATFORM_RELATIONSHIPS environment variable:

{
    "service": "kafka",
    "ip": "169.254.244.224",
    "hostname": "spvcs2jx7mu63nhq2cb7iblsim.kafka.service._.eu-3.platformsh.site",
    "cluster": "rjify4yjcwxaa-master-7rqtwti",
    "host": "kafka.internal",
    "rel": "kafka",
    "scheme": "kafka",
    "type": "kafka:2.1",
    "port": 9092
}

Usage example

In your .platform/services.yaml:

mykafka:
    type: kafka:2.1
    disk: 1024

In your .platform.app.yaml:

relationships:
    kafka: "mykafka:kafka"

You can then use the service in a configuration file of your application with something like:

Python
Ruby
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config


def usage_example():
    # Create a new Config object to ease reading the Platform.sh environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')
    
    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
        
        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)
        
        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )
        
        consumer.subscribe(['numtest'])
        
        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output
    
    except Exception as e:
        return e
## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

(The specific way to inject configuration into your application will vary. Consult your application or framework's documentation.)