Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,47 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public class ParallelClusterClient : ClusterClientBase
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
var cts = new CancellationTokenSource(timeout);

var tasks = ReplicaAddresses
.Select(address => CallReplicaAsync(address, query, cts.Token))
.ToList();

while (tasks.Count > 0)
{
var completed = await Task.WhenAny(tasks);

if (completed.IsCompletedSuccessfully)
{
await cts.CancelAsync();
return completed.Result;
}

tasks.Remove(completed);
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
throw new TimeoutException("No replica responded within the timeout");
}

private async Task<string> CallReplicaAsync(string address, string query, CancellationToken token)
{
var uri = $"{address}?query={query}";
var request = CreateRequest(uri);

await using (token.Register(() => request.Abort()))
{
throw new NotImplementedException();
return await ProcessRequestAsync(request);
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
59 changes: 59 additions & 0 deletions homework 2/ClusterClient/Clients/ReplicaPerformanceTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace ClusterClient.Clients
{
public class ReplicaPerformanceTracker
{
private readonly ConcurrentDictionary<string, ReplicaStats> stats = new();

public string[] OrderByFastest(string[] replicas)
{
return replicas
.Select((r, i) => new { Replica = r, Index = i })
.OrderBy(x =>
stats.TryGetValue(x.Replica, out var s) && s.HasSamples
? s.AverageMs
: long.MaxValue)
.ThenBy(x => x.Index)
.Select(x => x.Replica)
.ToArray();
}

public void ReportResult(string replica, long elapsedMs)
{
var stat = stats.GetOrAdd(replica, _ => new ReplicaStats());
stat.AddSample(elapsedMs);
}

private class ReplicaStats
{
private long avgMs;
private long count;

public bool HasSamples => Volatile.Read(ref count) > 0;

public long AverageMs
{
get
{
var avg = Volatile.Read(ref avgMs);
return avg == 0 ? long.MaxValue : avg;
}
}

public void AddSample(long sampleMs)
{
var newCount = Interlocked.Increment(ref count);

long oldAvg, newAvg;
do
{
oldAvg = Volatile.Read(ref avgMs);
newAvg = (oldAvg * (newCount - 1) + sampleMs) / newCount;
} while (Interlocked.CompareExchange(ref avgMs, newAvg, oldAvg) != oldAvg);
}
}
}
}
63 changes: 51 additions & 12 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,62 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Net;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public class RoundRobinClusterClient : ClusterClientBase
private readonly ReplicaPerformanceTracker tracker = new();

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
var stopwatch = Stopwatch.StartNew();

var orderedReplicas = tracker.OrderByFastest(ReplicaAddresses);
var remainingReplicas = orderedReplicas.Length;

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
foreach (var replica in orderedReplicas)
{
throw new NotImplementedException();
var remainingTime = timeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
throw new TimeoutException("No replica responded within overall timeout");

var timeoutPerRequest = remainingTime.Divide(remainingReplicas);
remainingReplicas--;

var webRequest = CreateRequest(replica + "?query=" + query);
Log.InfoFormat($"Processing {webRequest.RequestUri}");

var sw = Stopwatch.StartNew();
var requestTask = ProcessRequestAsync(webRequest);

var completed = await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest));

if (completed != requestTask)
{
sw.Stop();
tracker.ReportResult(replica, sw.ElapsedMilliseconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Неправильно записывать чистое время здесь т.к. мы по факту запишем время timeoutPerRequest, а не реальное время за которое ответила реплика. Тут нужно хотя бы домножать это на какой-то штраф, чтобы эта реплика "опустилась" в списке в самый конец

Либо т.к. мы не знаем, точное время завершения запроса, лучше не записывать такую статистику в принципе. Это будет даже более правильное поведение, чем записывать ошибочную статистику 🤔

continue;
}

try
{
var result = await requestTask;
sw.Stop();
tracker.ReportResult(replica, sw.ElapsedMilliseconds);
return result;
}
catch (WebException)
{
sw.Stop();
tracker.ReportResult(replica, sw.ElapsedMilliseconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут так же не правильно записывать время таймера в чистом виде. Если у нас произошла ошибка запроса (к примеру сетевая ошибка или реплика ответила 500кой), то запрос фактически выполнится "очень быстро". И мы запишем сюда слишком малое время и эта реплика ошибочно станет в самый верх списка, хотя она должна быть в самом низу.

Тут нужно добавлять статическое штрафное большое время хотя бы, но никак не реальное время таймера

}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
throw new TimeoutException("No replica responded");
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
98 changes: 89 additions & 9 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,103 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Net;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public class SmartClusterClient : ClusterClientBase
private readonly ReplicaPerformanceTracker tracker = new();

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
var stopwatch = Stopwatch.StartNew();

var orderedReplicas = tracker.OrderByFastest(ReplicaAddresses);
var remainingReplicas = orderedReplicas.Length;

var inFlight = new List<(string replica, Task<string> task, Stopwatch sw)>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше создавать класс или структуру в которую использовать как контейнер для этих значений. Более удобно получается и читается лучше


foreach (var replica in orderedReplicas)
{
}
var remainingTime = timeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
break;

var timeoutPerRequest = remainingTime.Divide(remainingReplicas);
remainingReplicas--;

var request = CreateRequest(replica + "?query=" + query);
Log.InfoFormat($"Processing {request.RequestUri}");

var sw = Stopwatch.StartNew();
var task = ProcessRequestAsync(request);

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
inFlight.Add((replica, task, sw));

var delayTask = Task.Delay(timeoutPerRequest);
var anyResponseTask = Task.WhenAny(inFlight.Select(x => x.task));

var completed = await Task.WhenAny(anyResponseTask, delayTask);

if (completed != anyResponseTask) continue;
{
await anyResponseTask;

var winner = inFlight.First(x => x.task.IsCompleted);

try
{
var result = await winner.task;
winner.sw.Stop();
tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds);
return result;
}
catch (WebException)
{
winner.sw.Stop();
tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds);
inFlight.Remove(winner);
}
}
}

while (inFlight.Count > 0)
{
throw new NotImplementedException();
var remainingTime = timeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
break;

var completed = await Task.WhenAny(
Task.WhenAny(inFlight.Select(x => x.task)),
Task.Delay(remainingTime)
);

if (completed is Task<string>)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут можно чуть упростить условие в цикле, к примеру написать if (completed is not Task<string>) continue;

Так лучше читается код, более линейно без лишних блоков)

{
var winner = inFlight.First(x => x.task.IsCompleted);

try
{
var result = await winner.task;
winner.sw.Stop();
tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds);
return result;
}
catch (WebException)
{
winner.sw.Stop();
tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds);
inFlight.Remove(winner);
}
}
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
throw new TimeoutException("No replica responded within timeout");
}
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}
Loading