Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1f75272
Merge pull request #14 from Root16/CompatibilityUpdates
csobrien Apr 1, 2024
2749f6c
Update SQLPagedQuery to handle users not having the required Order by
Apr 10, 2024
30b1284
Updated Code to match .Net8 syntactic sugar
Apr 10, 2024
5330f24
Merge pull request #15 from Root16/SQLUpdates
csobrien Apr 10, 2024
d639590
Updating compatibility to work with .Net6, 7, and 8.
Apr 11, 2024
13a8738
Updating namespaces
Apr 12, 2024
485a8a8
Making SQLConnection Public To Match DataverseDataSource
Apr 15, 2024
aa94c33
Merge pull request #16 from Root16/UpdatingCompatibility
csobrien Apr 16, 2024
ba15ca5
Use Task Parallel Library to process Dataverse requests
May 11, 2024
bbadd44
Add multiple retry policy to handle network errors
May 23, 2024
29f6921
Prefer EMR for single requests for consistent error handling
May 23, 2024
73db4ca
Only log exceptions if the message changes
May 24, 2024
faac4c2
Merge branch 'dev/parallel-requests' of https://github.com/Root16/spr…
May 24, 2024
c0192f4
Moved to primary constructor
May 24, 2024
933249a
Merge pull request #17 from Root16/dev/parallel-requests
csobrien Jun 6, 2024
0145341
Enable Creating A Generic Step That Can Be Called Multiple Times with…
Jun 25, 2024
9d662a3
Merge pull request #18 from Root16/EnableGenericizedSteps
csobrien Jun 25, 2024
8de6fcb
Adding pulling SQL Queries from a file
Jul 3, 2024
01e09a2
Merge pull request #19 from Root16/SQLUpdates
csobrien Jul 3, 2024
adc6f56
make operationcounts dictionary public for consumers to see progress …
Jul 25, 2024
0850839
Merge pull request #20 from Root16/dev/pre_post_step_overrides
csobrien Jul 25, 2024
c75cb47
Updating automatic Order By To Order By 1
Sep 25, 2024
00dff10
Merge pull request #21 from Root16/OrderByUpdate
csobrien Sep 25, 2024
614f951
add sql datasource
Jan 21, 2025
53ef5c0
add datarow extensions
Jan 22, 2025
50831bf
remove comment
Jan 23, 2025
5addf5c
update mapmonies for decimal and int types
Jan 23, 2025
1839f79
Entity extension updates and test cases
Jan 24, 2025
4e8df7a
add OrganizationRequest Information, DataverseFetchXmlReducingQuery, …
Jan 24, 2025
030d405
add dictionary & string extensions
Jan 27, 2025
223cc4f
add methods to directly query db
Jan 29, 2025
b855be8
update to task.delay
Jan 31, 2025
0db6319
default token
Jan 31, 2025
f8c84a1
update reducingquery
Jan 31, 2025
bf57b54
reuse more code from SQLPagedQuery
Jan 31, 2025
df20100
Merge pull request #22 from Root16/dev/sql-extensions
csobrien Jan 31, 2025
c80eb83
Merge pull request #23 from Root16/dev/dataverse-extensions
csobrien Jan 31, 2025
f665c62
Merge pull request #24 from Root16/dev/base-extensions
csobrien Jan 31, 2025
6818818
Merge pull request #25 from Root16/dev/sql_commands
csobrien Jan 31, 2025
0c309ba
add delayable batching
Feb 4, 2025
7476629
rename AddSproutWithBatchDelay
Feb 4, 2025
5ccf33c
Merge pull request #26 from Root16/dev/batch-delay
csobrien Feb 4, 2025
efd9d70
remove generic, add optionset mapping
Feb 4, 2025
81f9c8a
Added overload to allow a hook to be passed in to run step which allo…
Feb 11, 2025
431c07f
Changed structure of configurator and run step arguments
Feb 11, 2025
d6b74af
Merge pull request #29 from jacobherner/AddingHookForPassingStepState
csobrien Feb 11, 2025
2b4c56e
Entity extension updates and test cases
Jan 24, 2025
fe6cf76
Merge
TravisGilbert Feb 12, 2025
35164ca
move datarow extensions, remove dependency on dataverse from sql
Feb 13, 2025
983f2bf
Merge pull request #28 from Root16/tjg/EntityExtensionUpdates
csobrien Feb 14, 2025
a4d1f21
Merge pull request #27 from Root16/dev/datarow-ext-updates
csobrien Feb 14, 2025
f6d9d4e
Update deploy-to-nuget.yml
csobrien Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy-to-nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Set variables
id: setvars
run: |
if [[ "${{github.base_ref}}" == "master" || "${{github.ref}}" == "refs/heads/master" ]]; then
if [[ "${{github.base_ref}}" == "main" || "${{github.ref}}" == "refs/heads/main" ]]; then
echo "VERSION_SUFFIX=" >> "$GITHUB_ENV"
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using Root16.Sprout.Extensions;
using System.Collections.Concurrent;
using System.Net;
using System.ServiceModel;

