Beta branch refactoring#41
Conversation
…slogRecord format.
…set interface to KafkaRecord. Renamed RecordOffset to KafkaRecordImpl. Renamed NullOffset to NullKafkaRecord. Implemented KafkaRecordConverter for converting kafka records to SyslogRecords. Refactored WriteableQueue to be immutable.
…Renamed DatabaseOutput.java to BatchDistribution.java. Removed unused record() and isNull() methods from KafkaRecord interface and replaced record() usage in tests.
…o allow appending via SyslogAvroWriter. Improved exception handling in PartitionFile. Refactoring ProcessingTest.java
…dded preliminary version of method for writing the file managed by PartitionFile to HDFS while skipping file size checks.
…erface and renamed original PartitionFile.java to PartitionFileImpl.java.
…interface and PartitionRecordsImpl class which generates SyslogReccords list from KafkaRecords.
… to use secondary constructor instead. Added Config tests.
…rther refactoring.
…d Config.java by replacing missed setter with secondary constructor. Beginning refactoring tests to mirror the refactoring changes.
…rface, renamed BatchDistribution class to BatchDistributionImpl. Implemented IngestionRebalanceListener for handling kafka consumer group rebalance.
…rebalance() method to PartitionFile interface.
…e it to Ingestion0FilesLowSizeTest.java file.
…itialization in PartitionFileImpl.java and refactored ProcessingFailureTest.java which was affected by the fix. Cleaned comments.
51-code
left a comment
There was a problem hiding this comment.
Here are some general notes:
Multiple objects have long constructors with code in them, maybe that could be refactored?
Noticed that the useMockKafkaConsumer is a bit odd. Multiple objects have an if statement for checking this, could the objects instead implement an interface and provide a MockKafka version of the object?
Check which objects could be made final (public final class), probably most of them.
…mpl.java. Refactored ConsumerRebalanceListenerImpl to support topic partition offset tracking. Fixed listener registration for the consumer in ReadCoordinator.java. Refactored MockKafkaConsumerFactory to use subscribe method instead of assign.
…e) to load(String configurationPath).
|
Pair session result: Minor naming change to improve naming conventions. |
| if (LOGGER.isDebugEnabled()) { | ||
| LOGGER | ||
| .debug( | ||
| "Fuura searching your batch for <[{}]> with records <{}> and took <{}> milliseconds. <{}> EPS. ", |
There was a problem hiding this comment.
perhaps we should drop the joke about Fuura and have a proper message now?
| this.topicPartition = topicPartition; | ||
| this.batchOffsets = new ArrayList<>(); | ||
| this.partitionRecords = new PartitionRecordsImpl(config); | ||
| try (SyslogAvroWriter syslogAvroWriter = new SyslogAvroWriter(syslogFile)) { |
There was a problem hiding this comment.
could you please elaborate this? try-with-resources to create a new syslogAvroWriter and then a close for it so it gets a double close. other thing is that no code in constructor
There was a problem hiding this comment.
After some further testing and rechecking the logic how the syslogFile is handled, I deemed that initializing the avro-file in the constructor is redundant as the commitRecords() method will initialize the file if it doesn't exist yet.
Solved in commit 0a31fdd
…va. Refactored tests to check if initialized avro-file exists or not as expected.
…nfiguration files to prevent unintentional triggering of the consumer timeout.
| import java.util.Properties; | ||
|
|
||
| // This class will only hold the common configuration parameters. | ||
| public final class ConfigurationImpl implements Configuration { |
|
|
||
| public interface ConfigurationValidation { | ||
|
|
||
| public abstract void validate(Properties properties); |
There was a problem hiding this comment.
validators are not necessary, use rather configuration in the methods like
public int timeout() {
String timeoutString = map.get("timeout");
if (timeout == null) {
throw new ConfigurationException("timeout not configured");
}
int timeout;
try {
timeout = Integer.valueOf()
}
catch (FormatException e) {
LOGGER.error("not a number");
throw e
}
if (timeout <= 0) {
...
}
return timeout
}
There was a problem hiding this comment.
Implemented refactored configurations to the main code on commit 37d2a70
| if (!partitionFileMap.containsKey(recordOffset.get("partition").getAsString())) { | ||
| try { | ||
| partitionFileMap | ||
| .put(recordOffset.get("partition").getAsString(), new PartitionFileImpl(config, recordOffset)); |
There was a problem hiding this comment.
use immutable partitionFileFactory or so, so the object created is no longer configurable but gets always immutable configuration and only the recordOffset provided to the partitionFileFactory.partitionFor(recordOffset). the partitionFileFactory must be either initialized in secondary constructor or be passed from the object creating this one directly to the constructor.
There was a problem hiding this comment.
apply similar pattern to the other at the moment configurable objects too, because objects must not be configurable.
…a and tests that use cnf_01.
… to egress.properties. Disabled ConfigurationTest.java to prepare for refactoring.
…med NewHdfsConfiguration.java to HdfsConfiguration.java. Renamed NewKafkaConfiguration.java to KafkaConfiguration.java. Renamed related tests.
…r and moving all logic to PartitionFileFactory.
…nfiguration, reducing encapsulation in PartitionFileImpl. Refactored all tests according to the change.
…guration. Refactored code according to the change.
…rtition() and offset() methods. Refactored code to match the change.
|
Infrastructure side changes are ongoing for Kafka, which will require changes to the project once they are complete. |
Contains:
Also contains after changes: