Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,31 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var requests = ReplicaAddresses
.Select(uri => CreateRequest(uri + "?query=" + query))
.Select(ProcessRequestAsync)
.ToList();

var delay = Task.Delay(timeout);

while (requests.Count > 0)
{
var completedTask = await Task.WhenAny(requests.Concat([delay]));

if (completedTask == delay)
throw new TimeoutException();

requests.Remove((Task<string>)completedTask);

if (completedTask.IsCompletedSuccessfully)
return await (Task<string>)completedTask;
}

return null;
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
}
}
39 changes: 35 additions & 4 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,54 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class RoundRobinClusterClient : ClusterClientBase
{
private readonly ConcurrentDictionary<string, long> _replicaTimings = new();

public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
foreach (var replicaAddress in replicaAddresses)
{
_replicaTimings.TryAdd(replicaAddress, 0);
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var stopwatch = Stopwatch.StartNew();
var sortedReplicas = ReplicaAddresses.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue));
var remainingReplicas = ReplicaAddresses.Length;

foreach (var replicaAddress in sortedReplicas)
{
var timeoutPerReplica = (timeout - stopwatch.Elapsed) / remainingReplicas--;

var request = CreateRequest(replicaAddress + "?query=" + query);
var requestTask = ProcessRequestAsync(request);
var delay = Task.Delay(timeoutPerReplica);

var completedTask = await Task.WhenAny(requestTask, delay);
_replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Некорректное поведение в случае если реплика не ответила и мы вышли по таймауту или если произошла ошибка при отправке запроса или у самой реплики.

В случае с таймаутом, мы запишем некорректное время т.к. мы не знаем реального времени когда завершится запрос. В таком случае лучше либо не записывать такую статистику (она не достоверная), либо записывать время выполнение но умножать его на какой-то штраф (хотя бы x2/x3)

В случае если произошла ошибка, то Task.WhenAny может моментально вернуть нашу таску и мы запишем время выполнения некорректно, что потом повлияет на сортировку и эта реплика станет первой (а не в конце списка). В этом случае надо выставлять достаточно большое штрафное время (выше нормального времени обработки)


if (completedTask == delay)
continue;

if (!completedTask.IsCompletedSuccessfully)
continue;

return await requestTask;
}

throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
}
}
56 changes: 53 additions & 3 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -9,15 +11,63 @@ namespace ClusterClient.Clients
{
public class SmartClusterClient : ClusterClientBase
{
private readonly ConcurrentDictionary<string, long> _replicaTimings = new();

public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
foreach (var replicaAddress in replicaAddresses)
{
_replicaTimings.TryAdd(replicaAddress, 0);
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var stopwatch = Stopwatch.StartNew();
var sortedReplicas = ReplicaAddresses.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue));
var remainingReplicas = ReplicaAddresses.Length;
var pendingRequests = new List<Task<string>>();

foreach (var replicaAddress in sortedReplicas)
{
var timeoutPerReplica = (timeout - stopwatch.Elapsed) / remainingReplicas--;

pendingRequests.Add(ProcessRequestAsync(CreateRequest(replicaAddress + "?query=" + query)));

var delay = Task.Delay(timeoutPerReplica);
var completedTask = await Task.WhenAny(pendingRequests.Append(delay)) as Task<string>;
_replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут сразу несколько ошибок:

  1. Такая же как описывал выше в RoundRobingClusterClient, в случае таймаута мы запишем время некорректное (= timeoutPerReplica), а в случае ошибки, мы запишим некорректно маленькое время, что поднимет реплику в сортировке на самый верх, хотя должна была быть отправлена в самый низ списка реплик
  2. Запись статистики привязана к текущему значению foreach replicaAddress, НО в completedTask может оказаться любой запрос запущенный ранее (т.е. когда foreach уже на последней реплике, в completedTask может оказаться завершенный запрос для первой или второй реплики). Это собственно неправильно запишет статистику и соответственно неправильно отсортирует наши реплики


if (completedTask == null)
continue;

pendingRequests.Remove(completedTask);

if (completedTask.IsCompletedSuccessfully)
return await completedTask;
}

while (pendingRequests.Count > 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А в этом цикле мы в итоге статистику вообще не обновляем. Соответственно данные по скорости работы реплик будут некорректные и не полные

{
var remainingTime = timeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
break;

var completedTask =
await Task.WhenAny(pendingRequests.Append(Task.Delay(remainingTime))) as Task<string>;

if (completedTask == null)
break;

pendingRequests.Remove(completedTask);

if (completedTask.IsCompletedSuccessfully)
return await completedTask;
}

throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}
}
}