Kafka Transport, KafkaRecordFetcher
KafkaRecordFetcher<K, V> is the only class in ktestify-core that knows about Kafka internals. It implements RecordFetcher<V> and is the sole implementation of the transport contract today.
Fetch lifecycleโ
KafkaRecordFetcher.fetch()
โ
โโ โ subscribeAndAwaitAssignment()
โ Subscribe to topic, poll until partitions are assigned (max attempts before timeout).
โ
โโ โก calculateDeltaTime()
โ Priority (ms):
โ 1. context.consumerDeltaTime (from DataTable, already converted to ms)
โ 2. properties["consumerDeltaTime"] (seconds, converted here)
โ 3. ktestify.framework.timeouts.consumer-delta-time (HOCON duration)
โ
โโ โข seekToOffset(Tโ = now - deltaTime)
โ For each assigned partition:
โ offsetsForTimes(partition โ Tโ) โ seek to that offset
โ If no offset found after Tโ โ seekToEnd
โ
โโ โฃ pollUntilRecordFound()
SINGLE mode (isBatchConsumer = false):
Return as soon as โฅ1 record passes key filter + dedup check.
BATCH mode (isBatchConsumer = true):
Accumulate records until batchSize reached or timeout expires.
Dedup check: MATCHED_RECORDS.contains(record.toMatchedRecord())
Key filter: context.expectedRecordKey matches record.key (if set)
Deduplication registryโ
// Static, shared across all KafkaRecordFetcher instances in the JVM
private static final Set<MatchedRecord> MATCHED_RECORDS =
ConcurrentHashMap.newKeySet();
public static void clearMatchedRecords() {
MATCHED_RECORDS.clear();
}
The registry prevents two steps in the same scenario from claiming the same Kafka record. It is cleared in the @Before hook via KtestifyHooks at the start of every Cucumber scenario.
If you use KafkaRecordFetcher directly (outside Cucumber), you must call KafkaRecordFetcher.clearMatchedRecords() before each test, otherwise records from previous tests bleed through.
Creating a KafkaConsumer / KafkaProducerโ
KafkaClientFactory creates Kafka client instances from ConsumerContext or ProducerContext properties. It is the only place in ktestify-core that calls new KafkaConsumer<>() / new KafkaProducer<>().
KafkaConsumer<String, String> consumer =
KafkaClientFactory.createConsumer(context.getProperties());
KafkaProducer<String, String> producer =
KafkaClientFactory.createProducer(context.getProperties());
Properties are derived from KtestifyConfig and merged with any per-context overrides from the DataTable.
Avro deserialisationโ
AvroKafkaConsumer passes a KafkaAvroDeserializer (Confluent) as the value deserialiser. The Schema Registry URL and auth settings flow from SchemaRegistryConfig into the deserialiser properties automatically via KafkaClientFactory.
Error handlingโ
| Situation | Exception thrown |
|---|---|
| Timeout with 0 records | FetchException("Timed out after Nms waiting for a record on topic '...'") |
| Timeout with partial batch | FetchException("Timed out after Nms waiting for N record(s), only M collected.") |
WakeupException during poll | FetchException (logged, consumer closed cleanly) |
| Broker unreachable | FetchException wrapping the Kafka exception |
All FetchExceptions are caught by AbstractKafkaConsumer and re-thrown as ConsumerException.
See alsoโ
- Adding a transport โ, how to replace Kafka with a different broker
- Architecture โ, three-layer overview
- Timeout tuning โ, controlling timeouts from feature files