Skip to content

Proof Modifications to QuickFIX J

prerak-proof edited this page May 14, 2020 · 3 revisions

Background

The FIX protocol is a sequenced message protocol, in the sense that the messages sent using this protocol are assigned incrementing sequence numbers. When two parties communicate using the FIX protocol, there are a total of 4 sequence numbers being tracked. Each party keeps track of two sequence numbers: 1) its sender sequence number, and 2) the counterparty’s sequence number, also known as the target sequence number.

The most basic responsibility of any FIX application is to keep track of its own sender sequence number. If a FIX application loses track of the targetsequence number, it is possible to recover the connection without any loss of messages by using standard resend mechanisms. However, if the sender sequence number is lost, there is no hope - no recovery is possible, and this is considered a fatal error with the connection. Even if both parties agree to recover the session by manually resetting the sequence numbers to 1, there is a significant potential of message loss.

Most FIX applications use a FIX engine library (such as QuickFIX/J or b2bits), which typically handles the session-level concerns of the FIX protocol, including persistence and recovery of the FIX session state such as sender sequence number using local file-based stores. In addition, the library often sends administrative (aka session-level) messages to the counterparty on its own, either in response to incoming messages or because of the passage of (local wallclock) time.

Problem Statement

We would like to create a highly-available FIX application while continuing to use QuickFIX/J as the FIX engine library for convenience. In more technical terms, we would like to be able to replicate or recreate the local file-based stores used by the FIX engine on a backup server.

Problem Description

There are a few brute force methods for achieving this replication of file-based stores:

  1. Use a shared file system like NFS. This would cause high latencies as data is stored over the network. Since we do not intend to use NFS or such for any other production application, this would mean that the FIX applications would have special infrastructure needs.
  2. Use a network-based synchronous replication mechanism. This would require a backup instance of the application to be running at all times, which would need to be discovered by the primary instance, and a network connection would need to be maintained between the two. If the network connectivity fails, there would need to be a recovery mechanism.
  3. Use block-level replication tools for file copy. This would be an asynchronous copy, which means it may not work every single time. And again, this would create special infrastructure and deployment hassles.

We reject the above solutions and propose that the application be written in such a way that the store can be recreated simply by replaying the inputs into the application. This would allow multiple instances of an application to run in parallel, consume the same inputs and create the same session store.

Session-Level Messages Get In The Way

The main problem with this approach is that at least some of the information in the store comes directly from the FIX engine library and it is not possible to recreate this information from the inputs of the application.

Ideally, we would change the FIX engine to generate new messages exclusively through application callbacks. In other words, instead of generating and sending new messages on its own, the FIX engine would generate the new messagesbut not send them, and instead hand-off these messages to the application. The application would then ensure that the messages are sent out after they have been recorded in The SystemTM. We can call this roundtripping the message through The System, and we can call such messages deferred session-level messages.

Session Messages

If we want to ensure we intercept all session messages generated by the library, we need to understand the circumstances under which they are generated and the impact of not sending them immediately. Incidentally, the below discussion is valid for all versions of FIX from 4.0 to 5.0.

FIX Message Type
Send Reasons
Considerations / Impact when intercepting
A (Logon)
  • Initiator: Based on Timer

  • Acceptor: In response to an incoming Logon

  • Ensure the session is not marked as fully logged on until the Logon is fully sent

  • Any message replay resulting from tag 789 (NextExpectedMsgSeqNum) in the incoming Logon must be suppressed until Logon is sent

0 (Heartbeat)
  • Based on Timer

  • In response to a Test Request

  • No adverse impact

1 (Test Request)
  • Based on Timer

  • No adverse impact

2 (Resend Request)
  • In response to an incoming message

  • No adverse impact

3 (Session Reject)
  • In response to an incoming message

  • No adverse impact

4 (Sequence Reset)
  • Gap Fill mode: Allows skipping over certain messages during normal resend processing

  • Reset mode: Allows reestablishing a FIX session after an unrecoverable application failure (afaik, QuickFIX/J does not use this)

  • No adverse impact

  • Sequence Reset messages do not “burn” a sequence number, so as such they do not even need to be intercepted. In Gap Fill mode, a Sequence Reset message always uses the sequence number of a previously transmitted message, with an indicator for what the next sequence number should be.

5 (Logout)
  • Based on user request (for example, the app is shutting down)

  • Based on an illegal state (such as when an incoming sequence number is too low, or a garbled message is received)

  • In response to an incoming Logout

  • The tricky part here is that the connection should typically be disconnected after sending a Logout in some cases, but not all. The Logout message that is intercepted and roundtripped through The System should include enough information to make a decision on whether to disconnect.

j (Business Message Reject)
  • Strictly speaking, this is not a session-level message but QuickFIX/J uses it in response to certain types of malformed messages

  • No adverse impact

Key concepts when intercepting messages:

  1. For an incoming message that triggers an outbound session-level message, fully complete the incoming message processing which can include items such as:

    1. Validation: missing sequence number field, incorrect FIX version, data dictionary validation
    2. Sequence number validation: Too Low or Too High conditions
    3. Increment target sequence number, if the sequence number is equal to the expected target sequence number
    4. Collect any session state such as Logon Received, Logon Sent, Logout Received, Logout Sent
  2. When handing off the intercepted session-level message to the application, don't increment the sender sequence number until the message is ready to be transmitted to the counterparty. The idea is that a sender sequence number can only be “burned” due to some app input from The System.

Code Repository

For the following description, refer to the forked QuickFIX/J code repository here: https://github.com/prerak-proof/quickfixj

A sample application demonstrating the principle is located here: https://github.com/prerak-proof/quickfixj/tree/master/quickfixj-examples/executor/src/main/java/com/prooftrading/demo

