In this blog

What is KafkaJS?

KafkaJS is a Kafka client for NodeJS. If you are unfamiliar with Kafka, consider learning more about it. For the purposes of this article, Kafka can be viewed as a centralized queueing service, where each queue (more commonly called a topic) is an immutable ordered list of messages. Clients can read from and write to topics depending on their permissions which allows for asynchronous workloads, reprocessing, and parallelization.

KafkaJS gives JavaScript developers a way to interact with Kafka, typically by either reading or writing to topics but also through a few other methods. From its own website, KafkaJS claims it has no dependencies (a rarity in the Node ecosystem), is well tested in a production-like environment, and is battle-tested by the creators themselves.

TL;DR: Should you use KafkaJS?

To save you the trouble of reading the whole article, here are my overall recommendations for using KafkaJS based on personal experience and the feedback of other teammates:

Do NOT use KafkaJS if:

  • Your use case is better met by another Kafka client such as Kafka Streams or ksqlDB.
  • You are interested in learning the official Java client, which is what most documentation and training courses are based on.

Do use KafkaJS if:

  • Your related projects are already invested in JavaScript.
  • You want to easily integrate pushing to Kafka into a NodeJS API.
  • You need to write a custom integration similar to a connector.
  • You want a client that is reasonably debuggable.

Development

Overall, the development experience with KafkaJS has been a pleasant one. Besides needing some boilerplate code, KafkaJS gets out of the way very easily and allows you to write the code you want to, rather than forcing you into a particular structure. On my particular sub-team, we integrated KafkaJS with NestJS, although this was not the case for all sub-teams.

For our use case, we were able to wrap the core functionality of KafkaJS in a service class to abstract away common choices such as what compression algorithm to use. We also wrapped the Confluent Schema Registry library provided as a separate package to enable sending and receiving of AVRO encoded messages.

/* NestJS Controller */

@Controller('/')
class ApiToKafkaController {
	constructor(
		private kafka: KafkaService,
		private schemaRegistry: SchemaRegistryService,
		private topic: string,
	) {}
	
	@Post('/event')
	async eventHandler(@Body() body: EventData) {
	    const payload = await this.schemaRegistry.encode(event, this.topic)
	    await this.kafka.send(this.topic, payload)
	}
}

/* Schema Registry Service */

class SchemaRegistryService {
	private registry: SchemaRegistry

	constructor(config: SchemaRegistryAPIClientArgs) {
		this.registry = new SchemaRegistry(config)
	}

	await encode(data: unknown, topic: string) {
		const subject = `${topic}-value`
		const id = await this.registry.getLatestSchemaId(subject)
		const payload = await this.registry.encode(id, data)
		return payload
	}
}

/* KafkaService */

class KafkaService {
	private client: Kafka
	private producer: Producer
	
	constructor(config: KafkaConfig) {
		this.client = new Kafka(config)
		this.producer = this.client.producer()
	}
	
	async send(topic: String, value: Buffer) {
		await this.producer.send({
			topic,
			messages: [{ value }],
			compression: CompressionTypes.Snappy,
		})
	}
}

For consuming messages, KafkaJS has a bit of an odd interface. Instead of providing a simple subscribe(topic, listener, options) interface, you subscribe to a topic and then call run with an object containing your options and a eachMessage or eachBatch callback.

/* Schema Registry Service */

class SchemaRegistryService {
	/* ... */

	await decode(data: Buffer) {
		// schemaId is encoded in the message, no need to use the topic for lookup
		const message = await this.registry.decode(data)
		return message
	}
}

/* KafkaService */

class KafkaService {
	private consumer: Consumer
	
	async subscribe(topic: String, listener: (message: KafkaMessage) => unknown) {
		await this.consumer.subscribe({ topic })

		await this.consumer.run({
			eachMessage: (message) => onMessage(message.message),
      	})
	}
}

As you can see, it is fairly easy to wrap this interface to convert it to what I described, so even though it does not seem to be idiomatic JavaScript, it is simple and flexible. Incidentally, our original implementation used eachMessage to send API requests to a third-party service. When converted to use eachBatch as well as the batching endpoint of the service, we saw performance gains of around 8 to 10 times faster execution. This was a good reminder that while thinking and writing code about a single message is often easier, you may be missing out on performance.

Another library that became vital to our development process was this AVRO to TypeScript converter, which allowed us to easily convert not only our own schemas but also schemas provided by other teams. This converter meant that we could run a single build step to publish our schemas to the schema registry for local testing while also ensuring that our types were in sync with what Kafka was expecting while maintaining strong typing throughout our code.