namespace Root16.Sprout.DataSources.Dataverse;

Expand All @@ -11,10 +13,18 @@ public class DataverseDataSource : IDataSource<Entity>
private readonly ILogger<DataverseDataSource> logger;
public string? ImpersonateUsingAttribute { get; set; }

const int MaxRetries = 10;

public DataverseDataSource(ServiceClient crmServiceClient, ILogger<DataverseDataSource> logger)
{
CrmServiceClient = crmServiceClient;
ServiceClient.MaxConnectionTimeout = TimeSpan.FromMinutes(11);
CrmServiceClient.EnableAffinityCookie = false;
// TODO: Is there a better place to do this?
ThreadPool.SetMinThreads(100, 100);
ServicePointManager.DefaultConnectionLimit = 65000;
ServicePointManager.Expect100Continue = false;
ServicePointManager.UseNagleAlgorithm = false;
this.logger = logger;
}

Expand Down Expand Up @@ -102,7 +112,7 @@ public async Task<IReadOnlyList<DataOperationResult<Entity>>> ExecuteMultipleAsy
{
if (!dryRun)
{
var response = await CrmServiceClient.ExecuteAsync(requestCollection[0]);
var response = await TryExecuteRequestAsync(requestCollection[0]);
}
results.Add(ResultFromRequestType(requestCollection[0], true));
}
Expand All @@ -123,42 +133,30 @@ public async Task<IReadOnlyList<DataOperationResult<Entity>>> ExecuteMultipleAsy
}
else
{
List<OrganizationRequestCollection> ListofRequestCollections = [];

foreach (var request in requestCollection.ChunkBy(1000))
{
var orgRequestCollection = new OrganizationRequestCollection();
orgRequestCollection.AddRange(request);
ListofRequestCollections.Add(orgRequestCollection);
}
ConcurrentBag<DataOperationResult<Entity>> parallelResults = [];

List<Task<OrganizationResponse>> requestTasks = [];
ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = CrmServiceClient.RecommendedDegreesOfParallelism };

foreach (var requests in ListofRequestCollections)
await Parallel.ForEachAsync(requestCollection.Chunk(10), parallelOptions, async (batch, token) =>
{
requestTasks.Add(CrmServiceClient.ExecuteAsync(new ExecuteMultipleRequest
ExecuteMultipleRequest request = new()
{
Settings = new ExecuteMultipleSettings
{
ContinueOnError = true,
},
Requests = requests
}));
}
Requests = []
};
request.Requests.AddRange(batch);

OrganizationResponse[] organizationResponses = await Task.WhenAll(requestTasks);
List<ExecuteMultipleResponse> executeMultipleResponses = organizationResponses.Select(x => (ExecuteMultipleResponse)x).ToList();
ExecuteMultipleResponse batchResponse = await TryExecuteRequestAsync<ExecuteMultipleResponse>(request, token);

