Kafka (Message queue service)
Contents:
Apache Kafka is an open-source stream-processing software platform.
It is a framework for storing, reading and analyzing streaming data. See the Kafka documentation for more information.
Supported versions
Grid | Dedicated |
---|---|
|
None available |
Relationship
The format exposed in the $PLATFORM_RELATIONSHIPS
environment variable:
{
"service": "kafka25",
"ip": "169.254.46.170",
"hostname": "t7lv3t3ttyh3vyrzgqguj5upwy.kafka25.service._.eu-3.platformsh.site",
"cluster": "rjify4yjcwxaa-master-7rqtwti",
"host": "kafka.internal",
"rel": "kafka",
"scheme": "kafka",
"type": "kafka:2.5",
"port": 9092
}
Usage example
In your .platform/services.yaml
:
queuekafka:
type: kafka:2.5
disk: 512
In your .platform.app.yaml
:
relationships:
kafkaqueue: "queuekafka:kafka"
Note:
You will need to use
the kafka
type
when defining the service
# .platform/services.yaml
service_name:
type: kafka:version
disk: 512
and the endpoint kafka
when defining the relationship
# .platform.app.yaml
relationships:
relationship_name: “service_name:kafka”
Your service_name
and relationship_name
are defined by you, but we recommend making them distinct from each other.
You can then use the service in a configuration file of your application with something like:
package sh.platform.languages.sample;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sh.platform.config.Config;
import sh.platform.config.Kafka;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public class KafkaSample implements Supplier<String> {
@Override
public String get() {
StringBuilder logger = new StringBuilder();
// Create a new config object to ease reading the Platform.sh environment variables.
// You can alternatively use getenv() yourself.
Config config = new Config();
try {
// Get the credentials to connect to the Kafka service.
final Kafka kafka = config.getCredential("kafka", Kafka::new);
Map<String, Object> configProducer = new HashMap<>();
configProducer.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "animals");
final Producer<Long, String> producer = kafka.getProducer(configProducer);
// Sending data into the stream.
RecordMetadata metadata = producer.send(new ProducerRecord<>("animals", "lion")).get();
logger.append("Record sent with to partition ").append(metadata.partition())
.append(" with offset ").append(metadata.offset()).append('\n');
metadata = producer.send(new ProducerRecord<>("animals", "dog")).get();
logger.append("Record sent with to partition ").append(metadata.partition())
.append(" with offset ").append(metadata.offset()).append('\n');
metadata = producer.send(new ProducerRecord<>("animals", "cat")).get();
logger.append("Record sent with to partition ").append(metadata.partition())
.append(" with offset ").append(metadata.offset()).append('\n');
// Consumer, read data from the stream.
final HashMap<String, Object> configConsumer = new HashMap<>();
configConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
configConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Long, String> consumer = kafka.getConsumer(configConsumer, "animals");
ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(3));
// Print each record.
consumerRecords.forEach(record -> {
logger.append("Record: Key " + record.key());
logger.append(" value " + record.value());
logger.append(" partition " + record.partition());
logger.append(" offset " + record.offset()).append('\n');
});
// Commits the offset of record to broker.
consumer.commitSync();
return logger.toString();
} catch (Exception exp) {
throw new RuntimeException("An error when execute Kafka", exp);
}
}
}
from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# Get the credentials to connect to the Kafka service.
credentials = config.credentials('kafka')
try:
kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
# Producer
producer = KafkaProducer(
bootstrap_servers=[kafka_server],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for e in range(10):
data = {'number' : e}
producer.send('numtest', value=data)
# Consumer
consumer = KafkaConsumer(
bootstrap_servers=[kafka_server],
auto_offset_reset='earliest'
)
consumer.subscribe(['numtest'])
output = ''
# For demonstration purposes so it doesn't block.
for e in range(10):
message = next(consumer)
output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '
# What a real implementation would do instead.
# for message in consumer:
# output += loads(message.value.decode('UTF-8'))["number"]
return output
except Exception as e:
return e
## With the ruby-kafka gem
# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")
# Consumer
kafka.each_message(topic: "greetings") do |message|
puts message.offset, message.key, message.value
end
(The specific way to inject configuration into your application will vary. Consult your application or framework’s documentation.)