In personal projects, I also explored using the Admin Client which allows for managing Kafka topics and consumers. Though my particular use case ended up running into some cumbersome code regarding pausing a subset of partitions while consuming, the admin client is fairly intuitive and has a few spots that have been well-designed to flow into one another, such as the ability to retrieve a list of offsets for a particular timestamp and then set a consumer group to those offsets.

 async function consumeTimeslot(topic: string, groupId: string, start: Date, end: Date) {
 	const admin = await initAdmin()
 	const consumer = await initConsumer(groupId)
 
 	await consumer.subscribe({ topic })

	const partitions = await admin.fetchTopicOffsetsByTimestamp(topic, start.getTime())
	await admin.setOffsets({ groupId, topic, partitions })

    const { promise, resolve, reject } = deferredPromise()
	const messages = []

	consumer.run({ /* ... */ })

	return promise // resolves with messages
 }

A Note on Co-Partitioning

In Kafka, topics can have multiple partitions in order to facilitate parallel processing. When processing partitions, it's often necessary to consider co-partitioning so that data from different sources ends up on the same partitions and any node processing that particular partition does not need to consider the others.

KafkaJS ships with a DefaultPartitioner that differs from what is used in Java. This can be a huge pain to deal with when working in both languages, such as a KStreams app consuming from a topic written to by KafkaJS. Instead, it is better to use the JavaCompatiblePartitioner when producing to topics:

kafka.producer({ createPartitioner: Partitioners.JavaCompatiblePartitioner })

Testing

There are two main ways we have been interacting with KafkaJS when it comes to testing: unit testing and end-to-end testing.

With unit testing, the most common approach is to simply mock the KafkaJS layer as it is an edge to our application. Since we had already wrapped it in a service class, it was fairly easy to mock the service and mimic Kafka at that layer rather than mocking the larger surface area of KafkaJS itself.

describe('ApiToKafkaController', () => {
    const testTopic = 'test.topic'
	let kafkaMock = { send: jest.fn() }
	let registryMock = { encode: jest.fn() }
	let controller: ApiToKafkaController

	beforeEach(() => {
		controller = new ApiToKafkaController(kafkaMock, registryMock, testTopic)
	})

	it('should send the encoded message to kafka', async () => {
		const body = { field: 'value' }
		registryMock.encode.mockResolvedValue('encoded.value.string')
		
		await controller.eventHandler(body)
		
		expect(registryMock.encode).toHaveBeenCalledWith(body, testTopic)
		expect(kafkaMock.send).toHaveBeenCalledWith(testTopic, 'encoded.value.string')
	})
})

For end-to-end testing, we chose to use Cypress. This decision mostly comes from it being a familiar option to both developers and quality advocates on our team allowing both to contribute to the suite of tests. KafkaJS was a factor in this decision as it seemed like we could use both technologies together. Pairing Cypress with KafkaJS, we are able to test the flow of messages throughout our entire system in plain JavaScript.

To integrate KafkaJS into Cypress, we were able to set up tasks for various needs such as sending a message to a topic and asserting on a specific message coming across a different topic. By using a common interface for interacting with the system under test, we could seamlessly integrate our Kafka needs alongside other testing needs like reading from databases, listening to feeds and talking to third-party services. This meant our tests could focus on the conceptual flow of the system while abstracting the operations needed to invoke or confirm it working properly.

/* task */

// Cypress does not keep any state for us and any data passed from a test is by value,
// so we use a pseudo-reducer pattern by keeping a state object that is passed to every task as the first param
const state = {}

module.exports = (on) => {
	on('task', {
		// takes a topic to consume from, dumps to array in state as it reads
		startListener: (topic) => startListener(state, topic),
		// checks state array for a message that matches by correlationId, then asserts on equality
		hasExpectedMessage: (message) => hasExpectedMessage(state, message),
		// stops the consumer
		stopListener: () => stopListener(state),
	})
}
 
/* test */

describe('POST to /event', () => {
	before(() => cy.task('startListener', { topic: eventTopic }))

	after(() => cy.task('stopListener'))
	
	it('sends the event as expected', () => {
		const correlationId = uuid.v4()
		
		const event = { correlationId, /* ... */ }
		
		cy.request({
			method: 'POST',
			url: eventUrl,
			body: event,
		})
		
		cy.task('hasExpectedMessage', event, { timeout: 30000 }).should('eq', true)
	})
})

Conclusion

Overall, KafkaJS is a pretty amazing library that just works. It is not as batteries-included as some solutions like Kafka Streams, but the API it provides is robust and well documented. There are also a few other libraries that you may need to bring in to better take advantage of Kafka, but the integration between these libraries has been seamless in our experience.