11package clients ;
22
3- import clients .avro .PositionDistance ;
3+ import clients .avro .PositionString ;
44import clients .avro .PositionValue ;
55
66import io .confluent .kafka .streams .serdes .avro .SpecificAvroSerde ;
1010import java .util .Properties ;
1111import java .util .concurrent .CountDownLatch ;
1212
13- import net .sf .geographiclib .Geodesic ;
14-
1513import org .apache .kafka .common .serialization .Serde ;
1614import org .apache .kafka .common .serialization .Serdes ;
1715import org .apache .kafka .streams .KafkaStreams ;
2018import org .apache .kafka .streams .Topology ;
2119import org .apache .kafka .streams .kstream .Consumed ;
2220import org .apache .kafka .streams .kstream .KStream ;
23- import org .apache .kafka .streams .kstream .KTable ;
24- import org .apache .kafka .streams .kstream .Materialized ;
2521import org .apache .kafka .streams .kstream .Produced ;
2622
2723public class StreamsApp {
@@ -37,7 +33,7 @@ public static void main(String[] args) {
3733 settings .put (StreamsConfig .APPLICATION_ID_CONFIG , "streams-app-1" );
3834 settings .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , "kafka:9092" );
3935 settings .put (StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG ,
40- Serdes .String ().getClass ().getName ());
36+ Serdes .String ().getClass ().getName ());
4137 // Disabling caching ensures we get a complete "changelog" from the
4238 // aggregate(...) (i.e. every input event will have a corresponding output event.
4339 // see
@@ -68,54 +64,52 @@ public static void main(String[] args) {
6864 }
6965
7066 private static Topology getTopology () {
67+
7168 // When you want to override serdes explicitly/selectively
7269 final Map <String , String > serdeConfig = Collections .singletonMap ("schema.registry.url" ,
73- "http://schema-registry:8081" );
70+ "http://schema-registry:8081" );
7471 final Serde <PositionValue > positionValueSerde = new SpecificAvroSerde <>();
75- positionValueSerde .configure (serdeConfig , false );
76- final Serde <PositionDistance > positionDistanceSerde = new SpecificAvroSerde <>();
77- positionDistanceSerde .configure (serdeConfig , false );
72+ positionValueSerde .configure (serdeConfig , false );
73+ final Serde <PositionString > positionStringSerde = new SpecificAvroSerde <>();
74+ positionStringSerde .configure (serdeConfig , false );
7875
76+ // Create the StreamsBuilder object to create our Topology
7977 final StreamsBuilder builder = new StreamsBuilder ();
8078
81- // create a KStream from the driver-positions-avro topic
79+ // Create a KStream from the ` driver-positions-avro` topic
8280 // configure a serdes that can read the string key, and avro value
8381 final KStream <String , PositionValue > positions = builder .stream (
84- "driver-positions-avro" ,
85- Consumed .with (Serdes .String (),
86- positionValueSerde ));
87-
88-
89- // We do a groupByKey on the ‘positions’ stream which returns an
90- // intermediate KGroupedStream, we then aggregate to return a KTable.
91- final KTable <String , PositionDistance > reduced = positions .groupByKey ().aggregate (
92- () -> null ,
93- (aggKey , newValue , aggValue ) -> {
94- final Double newLatitude = newValue .getLatitude ();
95- final Double newLongitude = newValue .getLongitude ();
96-
97- // initial record - no distance to calculate
98- if (aggValue == null ) {
99- return new PositionDistance (newLatitude , newLongitude , 0.0 );
100- }
101-
102- // cacluate the distance between the new value and the aggregate value
103- final Double aggLatitude = aggValue .getLatitude ();
104- final Double aggLongitude = aggValue .getLongitude ();
105- Double aggDistance = aggValue .getDistance ();
106- final Double distance = Geodesic .WGS84 .Inverse (aggLatitude , aggLongitude ,
107- newLatitude , newLongitude ).s12 ;
108- aggDistance += distance ;
109-
110- // return the new value and distance as the new aggregate
111- return new PositionDistance (newLatitude , newLongitude , aggDistance );
112- }, Materialized .with (
113- Serdes .String (),
114- positionDistanceSerde ));
115-
116- reduced .toStream ().to (
117- "driver-distance-avro" ,
118- Produced .with (Serdes .String (), positionDistanceSerde ));
82+ "driver-positions-avro" ,
83+ Consumed .with (Serdes .String (),
84+ positionValueSerde ));
85+
86+ // TO-DO: Use filter() method to filter out the events from `driver-2`.
87+ // Define the predicate in the lambda expression of the filter().
88+ final KStream <String , PositionValue > positionsFiltered = positions .filter (
89+ (key ,value ) -> ???);
90+
91+ // TO-DO: Use mapValues() method to change the value of each
92+ // event from PositionValue to PositionString class.
93+ // You can check the two schemas under src/main/avro/.
94+ // Notice that position_string.avsc contains a new field
95+ // `positionString` as String type.
96+ final KStream <String , PositionString > positionsString = positionsFiltered .mapValues (
97+ value -> {
98+ final Double latitude = ???;
99+ final Double longitude = ???;
100+ final String positionString = "Latitude: " + String .valueOf (???) +
101+ ", Longitude: " + String .valueOf (???);
102+ return new PositionString (???, ???, ???);
103+ }
104+ );
105+
106+ // Write the results to topic `driver-positions-string-avro`
107+ // configure a serdes that can write the string key, and new avro value
108+ positionsString .to (
109+ "driver-positions-string-avro" ,
110+ Produced .with (Serdes .String (), positionStringSerde ));
111+
112+ // Build the Topology
119113 final Topology topology = builder .build ();
120114 return topology ;
121115 }
0 commit comments