-
Notifications
You must be signed in to change notification settings - Fork 54
Русинов Матвей, ДЗ №2 #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
451e9bb
2e6f5f1
45cf194
bcd4a57
d73e326
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,47 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) | ||
| { | ||
| public class ParallelClusterClient : ClusterClientBase | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| var cts = new CancellationTokenSource(timeout); | ||
|
|
||
| var tasks = ReplicaAddresses | ||
| .Select(address => CallReplicaAsync(address, query, cts.Token)) | ||
| .ToList(); | ||
|
|
||
| while (tasks.Count > 0) | ||
| { | ||
| var completed = await Task.WhenAny(tasks); | ||
|
|
||
| if (completed.IsCompletedSuccessfully) | ||
| { | ||
| await cts.CancelAsync(); | ||
| return completed.Result; | ||
| } | ||
|
|
||
| tasks.Remove(completed); | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| throw new TimeoutException("No replica responded within the timeout"); | ||
| } | ||
|
|
||
| private async Task<string> CallReplicaAsync(string address, string query, CancellationToken token) | ||
| { | ||
| var uri = $"{address}?query={query}"; | ||
| var request = CreateRequest(uri); | ||
|
|
||
| await using (token.Register(() => request.Abort())) | ||
| { | ||
| throw new NotImplementedException(); | ||
| return await ProcessRequestAsync(request); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| using System.Collections.Concurrent; | ||
| using System.Linq; | ||
| using System.Threading; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class ReplicaPerformanceTracker | ||
| { | ||
| private readonly ConcurrentDictionary<string, ReplicaStats> stats = new(); | ||
|
|
||
| public string[] OrderByFastest(string[] replicas) | ||
| { | ||
| return replicas | ||
| .Select((r, i) => new { Replica = r, Index = i }) | ||
| .OrderBy(x => | ||
| stats.TryGetValue(x.Replica, out var s) && s.HasSamples | ||
| ? s.AverageMs | ||
| : long.MaxValue) | ||
| .ThenBy(x => x.Index) | ||
| .Select(x => x.Replica) | ||
| .ToArray(); | ||
| } | ||
|
|
||
| public void ReportResult(string replica, long elapsedMs) | ||
| { | ||
| var stat = stats.GetOrAdd(replica, _ => new ReplicaStats()); | ||
| stat.AddSample(elapsedMs); | ||
| } | ||
|
|
||
| private class ReplicaStats | ||
| { | ||
| private long avgMs; | ||
| private long count; | ||
|
|
||
| public bool HasSamples => Volatile.Read(ref count) > 0; | ||
|
|
||
| public long AverageMs | ||
| { | ||
| get | ||
| { | ||
| var avg = Volatile.Read(ref avgMs); | ||
| return avg == 0 ? long.MaxValue : avg; | ||
| } | ||
| } | ||
|
|
||
| public void AddSample(long sampleMs) | ||
| { | ||
| var newCount = Interlocked.Increment(ref count); | ||
|
|
||
| long oldAvg, newAvg; | ||
| do | ||
| { | ||
| oldAvg = Volatile.Read(ref avgMs); | ||
| newAvg = (oldAvg * (newCount - 1) + sampleMs) / newCount; | ||
| } while (Interlocked.CompareExchange(ref avgMs, newAvg, oldAvg) != oldAvg); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,62 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Diagnostics; | ||
| using System.Net; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) | ||
| { | ||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| private readonly ReplicaPerformanceTracker tracker = new(); | ||
|
|
||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| } | ||
| var stopwatch = Stopwatch.StartNew(); | ||
|
|
||
| var orderedReplicas = tracker.OrderByFastest(ReplicaAddresses); | ||
| var remainingReplicas = orderedReplicas.Length; | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| foreach (var replica in orderedReplicas) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var remainingTime = timeout - stopwatch.Elapsed; | ||
| if (remainingTime <= TimeSpan.Zero) | ||
| throw new TimeoutException("No replica responded within overall timeout"); | ||
|
|
||
| var timeoutPerRequest = remainingTime.Divide(remainingReplicas); | ||
| remainingReplicas--; | ||
|
|
||
| var webRequest = CreateRequest(replica + "?query=" + query); | ||
| Log.InfoFormat($"Processing {webRequest.RequestUri}"); | ||
|
|
||
| var sw = Stopwatch.StartNew(); | ||
| var requestTask = ProcessRequestAsync(webRequest); | ||
|
|
||
| var completed = await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest)); | ||
|
|
||
| if (completed != requestTask) | ||
| { | ||
| sw.Stop(); | ||
| tracker.ReportResult(replica, sw.ElapsedMilliseconds); | ||
| continue; | ||
| } | ||
|
|
||
| try | ||
| { | ||
| var result = await requestTask; | ||
| sw.Stop(); | ||
| tracker.ReportResult(replica, sw.ElapsedMilliseconds); | ||
| return result; | ||
| } | ||
| catch (WebException) | ||
| { | ||
| sw.Stop(); | ||
| tracker.ReportResult(replica, sw.ElapsedMilliseconds); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тут так же не правильно записывать время таймера в чистом виде. Если у нас произошла ошибка запроса (к примеру сетевая ошибка или реплика ответила 500кой), то запрос фактически выполнится "очень быстро". И мы запишем сюда слишком малое время и эта реплика ошибочно станет в самый верх списка, хотя она должна быть в самом низу. Тут нужно добавлять статическое штрафное большое время хотя бы, но никак не реальное время таймера |
||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
| throw new TimeoutException("No replica responded"); | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,103 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Net; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) | ||
| { | ||
| public class SmartClusterClient : ClusterClientBase | ||
| private readonly ReplicaPerformanceTracker tracker = new(); | ||
|
|
||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| var stopwatch = Stopwatch.StartNew(); | ||
|
|
||
| var orderedReplicas = tracker.OrderByFastest(ReplicaAddresses); | ||
| var remainingReplicas = orderedReplicas.Length; | ||
|
|
||
| var inFlight = new List<(string replica, Task<string> task, Stopwatch sw)>(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Лучше создавать класс или структуру в которую использовать как контейнер для этих значений. Более удобно получается и читается лучше |
||
|
|
||
| foreach (var replica in orderedReplicas) | ||
| { | ||
| } | ||
| var remainingTime = timeout - stopwatch.Elapsed; | ||
| if (remainingTime <= TimeSpan.Zero) | ||
| break; | ||
|
|
||
| var timeoutPerRequest = remainingTime.Divide(remainingReplicas); | ||
| remainingReplicas--; | ||
|
|
||
| var request = CreateRequest(replica + "?query=" + query); | ||
| Log.InfoFormat($"Processing {request.RequestUri}"); | ||
|
|
||
| var sw = Stopwatch.StartNew(); | ||
| var task = ProcessRequestAsync(request); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| inFlight.Add((replica, task, sw)); | ||
|
|
||
| var delayTask = Task.Delay(timeoutPerRequest); | ||
| var anyResponseTask = Task.WhenAny(inFlight.Select(x => x.task)); | ||
|
|
||
| var completed = await Task.WhenAny(anyResponseTask, delayTask); | ||
|
|
||
| if (completed != anyResponseTask) continue; | ||
| { | ||
| await anyResponseTask; | ||
|
|
||
| var winner = inFlight.First(x => x.task.IsCompleted); | ||
|
|
||
| try | ||
| { | ||
| var result = await winner.task; | ||
| winner.sw.Stop(); | ||
| tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); | ||
| return result; | ||
| } | ||
| catch (WebException) | ||
| { | ||
| winner.sw.Stop(); | ||
| tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); | ||
| inFlight.Remove(winner); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| while (inFlight.Count > 0) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var remainingTime = timeout - stopwatch.Elapsed; | ||
| if (remainingTime <= TimeSpan.Zero) | ||
| break; | ||
|
|
||
| var completed = await Task.WhenAny( | ||
| Task.WhenAny(inFlight.Select(x => x.task)), | ||
| Task.Delay(remainingTime) | ||
| ); | ||
|
|
||
| if (completed is Task<string>) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тут можно чуть упростить условие в цикле, к примеру написать Так лучше читается код, более линейно без лишних блоков) |
||
| { | ||
| var winner = inFlight.First(x => x.task.IsCompleted); | ||
|
|
||
| try | ||
| { | ||
| var result = await winner.task; | ||
| winner.sw.Stop(); | ||
| tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); | ||
| return result; | ||
| } | ||
| catch (WebException) | ||
| { | ||
| winner.sw.Stop(); | ||
| tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); | ||
| inFlight.Remove(winner); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| throw new TimeoutException("No replica responded within timeout"); | ||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Неправильно записывать чистое время здесь т.к. мы по факту запишем время
timeoutPerRequest, а не реальное время за которое ответила реплика. Тут нужно хотя бы домножать это на какой-то штраф, чтобы эта реплика "опустилась" в списке в самый конецЛибо т.к. мы не знаем, точное время завершения запроса, лучше не записывать такую статистику в принципе. Это будет даже более правильное поведение, чем записывать ошибочную статистику 🤔