Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,41 @@
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public abstract class ClusterClientBase
{
public abstract class ClusterClientBase
protected readonly ReplicaStats ReplicaStats;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicaStats Получился локальным и хранит историю только для текущего экземпляра ClustrClient, если бы в будущем мы захотели в разных ситуациях делать запросы через разные стратегии, то у них бы была различная история производительности реплик и не оптимальная маршрутизация)

Это не ошибка, просто хотел подсветить один неучтенный сценарий)


protected ClusterClientBase(string[] replicaAddresses)
{
protected string[] ReplicaAddresses { get; set; }
ReplicaAddresses = replicaAddresses;
ReplicaStats = new ReplicaStats(replicaAddresses);
}

protected ClusterClientBase(string[] replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
}
protected string[] ReplicaAddresses { get; set; }
protected abstract ILog Log { get; }

public abstract Task<string> ProcessRequestAsync(string query, TimeSpan timeout);
protected abstract ILog Log { get; }
public abstract Task<string> ProcessRequestAsync(string query, TimeSpan timeout);

protected static HttpWebRequest CreateRequest(string uriStr)
{
var request = WebRequest.CreateHttp(Uri.EscapeUriString(uriStr));
request.Proxy = null;
request.KeepAlive = true;
request.ServicePoint.UseNagleAlgorithm = false;
request.ServicePoint.ConnectionLimit = 100500;
return request;
}
protected static HttpWebRequest CreateRequest(string uriStr)
{
var request = WebRequest.CreateHttp(Uri.EscapeUriString(uriStr));
request.Proxy = null;
request.KeepAlive = true;
request.ServicePoint.UseNagleAlgorithm = false;
request.ServicePoint.ConnectionLimit = 100500;
return request;
}

protected async Task<string> ProcessRequestAsync(WebRequest request)
protected async Task<string> ProcessRequestAsync(WebRequest request)
{
var timer = Stopwatch.StartNew();
using (var response = await request.GetResponseAsync())
{
var timer = Stopwatch.StartNew();
using (var response = await request.GetResponseAsync())
{
var result = await new StreamReader(response.GetResponseStream(), Encoding.UTF8).ReadToEndAsync();
Log.InfoFormat("Response from {0} received in {1} ms", request.RequestUri, timer.ElapsedMilliseconds);
return result;
}
var result = await new StreamReader(response.GetResponseStream(), Encoding.UTF8).ReadToEndAsync();
Log.InfoFormat("Response from {0} received in {1} ms", request.RequestUri, timer.ElapsedMilliseconds);
return result;
}
}
}
49 changes: 37 additions & 12 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Fclp.Internals.Extensions;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class ParallelClusterClient : ClusterClientBase
{
public class ParallelClusterClient : ClusterClientBase
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var webRequests = ReplicaAddresses
.Select(replicaAddress => CreateRequest(replicaAddress + "?query=" + query))
.ToList();

webRequests.ForEach(r => Log.InfoFormat($"Processing {r.RequestUri}"));

var timeoutTask = Task.Delay(timeout);

var tasks = webRequests
.Select(ProcessRequestAsync)
.Append(timeoutTask)
.ToList();

while (tasks.Count > 1)
{
var completedTask = await Task.WhenAny(tasks);

if (completedTask == timeoutTask)
throw new TimeoutException();

tasks.Remove(completedTask);

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
if (completedTask.Status == TaskStatus.RanToCompletion)
return await (Task<string>)completedTask;
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
throw new TimeoutException();
}
}
}
39 changes: 19 additions & 20 deletions homework 2/ClusterClient/Clients/RandomClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,33 @@
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class RandomClusterClient : ClusterClientBase
{
public class RandomClusterClient : ClusterClientBase
private readonly Random random = new();

public RandomClusterClient(string[] replicaAddresses)
: base(replicaAddresses)
{
private readonly Random random = new Random();
}

public RandomClusterClient(string[] replicaAddresses)
: base(replicaAddresses)
{
}
protected override ILog Log => LogManager.GetLogger(typeof(RandomClusterClient));

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var uri = ReplicaAddresses[random.Next(ReplicaAddresses.Length)];
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var uri = ReplicaAddresses[random.Next(ReplicaAddresses.Length)];

var webRequest = CreateRequest(uri + "?query=" + query);

Log.InfoFormat($"Processing {webRequest.RequestUri}");
var webRequest = CreateRequest(uri + "?query=" + query);

var resultTask = ProcessRequestAsync(webRequest);
Log.InfoFormat($"Processing {webRequest.RequestUri}");

await Task.WhenAny(resultTask, Task.Delay(timeout));
if (!resultTask.IsCompleted)
throw new TimeoutException();
var resultTask = ProcessRequestAsync(webRequest);

return resultTask.Result;
}
await Task.WhenAny(resultTask, Task.Delay(timeout));
if (!resultTask.IsCompleted)
throw new TimeoutException();

