Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -40,5 +42,74 @@ protected async Task<string> ProcessRequestAsync(WebRequest request)
return result;
}
}

protected async Task<Task<string>> WaitForReplicaAndLogAsync(
List<Task<string>> requests,
TimeSpan timeout,
bool throwIfTimeout = false)
{
ArgumentNullException.ThrowIfNull(requests);

Log.Debug($"Waiting for replica response. Pending replicas: {requests.Count}");
var completedRequest = await WaitWithTimeout(requests, timeout);

if (completedRequest == null)
{
Log.Debug("Global timeout reached. No replica responded in time");
return throwIfTimeout
? throw new TimeoutException()
: null;
}

requests.Remove(completedRequest);
Log.Debug($"Replica task completed. Remaining replicas: {requests.Count}");

if (completedRequest.IsCompletedSuccessfully)
{
Log.Info("Replica responded successfully");
return completedRequest;
}

Log.Error("Replica failed", completedRequest.Exception?.InnerException);
return completedRequest;
}

protected async Task<string> HandleWhenAnyResultAsync(Task<string> requestTask, Task completedTask, string replicaUri)
{
if (completedTask == requestTask)
{
if (!requestTask.IsCompletedSuccessfully)
{
Log.Error($"Replica {replicaUri} failed", requestTask.Exception?.InnerException);
return null;
}

Log.Info($"Replica {replicaUri} responded successfully");
return await requestTask;
}

Log.Debug($"Replica {replicaUri} timed out");
return null;
}

protected async Task<string> ProcessReplicaAsync(string uri, string query)
{
var request = CreateRequest($"{uri}?query={query}");
Log.Info($"Processing {request.RequestUri}");
return await ProcessRequestAsync(request);
}

private async Task<Task<T>> WaitWithTimeout<T>(IEnumerable<Task<T>> tasks, TimeSpan timeout)
{
var delay = Task.Delay(timeout);
var completed = await Task.WhenAny(tasks.Append(delay));

if (completed == delay)
{
return null;
}

return (Task<T>)completed;
}
}
}
23 changes: 18 additions & 5 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

Expand All @@ -13,11 +11,26 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var requests = ReplicaAddresses
.Select(uri => ProcessReplicaAsync(uri, query))
.ToList();

while (requests.Count > 0)
{
var completedRequest = await WaitForReplicaAndLogAsync(requests, timeout, true);

if (completedRequest is { IsCompletedSuccessfully: true })
{
return await completedRequest;
}
}

Log.Error("All replicas failed");
throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
}
}
40 changes: 37 additions & 3 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,55 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class RoundRobinClusterClient : ClusterClientBase
{
private readonly ConcurrentDictionary<string, long> _replicaTimings = new();

public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
foreach (var replicaAddress in replicaAddresses)
{
_replicaTimings.TryAdd(replicaAddress, 0);
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var stopwatch = Stopwatch.StartNew();
var replicasLeft = ReplicaAddresses.Length;

var sortedReplicas = ReplicaAddresses
.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue));

foreach (var replicaUri in sortedReplicas)
{
var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--;

Log.Debug($"Sending request to replica {replicaUri}. Timeout per replica: " +
$"{timeoutPerReplica.TotalMilliseconds} ms");

var requestTask = ProcessReplicaAsync(replicaUri, query);
var delay = Task.Delay(timeoutPerReplica);

var completedTask = await Task.WhenAny(requestTask, delay);
_replicaTimings[replicaUri] = stopwatch.ElapsedMilliseconds;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут сразу два типа ошибки:

  1. Фиксируется не время затраченное на текущий запрос, а общее прошедшее время. Т.е. даже для самой быстрой реплики, запрос к которой выполнялся (условно 15мс), время будет записано не 15мс, а все время выполнения от начала (т.е. суммирование как минимум всех timeoutPerReplica).
    Поэтому третья реплика будь она самой быстрой "на диком западе" не окажется первой в списке при повторной отправке запрос через этот клиент
  2. Даже если был бы локальный таймер под текущий запрос, при таймауте мы не должны записывать посто время таймаута, а хотя бы штрафное время для понижения реплики в списке. При ошибке так же, время выполнения будет минимальным и реплика незаслужено запишется "как самая быстрая). В случае ошибок должно быть записано высокое штрафное время с гарантированным понижением в списке при сортировке


var result = await HandleWhenAnyResultAsync(requestTask, completedTask, replicaUri);
if (result != null)
{
return result;
}
}

Log.Debug("All replicas failed or timed out");
throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
Expand Down
71 changes: 67 additions & 4 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,86 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class SmartClusterClient : ClusterClientBase
{
private readonly ConcurrentDictionary<string, long> _replicaTimings = new();

public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
foreach (var replicaAddress in replicaAddresses)
{
_replicaTimings.TryAdd(replicaAddress, 0);
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var stopwatch = Stopwatch.StartNew();
var replicasLeft = ReplicaAddresses.Length;

var sortedReplicas = ReplicaAddresses
.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue))
.ToList();

Log.Debug($"Replica order: {string.Join(", ", sortedReplicas)}");

var pendingRequests = new List<Task<string>>();

foreach (var replicaAddress in sortedReplicas)
{
var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--;

Log.Debug($"Starting request to {replicaAddress}. Timeout per replica: " +
$"{timeoutPerReplica.TotalMilliseconds} ms");

pendingRequests.Add(ProcessReplicaAsync(replicaAddress, query));
var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, timeoutPerReplica);
_replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А здесь уже количество ошибок выросло до трех:

  1. Использование общего таймера, а не индивидуального под каждый запрос — некорректная статистика которая по факту больше вредит, чем помогает
  2. Записывание времени всегда без учета, что произошел таймаут (не знаем реальное время выполнения) и не учитывается ошибка запроса, что может записать некорректное малое значение, что поднимет реплику в списке реплик
  3. Привязка обновления статистики к значению replicaAddress из foreach. У нас в completedTask может вернуться как таймаут, так и Задачи с запросами с предыдущих итераций. В итоге в completedTask может вернуться запрос к replicaAddress1, а у нас в foreach уже replicaAddress3, и мы запишем статистику времени выполнения replicaAddress1 в ячейку для replicaAddress3


if (completedTask is not { IsCompletedSuccessfully: true })
{
continue;
}

Log.Info($"Replica responded successfully in {stopwatch.ElapsedMilliseconds} ms");
return await completedTask;
}

Log.Debug("Initial replica attempts finished. Waiting for remaining pending requests");

while (pendingRequests.Count > 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А в цикле ожидания завершения задач мы в принципе не запишем никакую статистику. В итоге статистика у нас не имеет ничего общего с реальностью

{
var remainingTime = timeout - stopwatch.Elapsed;

if (remainingTime <= TimeSpan.Zero)
{
Log.Debug("Global timeout reached while waiting for pending replicas");
break;
}

Log.Debug($"Waiting for pending replicas. Remaining: {pendingRequests.Count}");
var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, remainingTime);

if (completedTask is not { IsCompletedSuccessfully: true })
{
continue;
}

Log.Info($"Pending replica responded successfully in {stopwatch.ElapsedMilliseconds} ms");
return await completedTask;
}

Log.Error("All replicas failed or timed out");
throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}
}
}