Skip to main content

Kafka Transport, KafkaRecordFetcher

KafkaRecordFetcher<K, V> is the only class in ktestify-core that knows about Kafka internals. It implements RecordFetcher<V> and is the sole implementation of the transport contract today.


Fetch lifecycleโ€‹

KafkaRecordFetcher.fetch()
โ”‚
โ”œโ”€ โ‘  subscribeAndAwaitAssignment()
โ”‚ Subscribe to topic, poll until partitions are assigned (max attempts before timeout).
โ”‚
โ”œโ”€ โ‘ก calculateDeltaTime()
โ”‚ Priority (ms):
โ”‚ 1. context.consumerDeltaTime (from DataTable, already converted to ms)
โ”‚ 2. properties["consumerDeltaTime"] (seconds, converted here)
โ”‚ 3. ktestify.framework.timeouts.consumer-delta-time (HOCON duration)
โ”‚
โ”œโ”€ โ‘ข seekToOffset(Tโ‚€ = now - deltaTime)
โ”‚ For each assigned partition:
โ”‚ offsetsForTimes(partition โ†’ Tโ‚€) โ†’ seek to that offset
โ”‚ If no offset found after Tโ‚€ โ†’ seekToEnd
โ”‚
โ””โ”€ โ‘ฃ pollUntilRecordFound()
SINGLE mode (isBatchConsumer = false):
Return as soon as โ‰ฅ1 record passes key filter + dedup check.
BATCH mode (isBatchConsumer = true):
Accumulate records until batchSize reached or timeout expires.
Dedup check: MATCHED_RECORDS.contains(record.toMatchedRecord())
Key filter: context.expectedRecordKey matches record.key (if set)

Deduplication registryโ€‹

// Static, shared across all KafkaRecordFetcher instances in the JVM
private static final Set<MatchedRecord> MATCHED_RECORDS =
ConcurrentHashMap.newKeySet();

public static void clearMatchedRecords() {
MATCHED_RECORDS.clear();
}

The registry prevents two steps in the same scenario from claiming the same Kafka record. It is cleared in the @Before hook via KtestifyHooks at the start of every Cucumber scenario.

warning

If you use KafkaRecordFetcher directly (outside Cucumber), you must call KafkaRecordFetcher.clearMatchedRecords() before each test, otherwise records from previous tests bleed through.


Creating a KafkaConsumer / KafkaProducerโ€‹

KafkaClientFactory creates Kafka client instances from ConsumerContext or ProducerContext properties. It is the only place in ktestify-core that calls new KafkaConsumer<>() / new KafkaProducer<>().

KafkaConsumer<String, String> consumer =
KafkaClientFactory.createConsumer(context.getProperties());

KafkaProducer<String, String> producer =
KafkaClientFactory.createProducer(context.getProperties());

Properties are derived from KtestifyConfig and merged with any per-context overrides from the DataTable.


Avro deserialisationโ€‹

AvroKafkaConsumer passes a KafkaAvroDeserializer (Confluent) as the value deserialiser. The Schema Registry URL and auth settings flow from SchemaRegistryConfig into the deserialiser properties automatically via KafkaClientFactory.


Error handlingโ€‹

SituationException thrown
Timeout with 0 recordsFetchException("Timed out after Nms waiting for a record on topic '...'")
Timeout with partial batchFetchException("Timed out after Nms waiting for N record(s), only M collected.")
WakeupException during pollFetchException (logged, consumer closed cleanly)
Broker unreachableFetchException wrapping the Kafka exception

All FetchExceptions are caught by AbstractKafkaConsumer and re-thrown as ConsumerException.


See alsoโ€‹