protected override ILog Log => LogManager.GetLogger(typeof(RandomClusterClient));
return resultTask.Result;
}
}
51 changes: 40 additions & 11 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,52 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class RoundRobinClusterClient : ClusterClientBase
{
public class RoundRobinClusterClient : ClusterClientBase
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var badRequestCounter = 0;
var sw = new Stopwatch();
var orderedReplicaAddresses = ReplicaAddresses
.OrderBy(r => ReplicaStats.Stats[r])
.ToList();

foreach (var address in orderedReplicaAddresses)
{
throw new NotImplementedException();
var webRequest = CreateRequest(address + "?query=" + query);

Log.InfoFormat($"Processing {webRequest.RequestUri}");

var task = ProcessRequestAsync(webRequest);
var requestTimeout = timeout / (ReplicaAddresses.Length - badRequestCounter);
var timeoutTask = Task.Delay(requestTimeout);

sw.Restart();
var completedTask = await Task.WhenAny(task, timeoutTask);
sw.Stop();

timeout -= sw.Elapsed;

if (completedTask != timeoutTask && completedTask.Status == TaskStatus.RanToCompletion)
{
ReplicaStats.Update(address, sw.Elapsed.Milliseconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sw.Elapsed.Milliseconds возвращает не общее прошедшее время в миллисекундах, а количество миллисекунд текущего интервала (текущей секунды) и в целом может возвращать только значения от -999 до 999.

Чтобы получить общее (реальное) количество прошедшего времени в миллисекундах, нужно вызывать либо sw.ElapsedMilliseconds либо sw.Elapsed.TotalMicroseconds

return await (Task<string>)completedTask;
}

badRequestCounter++;
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
throw new TimeoutException();
}
}
}
60 changes: 50 additions & 10 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,63 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class SmartClusterClient : ClusterClientBase
{
public class SmartClusterClient : ClusterClientBase
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var badRequestCounter = 0;
var sw = new Stopwatch();
var tasks = new List<Task<string>>();
var orderedReplicaAddresses = ReplicaAddresses
.OrderBy(r => ReplicaStats.Stats[r])
.ToList();

foreach (var address in orderedReplicaAddresses)
{
throw new NotImplementedException();
var webRequest = CreateRequest(address + "?query=" + query);

Log.InfoFormat($"Processing {webRequest.RequestUri}");

var task = ProcessRequestAsync(webRequest);
var requestTimeout = timeout / (ReplicaAddresses.Length - badRequestCounter);
var timeoutTask = Task.Delay(requestTimeout);
tasks.Add(task);

sw.Restart();
var anyTask = Task.WhenAny(tasks);
var completedTask = await Task.WhenAny(anyTask, timeoutTask);
sw.Stop();

timeout -= sw.Elapsed;

if (completedTask != timeoutTask)
{
var result = await anyTask;
if (result.IsFaulted)
tasks.Remove(result);

if (result.Status == TaskStatus.RanToCompletion)
{
ReplicaStats.Update(address, sw.Elapsed.Milliseconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sw.Elapsed.Milliseconds возвращает не общее прошедшее время в миллисекундах, а количество миллисекунд текущего интервала (текущей секунды) и в целом может возвращать только значения от -999 до 999.

Чтобы получить общее (реальное) количество прошедшего времени в миллисекундах, нужно вызывать либо sw.ElapsedMilliseconds либо sw.Elapsed.TotalMicroseconds

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так же некорректно обновляется статистика. У нас в result может оказаться как таска запроса к текущему адресу (address в foreach), так и таска принадлежащая предыдущему адресу.

В итоге может быть ситуация, когда мы записываем статистику о времени выполнения для адреса address2, но вернулась таска с завершившимся запросом к address1. В итоге мы некорректно наполняем статистику

return await result;
}
}

badRequestCounter++;
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
throw new TimeoutException();
}
}
}
54 changes: 27 additions & 27 deletions homework 2/ClusterClient/ClusterClient.csproj
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<OutputType>Exe</OutputType>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
</PropertyGroup>
<ItemGroup>
<Content Include="ServerAddresses.txt">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<PackageReference Include="log4net">
<Version>2.0.8</Version>
</PackageReference>
<PackageReference Include="Pingo.FluentCommandLineParser">
<Version>1.0.25</Version>
</PackageReference>
<PackageReference Include="System.Data.DataSetExtensions" Version="4.5.0" />
<PackageReference Include="System.Text.Encoding.CodePages" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<None Update="log4net.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<OutputType>Exe</OutputType>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
</PropertyGroup>
<ItemGroup>
<Content Include="ServerAddresses.txt">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<PackageReference Include="log4net">
<Version>2.0.8</Version>
</PackageReference>
<PackageReference Include="Pingo.FluentCommandLineParser">
<Version>1.0.25</Version>
</PackageReference>
<PackageReference Include="System.Data.DataSetExtensions" Version="4.5.0"/>
<PackageReference Include="System.Text.Encoding.CodePages" Version="5.0.0"/>
</ItemGroup>
<ItemGroup>
<None Update="log4net.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Loading