Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,35 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var tasks = ReplicaAddresses
.Select(addr => CreateRequest($"{addr}?query={query}"))
.Select(ProcessRequestAsync)
.ToList();

var timeoutTask = Task.Delay(timeout);

var tasksToWait = new List<Task>(tasks);
tasksToWait.Add(timeoutTask);

while (tasksToWait.Count > 1)
{
var winner = await Task.WhenAny(tasksToWait);
if (winner == timeoutTask) break;

var completedTask = (Task<string>)winner;
tasksToWait.Remove(completedTask);
try
{
return await completedTask;
}
catch
{
Log.Warn("Failed to process replica.");
}
}
throw new TimeoutException("Not a single replica was successfully answered within the allotted time.");
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
Expand Down
63 changes: 61 additions & 2 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using log4net;
Expand All @@ -13,11 +16,67 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var sortedReplicas = ReplicaAddresses
.OrderBy(addr => ReplicaTimes.TryGetValue(addr, out var time) ? time : 0)
.ToArray();

var timer = Stopwatch.StartNew();

for (var i = 0; i < sortedReplicas.Length; i++)
{
var timeRemain = timeout - timer.Elapsed;
if (timeRemain <= TimeSpan.Zero)
break;

var currentReplica = sortedReplicas[i];
var replicaTimeout = timeRemain / (sortedReplicas.Length - i);
var uri = $"{currentReplica}?query={query}";
var request = CreateRequest(uri);
var completedTask = ProcessAndMeasureAsync(currentReplica, request);

await Task.WhenAny(completedTask, Task.Delay(replicaTimeout));

if (!completedTask.IsCompleted)
continue;

try
{
return await completedTask;
}
catch
{
Log.Warn("Failed to process replica.");
}
}
throw new TimeoutException("Not a single replica was successfully answered within the allotted time.");
}

private async Task<string> ProcessAndMeasureAsync(string replicaAddress, WebRequest request)
{
var timer = Stopwatch.StartNew();
try
{
var result = await base.ProcessRequestAsync(request);
timer.Stop();

ReplicaTimes.AddOrUpdate(
replicaAddress,
timer.ElapsedMilliseconds,
(key, oldTime) => oldTime == double.MaxValue ? timer.ElapsedMilliseconds : (oldTime * 0.8 + timer.ElapsedMilliseconds * 0.2)
);

return result;
}
catch
{
ReplicaTimes[replicaAddress] = double.MaxValue;
throw;
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
private readonly ConcurrentDictionary<string, double> ReplicaTimes = new ConcurrentDictionary<string, double>();
}
}
75 changes: 72 additions & 3 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using log4net;
Expand All @@ -13,11 +16,77 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
}
var sortedReplicas = ReplicaAddresses
.OrderBy(addr => ReplicaTimes.TryGetValue(addr, out var time) ? time : 0)
.ToArray();

var tasks = new List<Task<string>>();
var timer = Stopwatch.StartNew();

for (var i = 0; i < sortedReplicas.Length; i++)
{
var timeRemain = timeout - timer.Elapsed;
if (timeRemain <= TimeSpan.Zero)
break;

var currentReplica = sortedReplicas[i];
var replicaTimeout = timeRemain / (sortedReplicas.Length - i);
var uri = $"{sortedReplicas[i]}?query={query}";
var request = CreateRequest(uri);
tasks.Add(ProcessAndMeasureAsync(currentReplica, request));

var timeoutTask = Task.Delay(replicaTimeout);

while (tasks.Count > 0)
{
var tasksToWait = new List<Task>(tasks);
tasksToWait.Add(timeoutTask);

var winner = await Task.WhenAny(tasksToWait);
if (winner == timeoutTask) break;

var completedTask = (Task<string>)winner;
tasks.Remove(completedTask);
try
{
return await completedTask;
}
catch
{
Log.Warn("Failed to process replica.");
if (tasks.Count == 0) break;
}
}
}
throw new TimeoutException("Not a single replica was successfully answered within the allotted time.");
}

private async Task<string> ProcessAndMeasureAsync(string replicaAddress, WebRequest request)
{
var timer = Stopwatch.StartNew();
try
{
var result = await base.ProcessRequestAsync(request);
timer.Stop();

ReplicaTimes.AddOrUpdate(
replicaAddress,
timer.ElapsedMilliseconds,
(key, oldTime) => oldTime == double.MaxValue ? timer.ElapsedMilliseconds : (oldTime * 0.8 + timer.ElapsedMilliseconds * 0.2)
);

return result;
}
catch
{
ReplicaTimes[replicaAddress] = double.MaxValue;
throw;
}
}

private readonly ConcurrentDictionary<string, double> ReplicaTimes = new ConcurrentDictionary<string, double>();
protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}
}