for (var i = 0; i < executeMultipleResponses.Count; i++)
{
var responses = executeMultipleResponses[i].Responses;
var matchingRequests = ListofRequestCollections[i];
for (var k = 0; k < matchingRequests.Count; k++)
for (var k = 0; k < batch.Length; k++)
{
var response = responses.FirstOrDefault(r => r.RequestIndex == k);
var response = batchResponse.Responses.FirstOrDefault(r => r.RequestIndex == k);
if (response?.Fault is not null)
{
results.Add(ResultFromRequestType(matchingRequests[k], false));
parallelResults.Add(ResultFromRequestType(batch[k], false));
if (response?.Fault?.InnerFault?.InnerFault?.Message is not null
&& response.Fault.InnerFault.InnerFault is OrganizationServiceFault innermostFault)
{
Expand All @@ -171,10 +169,12 @@ public async Task<IReadOnlyList<DataOperationResult<Entity>>> ExecuteMultipleAsy
}
else
{
results.Add(ResultFromRequestType(matchingRequests[k], true));
parallelResults.Add(ResultFromRequestType(batch[k], true));
}
}
}
});

results.AddRange(parallelResults);
}
}
return results;
Expand All @@ -199,6 +199,11 @@ public IPagedQuery<Entity> CreateFetchXmlQuery(string fetchXml)
return new DataverseFetchXmlPagedQuery(this, fetchXml);
}

public IPagedQuery<Entity> CreateFetchXmlReducingQuery(string fetchXml, string? countByAttribute = null)
{
return new DataverseFetchXmlReducingQuery(this, fetchXml, countByAttribute);
}

protected static OrganizationRequest? CreateOrganizationRequest(DataOperation<Entity> change, IEnumerable<string> dataOperationFlags)
{
OrganizationRequest request;
Expand Down Expand Up @@ -240,4 +245,32 @@ public IPagedQuery<Entity> CreateFetchXmlQuery(string fetchXml)

return request;
}

private Task<OrganizationResponse> TryExecuteRequestAsync(OrganizationRequest request, CancellationToken token = default)
=> TryExecuteRequestAsync<OrganizationResponse>(request, token);

