diff --git a/pkg/client/client_cyberark.go b/pkg/client/client_cyberark.go new file mode 100644 index 00000000..af26e96c --- /dev/null +++ b/pkg/client/client_cyberark.go @@ -0,0 +1,9 @@ +package client + +import ( + "github.com/jetstack/preflight/pkg/internal/cyberark/dataupload" +) + +type CyberArkClient = dataupload.CyberArkClient + +var NewCyberArkClient = dataupload.NewCyberArkClient diff --git a/pkg/internal/cyberark/dataupload/dataupload.go b/pkg/internal/cyberark/dataupload/dataupload.go new file mode 100644 index 00000000..43f99cfc --- /dev/null +++ b/pkg/internal/cyberark/dataupload/dataupload.go @@ -0,0 +1,154 @@ +package dataupload + +import ( + "bytes" + "context" + "crypto/sha256" + "crypto/x509" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "k8s.io/client-go/transport" + + "github.com/jetstack/preflight/api" + "github.com/jetstack/preflight/pkg/version" +) + +const ( + // maxRetrievePresignedUploadURLBodySize is the maximum allowed size for a response body from the + // Retrieve Presigned Upload URL service. + maxRetrievePresignedUploadURLBodySize = 10 * 1024 +) + +type CyberArkClient struct { + baseURL string + client *http.Client + + authenticateRequest func(req *http.Request) error +} + +type Options struct { + ClusterName string + ClusterDescription string +} + +func NewCyberArkClient(trustedCAs *x509.CertPool, baseURL string, authenticateRequest func(req *http.Request) error) (*CyberArkClient, error) { + cyberClient := &http.Client{} + tr := http.DefaultTransport.(*http.Transport).Clone() + if trustedCAs != nil { + tr.TLSClientConfig.RootCAs = trustedCAs + } + cyberClient.Transport = transport.DebugWrappers(tr) + + return &CyberArkClient{ + baseURL: baseURL, + client: cyberClient, + authenticateRequest: authenticateRequest, + }, nil +} + +func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payload api.DataReadingsPost, opts Options) error { + if opts.ClusterName == "" { + return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty") + } + + encodedBody := &bytes.Buffer{} + checksum := sha256.New() + if err := json.NewEncoder(io.MultiWriter(encodedBody, checksum)).Encode(payload); err != nil { + return err + } + + presignedUploadURL, err := c.retrievePresignedUploadURL(ctx, hex.EncodeToString(checksum.Sum(nil)), opts) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, presignedUploadURL, encodedBody) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + version.SetUserAgent(req) + + res, err := c.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if code := res.StatusCode; code < 200 || code >= 300 { + body, _ := io.ReadAll(io.LimitReader(res.Body, 500)) + if len(body) == 0 { + body = []byte(``) + } + return fmt.Errorf("received response with status code %d: %s", code, bytes.TrimSpace(body)) + } + + return nil +} + +func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksum string, opts Options) (string, error) { + uploadURL, err := url.JoinPath(c.baseURL, "/api/data/kubernetes/upload") + if err != nil { + return "", err + } + + request := struct { + ClusterID string `json:"cluster_id"` + ClusterDescription string `json:"cluster_description"` + Checksum string `json:"checksum_sha256"` + }{ + ClusterID: opts.ClusterName, + ClusterDescription: opts.ClusterDescription, + Checksum: checksum, + } + + encodedBody := &bytes.Buffer{} + if err := json.NewEncoder(encodedBody).Encode(request); err != nil { + return "", err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, encodedBody) + if err != nil { + return "", err + } + + req.Header.Set("Content-Type", "application/json") + if err := c.authenticateRequest(req); err != nil { + return "", fmt.Errorf("failed to authenticate request") + } + version.SetUserAgent(req) + + res, err := c.client.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + + if code := res.StatusCode; code < 200 || code >= 300 { + body, _ := io.ReadAll(io.LimitReader(res.Body, 500)) + if len(body) == 0 { + body = []byte(``) + } + return "", fmt.Errorf("received response with status code %d: %s", code, bytes.TrimSpace(body)) + } + + response := struct { + URL string `json:"url"` + }{} + + if err := json.NewDecoder(io.LimitReader(res.Body, maxRetrievePresignedUploadURLBodySize)).Decode(&response); err != nil { + if err == io.ErrUnexpectedEOF { + return "", fmt.Errorf("rejecting JSON response from server as it was too large or was truncated") + } + + return "", fmt.Errorf("failed to parse JSON from otherwise successful request to start data upload: %s", err) + } + + return response.URL, nil +} diff --git a/pkg/internal/cyberark/dataupload/dataupload_test.go b/pkg/internal/cyberark/dataupload/dataupload_test.go new file mode 100644 index 00000000..43423152 --- /dev/null +++ b/pkg/internal/cyberark/dataupload/dataupload_test.go @@ -0,0 +1,120 @@ +package dataupload_test + +import ( + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/jetstack/preflight/api" + "github.com/jetstack/preflight/pkg/internal/cyberark/dataupload" +) + +func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) { + fakeTime := time.Unix(123, 0) + defaultPayload := api.DataReadingsPost{ + AgentMetadata: &api.AgentMetadata{ + Version: "test-version", + ClusterID: "test", + }, + DataGatherTime: fakeTime, + DataReadings: []*api.DataReading{ + { + ClusterID: "success-cluster-id", + DataGatherer: "test-gatherer", + Timestamp: api.Time{Time: fakeTime}, + Data: map[string]interface{}{"test": "data"}, + SchemaVersion: "v1", + }, + }, + } + defaultOpts := dataupload.Options{ + ClusterName: "success-cluster-id", + ClusterDescription: "success-cluster-description", + } + + setToken := func(token string) func(*http.Request) error { + return func(req *http.Request) error { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + return nil + } + } + + tests := []struct { + name string + payload api.DataReadingsPost + authenticate func(req *http.Request) error + opts dataupload.Options + requireFn func(t *testing.T, err error) + }{ + { + name: "successful upload", + payload: defaultPayload, + opts: defaultOpts, + authenticate: setToken("success-token"), + requireFn: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, + { + name: "error when cluster name is empty", + payload: defaultPayload, + opts: dataupload.Options{ClusterName: ""}, + authenticate: setToken("success-token"), + requireFn: func(t *testing.T, err error) { + require.ErrorContains(t, err, "programmer mistake: the cluster name") + }, + }, + { + name: "error when bearer token is incorrect", + payload: defaultPayload, + opts: defaultOpts, + authenticate: setToken("fail-token"), + requireFn: func(t *testing.T, err error) { + require.ErrorContains(t, err, "received response with status code 500: should authenticate using the correct bearer token") + }, + }, + { + name: "invalid JSON from server (RetrievePresignedUploadURL step)", + payload: defaultPayload, + opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned", ClusterDescription: defaultOpts.ClusterDescription}, + authenticate: setToken("success-token"), + requireFn: func(t *testing.T, err error) { + require.ErrorContains(t, err, "rejecting JSON response from server as it was too large or was truncated") + }, + }, + { + name: "500 from server (PostData step)", + payload: defaultPayload, + opts: dataupload.Options{ClusterName: "invalid-response-post-data", ClusterDescription: defaultOpts.ClusterDescription}, + authenticate: setToken("success-token"), + requireFn: func(t *testing.T, err error) { + require.ErrorContains(t, err, "received response with status code 500: mock error") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server := dataupload.MockDataUploadServer() + defer server.Close() + + certPool := x509.NewCertPool() + require.True(t, certPool.AppendCertsFromPEM(pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: server.Server.TLS.Certificates[0].Certificate[0], + }))) + + cyberArkClient, err := dataupload.NewCyberArkClient(certPool, server.Server.URL, tc.authenticate) + require.NoError(t, err) + + err = cyberArkClient.PostDataReadingsWithOptions(context.TODO(), tc.payload, tc.opts) + tc.requireFn(t, err) + }) + } +} diff --git a/pkg/internal/cyberark/dataupload/mock.go b/pkg/internal/cyberark/dataupload/mock.go new file mode 100644 index 00000000..af561ae8 --- /dev/null +++ b/pkg/internal/cyberark/dataupload/mock.go @@ -0,0 +1,159 @@ +package dataupload + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + + "github.com/jetstack/preflight/pkg/version" + + _ "embed" +) + +const ( + successBearerToken = "success-token" + + successClusterID = "success-cluster-id" + successClusterDescription = "success-cluster-description" +) + +type mockDataUploadServer struct { + Server *httptest.Server +} + +// MockDataUploadServer returns a mocked data upload server with default values. +func MockDataUploadServer() *mockDataUploadServer { + mds := &mockDataUploadServer{} + mds.Server = httptest.NewTLSServer(mds) + return mds +} + +func (mds *mockDataUploadServer) Close() { + mds.Server.Close() +} + +func (mds *mockDataUploadServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/data/kubernetes/upload": + mds.handlePresignedUpload(w, r) + return + case "/presigned-upload": + mds.handleUpload(w, r, false) + return + case "/presigned-upload-invalid-json": + mds.handleUpload(w, r, false) + return + default: + w.WriteHeader(http.StatusNotFound) + } +} + +func (mds *mockDataUploadServer) handlePresignedUpload(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + _, _ = w.Write([]byte(`{"message":"method not allowed"}`)) + return + } + + if r.Header.Get("User-Agent") != version.UserAgent() { + http.Error(w, "should set user agent on all requests", http.StatusInternalServerError) + return + } + + if r.Header.Get("Content-Type") != "application/json" { + http.Error(w, "should send JSON on all requests", http.StatusInternalServerError) + return + } + + if r.Header.Get("Authorization") != "Bearer "+successBearerToken { + http.Error(w, "should authenticate using the correct bearer token", http.StatusInternalServerError) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "failed to read post body", http.StatusInternalServerError) + return + } + + var req struct { + ClusterID string `json:"cluster_id"` + ClusterDescription string `json:"Cluster_description"` + Checksum string `json:"checksum_sha256"` + } + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, "failed to unmarshal post body", http.StatusInternalServerError) + return + } + + if req.ClusterDescription != successClusterDescription { + http.Error(w, "post body contains unexpected description", http.StatusInternalServerError) + return + } + + // Simulate invalid JSON response for RetrievePresignedUploadURL step + if req.ClusterID == "invalid-json-retrieve-presigned" { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"url":`)) // invalid JSON + return + } + + // Simulate invalid JSON response for RetrievePresignedUploadURL step + if req.ClusterID == "invalid-response-post-data" { + http.Error(w, "mock error", http.StatusInternalServerError) + return + } + + if req.ClusterID != successClusterID { + http.Error(w, "post body contains cluster ID", http.StatusInternalServerError) + return + } + + // Write response body + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + presignedURL := mds.Server.URL + "/presigned-upload?checksum=" + req.Checksum + _ = json.NewEncoder(w).Encode(struct { + URL string `json:"url"` + }{presignedURL}) +} + +func (mds *mockDataUploadServer) handleUpload(w http.ResponseWriter, r *http.Request, invalidJSON bool) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + _, _ = w.Write([]byte(`{"message":"method not allowed"}`)) + return + } + + if r.Header.Get("User-Agent") != version.UserAgent() { + http.Error(w, "should set user agent on all requests", http.StatusInternalServerError) + return + } + + if r.Header.Get("Content-Type") != "application/json" { + http.Error(w, "should send JSON on all requests", http.StatusInternalServerError) + return + } + + if invalidJSON { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"url":`)) // invalid JSON + return + } + + checksum := sha256.New() + _, _ = io.Copy(checksum, r.Body) + + if r.URL.Query().Get("checksum") != hex.EncodeToString(checksum.Sum(nil)) { + http.Error(w, "checksum is invalid", http.StatusInternalServerError) + } + + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"success":true}`)) +} diff --git a/pkg/internal/cyberark/identity/advance_authentication_test.go b/pkg/internal/cyberark/identity/advance_authentication_test.go index ef031daa..a670f542 100644 --- a/pkg/internal/cyberark/identity/advance_authentication_test.go +++ b/pkg/internal/cyberark/identity/advance_authentication_test.go @@ -136,15 +136,13 @@ func Test_IdentityAdvanceAuthentication(t *testing.T) { return } - val, ok := client.tokenCache[testSpec.username] - - if !ok { + if len(client.tokenCached) == 0 { t.Errorf("expected token for %s to be set to %q but wasn't found", testSpec.username, mockSuccessfulStartAuthenticationToken) return } - if val != mockSuccessfulStartAuthenticationToken { - t.Errorf("expected token for %s to be set to %q but was set to %q", testSpec.username, mockSuccessfulStartAuthenticationToken, val) + if client.tokenCached != mockSuccessfulStartAuthenticationToken { + t.Errorf("expected token for %s to be set to %q but was set to %q", testSpec.username, mockSuccessfulStartAuthenticationToken, client.tokenCached) } }) } diff --git a/pkg/internal/cyberark/identity/authenticated_http_client.go b/pkg/internal/cyberark/identity/authenticated_http_client.go new file mode 100644 index 00000000..901d14db --- /dev/null +++ b/pkg/internal/cyberark/identity/authenticated_http_client.go @@ -0,0 +1,19 @@ +package identity + +import ( + "fmt" + "net/http" +) + +func (c *Client) AuthenticateRequest(req *http.Request) error { + c.tokenCachedMutex.Lock() + defer c.tokenCachedMutex.Unlock() + + if len(c.tokenCached) == 0 { + return fmt.Errorf("no token cached") + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", string(c.tokenCached))) + + return nil +} diff --git a/pkg/internal/cyberark/identity/identity.go b/pkg/internal/cyberark/identity/identity.go index da2d648a..12258cda 100644 --- a/pkg/internal/cyberark/identity/identity.go +++ b/pkg/internal/cyberark/identity/identity.go @@ -174,15 +174,15 @@ type advanceAuthenticationResponseResult struct { } // Client is an client for interacting with the CyberArk Identity API and performing a login using a username and password. -// For context on the behaviour of this client, see the Pytho SDK: https://github.com/cyberark/ark-sdk-python/blob/3be12c3f2d3a2d0407025028943e584b6edc5996/ark_sdk_python/auth/identity/ark_identity.py +// For context on the behaviour of this client, see the Python SDK: https://github.com/cyberark/ark-sdk-python/blob/3be12c3f2d3a2d0407025028943e584b6edc5996/ark_sdk_python/auth/identity/ark_identity.py type Client struct { client *http.Client endpoint string subdomain string - tokenCache map[string]token - tokenCacheMutex sync.Mutex + tokenCached token + tokenCachedMutex sync.Mutex } // token is a wrapper type for holding auth tokens we want to cache. @@ -218,15 +218,15 @@ func NewWithDiscoveryClient(ctx context.Context, discoveryClient *servicediscove endpoint: endpoint, subdomain: subdomain, - tokenCache: make(map[string]token), - tokenCacheMutex: sync.Mutex{}, + tokenCached: "", + tokenCachedMutex: sync.Mutex{}, }, nil } // LoginUsernamePassword performs a blocking call to fetch an auth token from CyberArk Identity using the given username and password. // The password is zeroed after use. -// Tokens are cached internally and are not directly accessible to code; use Client.AuthenticatedHTTPClient to add credentials -// to an *http.Client. +// Tokens are cached internally and are not directly accessible to code; use Client.AuthenticateRequest to add credentials +// to an *http.Request. func (c *Client) LoginUsernamePassword(ctx context.Context, username string, password []byte) error { defer func() { for i := range password { @@ -443,11 +443,11 @@ func (c *Client) doAdvanceAuthentication(ctx context.Context, username string, p klog.FromContext(ctx).Info("successfully completed AdvanceAuthentication request to CyberArk Identity; login complete", "username", username) - c.tokenCacheMutex.Lock() + c.tokenCachedMutex.Lock() - c.tokenCache[username] = token(advanceAuthResponse.Result.Token) + c.tokenCached = token(advanceAuthResponse.Result.Token) - c.tokenCacheMutex.Unlock() + c.tokenCachedMutex.Unlock() return nil }