Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class ParallelClusterClient : ClusterClientBase
public class ParallelClusterClient(string[] replicaAddresses)
: ClusterClientBase(replicaAddresses)
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
}
var tasks = ReplicaAddresses
.Select(address =>
{
var request = CreateRequest(address + "?query=" + query);
Log.InfoFormat($"Processing {request.RequestUri}");
return ProcessRequestAsync(request);
})
.ToList();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var timeoutTask = Task.Delay(timeout);

while (tasks.Count > 0)
{
var completed = await Task.WhenAny(tasks.Append(timeoutTask));

if (completed == timeoutTask)
throw new TimeoutException();

var finishedTask = (Task<string>)completed;
tasks.Remove(finishedTask);

if (finishedTask.Status == TaskStatus.RanToCompletion)
return finishedTask.Result;
}

throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
protected override ILog Log =>
LogManager.GetLogger(typeof(ParallelClusterClient));
}
}
}
19 changes: 19 additions & 0 deletions homework 2/ClusterClient/Clients/ReplicaStatisticsEntry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Collections.Generic;
using System.Linq;

namespace ClusterClient.Clients;

public class ReplicaStatisticsEntry(string address, int maxHistory = 10)
{
public string Address { get; } = address;
private readonly Queue<long> lastTimes = new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue не потокобезопасный, лучше либо добавить блокировки (асинхронные), хотя бы через семафор, либо использовать потокобезопасный ConcurrentQueue

Если "в реальной ситуации" кто-то попробует делать несколько разных запросов к одной реплике через один клиент, то можем нарваться в лучшем случае на неопределенное поведение гонки. Поэтому лучше потенциальные места такие обкладывать LockFree структурами или блокировками)


public void RecordResponseTime(long ms)
{
lastTimes.Enqueue(ms);
if (lastTimes.Count > maxHistory)
lastTimes.Dequeue();
}

public double AverageTime => lastTimes.Any() ? lastTimes.Average() : double.MaxValue;
}
48 changes: 44 additions & 4 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,61 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class RoundRobinClusterClient : ClusterClientBase
public class RoundRobinClusterClient: ClusterClientBase
{
private readonly Dictionary<string, ReplicaStatisticsEntry> replicaStats;

public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
replicaStats = replicaAddresses.ToDictionary(
addr => addr,
addr => new ReplicaStatisticsEntry(addr)
);
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var start = Stopwatch.StartNew();

var orderedReplicas = replicaStats.Values
.OrderBy(x => x.AverageTime)
.Select(x => x.Address)
.ToArray();
Comment on lines +27 to +30
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вижу сортировку по времени ответа, но не вижу ниже сохранения статистики. Получается этот клиент по факту не реализует обновление статистики и сортировка тут "бессмысленна"


for (var i = 0; i < orderedReplicas.Length; i++)
{
var remainingTime = timeout - start.Elapsed;

if (remainingTime <= TimeSpan.Zero)
throw new TimeoutException();

var remainingReplicas = orderedReplicas.Length - i;
var replicaTimeout = TimeSpan.FromMilliseconds(
remainingTime.TotalMilliseconds / remainingReplicas);

var uri = orderedReplicas[i] + "?query=" + query;
var webRequest = CreateRequest(uri);

Log.InfoFormat($"Processing {webRequest.RequestUri}, timeout {replicaTimeout}");

var requestTask = ProcessRequestAsync(webRequest);
var delayTask = Task.Delay(replicaTimeout);

var completed = await Task.WhenAny(requestTask, delayTask);

if (completed != requestTask) continue;
if (!requestTask.IsFaulted)
return requestTask.Result;
}

throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
Expand Down
90 changes: 85 additions & 5 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,103 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class SmartClusterClient : ClusterClientBase
{
private readonly Dictionary<string, ReplicaStatisticsEntry> replicaStats;

public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
replicaStats = replicaAddresses.ToDictionary(
addr => addr,
addr => new ReplicaStatisticsEntry(addr)
);
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var deadline = DateTime.UtcNow + timeout;
var runningTasks = new List<(Task<string> task, string address, Stopwatch timer)>();

var orderedReplicas = replicaStats.Values
.OrderBy(x => x.AverageTime)
.Select(x => x.Address)
.ToArray();

var addressesLeft = orderedReplicas.Length;

foreach (var replica in orderedReplicas)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero)
throw new TimeoutException();

var replicaTimeout = remaining / addressesLeft;

var request = CreateRequest($"{replica}?query={query}");
Log.InfoFormat($"Processing {request.RequestUri}");

var timer = Stopwatch.StartNew();
var task = ProcessRequestAsync(request);
runningTasks.Add((task, replica, timer));

var finished = await WaitOneAsync(runningTasks, replicaTimeout);
if (finished != null)
{
var (resultTask, resultReplica, resultTimer) = finished.Value;
resultTimer.Stop();
replicaStats[resultReplica].RecordResponseTime(resultTimer.ElapsedMilliseconds);
return await resultTask;
}

addressesLeft--;
}

while (runningTasks.Count > 0)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero)
throw new TimeoutException();

var finished = await WaitOneAsync(runningTasks, remaining);
if (finished == null) continue;

var (resultTask, resultReplica, resultTimer) = finished.Value;
resultTimer.Stop();
replicaStats[resultReplica].RecordResponseTime(resultTimer.ElapsedMilliseconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Записывается только статистика по успешным запросам, но если вдруг одна реплика ответит "очень хорошо", а потом будет тупить, то она не окажется в конце списка реплик, как того бы хотелось.

Записывать статистику нужно всегда, просто в случае проблем (ошибки/таймаута) записывать время * на штраф, чтобы гарантированно ее перемещать ниже стабильно работающих реплик

return await resultTask;
}

throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
private async Task<(Task<string>, string, Stopwatch)?> WaitOneAsync(
List<(Task<string> task, string address, Stopwatch timer)> runningTasks,
TimeSpan timeout)
{
if (timeout <= TimeSpan.Zero)
return null;

var delayTask = Task.Delay(timeout);
var tasksList = runningTasks.Select(x => x.task).Append(delayTask).ToList();

var completed = await Task.WhenAny(tasksList);

if (completed == delayTask)
return null;

var finishedTask = (Task<string>)completed;
var result = runningTasks.First(x => x.task == finishedTask);
runningTasks.Remove(result);

return finishedTask.IsCompletedSuccessfully ? (finishedTask, result.address, result.timer) : null;
}
}
}
}