private async Task<T> TryExecuteRequestAsync<T>(OrganizationRequest request, CancellationToken token = default)
where T : OrganizationResponse
{
var retryCount = 0;
Exception? lastException = null;
do
{
try
{
return (T)await CrmServiceClient.ExecuteAsync(request, token);
}
catch (FaultException<OrganizationServiceFault>) { throw; }
catch (Exception ex)
{
if (lastException is null || !ex.Message.Equals(lastException.Message, StringComparison.OrdinalIgnoreCase))
{
logger.LogError(ex, ex.Message);
}
lastException = ex;
}
} while (retryCount++ < MaxRetries);

throw lastException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.PowerPlatform.Dataverse.Client;
using Root16.Sprout.DataSources.Dataverse;

namespace Root16.Sprout.DependencyInjection;
namespace Root16.Sprout.DataSources.Dataverse;

public class DataverseDataSourceFactory : IDataverseDataSourceFactory
public class DataverseDataSourceFactory(IServiceProvider serviceProvider) : IDataverseDataSourceFactory
{
private readonly IServiceProvider serviceProvider;

public DataverseDataSourceFactory(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}
private readonly IServiceProvider serviceProvider = serviceProvider;

public DataverseDataSource CreateDataSource(string connectionStringName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@

namespace Root16.Sprout.DataSources.Dataverse;

public class DataverseFetchXmlPagedQuery : IPagedQuery<Entity>
public class DataverseFetchXmlPagedQuery(DataverseDataSource dataSource, string fetchXml) : IPagedQuery<Entity>
{
private readonly DataverseDataSource dataSource;
private readonly string fetchXml;

public DataverseFetchXmlPagedQuery(DataverseDataSource dataSource, string fetchXml)
{
this.dataSource = dataSource;
this.fetchXml = fetchXml;
}
private readonly DataverseDataSource dataSource = dataSource;
private readonly string fetchXml = fetchXml;

private static string AddPaging(string fetchXml, int page, int pageSize, string? pagingCookie)
{
Expand All @@ -36,7 +30,7 @@ private static void AddPaging(XDocument fetchDoc, int page, int pageSize, string

public async Task<PagedQueryResult<Entity>> GetNextPageAsync(int pageNumber, int pageSize, object? bookmark)
{
var results = await dataSource.CrmServiceClient.RetrieveMultipleAsync(new FetchExpression(AddPaging(fetchXml, ++pageNumber, pageSize, (string?)bookmark)));
var results = await dataSource.CrmServiceClient.RetrieveMultipleWithRetryAsync(new FetchExpression(AddPaging(fetchXml, ++pageNumber, pageSize, (string?)bookmark)));

return new PagedQueryResult<Entity>
(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
using Microsoft.PowerPlatform.Dataverse.Client.Extensions;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Metadata;
using Microsoft.Xrm.Sdk.Query;
using System.Xml.Linq;

namespace Root16.Sprout.DataSources.Dataverse;

public class DataverseFetchXmlReducingQuery(DataverseDataSource dataSource, string fetchXml, string? countByAttribute = null) : IPagedQuery<Entity>
{
private readonly DataverseDataSource dataSource = dataSource;
private readonly string fetchXml = fetchXml;
private readonly string? countByAttribute = countByAttribute;
private static string AddPaging(string fetchXml, int? page, int? pageSize, string? pagingCookie)
{
var fetchDoc = XDocument.Parse(fetchXml);
AddPaging(fetchDoc, page, pageSize, pagingCookie);
return fetchDoc.ToString(SaveOptions.DisableFormatting);
}

private static void AddPaging(XDocument fetchDoc, int? page, int? pageSize, string? pagingCookie)
{
if (page is not null)
fetchDoc.Root?.SetAttributeValue("page", page);
if (pageSize is not null)
fetchDoc.Root?.SetAttributeValue("count", pageSize);
if (pagingCookie is not null)
{
fetchDoc.Root?.SetAttributeValue("paging-cookie", pagingCookie);
}
}

// results should be 'different' everytime cause reducing
public async Task<PagedQueryResult<Entity>> GetNextPageAsync(int pageNumber, int pageSize, object? bookmark)
{
var results = await dataSource.CrmServiceClient.RetrieveMultipleWithRetryAsync(new FetchExpression(fetchXml));

return new PagedQueryResult<Entity>
(
[.. results.Entities.Take(pageSize)],
results.MoreRecords || results.Entities.Count > pageSize,
results.PagingCookie
);
}

public Task<int?> GetTotalRecordCountAsync()
{
var fetchDoc = XDocument.Parse(fetchXml);
if (fetchDoc.Root is null)
{
return Task.FromResult<int?>(null);
}

if ((bool?)fetchDoc.Root?.Attribute("aggregate") == true)
{
return Task.FromResult<int?>(null);
}

var entityElem = fetchDoc.Root?.Element("entity");
if (entityElem is null)
{
return Task.FromResult<int?>(null);
}

var attributeElements = fetchDoc.Root?.Descendants().Where(e => e.Name == "attribute").ToArray();
if (attributeElements is null)
{
return Task.FromResult<int?>(null);
}

foreach (var attr in attributeElements)
{
attr.Remove();
}

string? primaryAttribute = this.countByAttribute;
if (string.IsNullOrWhiteSpace(primaryAttribute))
{
var entityMetadata = dataSource.CrmServiceClient.GetEntityMetadata(entityElem.Attribute("name")?.Value, EntityFilters.Entity);
primaryAttribute = entityMetadata.PrimaryIdAttribute;
}
entityElem.Add(new XElement("attribute", new XAttribute("name", primaryAttribute)));

int page = 1;

bool moreRecords;
string? pagingCookie = null;
int pageSize = 5000;
int totalCount = 0;

do
{
AddPaging(fetchDoc, page, pageSize, pagingCookie);
var results = dataSource.CrmServiceClient.RetrieveMultiple(new FetchExpression { Query = fetchDoc.ToString(SaveOptions.DisableFormatting) });
totalCount += results.Entities.Count;
moreRecords = results.MoreRecords;
pagingCookie = results.PagingCookie;
page++;
}
while (moreRecords);

return Task.FromResult<int?>(totalCount);
}
}
Loading