import{KafkaClientBindings,KafkaClientComponent,KafkaClientOptions,}from'loopback4-kafka-client';// ...exportclassMyApplicationextendsBootMixin(ServiceMixin(RepositoryMixin(RestApplication)),){constructor(options:ApplicationConfig={}){this.configure<KafkaClientOptions>(KafkaClientBindings.Component).to({initObservers:true,// if you want to init consumer lifeCycleObservertopics:[Topics.First],// if you want to use producers for given topicsconnection:{// refer https://kafka.js.org/docs/configurationbrokers:[process.env.KAFKA_SERVER??''],},});this.bind(KafkaClientBindings.ProducerConfiguration).to({// your producer config// refer https://kafka.js.org/docs/producing#options});this.bind(KafkaClientBindings.ConsumerConfiguration).to({// refer https://kafka.js.org/docs/consuming#optionsgroupId:process.env.KAFKA_CONSUMER_GROUP,});this.component(KafkaClientComponent);// ...}// ...}
Producers and Consumers work on a Stream which defines the topic and events used by the application. You can implement the IStreamDefinition to create your own stream class.
exportclassTestStreamimplementsIStreamDefinition{topic=Topics.First;messages:{// [<event type key from enum>] : <event type or interface>[Events.start]:StartEvent;[Events.stop]:StopEvent;};}
A Consumer is a loopback extension that is used by the KafkaConsumerService to initialize consumers. It must implement the IConsumer interface and should be using the @consumer() decorator. If you want the consumers to start at the start of your application, you should pass the initObservers config to the Component configuration.
// start.consumer.ts@consumer<TestStream,Events.start>()exportclassStartConsumerimplementsIConsumer<TestStream,Events.start>{constructor(@inject('test.handler.start')publichandler:StreamHandler<TestStream,Events.start>,){}topic:Topics.First=Topics.First;event:Events.start=Events.start;// you can write the handler as a methodhandler(payload:StartEvent){console.log(payload);}}
If you want to write a shared handler for different events, you can use the eventHandlerKey to bind a handler in the application -
A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the @producer(TOPIC_NAME) decorator.
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -