restart = new SynchronousQueue<>();
- private volatile ServerModel model;
+ private volatile ServerModel model;
private volatile CommandShell shell;
private String current_path = "";
private static final String COMMANDS =
- "Commands:\n\n" +
- "Note: '.' and '..' will be interpreted as the current directory and the parent directory respectively.\n" +
- "Spaces within a path do not need to be quoted.\n\n" +
- "\tls - List all alarm tree items in the current directory.\n" +
- "\tls -disconnected - List all the disconnected PVs in the entire alarm tree.\n" +
- "\tls -disabled - List all the disabled PVs in the entire alarm tree.\n" +
- "\tls -all - List all alarm tree PVs in the entire alarm tree.\n" +
- "\tls -active - .. which are in active alarm.\n" +
- "\tls -alarm - .. alarm, active or acknowledged.\n" +
- "\tls dir - List all alarm tree items in the specified directory contained in the current directory.\n" +
- "\tls /path/to/dir - List all alarm tree items in the specified directory at the specified path.\n" +
- "\tcd - Change to the root directory.\n" +
- "\tcd dir - Change to the specified directory contained in the current directory.\n" +
- "\tcd /path/to/dir - Change to the specified directory at the specified path.\n" +
- "\tpv pv - Print the specified PV in the current directory.\n" +
- "\tpv /path/to/pv - Print the specified PV at the specified path.\n" +
- "\tmode - Show mode.\n" +
- "\tmode normal - Select normal mode.\n" +
- "\tmode maintenance - Select maintenance mode.\n" +
- "\tresend - Re-send all PV states to clients (for tests after network issues).\n" +
- "\trestart - Re-load alarm configuration and restart.\n" +
- "\tshutdown - Shut alarm server down and exit.\n";
-
- /**
- * Ensure that the required Kafka topics exist and are correctly configured.
- *
- * Creates and configures the main alarm topic (compacted) and command/talk topics (deleted).
- * For more details on alarm topic configuration, see:
- * Refer to Configure Alarm Topics
- *
- * @param server Kafka server
- * @param topic Base topic name
- * @param kafka_props_file Extra Kafka properties file
- * @throws Exception
- */
- private static void ensureKafkaTopics(String server, String topic, String kafka_props_file) throws Exception {
- var kafka_props = KafkaHelper.loadPropsFromFile(kafka_props_file);
- kafka_props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server);
- try (AdminClient admin = AdminClient.create(kafka_props)) {
- Set topics = admin.listTopics().names().get(60, TimeUnit.SECONDS);
- // Compacted topic
- String compactedTopic = topic;
- if (!topics.contains(compactedTopic)) {
- createTopic(admin, compactedTopic);
- }
- setCompactedConfig(admin, compactedTopic);
-
- // Deleted topics
- for (String suffix : List.of("Command", "Talk")) {
- String deletedTopic = topic + suffix;
- if (!topics.contains(deletedTopic)) {
- createTopic(admin, deletedTopic);
- }
- setDeletedConfig(admin, deletedTopic);
- }
- }
- }
-
- /**
- * Create topics
- *
- * @param admin Admin client
- * @param topic Topic name
- * @throws Exception
- */
- private static void createTopic(AdminClient admin, String topic) throws Exception {
- NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
- try {
- admin.createTopics(List.of(newTopic)).all().get();
- logger.info("Created topic: " + topic);
- } catch (Exception e) {
- if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
- logger.info("Topic already exists: " + topic);
- } else {
- throw e;
- }
- }
- }
-
- /**
- * Configure topic for alarm state storage with compaction to retain latest state.
- * For configuration information, see:
- *
- * Refer to Configure Alarm Topics
- *
- * @param admin Admin client
- * @param topic Topic name
- * @throws Exception
- */
- private static void setCompactedConfig(AdminClient admin, String topic) throws Exception {
- ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
- List configOps = List.of(
- new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET)
- );
- admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
- logger.info("Set compacted config for topic: " + topic);
- }
-
- /**
- * Configure topic for command/talk messages with time-based deletion.
- * For configuration information, see:
- *
- * Refer to Configure Alarm Topics
- *
- * @param admin Admin client
- * @param topic Topic name
- * @throws Exception
- */
- private static void setDeletedConfig(AdminClient admin, String topic) throws Exception {
- ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
- List configOps = List.of(
- new AlterConfigOp(new ConfigEntry("cleanup.policy", "delete"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("retention.ms", "20000"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("delete.retention.ms", "1000"), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("file.delete.delay.ms", "1000"), AlterConfigOp.OpType.SET)
- );
- admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
- logger.info("Set deleted config for topic: " + topic);
- }
-
- private AlarmServerMain(final String server, final String config, final boolean use_shell, final String kafka_props_file)
- {
+ "Commands:\n\n" +
+ "Note: '.' and '..' will be interpreted as the current directory and the parent directory respectively.\n" +
+ "Spaces within a path do not need to be quoted.\n\n" +
+ "\tls - List all alarm tree items in the current directory.\n" +
+ "\tls -disconnected - List all the disconnected PVs in the entire alarm tree.\n" +
+ "\tls -disabled - List all the disabled PVs in the entire alarm tree.\n" +
+ "\tls -all - List all alarm tree PVs in the entire alarm tree.\n" +
+ "\tls -active - .. which are in active alarm.\n" +
+ "\tls -alarm - .. alarm, active or acknowledged.\n" +
+ "\tls dir - List all alarm tree items in the specified directory contained in the current directory.\n" +
+ "\tls /path/to/dir - List all alarm tree items in the specified directory at the specified path.\n" +
+ "\tcd - Change to the root directory.\n" +
+ "\tcd dir - Change to the specified directory contained in the current directory.\n" +
+ "\tcd /path/to/dir - Change to the specified directory at the specified path.\n" +
+ "\tpv pv - Print the specified PV in the current directory.\n" +
+ "\tpv /path/to/pv - Print the specified PV at the specified path.\n" +
+ "\tmode - Show mode.\n" +
+ "\tmode normal - Select normal mode.\n" +
+ "\tmode maintenance - Select maintenance mode.\n" +
+ "\tresend - Re-send all PV states to clients (for tests after network issues).\n" +
+ "\trestart - Re-load alarm configuration and restart.\n" +
+ "\tshutdown - Shut alarm server down and exit.\n";
+
+ private AlarmServerMain(final String server, final String config, final boolean use_shell, final String kafka_props_file) {
logger.info("Server: " + server);
logger.info("Config: " + config);
logger.info("Extra Kafka Properties: " + kafka_props_file);
- try
- {
+ try {
// 'main' loop that keeps performing a full startup and shutdown
// whenever a 'restart' is requested.
boolean run = true;
- while (run)
- {
+ while (run) {
logger.info("Verify topics exists and are correctly configured...");
// Create/verify topics before using Kafka
- ensureKafkaTopics(server, config, kafka_props_file);
+ TopicUtils.ensureKafkaTopics(server, config, kafka_props_file);
logger.info("Fetching past alarm states...");
final AlarmStateInitializer init = new AlarmStateInitializer(server, config, kafka_props_file);
@@ -212,8 +97,7 @@ private AlarmServerMain(final String server, final String config, final boolean
model = new ServerModel(server, config, initial_states, this, kafka_props_file);
model.start();
- if (use_shell)
- {
+ if (use_shell) {
shell = new CommandShell(COMMANDS, this::handleShellCommands);
// Start the command shell at the root node.
@@ -235,9 +119,7 @@ private AlarmServerMain(final String server, final String config, final boolean
model.shutdown();
}
- }
- catch (final Throwable ex)
- {
+ } catch (final Throwable ex) {
logger.log(Level.SEVERE, "Alarm Server main loop error", ex);
}
@@ -248,16 +130,15 @@ private AlarmServerMain(final String server, final String config, final boolean
/**
* Handle shell commands. Passed to command shell.
+ *
* @param args - variadic String
* @return result - boolean result of executing the command.
* @throws Throwable
*/
- private boolean handleShellCommands(final String... args) throws Throwable
- {
+ private boolean handleShellCommands(final String... args) throws Throwable {
if (args == null)
restart.offer(false);
- else if (args.length == 1)
- {
+ else if (args.length == 1) {
if (args[0].startsWith("shut"))
restart.offer(false);
else if (args[0].equals("restart"))
@@ -273,18 +154,14 @@ else if (args[0].equals("cd")) // cd with no argument goes to root directory.
{
current_path = model.getRoot().getPathName();
shell.setPrompt(current_path);
- }
- else if (args[0].equals("ls")) // List alarm tree items in current directory. _Not_ recursive descent.
+ } else if (args[0].equals("ls")) // List alarm tree items in current directory. _Not_ recursive descent.
{
List> children = model.findNode(current_path).getChildren();
for (final AlarmTreeItem> child : children)
System.out.println(child.getName() + " - " + child.getState());
- }
- else
+ } else
return false;
- }
- else if (args.length >= 2)
- {
+ } else if (args.length >= 2) {
// Concatenate all the tokens whose index is > 0 into a single string.
// This allows for spaces in PV and Node names. They would have been split on whitespace by the CommandShell.
String args1 = "";
@@ -292,30 +169,26 @@ else if (args.length >= 2)
args1 += " " + args[i];
args1 = args1.trim();
- try
- {
+ try {
if (args[0].equals("cd")) // Change directory to specified location.
{
final String new_path = determinePath(args1);
final AlarmTreeItem> new_loc = model.findNode(new_path);
- if (null == new_loc)
- {
+ if (null == new_loc) {
System.out.println("Node not found: " + new_path);
return false;
}
// Can't change location to leaves.
- if (new_loc instanceof AlarmTreeLeaf)
- {
+ if (new_loc instanceof AlarmTreeLeaf) {
System.out.println("Node not a directory: " + new_loc.getPathName());
return false;
}
current_path = new_loc.getPathName();
shell.setPrompt(current_path);
- }
- else if (args[0].equals("ls")) // List the alarm tree items at the specified location.
+ } else if (args[0].equals("ls")) // List the alarm tree items at the specified location.
{
if (args1.startsWith("-disc")) // Print all disconnected PVs in tree.
listPVs(model.getRoot(), PVMode.Disconnected);
@@ -332,8 +205,7 @@ else if (args1.startsWith("-ala")) // Print all the PVs in the tree that are in
final String path = determinePath(args1);
final AlarmTreeItem> node = model.findNode(path);
- if (null == node)
- {
+ if (null == node) {
System.out.println("Node not found: " + path);
return false;
}
@@ -342,32 +214,26 @@ else if (args1.startsWith("-ala")) // Print all the PVs in the tree that are in
for (final AlarmTreeItem> child : children)
System.out.println(child.getName() + " - " + child.getState());
}
- }
- else if (args[0].equals("pv")) // Print the specified PV.
+ } else if (args[0].equals("pv")) // Print the specified PV.
{
final String pvPath = determinePath(args1);
final AlarmTreeItem> node = model.findNode(pvPath);
- if (node instanceof AlarmServerNode)
- {
+ if (node instanceof AlarmServerNode) {
System.out.println("Specified alarm tree item is not a PV: " + pvPath);
return false;
}
final AlarmServerPV pv = (AlarmServerPV) node;
System.out.println(pv);
- }
- else if (args[0].equals("mode"))
- {
+ } else if (args[0].equals("mode")) {
setMaintenanceMode(args1.startsWith("maint"));
System.out.println(AlarmLogic.getMaintenanceMode() ? "Maintenance mode" : "Normal mode");
}
} // Catch the exceptions caused by findNode searching a path that doesn't start with the root directory.
- catch (Exception ex)
- {
+ catch (Exception ex) {
System.out.println(ex.getMessage());
return false;
}
- }
- else
+ } else
return false;
return true;
@@ -383,32 +249,28 @@ else if (args[0].equals("mode"))
* /dir -> current_path/dir
* dir -> current_path/dir
*
+ *
* @param arg - String to be examined.
* @return new_path
* @throws Exception
*/
- private String determinePath(final String arg) throws Exception
- {
+ private String determinePath(final String arg) throws Exception {
String new_path = current_path;
if (arg.equals(".")) // Current directory.
{
return new_path;
- }
- else if (arg.equals("..")) // Parent directory.
+ } else if (arg.equals("..")) // Parent directory.
{
AlarmTreeItem> parent = model.findNode(current_path).getParent();
if (null != parent)
new_path = parent.getPathName();
- }
- else if (arg.startsWith(model.getRoot().getPathName())) // If starts from root, treat it as a whole path.
+ } else if (arg.startsWith(model.getRoot().getPathName())) // If starts from root, treat it as a whole path.
{
new_path = arg;
- }
- else if (arg.startsWith("/")) // Allow for "command /dir".
+ } else if (arg.startsWith("/")) // Allow for "command /dir".
{
new_path = current_path + arg;
- }
- else // Allow for "command dir".
+ } else // Allow for "command dir".
{
new_path = current_path + "/" + arg;
}
@@ -419,62 +281,58 @@ else if (arg.startsWith("/")) // Allow for "command /dir".
return new_path;
}
- /** Handle commands
+ /**
+ * Handle commands
*
- *
- * - acknowledge /some/path -
- * Acknowledge alarms in subtree
- *
- unacknowledge /some/path -
- * Un-Acknowledge alarms in subtree
- *
- mode [normal|maintenance] -
- * Select normal or maintenance mode
- *
- dump -
- * Dumps complete alarm tree
- *
- dump /some/path -
- * Dumps subtree
- *
- pvs -
- * Prints all PVs
- *
- pvs /some/path -
- * Prints PVs in subtree
- *
- pv name_of_PV -
- * Prints that PV
- *
- disconnected -
- * Prints all disconnected PVs
- *
- restart -
- * Re-load configuration
- *
- shutdown -
- * Quit
- *
+ *
+ * - acknowledge /some/path -
+ * Acknowledge alarms in subtree
+ *
- unacknowledge /some/path -
+ * Un-Acknowledge alarms in subtree
+ *
- mode [normal|maintenance] -
+ * Select normal or maintenance mode
+ *
- dump -
+ * Dumps complete alarm tree
+ *
- dump /some/path -
+ * Dumps subtree
+ *
- pvs -
+ * Prints all PVs
+ *
- pvs /some/path -
+ * Prints PVs in subtree
+ *
- pv name_of_PV -
+ * Prints that PV
+ *
- disconnected -
+ * Prints all disconnected PVs
+ *
- restart -
+ * Re-load configuration
+ *
- shutdown -
+ * Quit
+ *
+ *
*
- * @param path Alarm tree path
- * @param json Command
+ * @param path Alarm tree path
+ * @param json Command
*/
@Override
- public void handleCommand(final String path, final String json)
- {
- try
- {
+ public void handleCommand(final String path, final String json) {
+ try {
final JsonNode jsonNode = (JsonNode) JsonModelReader.parseJsonText(json);
final JsonNode commandNode = jsonNode.get(JsonTags.COMMAND);
if (null == commandNode)
throw new Exception("Command parsing failed.");
final String command = commandNode.asText();
- if (command.startsWith("ack"))
- {
+ if (command.startsWith("ack")) {
final AlarmTreeItem> node = model.findNode(path);
if (node == null)
throw new Exception("Unknown alarm tree node '" + path + "'");
acknowledge(node, true);
- }
- else if (command.startsWith("unack"))
- {
+ } else if (command.startsWith("unack")) {
final AlarmTreeItem> node = model.findNode(path);
if (node == null)
throw new Exception("Unknown alarm tree node '" + path + "'");
acknowledge(node, false);
- }
- else if (JsonTags.MAINTENANCE.equals(command))
+ } else if (JsonTags.MAINTENANCE.equals(command))
setMaintenanceMode(true);
else if (JsonTags.NORMAL.equals(command))
setMaintenanceMode(false);
@@ -482,60 +340,45 @@ else if (JsonTags.DISABLE_NOTIFY.equals(command))
setDisableNotify(true);
else if (JsonTags.ENABLE_NOTIFY.equals(command))
setDisableNotify(false);
- else if (command.equalsIgnoreCase("dump"))
- {
+ else if (command.equalsIgnoreCase("dump")) {
final AlarmTreeItem> node;
node = model.findNode(path);
if (node == null)
throw new Exception("Unknown alarm tree node '" + path + "'");
System.out.println(node.getPathName() + ":");
ModelPrinter.print(node);
- }
- else if (command.equalsIgnoreCase("pvs"))
- {
+ } else if (command.equalsIgnoreCase("pvs")) {
final AlarmTreeItem> node;
node = model.findNode(path);
if (node == null)
throw new Exception("Unknown alarm tree node '" + path + "'");
System.out.println("PVs for " + node.getPathName() + ":");
listPVs(node, PVMode.All);
- }
- else if (command.equalsIgnoreCase("disconnected"))
- {
+ } else if (command.equalsIgnoreCase("disconnected")) {
final AlarmTreeItem> node;
node = model.findNode(path);
if (node == null)
throw new Exception("Unknown alarm tree node '" + path + "'");
System.out.println("PVs for " + node.getPathName() + ":");
listPVs(node, PVMode.Disconnected);
- }
- else if (command.equalsIgnoreCase("pv"))
- {
+ } else if (command.equalsIgnoreCase("pv")) {
final AlarmServerPV pv = model.findPV(path);
if (pv == null)
throw new Exception("Unknown PV '" + path + "'");
listPVs(pv, PVMode.All);
- }
- else if (command.equals("shutdown"))
- {
+ } else if (command.equals("shutdown")) {
restart.offer(false);
- }
- else if (command.equalsIgnoreCase("restart"))
- {
+ } else if (command.equalsIgnoreCase("restart")) {
logger.log(Level.INFO, "Restart requested");
restart.offer(true);
- }
- else
+ } else
throw new Exception("Unknown command.");
- }
- catch (Exception ex)
- {
+ } catch (Exception ex) {
logger.log(Level.WARNING, "Error for command. path: '" + path + "', JSON: '" + json + "'", ex);
}
}
- private void setDisableNotify(final boolean disable_notify)
- {
+ private void setDisableNotify(final boolean disable_notify) {
// Any change?
if (disable_notify == AlarmLogic.getDisableNotify())
return;
@@ -546,8 +389,7 @@ private void setDisableNotify(final boolean disable_notify)
model.sendStateUpdate(model.getRoot().getPathName(), model.getRoot().getState());
}
- private void setMaintenanceMode(final boolean maintenance_mode)
- {
+ private void setMaintenanceMode(final boolean maintenance_mode) {
// Any change?
if (maintenance_mode == AlarmLogic.getMaintenanceMode())
return;
@@ -561,82 +403,68 @@ private void setMaintenanceMode(final boolean maintenance_mode)
model.sendStateUpdate(model.getRoot().getPathName(), model.getRoot().getState());
}
- /** @param node Node where to start ack'ing all INVALID or UNDEFINED alarms */
- private void acknowledgeInvalidUndefined(final AlarmTreeItem> node)
- {
- if (node instanceof AlarmServerPV)
- {
- final AlarmServerPV pv_node = (AlarmServerPV) node;
+ /**
+ * @param node Node where to start ack'ing all INVALID or UNDEFINED alarms
+ */
+ private void acknowledgeInvalidUndefined(final AlarmTreeItem> node) {
+ if (node instanceof AlarmServerPV pv_node) {
if (pv_node.getState().severity.ordinal() >= SeverityLevel.INVALID.ordinal())
pv_node.acknowledge(true);
- }
- else
+ } else
for (final AlarmTreeItem> child : node.getChildren())
acknowledgeInvalidUndefined(child);
}
- private void acknowledge(final AlarmTreeItem> node, final boolean acknowledge)
- {
- if (node instanceof AlarmServerPV)
- {
- final AlarmServerPV pv_node = (AlarmServerPV) node;
+ private void acknowledge(final AlarmTreeItem> node, final boolean acknowledge) {
+ if (node instanceof AlarmServerPV pv_node) {
pv_node.acknowledge(acknowledge);
- }
- else
+ } else
for (final AlarmTreeItem> child : node.getChildren())
acknowledge(child, acknowledge);
}
- enum PVMode
- {
+ enum PVMode {
All,
InActiveAlarm,
InAlarm,
Disconnected,
Disabled
- };
+ }
- private void listPVs(final AlarmTreeItem> node, final PVMode which)
- {
+ private void listPVs(final AlarmTreeItem> node, final PVMode which) {
listPVs(new AtomicInteger(), node, which);
}
- private void listPVs(final AtomicInteger count, final AlarmTreeItem> node, final PVMode which)
- {
- if (node instanceof AlarmServerPV)
- {
- final AlarmServerPV pv_node = (AlarmServerPV) node;
- switch (which)
- {
- case Disabled:
- if (pv_node.isEnabled())
- return;
- break;
- case Disconnected:
- if (!pv_node.isEnabled() || pv_node.isConnected())
- return;
- break;
- case InActiveAlarm:
- if (!pv_node.isEnabled() || !pv_node.getState().severity.isActive())
- return;
- break;
- case InAlarm:
- if (!pv_node.isEnabled() || pv_node.getState().severity == SeverityLevel.OK)
- return;
- break;
- default:
- break;
+ private void listPVs(final AtomicInteger count, final AlarmTreeItem> node, final PVMode which) {
+ if (node instanceof AlarmServerPV pv_node) {
+ switch (which) {
+ case Disabled:
+ if (pv_node.isEnabled())
+ return;
+ break;
+ case Disconnected:
+ if (!pv_node.isEnabled() || pv_node.isConnected())
+ return;
+ break;
+ case InActiveAlarm:
+ if (!pv_node.isEnabled() || !pv_node.getState().severity.isActive())
+ return;
+ break;
+ case InAlarm:
+ if (!pv_node.isEnabled() || pv_node.getState().severity == SeverityLevel.OK)
+ return;
+ break;
+ default:
+ break;
}
System.out.format("%3d : ", count.incrementAndGet());
System.out.println(pv_node);
- }
- else
+ } else
for (final AlarmTreeItem> child : node.getChildren())
listPVs(count, child, which);
}
- private static void help()
- {
+ private static void help() {
// http://patorjk.com/software/taag/#p=display&f=Epic&t=Alarm%20Server
System.out.println(" _______ _ _______ _______ _______ _______ _______ _______ _______ _______");
System.out.println("( ___ )( \\ ( ___ )( ____ )( ) ( ____ \\( ____ \\( ____ )|\\ /|( ____ \\( ____ )");
@@ -652,8 +480,6 @@ private static void help()
System.out.println("-help - This text");
System.out.println("-server localhost:9092 - Kafka server with port number");
System.out.println("-config Accelerator - Alarm configuration");
- // Don't mention this option, prefer examples/create_topics.sh
- // System.out.println("-create_topics - Create Kafka topics for alarm configuration?");
System.out.println("-settings settings.{xml,ini} - Import preferences (PV connectivity) from property format file");
System.out.println("-noshell - Disable the command shell for running without a terminal");
System.out.println("-export config.xml - Export alarm configuration to file");
@@ -666,8 +492,7 @@ private static void help()
}
- public static void main(final String[] original_args) throws Exception
- {
+ public static void main(final String[] original_args) throws Exception {
LogManager.getLogManager().readConfiguration(AlarmServerMain.class.getResourceAsStream("/alarm_server_logging.properties"));
String server = "localhost:9092";
@@ -679,91 +504,81 @@ public static void main(final String[] original_args) throws Exception
final List args = new ArrayList<>(List.of(original_args));
final Iterator iter = args.iterator();
HashMap parsed_args = new HashMap();
- try
- {
+ try {
// define command line arguments
- String help_arg = "-help";
- String help_alt_arg = "-h";
- String server_arg = "-server";
- String config_arg = "-config";
- String create_topics_arg = "-create_topics";
- String settings_arg = "-settings";
- String noshell_arg = "-noshell";
- String export_arg = "-export";
- String import_arg = "-import";
- String logging_arg = "-logging";
- String connect_secs_arg = "-connect_secs";
- String stable_secs_arg = "-stable_secs";
- String kafka_props_arg = "-kafka_properties";
+ String help_arg = "-help";
+ String help_alt_arg = "-h";
+ String server_arg = "-server";
+ String config_arg = "-config";
+ String settings_arg = "-settings";
+ String noshell_arg = "-noshell";
+ String export_arg = "-export";
+ String import_arg = "-import";
+ String logging_arg = "-logging";
+ String connect_secs_arg = "-connect_secs";
+ String stable_secs_arg = "-stable_secs";
+ String kafka_props_arg = "-kafka_properties";
Set options = Set.of(
- server_arg,
- config_arg,
- settings_arg,
- export_arg,
- import_arg,
- logging_arg,
- connect_secs_arg,
- stable_secs_arg,
- kafka_props_arg);
+ server_arg,
+ config_arg,
+ settings_arg,
+ export_arg,
+ import_arg,
+ logging_arg,
+ connect_secs_arg,
+ stable_secs_arg,
+ kafka_props_arg);
Set flags = Set.of(
- help_arg,
- help_alt_arg,
- noshell_arg,
- create_topics_arg
+ help_arg,
+ help_alt_arg,
+ noshell_arg
);
// to handle arguments that may be provided via a settings file
// as well as directly on the commandline, map their relationship
Map args_to_prefs = Map.ofEntries(
- Map.entry(config_arg, "config_names"),
- Map.entry(server_arg, "server"),
- Map.entry(kafka_props_arg, "kafka_properties")
+ Map.entry(config_arg, "config_names"),
+ Map.entry(server_arg, "server"),
+ Map.entry(kafka_props_arg, "kafka_properties")
);
- while (iter.hasNext())
- {
+ while (iter.hasNext()) {
final String cmd = iter.next();
- if (options.contains(cmd))
- {
- if (! iter.hasNext())
- throw new Exception("Missing argument for " + cmd);
+ if (options.contains(cmd)) {
+ if (!iter.hasNext())
+ throw new Exception("Missing argument for " + cmd);
final String arg = iter.next();
parsed_args.put(cmd, arg);
- }
- else if (flags.contains(cmd))
+ } else if (flags.contains(cmd))
parsed_args.put(cmd, "");
else
throw new Exception("Unknown option " + cmd);
}
- if (parsed_args.containsKey(help_arg) || parsed_args.containsKey(help_alt_arg))
- {
+ if (parsed_args.containsKey(help_arg) || parsed_args.containsKey(help_alt_arg)) {
help();
return;
}
- if (parsed_args.containsKey(logging_arg))
+ if (parsed_args.containsKey(logging_arg)) {
LogManager.getLogManager().readConfiguration(new FileInputStream(parsed_args.get(logging_arg)));
- if (parsed_args.containsKey(settings_arg))
- {
+ }
+ if (parsed_args.containsKey(settings_arg)) {
final String filename = parsed_args.get(settings_arg);
logger.info("Loading settings from " + filename);
PropertyPreferenceLoader.load(new FileInputStream(filename));
- final Preferences userPrefs = Preferences.userRoot().node("org/phoebus/applications/alarm");
+ final Preferences userPrefs = Preferences.userRoot().node("org/phoebus/applications/alarm");
- for (Map.Entry entry: args_to_prefs.entrySet())
- {
+ for (Map.Entry entry : args_to_prefs.entrySet()) {
final String prefKey = entry.getValue();
final String arg = entry.getKey();
-
- if (parsed_args.containsKey(arg))
- {
- logger.log(Level.WARNING,"Potentially conflicting setting: -settings/"+prefKey+": " + userPrefs.get(prefKey, "") + " and " + arg + ":" + parsed_args.get(arg));
- logger.log(Level.WARNING,"Using argument " + arg + " instead of -settings");
- logger.log(Level.WARNING,prefKey + ": " + parsed_args.get(arg));
- }
- else if (Set.of(userPrefs.keys()).contains(prefKey))
+
+ if (parsed_args.containsKey(arg)) {
+ logger.log(Level.WARNING, "Potentially conflicting setting: -settings/" + prefKey + ": " + userPrefs.get(prefKey, "") + " and " + arg + ":" + parsed_args.get(arg));
+ logger.log(Level.WARNING, "Using argument " + arg + " instead of -settings");
+ logger.log(Level.WARNING, prefKey + ": " + parsed_args.get(arg));
+ } else if (Set.of(userPrefs.keys()).contains(prefKey))
parsed_args.put(arg, userPrefs.get(prefKey, ""));
}
}
@@ -772,40 +587,29 @@ else if (Set.of(userPrefs.keys()).contains(prefKey))
server = parsed_args.getOrDefault(server_arg, server);
kafka_properties = parsed_args.getOrDefault(kafka_props_arg, kafka_properties);
use_shell = !parsed_args.containsKey(noshell_arg);
-
- if (parsed_args.containsKey(connect_secs_arg))
- AlarmStateInitializer.CONNECTION_SECS = AlarmConfigTool.CONNECTION_SECS
- = Long.parseLong(parsed_args.get(connect_secs_arg));
- if (parsed_args.containsKey(stable_secs_arg))
+ if (parsed_args.containsKey(connect_secs_arg)) {
+ AlarmStateInitializer.CONNECTION_SECS = AlarmConfigTool.CONNECTION_SECS
+ = Long.parseLong(parsed_args.get(connect_secs_arg));
+ }
+ if (parsed_args.containsKey(stable_secs_arg)) {
AlarmStateInitializer.STABILIZATION_SECS = AlarmConfigTool.STABILIZATION_SECS
- = Long.parseLong(parsed_args.get(stable_secs_arg));
-
- if (parsed_args.containsKey(create_topics_arg))
- {
- logger.info("Discovering and creating any missing topics at " + server);
- CreateTopics.discoverAndCreateTopics(server, true, List.of(config,
- config + AlarmSystemConstants.COMMAND_TOPIC_SUFFIX,
- config + AlarmSystemConstants.TALK_TOPIC_SUFFIX),
- kafka_properties);
+ = Long.parseLong(parsed_args.get(stable_secs_arg));
}
- if (parsed_args.containsKey(export_arg))
- {
+ if (parsed_args.containsKey(export_arg)) {
final String filename = parsed_args.get(export_arg);
logger.info("Exporting model to " + filename);
new AlarmConfigTool().exportModel(filename, server, config, kafka_properties);
}
- if (parsed_args.containsKey(import_arg))
- {
+ if (parsed_args.containsKey(import_arg)) {
final String filename = parsed_args.get(import_arg);
logger.info("Import model from " + filename);
new AlarmConfigTool().importModel(filename, server, config, kafka_properties);
}
- if (parsed_args.containsKey(export_arg) || parsed_args.containsKey(import_arg))
+ if (parsed_args.containsKey(export_arg) || parsed_args.containsKey(import_arg)) {
return;
- }
- catch (final Exception ex)
- {
+ }
+ } catch (final Exception ex) {
help();
System.out.println();
ex.printStackTrace();
diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/TopicUtils.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/TopicUtils.java
new file mode 100644
index 0000000000..bc21c97b20
--- /dev/null
+++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/TopicUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright (C) 2025 European Spallation Source ERIC.
+ */
+
+package org.phoebus.applications.alarm.server;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.ConfigResource;
+import org.phoebus.applications.alarm.client.KafkaHelper;
+import org.phoebus.framework.preferences.AnnotatedPreferences;
+import org.phoebus.framework.preferences.Preference;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * Provides a utility to create the Kafka topics if needed, and then
+ * configure them based on preferences.
+ */
+public class TopicUtils {
+
+ private static final Logger logger = Logger.getLogger(TopicUtils.class.getName());
+
+ @SuppressWarnings("unused")
+ @Preference private static int numberOfPartitions;
+ @SuppressWarnings("unused")
+ @Preference private static int replicationFactor;
+
+ static {
+ AnnotatedPreferences.initialize(TopicUtils.class, "/alarm_server.properties");
+ }
+
+ /**
+ * Ensure that the required Kafka topics exist and are correctly configured.
+ *
+ * Creates and configures the main alarm topic (compacted) and command/talk topics (deleted).
+ * For more details on alarm topic configuration, see:
+ * Refer to Configure Alarm Topics
+ *
+ * @param server Kafka server
+ * @param topic Base topic name
+ * @param kafkaPropsFile Extra Kafka properties file
+ * @throws Exception If for instance an admin client could not be created or
+ * if the request to Kafka times out.
+ */
+ public static void ensureKafkaTopics(String server, String topic, String kafkaPropsFile) throws Exception {
+ var kafkaProps = KafkaHelper.loadPropsFromFile(kafkaPropsFile);
+ kafkaProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+ try (AdminClient admin = AdminClient.create(kafkaProps)) {
+ Set topics = admin.listTopics().names().get(60, TimeUnit.SECONDS);
+ // Compacted topic
+ if (!topics.contains(topic)) {
+ createTopic(admin, topic);
+ }
+ setCompactedConfig(admin, topic);
+
+ // Deleted topics
+ for (String suffix : List.of("Command", "Talk")) {
+ String deletedTopic = topic + suffix;
+ if (!topics.contains(deletedTopic)) {
+ createTopic(admin, deletedTopic);
+ }
+ setDeletedConfig(admin, deletedTopic);
+ }
+ }
+ }
+
+ /**
+ * Create topics
+ *
+ * @param admin Admin client
+ * @param topic Topic name
+ * @throws Exception If topic could not be created
+ */
+ private static void createTopic(AdminClient admin, String topic) throws Exception {
+ NewTopic newTopic = new NewTopic(topic, numberOfPartitions, (short) replicationFactor);
+ try {
+ admin.createTopics(List.of(newTopic)).all().get();
+ logger.info("Created topic: " + topic);
+ } catch (Exception e) {
+ if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
+ logger.info("Topic already exists: " + topic);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Configure topic for alarm state storage with compaction to retain latest state.
+ * For configuration information, see:
+ *
+ * Refer to Configure Alarm Topics
+ *
+ * @param admin Admin client
+ * @param topic Topic name
+ * @throws Exception If topic could not be configured
+ */
+ private static void setCompactedConfig(AdminClient admin, String topic) throws Exception {
+ ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+ List configOps = List.of(
+ new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET)
+ );
+ admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
+ logger.info("Set compacted config for topic: " + topic);
+ }
+
+ /**
+ * Configure topic for command/talk messages with time-based deletion.
+ * For configuration information, see:
+ *
+ * Refer to Configure Alarm Topics
+ *
+ * @param admin Admin client
+ * @param topic Topic name
+ * @throws Exception If topic could not be configured
+ */
+ private static void setDeletedConfig(AdminClient admin, String topic) throws Exception {
+ ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+ List configOps = List.of(
+ new AlterConfigOp(new ConfigEntry("cleanup.policy", "delete"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("retention.ms", "20000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("delete.retention.ms", "1000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("file.delete.delay.ms", "1000"), AlterConfigOp.OpType.SET)
+ );
+ admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
+ logger.info("Set deleted config for topic: " + topic);
+ }
+}
diff --git a/services/alarm-server/src/main/resources/alarm_server.properties b/services/alarm-server/src/main/resources/alarm_server.properties
new file mode 100644
index 0000000000..66f14f9af0
--- /dev/null
+++ b/services/alarm-server/src/main/resources/alarm_server.properties
@@ -0,0 +1,9 @@
+###############################################
+# Package org.phoebus.applications.alarm.server
+###############################################
+
+# Kafka topic replication factor
+replicationFactor=1
+
+# Kafka partition count
+numberOfPartitions=1
\ No newline at end of file