-
Notifications
You must be signed in to change notification settings - Fork 54
Шубин Игорь #60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Шубин Игорь #60
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,48 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Fclp.Internals.Extensions; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class ParallelClusterClient : ClusterClientBase | ||
| { | ||
| public class ParallelClusterClient : ClusterClientBase | ||
| public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
|
|
||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| var webRequests = ReplicaAddresses | ||
| .Select(replicaAddress => CreateRequest(replicaAddress + "?query=" + query)) | ||
| .ToList(); | ||
|
|
||
| webRequests.ForEach(r => Log.InfoFormat($"Processing {r.RequestUri}")); | ||
|
|
||
| var timeoutTask = Task.Delay(timeout); | ||
|
|
||
| var tasks = webRequests | ||
| .Select(ProcessRequestAsync) | ||
| .Append(timeoutTask) | ||
| .ToList(); | ||
|
|
||
| while (tasks.Count > 1) | ||
| { | ||
| var completedTask = await Task.WhenAny(tasks); | ||
|
|
||
| if (completedTask == timeoutTask) | ||
| throw new TimeoutException(); | ||
|
|
||
| tasks.Remove(completedTask); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| if (completedTask.Status == TaskStatus.RanToCompletion) | ||
| return await (Task<string>)completedTask; | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
| throw new TimeoutException(); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,52 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| { | ||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| var badRequestCounter = 0; | ||
| var sw = new Stopwatch(); | ||
| var orderedReplicaAddresses = ReplicaAddresses | ||
| .OrderBy(r => ReplicaStats.Stats[r]) | ||
| .ToList(); | ||
|
|
||
| foreach (var address in orderedReplicaAddresses) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var webRequest = CreateRequest(address + "?query=" + query); | ||
|
|
||
| Log.InfoFormat($"Processing {webRequest.RequestUri}"); | ||
|
|
||
| var task = ProcessRequestAsync(webRequest); | ||
| var requestTimeout = timeout / (ReplicaAddresses.Length - badRequestCounter); | ||
| var timeoutTask = Task.Delay(requestTimeout); | ||
|
|
||
| sw.Restart(); | ||
| var completedTask = await Task.WhenAny(task, timeoutTask); | ||
| sw.Stop(); | ||
|
|
||
| timeout -= sw.Elapsed; | ||
|
|
||
| if (completedTask != timeoutTask && completedTask.Status == TaskStatus.RanToCompletion) | ||
| { | ||
| ReplicaStats.Update(address, sw.Elapsed.Milliseconds); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Чтобы получить общее (реальное) количество прошедшего времени в миллисекундах, нужно вызывать либо |
||
| return await (Task<string>)completedTask; | ||
| } | ||
|
|
||
| badRequestCounter++; | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
| throw new TimeoutException(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,63 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class SmartClusterClient : ClusterClientBase | ||
| { | ||
| public class SmartClusterClient : ClusterClientBase | ||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| var badRequestCounter = 0; | ||
| var sw = new Stopwatch(); | ||
| var tasks = new List<Task<string>>(); | ||
| var orderedReplicaAddresses = ReplicaAddresses | ||
| .OrderBy(r => ReplicaStats.Stats[r]) | ||
| .ToList(); | ||
|
|
||
| foreach (var address in orderedReplicaAddresses) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var webRequest = CreateRequest(address + "?query=" + query); | ||
|
|
||
| Log.InfoFormat($"Processing {webRequest.RequestUri}"); | ||
|
|
||
| var task = ProcessRequestAsync(webRequest); | ||
| var requestTimeout = timeout / (ReplicaAddresses.Length - badRequestCounter); | ||
| var timeoutTask = Task.Delay(requestTimeout); | ||
| tasks.Add(task); | ||
|
|
||
| sw.Restart(); | ||
| var anyTask = Task.WhenAny(tasks); | ||
| var completedTask = await Task.WhenAny(anyTask, timeoutTask); | ||
| sw.Stop(); | ||
|
|
||
| timeout -= sw.Elapsed; | ||
|
|
||
| if (completedTask != timeoutTask) | ||
| { | ||
| var result = await anyTask; | ||
| if (result.IsFaulted) | ||
| tasks.Remove(result); | ||
|
|
||
| if (result.Status == TaskStatus.RanToCompletion) | ||
| { | ||
| ReplicaStats.Update(address, sw.Elapsed.Milliseconds); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Чтобы получить общее (реальное) количество прошедшего времени в миллисекундах, нужно вызывать либо
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Так же некорректно обновляется статистика. У нас в result может оказаться как таска запроса к текущему адресу ( В итоге может быть ситуация, когда мы записываем статистику о времени выполнения для адреса |
||
| return await result; | ||
| } | ||
| } | ||
|
|
||
| badRequestCounter++; | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| throw new TimeoutException(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,29 +1,29 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
| <PropertyGroup> | ||
| <TargetFramework>net8.0</TargetFramework> | ||
| <OutputType>Exe</OutputType> | ||
| <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir> | ||
| <RestorePackages>true</RestorePackages> | ||
| <GenerateAssemblyInfo>false</GenerateAssemblyInfo> | ||
| </PropertyGroup> | ||
| <ItemGroup> | ||
| <Content Include="ServerAddresses.txt"> | ||
| <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> | ||
| </Content> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <PackageReference Include="log4net"> | ||
| <Version>2.0.8</Version> | ||
| </PackageReference> | ||
| <PackageReference Include="Pingo.FluentCommandLineParser"> | ||
| <Version>1.0.25</Version> | ||
| </PackageReference> | ||
| <PackageReference Include="System.Data.DataSetExtensions" Version="4.5.0" /> | ||
| <PackageReference Include="System.Text.Encoding.CodePages" Version="5.0.0" /> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <None Update="log4net.config"> | ||
| <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> | ||
| </None> | ||
| </ItemGroup> | ||
| <PropertyGroup> | ||
| <TargetFramework>net8.0</TargetFramework> | ||
| <OutputType>Exe</OutputType> | ||
| <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir> | ||
| <RestorePackages>true</RestorePackages> | ||
| <GenerateAssemblyInfo>false</GenerateAssemblyInfo> | ||
| </PropertyGroup> | ||
| <ItemGroup> | ||
| <Content Include="ServerAddresses.txt"> | ||
| <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> | ||
| </Content> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <PackageReference Include="log4net"> | ||
| <Version>2.0.8</Version> | ||
| </PackageReference> | ||
| <PackageReference Include="Pingo.FluentCommandLineParser"> | ||
| <Version>1.0.25</Version> | ||
| </PackageReference> | ||
| <PackageReference Include="System.Data.DataSetExtensions" Version="4.5.0"/> | ||
| <PackageReference Include="System.Text.Encoding.CodePages" Version="5.0.0"/> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <None Update="log4net.config"> | ||
| <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> | ||
| </None> | ||
| </ItemGroup> | ||
| </Project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplicaStatsПолучился локальным и хранит историю только для текущего экземпляра ClustrClient, если бы в будущем мы захотели в разных ситуациях делать запросы через разные стратегии, то у них бы была различная история производительности реплик и не оптимальная маршрутизация)Это не ошибка, просто хотел подсветить один неучтенный сценарий)