diff --git a/api/bulk/client.go b/api/bulk/client.go new file mode 100644 index 0000000..5a68483 --- /dev/null +++ b/api/bulk/client.go @@ -0,0 +1,134 @@ +package bulk + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" +) + +// Client is a Salesforce Bulk API 2.0 client. +type Client struct { + httpClient *http.Client + instanceURL string + apiVersion string + baseURL string +} + +// ClientConfig contains configuration for creating a new Bulk API client. +type ClientConfig struct { + InstanceURL string + HTTPClient *http.Client + APIVersion string +} + +// New creates a new Bulk API client. +func New(cfg ClientConfig) (*Client, error) { + if cfg.InstanceURL == "" { + return nil, fmt.Errorf("instance URL is required") + } + if cfg.HTTPClient == nil { + return nil, fmt.Errorf("HTTP client is required") + } + + instanceURL := strings.TrimSuffix(cfg.InstanceURL, "/") + apiVersion := cfg.APIVersion + if apiVersion == "" { + apiVersion = "v62.0" + } + + return &Client{ + httpClient: cfg.HTTPClient, + instanceURL: instanceURL, + apiVersion: apiVersion, + baseURL: fmt.Sprintf("%s/services/data/%s", instanceURL, apiVersion), + }, nil +} + +// doRequest performs an HTTP request and returns the response body. +func (c *Client) doRequest(ctx context.Context, method, path string, body interface{}) ([]byte, error) { + var bodyReader io.Reader + contentType := "application/json" + + if body != nil { + switch v := body.(type) { + case string: + bodyReader = strings.NewReader(v) + contentType = "text/csv" + case []byte: + bodyReader = bytes.NewReader(v) + contentType = "text/csv" + default: + jsonBody, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal request body: %w", err) + } + bodyReader = bytes.NewReader(jsonBody) + } + } + + fullURL := path + if !strings.HasPrefix(path, "http") { + fullURL = c.baseURL + path + } + + req, err := http.NewRequestWithContext(ctx, method, fullURL, bodyReader) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", contentType) + req.Header.Set("Accept", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(respBody)) + } + + return respBody, nil +} + +// doCSVRequest performs an HTTP request expecting CSV response. +func (c *Client) doCSVRequest(ctx context.Context, method, path string) ([]byte, error) { + fullURL := path + if !strings.HasPrefix(path, "http") { + fullURL = c.baseURL + path + } + + req, err := http.NewRequestWithContext(ctx, method, fullURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Accept", "text/csv") + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(respBody)) + } + + return respBody, nil +} diff --git a/api/bulk/client_test.go b/api/bulk/client_test.go new file mode 100644 index 0000000..d8d7966 --- /dev/null +++ b/api/bulk/client_test.go @@ -0,0 +1,503 @@ +package bulk + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + tests := []struct { + name string + cfg ClientConfig + wantErr bool + }{ + { + name: "valid config", + cfg: ClientConfig{ + InstanceURL: "https://test.salesforce.com", + HTTPClient: &http.Client{}, + }, + wantErr: false, + }, + { + name: "missing instance URL", + cfg: ClientConfig{ + HTTPClient: &http.Client{}, + }, + wantErr: true, + }, + { + name: "missing HTTP client", + cfg: ClientConfig{ + InstanceURL: "https://test.salesforce.com", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := New(tt.cfg) + if tt.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.NotNil(t, client) + }) + } +} + +func TestCreateJob(t *testing.T) { + expectedJob := JobInfo{ + ID: "750xx000000001", + Operation: OperationInsert, + Object: "Account", + State: StateOpen, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/ingest") + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.CreateJob(context.Background(), JobConfig{ + Object: "Account", + Operation: OperationInsert, + }) + require.NoError(t, err) + assert.Equal(t, expectedJob.ID, job.ID) + assert.Equal(t, StateOpen, job.State) +} + +func TestUploadJobData(t *testing.T) { + csvData := []byte("Name,Industry\nAcme,Technology") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPut, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/ingest/750xx000000001/batches") + assert.Equal(t, "text/csv", r.Header.Get("Content-Type")) + w.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + err = client.UploadJobData(context.Background(), "750xx000000001", csvData) + require.NoError(t, err) +} + +func TestCloseJob(t *testing.T) { + expectedJob := JobInfo{ + ID: "750xx000000001", + State: StateUploadComplete, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPatch, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/ingest/750xx000000001") + + var req UpdateJobRequest + _ = json.NewDecoder(r.Body).Decode(&req) + assert.Equal(t, StateUploadComplete, req.State) + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.CloseJob(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Equal(t, StateUploadComplete, job.State) +} + +func TestGetJob(t *testing.T) { + expectedJob := JobInfo{ + ID: "750xx000000001", + State: StateJobComplete, + NumberRecordsProcessed: 100, + NumberRecordsFailed: 2, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.GetJob(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Equal(t, 100, job.NumberRecordsProcessed) + assert.Equal(t, 2, job.NumberRecordsFailed) +} + +func TestListJobs(t *testing.T) { + expected := JobsResponse{ + Done: true, + Records: []JobInfo{ + {ID: "750xx000000001", State: StateJobComplete}, + {ID: "750xx000000002", State: StateInProgress}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/ingest") + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + resp, err := client.ListJobs(context.Background()) + require.NoError(t, err) + assert.Len(t, resp.Records, 2) +} + +func TestGetSuccessfulResults(t *testing.T) { + csvData := "sf__Id,sf__Created,Name\n001xx000001,true,Acme" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/successfulResults") + assert.Equal(t, "text/csv", r.Header.Get("Accept")) + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + data, err := client.GetSuccessfulResults(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Equal(t, csvData, string(data)) +} + +func TestGetFailedResults(t *testing.T) { + csvData := "sf__Id,sf__Error,Name\n,REQUIRED_FIELD_MISSING,Acme" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/failedResults") + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + data, err := client.GetFailedResults(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Contains(t, string(data), "REQUIRED_FIELD_MISSING") +} + +func TestCreateQueryJob(t *testing.T) { + expectedJob := QueryJobInfo{ + ID: "750xx000000001", + Operation: OperationQuery, + Query: "SELECT Id FROM Account", + State: StateUploadComplete, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/query") + + var req CreateQueryJobRequest + _ = json.NewDecoder(r.Body).Decode(&req) + assert.Equal(t, "SELECT Id FROM Account", req.Query) + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.CreateQueryJob(context.Background(), QueryConfig{ + Query: "SELECT Id FROM Account", + }) + require.NoError(t, err) + assert.Equal(t, expectedJob.ID, job.ID) + assert.Equal(t, OperationQuery, job.Operation) +} + +func TestGetQueryResults(t *testing.T) { + csvData := "Id,Name\n001xx000001,Acme\n001xx000002,Test" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/query/750xx000000001/results") + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + data, err := client.GetQueryResults(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Contains(t, string(data), "Acme") + assert.Contains(t, string(data), "Test") +} + +func TestAbortJob(t *testing.T) { + expectedJob := JobInfo{ + ID: "750xx000000001", + State: StateAborted, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPatch, r.Method) + + var req UpdateJobRequest + _ = json.NewDecoder(r.Body).Decode(&req) + assert.Equal(t, StateAborted, req.State) + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.AbortJob(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Equal(t, StateAborted, job.State) +} + +func TestDeleteJob(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodDelete, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/ingest/750xx000000001") + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + err = client.DeleteJob(context.Background(), "750xx000000001") + require.NoError(t, err) +} + +func TestGetUnprocessedRecords(t *testing.T) { + csvData := "Name,Industry\nUnprocessed,Tech" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/unprocessedrecords") + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + data, err := client.GetUnprocessedRecords(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Contains(t, string(data), "Unprocessed") +} + +func TestGetQueryJob(t *testing.T) { + expectedJob := QueryJobInfo{ + ID: "750xx000000001", + Operation: OperationQuery, + State: StateJobComplete, + NumberRecordsProcessed: 50, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/query/750xx000000001") + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.GetQueryJob(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Equal(t, StateJobComplete, job.State) + assert.Equal(t, 50, job.NumberRecordsProcessed) +} + +func TestAbortQueryJob(t *testing.T) { + expectedJob := QueryJobInfo{ + ID: "750xx000000001", + State: StateAborted, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPatch, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/query/750xx000000001") + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + job, err := client.AbortQueryJob(context.Background(), "750xx000000001") + require.NoError(t, err) + assert.Equal(t, StateAborted, job.State) +} + +func TestDeleteQueryJob(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodDelete, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/query/750xx000000001") + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + err = client.DeleteQueryJob(context.Background(), "750xx000000001") + require.NoError(t, err) +} + +func TestListQueryJobs(t *testing.T) { + expected := QueryJobsResponse{ + Done: true, + Records: []QueryJobInfo{ + {ID: "750xx000000001", State: StateJobComplete}, + {ID: "750xx000000002", State: StateInProgress}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + assert.Contains(t, r.URL.Path, "/jobs/query") + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + resp, err := client.ListQueryJobs(context.Background()) + require.NoError(t, err) + assert.Len(t, resp.Records, 2) +} + +func TestAPIError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"errorCode":"INVALID_FIELD","message":"Invalid field"}`)) + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + _, err = client.GetJob(context.Background(), "invalid") + require.Error(t, err) + assert.Contains(t, err.Error(), "400") +} + +func TestContextCancellation(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Slow response + select {} + })) + defer server.Close() + + client, err := New(ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + _, err = client.GetJob(ctx, "750xx000000001") + require.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") +} diff --git a/api/bulk/job.go b/api/bulk/job.go new file mode 100644 index 0000000..d64163b --- /dev/null +++ b/api/bulk/job.go @@ -0,0 +1,287 @@ +package bulk + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +// CreateJob creates a new bulk ingest job. +func (c *Client) CreateJob(ctx context.Context, cfg JobConfig) (*JobInfo, error) { + contentType := cfg.ContentType + if contentType == "" { + contentType = ContentTypeCSV + } + + req := CreateJobRequest{ + Object: cfg.Object, + Operation: cfg.Operation, + ExternalIDFieldName: cfg.ExternalID, + ContentType: contentType, + } + + body, err := c.doRequest(ctx, http.MethodPost, "/jobs/ingest", req) + if err != nil { + return nil, err + } + + var job JobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse job response: %w", err) + } + + return &job, nil +} + +// UploadJobData uploads CSV data to a bulk job. +func (c *Client) UploadJobData(ctx context.Context, jobID string, data []byte) error { + path := fmt.Sprintf("/jobs/ingest/%s/batches", jobID) + _, err := c.doRequest(ctx, http.MethodPut, path, data) + return err +} + +// CloseJob marks a job as UploadComplete to start processing. +func (c *Client) CloseJob(ctx context.Context, jobID string) (*JobInfo, error) { + path := fmt.Sprintf("/jobs/ingest/%s", jobID) + req := UpdateJobRequest{State: StateUploadComplete} + + body, err := c.doRequest(ctx, http.MethodPatch, path, req) + if err != nil { + return nil, err + } + + var job JobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse job response: %w", err) + } + + return &job, nil +} + +// GetJob retrieves information about a bulk ingest job. +func (c *Client) GetJob(ctx context.Context, jobID string) (*JobInfo, error) { + path := fmt.Sprintf("/jobs/ingest/%s", jobID) + body, err := c.doRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, err + } + + var job JobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse job response: %w", err) + } + + return &job, nil +} + +// AbortJob aborts a bulk job. +func (c *Client) AbortJob(ctx context.Context, jobID string) (*JobInfo, error) { + path := fmt.Sprintf("/jobs/ingest/%s", jobID) + req := UpdateJobRequest{State: StateAborted} + + body, err := c.doRequest(ctx, http.MethodPatch, path, req) + if err != nil { + return nil, err + } + + var job JobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse job response: %w", err) + } + + return &job, nil +} + +// DeleteJob deletes a bulk job. +func (c *Client) DeleteJob(ctx context.Context, jobID string) error { + path := fmt.Sprintf("/jobs/ingest/%s", jobID) + _, err := c.doRequest(ctx, http.MethodDelete, path, nil) + return err +} + +// ListJobs lists bulk ingest jobs. +func (c *Client) ListJobs(ctx context.Context) (*JobsResponse, error) { + body, err := c.doRequest(ctx, http.MethodGet, "/jobs/ingest", nil) + if err != nil { + return nil, err + } + + var resp JobsResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("failed to parse jobs response: %w", err) + } + + return &resp, nil +} + +// GetSuccessfulResults retrieves successful results from a completed job. +func (c *Client) GetSuccessfulResults(ctx context.Context, jobID string) ([]byte, error) { + path := fmt.Sprintf("/jobs/ingest/%s/successfulResults", jobID) + return c.doCSVRequest(ctx, http.MethodGet, path) +} + +// GetFailedResults retrieves failed results from a completed job. +func (c *Client) GetFailedResults(ctx context.Context, jobID string) ([]byte, error) { + path := fmt.Sprintf("/jobs/ingest/%s/failedResults", jobID) + return c.doCSVRequest(ctx, http.MethodGet, path) +} + +// GetUnprocessedRecords retrieves unprocessed records from a job. +func (c *Client) GetUnprocessedRecords(ctx context.Context, jobID string) ([]byte, error) { + path := fmt.Sprintf("/jobs/ingest/%s/unprocessedrecords", jobID) + return c.doCSVRequest(ctx, http.MethodGet, path) +} + +// PollJob polls a job until it reaches a terminal state or timeout. +func (c *Client) PollJob(ctx context.Context, jobID string, cfg PollConfig) (*JobInfo, error) { + if cfg.Interval == 0 { + cfg = DefaultPollConfig() + } + + deadline := time.Now().Add(cfg.Timeout) + ticker := time.NewTicker(cfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + if time.Now().After(deadline) { + return nil, fmt.Errorf("timeout waiting for job to complete") + } + + job, err := c.GetJob(ctx, jobID) + if err != nil { + return nil, err + } + + switch job.State { + case StateJobComplete, StateFailed, StateAborted: + return job, nil + } + } + } +} + +// CreateQueryJob creates a new bulk query job. +func (c *Client) CreateQueryJob(ctx context.Context, cfg QueryConfig) (*QueryJobInfo, error) { + contentType := cfg.ContentType + if contentType == "" { + contentType = ContentTypeCSV + } + + req := CreateQueryJobRequest{ + Operation: OperationQuery, + Query: cfg.Query, + ContentType: contentType, + } + + body, err := c.doRequest(ctx, http.MethodPost, "/jobs/query", req) + if err != nil { + return nil, err + } + + var job QueryJobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse query job response: %w", err) + } + + return &job, nil +} + +// GetQueryJob retrieves information about a bulk query job. +func (c *Client) GetQueryJob(ctx context.Context, jobID string) (*QueryJobInfo, error) { + path := fmt.Sprintf("/jobs/query/%s", jobID) + body, err := c.doRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, err + } + + var job QueryJobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse query job response: %w", err) + } + + return &job, nil +} + +// GetQueryResults retrieves results from a bulk query job. +func (c *Client) GetQueryResults(ctx context.Context, jobID string) ([]byte, error) { + path := fmt.Sprintf("/jobs/query/%s/results", jobID) + return c.doCSVRequest(ctx, http.MethodGet, path) +} + +// AbortQueryJob aborts a bulk query job. +func (c *Client) AbortQueryJob(ctx context.Context, jobID string) (*QueryJobInfo, error) { + path := fmt.Sprintf("/jobs/query/%s", jobID) + req := UpdateJobRequest{State: StateAborted} + + body, err := c.doRequest(ctx, http.MethodPatch, path, req) + if err != nil { + return nil, err + } + + var job QueryJobInfo + if err := json.Unmarshal(body, &job); err != nil { + return nil, fmt.Errorf("failed to parse query job response: %w", err) + } + + return &job, nil +} + +// DeleteQueryJob deletes a bulk query job. +func (c *Client) DeleteQueryJob(ctx context.Context, jobID string) error { + path := fmt.Sprintf("/jobs/query/%s", jobID) + _, err := c.doRequest(ctx, http.MethodDelete, path, nil) + return err +} + +// ListQueryJobs lists bulk query jobs. +func (c *Client) ListQueryJobs(ctx context.Context) (*QueryJobsResponse, error) { + body, err := c.doRequest(ctx, http.MethodGet, "/jobs/query", nil) + if err != nil { + return nil, err + } + + var resp QueryJobsResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("failed to parse query jobs response: %w", err) + } + + return &resp, nil +} + +// PollQueryJob polls a query job until it reaches a terminal state or timeout. +func (c *Client) PollQueryJob(ctx context.Context, jobID string, cfg PollConfig) (*QueryJobInfo, error) { + if cfg.Interval == 0 { + cfg = DefaultPollConfig() + } + + deadline := time.Now().Add(cfg.Timeout) + ticker := time.NewTicker(cfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + if time.Now().After(deadline) { + return nil, fmt.Errorf("timeout waiting for query job to complete") + } + + job, err := c.GetQueryJob(ctx, jobID) + if err != nil { + return nil, err + } + + switch job.State { + case StateJobComplete, StateFailed, StateAborted: + return job, nil + } + } + } +} diff --git a/api/bulk/types.go b/api/bulk/types.go new file mode 100644 index 0000000..5d57499 --- /dev/null +++ b/api/bulk/types.go @@ -0,0 +1,149 @@ +// Package bulk provides a client for the Salesforce Bulk API 2.0. +package bulk + +import "time" + +// Operation represents a bulk job operation type. +type Operation string + +// Bulk job operations. +const ( + OperationInsert Operation = "insert" + OperationUpdate Operation = "update" + OperationUpsert Operation = "upsert" + OperationDelete Operation = "delete" + OperationQuery Operation = "query" +) + +// State represents a bulk job state. +type State string + +// Bulk job states. +const ( + StateOpen State = "Open" + StateUploadComplete State = "UploadComplete" + StateInProgress State = "InProgress" + StateJobComplete State = "JobComplete" + StateFailed State = "Failed" + StateAborted State = "Aborted" +) + +// ContentType represents the content type for bulk data. +type ContentType string + +// Content types. +const ( + ContentTypeCSV ContentType = "CSV" + ContentTypeJSON ContentType = "JSON" +) + +// JobInfo represents information about a bulk job. +type JobInfo struct { + ID string `json:"id,omitempty"` + Operation Operation `json:"operation"` + Object string `json:"object"` + CreatedByID string `json:"createdById,omitempty"` + CreatedDate string `json:"createdDate,omitempty"` + SystemModstamp string `json:"systemModstamp,omitempty"` + State State `json:"state,omitempty"` + ExternalIDFieldName string `json:"externalIdFieldName,omitempty"` + ConcurrencyMode string `json:"concurrencyMode,omitempty"` + ContentType ContentType `json:"contentType,omitempty"` + APIVersion float64 `json:"apiVersion,omitempty"` + JobType string `json:"jobType,omitempty"` + LineEnding string `json:"lineEnding,omitempty"` + ColumnDelimiter string `json:"columnDelimiter,omitempty"` + NumberRecordsProcessed int `json:"numberRecordsProcessed,omitempty"` + NumberRecordsFailed int `json:"numberRecordsFailed,omitempty"` + Retries int `json:"retries,omitempty"` + TotalProcessingTime int `json:"totalProcessingTime,omitempty"` + APIActiveProcessingTime int `json:"apiActiveProcessingTime,omitempty"` + ApexProcessingTime int `json:"apexProcessingTime,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` +} + +// QueryJobInfo represents information about a bulk query job. +type QueryJobInfo struct { + ID string `json:"id,omitempty"` + Operation Operation `json:"operation"` + Object string `json:"object,omitempty"` + CreatedByID string `json:"createdById,omitempty"` + CreatedDate string `json:"createdDate,omitempty"` + SystemModstamp string `json:"systemModstamp,omitempty"` + State State `json:"state,omitempty"` + ConcurrencyMode string `json:"concurrencyMode,omitempty"` + ContentType ContentType `json:"contentType,omitempty"` + APIVersion float64 `json:"apiVersion,omitempty"` + LineEnding string `json:"lineEnding,omitempty"` + ColumnDelimiter string `json:"columnDelimiter,omitempty"` + NumberRecordsProcessed int `json:"numberRecordsProcessed,omitempty"` + Retries int `json:"retries,omitempty"` + TotalProcessingTime int `json:"totalProcessingTime,omitempty"` + Query string `json:"query,omitempty"` +} + +// JobsResponse represents a list of bulk jobs. +type JobsResponse struct { + Done bool `json:"done"` + Records []JobInfo `json:"records"` + NextRecordsURL string `json:"nextRecordsUrl,omitempty"` +} + +// QueryJobsResponse represents a list of bulk query jobs. +type QueryJobsResponse struct { + Done bool `json:"done"` + Records []QueryJobInfo `json:"records"` + NextRecordsURL string `json:"nextRecordsUrl,omitempty"` +} + +// CreateJobRequest represents a request to create a bulk ingest job. +type CreateJobRequest struct { + Object string `json:"object"` + Operation Operation `json:"operation"` + ExternalIDFieldName string `json:"externalIdFieldName,omitempty"` + ContentType ContentType `json:"contentType,omitempty"` + LineEnding string `json:"lineEnding,omitempty"` + ColumnDelimiter string `json:"columnDelimiter,omitempty"` +} + +// CreateQueryJobRequest represents a request to create a bulk query job. +type CreateQueryJobRequest struct { + Operation Operation `json:"operation"` + Query string `json:"query"` + ContentType ContentType `json:"contentType,omitempty"` + LineEnding string `json:"lineEnding,omitempty"` + ColumnDelimiter string `json:"columnDelimiter,omitempty"` +} + +// UpdateJobRequest represents a request to update a bulk job state. +type UpdateJobRequest struct { + State State `json:"state"` +} + +// JobConfig contains configuration for creating a bulk job. +type JobConfig struct { + Object string + Operation Operation + ExternalID string + ContentType ContentType +} + +// QueryConfig contains configuration for creating a bulk query job. +type QueryConfig struct { + Query string + ContentType ContentType +} + +// PollConfig contains configuration for polling job status. +type PollConfig struct { + Interval time.Duration + Timeout time.Duration +} + +// DefaultPollConfig returns default polling configuration. +func DefaultPollConfig() PollConfig { + return PollConfig{ + Interval: 5 * time.Second, + Timeout: 10 * time.Minute, + } +} diff --git a/cmd/sfdc/main.go b/cmd/sfdc/main.go index 2ab2d4f..1117e6f 100644 --- a/cmd/sfdc/main.go +++ b/cmd/sfdc/main.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + "github.com/open-cli-collective/salesforce-cli/internal/cmd/bulkcmd" "github.com/open-cli-collective/salesforce-cli/internal/cmd/completion" "github.com/open-cli-collective/salesforce-cli/internal/cmd/configcmd" "github.com/open-cli-collective/salesforce-cli/internal/cmd/initcmd" @@ -45,5 +46,8 @@ func run() error { objectcmd.Register(rootCmd, opts) limitscmd.Register(rootCmd, opts) + // Bulk API commands + bulkcmd.Register(rootCmd, opts) + return rootCmd.Execute() } diff --git a/internal/cmd/bulkcmd/bulk.go b/internal/cmd/bulkcmd/bulk.go new file mode 100644 index 0000000..55bb6ea --- /dev/null +++ b/internal/cmd/bulkcmd/bulk.go @@ -0,0 +1,32 @@ +// Package bulkcmd provides commands for Salesforce Bulk API 2.0 operations. +package bulkcmd + +import ( + "github.com/spf13/cobra" + + "github.com/open-cli-collective/salesforce-cli/internal/cmd/root" +) + +// Register registers the bulk command with the root command. +func Register(parent *cobra.Command, opts *root.Options) { + cmd := &cobra.Command{ + Use: "bulk", + Short: "Bulk API 2.0 operations for large data import/export", + Long: `Bulk API 2.0 commands for handling large data operations. + +Use bulk operations when working with thousands or millions of records. +For smaller datasets, use the record command instead. + +Examples: + sfdc bulk import Account --file accounts.csv --operation insert + sfdc bulk export "SELECT Id, Name FROM Account" --output accounts.csv + sfdc bulk job list + sfdc bulk job status 750xx000000001`, + } + + cmd.AddCommand(newImportCommand(opts)) + cmd.AddCommand(newExportCommand(opts)) + cmd.AddCommand(newJobCommand(opts)) + + parent.AddCommand(cmd) +} diff --git a/internal/cmd/bulkcmd/bulk_test.go b/internal/cmd/bulkcmd/bulk_test.go new file mode 100644 index 0000000..a3e4846 --- /dev/null +++ b/internal/cmd/bulkcmd/bulk_test.go @@ -0,0 +1,568 @@ +package bulkcmd + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-cli-collective/salesforce-cli/api/bulk" + "github.com/open-cli-collective/salesforce-cli/internal/cmd/root" +) + +func TestImportCommand(t *testing.T) { + expectedJob := bulk.JobInfo{ + ID: "750xx000000001", + Operation: bulk.OperationInsert, + Object: "Account", + State: bulk.StateJobComplete, + } + + callCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/services/data/v62.0/jobs/ingest": + // Create job + expectedJob.State = bulk.StateOpen + _ = json.NewEncoder(w).Encode(expectedJob) + case r.Method == http.MethodPut: + // Upload data + w.WriteHeader(http.StatusCreated) + case r.Method == http.MethodPatch: + // Close job + expectedJob.State = bulk.StateUploadComplete + _ = json.NewEncoder(w).Encode(expectedJob) + } + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + // Create temp CSV file + tmpDir := t.TempDir() + csvFile := filepath.Join(tmpDir, "accounts.csv") + err = os.WriteFile(csvFile, []byte("Name,Industry\nAcme,Technology"), 0644) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: stderr, + } + opts.SetBulkClient(client) + + cmd := newImportCommand(opts) + cmd.SetArgs([]string{"Account", "--file", csvFile, "--operation", "insert"}) + cmd.SetOut(stdout) + cmd.SetErr(stderr) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "Creating bulk insert job") + assert.Contains(t, output, "750xx000000001") +} + +func TestImportCommand_UpsertRequiresExternalID(t *testing.T) { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: stderr, + } + + // Create temp CSV file + tmpDir := t.TempDir() + csvFile := filepath.Join(tmpDir, "contacts.csv") + err := os.WriteFile(csvFile, []byte("Email,Name\ntest@test.com,Test"), 0644) + require.NoError(t, err) + + cmd := newImportCommand(opts) + cmd.SetArgs([]string{"Contact", "--file", csvFile, "--operation", "upsert"}) + cmd.SetOut(stdout) + cmd.SetErr(stderr) + + err = cmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "--external-id is required") +} + +func TestExportCommand(t *testing.T) { + csvData := "Id,Name\n001xx000001,Acme\n001xx000002,Test" + expectedJob := bulk.QueryJobInfo{ + ID: "750xx000000001", + Operation: bulk.OperationQuery, + State: bulk.StateJobComplete, + NumberRecordsProcessed: 2, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && r.URL.Path == "/services/data/v62.0/jobs/query": + // Create query job + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + case r.Method == http.MethodGet && r.URL.Path == "/services/data/v62.0/jobs/query/750xx000000001": + // Get job status + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + case r.Method == http.MethodGet && r.URL.Path == "/services/data/v62.0/jobs/query/750xx000000001/results": + // Get results + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + } + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: stderr, + } + opts.SetBulkClient(client) + + cmd := newExportCommand(opts) + cmd.SetArgs([]string{"SELECT Id, Name FROM Account"}) + cmd.SetOut(stdout) + cmd.SetErr(stderr) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "Id,Name") + assert.Contains(t, output, "Acme") +} + +func TestJobListCommand(t *testing.T) { + expected := bulk.JobsResponse{ + Done: true, + Records: []bulk.JobInfo{ + {ID: "750xx000000001", Object: "Account", Operation: bulk.OperationInsert, State: bulk.StateJobComplete}, + {ID: "750xx000000002", Object: "Contact", Operation: bulk.OperationUpdate, State: bulk.StateInProgress}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobListCommand(opts) + cmd.SetArgs([]string{}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "750xx000000001") + assert.Contains(t, output, "Account") + assert.Contains(t, output, "2 job(s)") +} + +func TestJobStatusCommand(t *testing.T) { + expectedJob := bulk.JobInfo{ + ID: "750xx000000001", + Object: "Account", + Operation: bulk.OperationInsert, + State: bulk.StateJobComplete, + NumberRecordsProcessed: 100, + NumberRecordsFailed: 2, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobStatusCommand(opts) + cmd.SetArgs([]string{"750xx000000001"}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "JobComplete") + assert.Contains(t, output, "100") + assert.Contains(t, output, "2") +} + +func TestJobAbortCommand(t *testing.T) { + expectedJob := bulk.JobInfo{ + ID: "750xx000000001", + State: bulk.StateAborted, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPatch, r.Method) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobAbortCommand(opts) + cmd.SetArgs([]string{"750xx000000001"}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "aborted") +} + +func TestJobResultsCommand(t *testing.T) { + csvData := "sf__Id,sf__Created,Name\n001xx000001,true,Acme" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobResultsCommand(opts) + cmd.SetArgs([]string{"750xx000000001"}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "sf__Id") + assert.Contains(t, output, "Acme") +} + +func TestJobErrorsCommand(t *testing.T) { + csvData := "sf__Id,sf__Error,Name\n,REQUIRED_FIELD_MISSING,Acme" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobErrorsCommand(opts) + cmd.SetArgs([]string{"750xx000000001"}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "REQUIRED_FIELD_MISSING") +} + +func TestImportCommand_InvalidOperation(t *testing.T) { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: stderr, + } + + tmpDir := t.TempDir() + csvFile := filepath.Join(tmpDir, "test.csv") + err := os.WriteFile(csvFile, []byte("Name\nTest"), 0644) + require.NoError(t, err) + + cmd := newImportCommand(opts) + cmd.SetArgs([]string{"Account", "--file", csvFile, "--operation", "invalid"}) + cmd.SetOut(stdout) + cmd.SetErr(stderr) + + err = cmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid operation") +} + +func TestImportCommand_FileNotFound(t *testing.T) { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: stderr, + } + + cmd := newImportCommand(opts) + cmd.SetArgs([]string{"Account", "--file", "/nonexistent/file.csv", "--operation", "insert"}) + cmd.SetOut(stdout) + cmd.SetErr(stderr) + + err := cmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to read file") +} + +func TestJobListCommand_Empty(t *testing.T) { + expected := bulk.JobsResponse{ + Done: true, + Records: []bulk.JobInfo{}, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobListCommand(opts) + cmd.SetArgs([]string{}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, "No bulk jobs found") +} + +func TestJobListCommand_JSONOutput(t *testing.T) { + expected := bulk.JobsResponse{ + Done: true, + Records: []bulk.JobInfo{ + {ID: "750xx000000001", Object: "Account", Operation: bulk.OperationInsert, State: bulk.StateJobComplete}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expected) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "json", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobListCommand(opts) + cmd.SetArgs([]string{}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + var result bulk.JobsResponse + err = json.Unmarshal(stdout.Bytes(), &result) + require.NoError(t, err) + assert.Len(t, result.Records, 1) +} + +func TestExportCommand_ToFile(t *testing.T) { + csvData := "Id,Name\n001xx000001,Acme" + expectedJob := bulk.QueryJobInfo{ + ID: "750xx000000001", + Operation: bulk.OperationQuery, + State: bulk.StateJobComplete, + NumberRecordsProcessed: 1, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost: + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + case r.URL.Path == "/services/data/v62.0/jobs/query/750xx000000001": + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(expectedJob) + case r.URL.Path == "/services/data/v62.0/jobs/query/750xx000000001/results": + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + } + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + tmpDir := t.TempDir() + outputFile := filepath.Join(tmpDir, "output.csv") + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newExportCommand(opts) + cmd.SetArgs([]string{"SELECT Id, Name FROM Account", "--output", outputFile}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + // Check file was written + data, err := os.ReadFile(outputFile) + require.NoError(t, err) + assert.Contains(t, string(data), "Acme") +} + +func TestJobResultsCommand_ToFile(t *testing.T) { + csvData := "sf__Id,sf__Created,Name\n001xx000001,true,Acme" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/csv") + _, _ = w.Write([]byte(csvData)) + })) + defer server.Close() + + client, err := bulk.New(bulk.ClientConfig{ + InstanceURL: server.URL, + HTTPClient: server.Client(), + }) + require.NoError(t, err) + + tmpDir := t.TempDir() + outputFile := filepath.Join(tmpDir, "results.csv") + + stdout := &bytes.Buffer{} + opts := &root.Options{ + Output: "table", + Stdout: stdout, + Stderr: &bytes.Buffer{}, + } + opts.SetBulkClient(client) + + cmd := newJobResultsCommand(opts) + cmd.SetArgs([]string{"750xx000000001", "--output", outputFile}) + cmd.SetOut(stdout) + + err = cmd.Execute() + require.NoError(t, err) + + // Check file was written + data, err := os.ReadFile(outputFile) + require.NoError(t, err) + assert.Contains(t, string(data), "Acme") + assert.Contains(t, stdout.String(), "Results written to") +} diff --git a/internal/cmd/bulkcmd/export.go b/internal/cmd/bulkcmd/export.go new file mode 100644 index 0000000..c921f9f --- /dev/null +++ b/internal/cmd/bulkcmd/export.go @@ -0,0 +1,91 @@ +package bulkcmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/open-cli-collective/salesforce-cli/api/bulk" + "github.com/open-cli-collective/salesforce-cli/internal/cmd/root" +) + +func newExportCommand(opts *root.Options) *cobra.Command { + var ( + output string + ) + + cmd := &cobra.Command{ + Use: "export ", + Short: "Export data using a bulk query", + Long: `Export data from Salesforce using Bulk API 2.0 query. + +Use this for exporting large datasets. For smaller queries, use the query command. + +Examples: + sfdc bulk export "SELECT Id, Name, Industry FROM Account" + sfdc bulk export "SELECT Id, Name FROM Account" --output accounts.csv + sfdc bulk export "SELECT * FROM Contact" --output contacts.csv`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runExport(cmd.Context(), opts, args[0], output) + }, + } + + cmd.Flags().StringVarP(&output, "output", "o", "", "Output file path (prints to stdout if not specified)") + + return cmd +} + +func runExport(ctx context.Context, opts *root.Options, soql, output string) error { + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + v := opts.View() + + // Create query job + v.Info("Creating bulk query job...") + job, err := client.CreateQueryJob(ctx, bulk.QueryConfig{ + Query: soql, + }) + if err != nil { + return fmt.Errorf("failed to create query job: %w", err) + } + + v.Info("Job created: %s", job.ID) + + // Poll until complete + v.Info("Waiting for query to complete...") + job, err = client.PollQueryJob(ctx, job.ID, bulk.DefaultPollConfig()) + if err != nil { + return fmt.Errorf("failed waiting for query job: %w", err) + } + + if job.State != bulk.StateJobComplete { + return fmt.Errorf("query job failed with state: %s", job.State) + } + + v.Info("Query completed. Records: %d", job.NumberRecordsProcessed) + + // Get results + v.Info("Downloading results...") + data, err := client.GetQueryResults(ctx, job.ID) + if err != nil { + return fmt.Errorf("failed to get query results: %w", err) + } + + // Write to file or stdout + if output != "" { + if err := os.WriteFile(output, data, 0644); err != nil { + return fmt.Errorf("failed to write output file: %w", err) + } + v.Info("Results written to %s", output) + } else { + fmt.Fprintln(opts.Stdout, string(data)) + } + + return nil +} diff --git a/internal/cmd/bulkcmd/import.go b/internal/cmd/bulkcmd/import.go new file mode 100644 index 0000000..65d54be --- /dev/null +++ b/internal/cmd/bulkcmd/import.go @@ -0,0 +1,146 @@ +package bulkcmd + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + + "github.com/open-cli-collective/salesforce-cli/api/bulk" + "github.com/open-cli-collective/salesforce-cli/internal/cmd/root" +) + +func newImportCommand(opts *root.Options) *cobra.Command { + var ( + file string + operation string + externalID string + wait bool + ) + + cmd := &cobra.Command{ + Use: "import ", + Short: "Import data from a CSV file using Bulk API 2.0", + Long: `Import data from a CSV file into Salesforce using Bulk API 2.0. + +The CSV file must have a header row with field names matching the Salesforce object. + +Operations: + insert - Create new records + update - Update existing records (requires Id column) + upsert - Insert or update based on external ID field + delete - Delete records (requires Id column) + +Examples: + sfdc bulk import Account --file accounts.csv --operation insert + sfdc bulk import Contact --file contacts.csv --operation upsert --external-id Email + sfdc bulk import Account --file accounts.csv --operation update --wait + sfdc bulk import Account --file delete-ids.csv --operation delete`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runImport(cmd.Context(), opts, args[0], file, operation, externalID, wait) + }, + } + + cmd.Flags().StringVarP(&file, "file", "f", "", "Path to CSV file (required)") + cmd.Flags().StringVar(&operation, "operation", "insert", "Operation: insert, update, upsert, delete") + cmd.Flags().StringVar(&externalID, "external-id", "", "External ID field for upsert operation") + cmd.Flags().BoolVar(&wait, "wait", false, "Wait for job to complete") + + _ = cmd.MarkFlagRequired("file") + + return cmd +} + +func runImport(ctx context.Context, opts *root.Options, object, file, operation, externalID string, wait bool) error { + // Validate operation + op := bulk.Operation(strings.ToLower(operation)) + switch op { + case bulk.OperationInsert, bulk.OperationUpdate, bulk.OperationUpsert, bulk.OperationDelete: + // Valid + default: + return fmt.Errorf("invalid operation: %s (must be insert, update, upsert, or delete)", operation) + } + + // Upsert requires external ID + if op == bulk.OperationUpsert && externalID == "" { + return fmt.Errorf("--external-id is required for upsert operation") + } + + // Read CSV file + data, err := os.ReadFile(file) + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + // Create bulk client + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + v := opts.View() + + // Create job + v.Info("Creating bulk %s job for %s...", operation, object) + job, err := client.CreateJob(ctx, bulk.JobConfig{ + Object: object, + Operation: op, + ExternalID: externalID, + }) + if err != nil { + return fmt.Errorf("failed to create job: %w", err) + } + + v.Info("Job created: %s", job.ID) + + // Upload data + v.Info("Uploading data...") + if err := client.UploadJobData(ctx, job.ID, data); err != nil { + return fmt.Errorf("failed to upload data: %w", err) + } + + // Close job to start processing + v.Info("Starting job processing...") + job, err = client.CloseJob(ctx, job.ID) + if err != nil { + return fmt.Errorf("failed to close job: %w", err) + } + + if !wait { + v.Info("Job %s is processing. Use 'sfdc bulk job status %s' to check progress.", job.ID, job.ID) + return nil + } + + // Poll until complete + v.Info("Waiting for job to complete...") + job, err = client.PollJob(ctx, job.ID, bulk.DefaultPollConfig()) + if err != nil { + return fmt.Errorf("failed waiting for job: %w", err) + } + + // Show results + return renderJobResult(opts, job) +} + +func renderJobResult(opts *root.Options, job *bulk.JobInfo) error { + v := opts.View() + + if opts.Output == "json" { + return v.JSON(job) + } + + v.Info("Job completed:") + v.Info(" ID: %s", job.ID) + v.Info(" State: %s", job.State) + v.Info(" Records Processed: %d", job.NumberRecordsProcessed) + v.Info(" Records Failed: %d", job.NumberRecordsFailed) + + if job.NumberRecordsFailed > 0 { + v.Info("\nUse 'sfdc bulk job errors %s' to see failed records.", job.ID) + } + + return nil +} diff --git a/internal/cmd/bulkcmd/job.go b/internal/cmd/bulkcmd/job.go new file mode 100644 index 0000000..03c834c --- /dev/null +++ b/internal/cmd/bulkcmd/job.go @@ -0,0 +1,244 @@ +package bulkcmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/open-cli-collective/salesforce-cli/internal/cmd/root" +) + +func newJobCommand(opts *root.Options) *cobra.Command { + cmd := &cobra.Command{ + Use: "job", + Short: "Manage bulk jobs", + Long: `Manage Salesforce Bulk API 2.0 jobs. + +Examples: + sfdc bulk job list + sfdc bulk job status 750xx000000001 + sfdc bulk job results 750xx000000001 + sfdc bulk job errors 750xx000000001 + sfdc bulk job abort 750xx000000001`, + } + + cmd.AddCommand(newJobListCommand(opts)) + cmd.AddCommand(newJobStatusCommand(opts)) + cmd.AddCommand(newJobResultsCommand(opts)) + cmd.AddCommand(newJobErrorsCommand(opts)) + cmd.AddCommand(newJobAbortCommand(opts)) + + return cmd +} + +func newJobListCommand(opts *root.Options) *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "List recent bulk jobs", + Long: `List recent bulk ingest jobs. + +Examples: + sfdc bulk job list + sfdc bulk job list -o json`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return runJobList(cmd.Context(), opts) + }, + } +} + +func runJobList(ctx context.Context, opts *root.Options) error { + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + resp, err := client.ListJobs(ctx) + if err != nil { + return fmt.Errorf("failed to list jobs: %w", err) + } + + v := opts.View() + + if len(resp.Records) == 0 { + v.Info("No bulk jobs found") + return nil + } + + if opts.Output == "json" { + return v.JSON(resp) + } + + headers := []string{"ID", "Object", "Operation", "State", "Processed", "Failed"} + rows := make([][]string, 0, len(resp.Records)) + for _, job := range resp.Records { + rows = append(rows, []string{ + job.ID, + job.Object, + string(job.Operation), + string(job.State), + fmt.Sprintf("%d", job.NumberRecordsProcessed), + fmt.Sprintf("%d", job.NumberRecordsFailed), + }) + } + + if err := v.Table(headers, rows); err != nil { + return err + } + + v.Info("\n%d job(s)", len(resp.Records)) + return nil +} + +func newJobStatusCommand(opts *root.Options) *cobra.Command { + return &cobra.Command{ + Use: "status ", + Short: "Get bulk job status", + Long: `Get the status of a bulk ingest job. + +Examples: + sfdc bulk job status 750xx000000001 + sfdc bulk job status 750xx000000001 -o json`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runJobStatus(cmd.Context(), opts, args[0]) + }, + } +} + +func runJobStatus(ctx context.Context, opts *root.Options, jobID string) error { + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + job, err := client.GetJob(ctx, jobID) + if err != nil { + return fmt.Errorf("failed to get job: %w", err) + } + + return renderJobResult(opts, job) +} + +func newJobResultsCommand(opts *root.Options) *cobra.Command { + var output string + + cmd := &cobra.Command{ + Use: "results ", + Short: "Get successful results from a bulk job", + Long: `Get the successful results from a completed bulk ingest job. + +Examples: + sfdc bulk job results 750xx000000001 + sfdc bulk job results 750xx000000001 --output results.csv`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runJobResults(cmd.Context(), opts, args[0], output) + }, + } + + cmd.Flags().StringVarP(&output, "output", "o", "", "Output file path") + + return cmd +} + +func runJobResults(ctx context.Context, opts *root.Options, jobID, output string) error { + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + data, err := client.GetSuccessfulResults(ctx, jobID) + if err != nil { + return fmt.Errorf("failed to get results: %w", err) + } + + if output != "" { + if err := os.WriteFile(output, data, 0644); err != nil { + return fmt.Errorf("failed to write output file: %w", err) + } + opts.View().Info("Results written to %s", output) + return nil + } + + fmt.Fprintln(opts.Stdout, string(data)) + return nil +} + +func newJobErrorsCommand(opts *root.Options) *cobra.Command { + var output string + + cmd := &cobra.Command{ + Use: "errors ", + Short: "Get failed records from a bulk job", + Long: `Get the failed records from a completed bulk ingest job. + +Examples: + sfdc bulk job errors 750xx000000001 + sfdc bulk job errors 750xx000000001 --output errors.csv`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runJobErrors(cmd.Context(), opts, args[0], output) + }, + } + + cmd.Flags().StringVarP(&output, "output", "o", "", "Output file path") + + return cmd +} + +func runJobErrors(ctx context.Context, opts *root.Options, jobID, output string) error { + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + data, err := client.GetFailedResults(ctx, jobID) + if err != nil { + return fmt.Errorf("failed to get failed results: %w", err) + } + + if output != "" { + if err := os.WriteFile(output, data, 0644); err != nil { + return fmt.Errorf("failed to write output file: %w", err) + } + opts.View().Info("Errors written to %s", output) + return nil + } + + fmt.Fprintln(opts.Stdout, string(data)) + return nil +} + +func newJobAbortCommand(opts *root.Options) *cobra.Command { + return &cobra.Command{ + Use: "abort ", + Short: "Abort a bulk job", + Long: `Abort a running bulk ingest job. + +Examples: + sfdc bulk job abort 750xx000000001`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runJobAbort(cmd.Context(), opts, args[0]) + }, + } +} + +func runJobAbort(ctx context.Context, opts *root.Options, jobID string) error { + client, err := opts.BulkClient() + if err != nil { + return fmt.Errorf("failed to create bulk client: %w", err) + } + + job, err := client.AbortJob(ctx, jobID) + if err != nil { + return fmt.Errorf("failed to abort job: %w", err) + } + + v := opts.View() + v.Info("Job %s aborted", job.ID) + return nil +} diff --git a/internal/cmd/root/root.go b/internal/cmd/root/root.go index a2d9dd9..620d9e7 100644 --- a/internal/cmd/root/root.go +++ b/internal/cmd/root/root.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/cobra" "github.com/open-cli-collective/salesforce-cli/api" + "github.com/open-cli-collective/salesforce-cli/api/bulk" "github.com/open-cli-collective/salesforce-cli/internal/auth" "github.com/open-cli-collective/salesforce-cli/internal/config" "github.com/open-cli-collective/salesforce-cli/internal/version" @@ -27,6 +28,8 @@ type Options struct { // testClient is used for testing; if set, APIClient() returns this instead testClient *api.Client + // testBulkClient is used for testing; if set, BulkClient() returns this instead + testBulkClient *bulk.Client } // View returns a configured View instance @@ -65,6 +68,34 @@ func (o *Options) SetAPIClient(client *api.Client) { o.testClient = client } +// BulkClient creates a new Bulk API client from config +func (o *Options) BulkClient() (*bulk.Client, error) { + if o.testBulkClient != nil { + return o.testBulkClient, nil + } + + cfg, err := config.Load() + if err != nil { + return nil, err + } + + httpClient, err := auth.GetHTTPClient(context.Background()) + if err != nil { + return nil, err + } + + return bulk.New(bulk.ClientConfig{ + InstanceURL: cfg.InstanceURL, + HTTPClient: httpClient, + APIVersion: o.APIVersion, + }) +} + +// SetBulkClient sets a test bulk client (for testing only) +func (o *Options) SetBulkClient(client *bulk.Client) { + o.testBulkClient = client +} + // NewCmd creates the root command and returns the options struct func NewCmd() (*cobra.Command, *Options) { opts := &Options{