Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,35 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var tasks = ReplicaAddresses
.Select(uri => CreateRequest(uri + "?query=" + query))
.Select(ProcessRequestAsync)
.ToList();

var delayTask = Task.Delay(timeout);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По-хорошему delayTask бы создать до всех ProcessRequestAsync. Иначе выходит, что запросы уже какое-то время как отправлены

while (tasks.Count != 0)
{
var processTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask);
await Task.WhenAny(processTask, delayTask);
if (delayTask.IsCompleted)
{
throw new TimeoutException();
}
var completedTask = await (Task<Task<string>>)processTask;
tasks.Remove(completedTask);

try
{
return await completedTask;
}
catch (Exception)
{
Log.Error("Task failed");
}
}
return null;
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
Expand Down
44 changes: 42 additions & 2 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -12,10 +14,48 @@ public class RoundRobinClusterClient : ClusterClientBase
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

private static readonly ConcurrentDictionary<string, long> ReplicaStats = new();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var sortedReplicas = ReplicaAddresses
.OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0))
.ToList();
var sw = Stopwatch.StartNew();

for (int i = 0; i < sortedReplicas.Count; i++)
{
var remainingReplicas = sortedReplicas.Count - i;
var timeLeft = timeout - sw.Elapsed;
var currentReplicaTimeout = TimeSpan.FromMilliseconds(timeLeft.TotalMilliseconds / remainingReplicas);
var uri = sortedReplicas[i];
var request = CreateRequest(uri + "?query=" + query);
var requestTimer = Stopwatch.StartNew();
var task = ProcessRequestAsync(request);
var delayTask = Task.Delay(currentReplicaTimeout);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeLeft и currentReplicaTimeout могут оказаться отрицательными. Task.Delay может или выбросить исключение, или, с маленьким шансом, превратиться в никогда не заканчивающуюся таску

var completedTask = await Task.WhenAny(task, delayTask);

if (completedTask == task)
{
try
{
var result = await task;
UpdateStats(uri, requestTimer.ElapsedMilliseconds);
return result;
}
catch (Exception)
{
UpdateStats(uri, (long)timeout.TotalMilliseconds);
}
}
}
throw new TimeoutException($"Request {query} timed out");
}

private void UpdateStats(string uri, long time)
{
ReplicaStats.AddOrUpdate(uri, time, (key, oldVal) => (oldVal + time) / 2);
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
Expand Down
80 changes: 77 additions & 3 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -12,12 +14,84 @@ public class SmartClusterClient : ClusterClientBase
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

private static readonly ConcurrentDictionary<string, long> ReplicaStats = new();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var sortedReplicas = ReplicaAddresses
.OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0))
.ToList();

var tasks = new List<Task<ResponseData>>();
var perReplicaTimeout = TimeSpan.FromMilliseconds(timeout.TotalMilliseconds / ReplicaAddresses.Length);
var sw = Stopwatch.StartNew();

for (int i = 0; i < sortedReplicas.Count; i++)
{
var request = CreateRequest(sortedReplicas[i] + "?query=" + query);
tasks.Add(ProcessAndMeasureAsync(sortedReplicas[i], request));

while (true)
{
var timePassed = sw.Elapsed;
var timeToNextLaunch = perReplicaTimeout * (i + 1) - timePassed;

if (i == ReplicaAddresses.Length - 1)
timeToNextLaunch = timeout - timePassed;

var delayTask = Task.Delay(timeToNextLaunch);
var completedTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask);

if (completedTask != delayTask)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если реплики будут тормозить, то соответствующие таски так и останутся в tasks, и статистика для них не обновится

{
var finishedTask = await (Task<Task<ResponseData>>)completedTask;
try
{
var response = await finishedTask;
UpdateStats(response.Uri, response.ElapsedMs);
return response.Content;
}
catch (Exception)
{
UpdateStats(sortedReplicas[i], (long)timeout.TotalMilliseconds);
tasks.Remove(finishedTask);
if (tasks.Count == 0 || i < ReplicaAddresses.Length - 1)
{
goto next;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему не просто break?

}
}
}
else
{
break;
}
}

next: ;
}

throw new TimeoutException();
}

private async Task<ResponseData> ProcessAndMeasureAsync(string uri, System.Net.WebRequest request)
{
var timer = Stopwatch.StartNew();
var content = await ProcessRequestAsync(request);
return new ResponseData { Uri = uri, Content = content, ElapsedMs = timer.ElapsedMilliseconds };
}
private void UpdateStats(string uri, long elapsedMs)
{
ReplicaStats.AddOrUpdate(uri, elapsedMs, (key, oldVal) => (oldVal + elapsedMs) / 2);
}

private class ResponseData
{
public string Uri { get; set; }
public string Content { get; set; }
public long ElapsedMs { get; set; }
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}
}
}