Intercepting Generated Session Messages

  • The quickfix.Session class implements nearly all of the FIX protocol handling.
  • First, we observe that the sender sequence number is incremented using this.state.incrNextSenderMsgSeqNum()method call, where state is an object of the class quickfix.SessionState.
  • If we search through all usages of this method, we are delighted to find that there is a single invocation to this method in the entire library: Session.sendRaw.
  • Following the sendRaw usages further, we find that all of the generated session-level messages are sent using this method.
  • If we replace the session-message-related usages of sendRaw with a new method sendAdmin, we can install a hook to intercept these messages:
    /**
     * @return true, if message was sent using sendRaw; false, if it was deferred to the application
     */
    private boolean sendAdmin(Message message, int num, boolean forceSend) {
        // allow the application to handle the admin message being sent if:
        // 1) it will burn a sequence number
        // 2) we have an app that cares about this
        // 3) this is an admin message
        if (!forceSend && num == 0 && isAsyncAdmin() && message.isAsyncAdminEligible()) {
            ((ApplicationAsyncAdmin) application).beforeAdminSend(message, sessionID);
            return false;
        }
        
        // continue sending the message; we don't use sendRaw return value as:
        // 1) it was already not being used anywhere
        // 2) sendAdmin return value has its own meaning, as described above
        sendRaw(message, num);
        return true;
    }
  • Note that for the interception to work, the Application class provided to the library must implement the ApplicationAsyncAdmin interface.
  • Note the addition of the message.isAsyncAdminEligible method, which returns true for all session-level messages (A012345 message types) and the j (Business Message Reject) message type.
  • Once we successfully intercept the session-level messages and roundtrip them through The System before sending them out, we realize (as the table above suggests) that the processing of certain messages requires special considerations.

Handling Deferred Session Messages

Logon Processing

If the generated Logon message is not immediately sent, the following processing must be altered and moved to after the Logon is actually sent:

  1. Call state.setLogonSent(true) in sendRaw only after a successful Logon send. This ensures that the session is never fully “logged on” until such time.
  2. For an acceptor, if the incoming Logon has a tag 789 (NextExpectedMsgSeqNum), it can be used to infer a Resend Request. The detection of this implicit Resend Request must be done before the generated Logon is intercepted, but the actual replay of messages is done after the Logon is sent. This implicit Resend Request can be saved in the session’s SessionState cache.
  3. The Application.onLogon callback must not be invoked until after the Logon message is sent.

Logout Processing

This is easily the most complicated message type to handle. We observe that there are two patterns for sending Logout messages:

  1. The library sends the Logout message and waits for an acknowledgment from the counterparty. In this case, no special handling is required when the deferred Logout message is eventually sent.

  2. The library either receives an orderly Logout message from the counterparty, or it detects a problem with an incoming message, which causes it to:

    1. send the Logout message
    2. increment the target sequence number, if the sequence number is the next expected number
    3. disconnects immediately

We were able to extract the second pattern into a single method called generateLogoutAndDisconnect and most usages of generateLogout followed by disconnect were replaced with this new method.

There was a tricky part to this, which we solved in a rather hacky way. It is impossible to know whether the deferred Logout message should be followed by a disconnect. We modified the Text (tag 58) field of the Logout message to include a “disconnect marker”, which we could detect after the roundtrip and decide whether to issue a disconnect.

Sending Deferred Messages

After a deferred message is roundtripped and ready to be sent, the application can call the following method:

    public void sendDeferredAdmin(Message message) throws FieldNotFound, IOException, InvalidMessage {
        // can the below if be false ever? as of now, nothing calls this other than an async app
        if (isAsyncAdmin()) {
            switch (message.getHeader().getString(MsgType.FIELD)) {
                case MsgType.LOGON:
                    sendAdmin(message, 0, true);
                    finishLogon(null);
                    return;
                case MsgType.LOGOUT:
                    boolean doDisconnect = removeDisconnectMarker(message);
                    sendAdmin(message, 0, true);
                    if (state.isLogoutReceived() || doDisconnect) {
                        finishLogoutAndDisconnect(null, false);
                    }
                    return;
            }
        }
        sendAdmin(message, 0, true);
    }

Session Reset Processing

QuickFIX/J allows resetting the session state (primarily sender and target sequence numbers) at various points in the session lifecycle through the use of following configurations: ResetOnLogon, ResetOnLogout, ResetOnDisconnect, ResetOnError.

In order to be able to recreate the exact final state of the session on a backup server, these reset events need to be captured by the application and roundtripped through The System as well. We didn’t add another callback to ApplicationAsyncAdmin interface for this purpose, because an existing event already exists: SessionStateListener.onReset. The sample application linked above demonstrates the correct usage of this callback (see com.prooftrading.demo.apps.ClientGateway.SessionResetSequencer).

Note that there exists another similar callback ApplicationExtended.onBeforeSessionReset, but it is not suitable for capturing the session reset event, because it is only called if the session is reset while connected and logged on.

Unit Testing

  • The acceptance test harness was enhanced to also test the library in an async-admin mode. All current tests pass.
  • For the rest of the unit tests, the UnitTestApplication has been enhanced to work in async mode optionally. All but 45 tests pass, and the only ones that fail are the ones that expect responses to messages to be sent out synchronously. All of these tests passed after adding a short Thread.sleep(10) after the session.next() calls, or after adding a CountdownLatch to prevent tests from proceeding until a requested async send operation is complete.

Known issues:

  • If the sender or target sequence numbers are reset either using the API or using the JMX Bean admin functionality, the changes are not represented in a replicated store. ApplicationAsyncAdmin could be enhanced to provide additional callbacks to help defer these administrative actions until after they’ve roundtripped through the system.