-
Notifications
You must be signed in to change notification settings - Fork 54
Головнев Максим ДЗ2 #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,44 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class ParallelClusterClient : ClusterClientBase | ||
| public class ParallelClusterClient(string[] replicaAddresses) | ||
| : ClusterClientBase(replicaAddresses) | ||
| { | ||
| public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| } | ||
| var tasks = ReplicaAddresses | ||
| .Select(address => | ||
| { | ||
| var request = CreateRequest(address + "?query=" + query); | ||
| Log.InfoFormat($"Processing {request.RequestUri}"); | ||
| return ProcessRequestAsync(request); | ||
| }) | ||
| .ToList(); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var timeoutTask = Task.Delay(timeout); | ||
|
|
||
| while (tasks.Count > 0) | ||
| { | ||
| var completed = await Task.WhenAny(tasks.Append(timeoutTask)); | ||
|
|
||
| if (completed == timeoutTask) | ||
| throw new TimeoutException(); | ||
|
|
||
| var finishedTask = (Task<string>)completed; | ||
| tasks.Remove(finishedTask); | ||
|
|
||
| if (finishedTask.Status == TaskStatus.RanToCompletion) | ||
| return finishedTask.Result; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
| protected override ILog Log => | ||
| LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
|
|
||
| namespace ClusterClient.Clients; | ||
|
|
||
| public class ReplicaStatisticsEntry(string address, int maxHistory = 10) | ||
| { | ||
| public string Address { get; } = address; | ||
| private readonly Queue<long> lastTimes = new(); | ||
|
|
||
| public void RecordResponseTime(long ms) | ||
| { | ||
| lastTimes.Enqueue(ms); | ||
| if (lastTimes.Count > maxHistory) | ||
| lastTimes.Dequeue(); | ||
| } | ||
|
|
||
| public double AverageTime => lastTimes.Any() ? lastTimes.Average() : double.MaxValue; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,21 +1,61 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| public class RoundRobinClusterClient: ClusterClientBase | ||
| { | ||
| private readonly Dictionary<string, ReplicaStatisticsEntry> replicaStats; | ||
|
|
||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| replicaStats = replicaAddresses.ToDictionary( | ||
| addr => addr, | ||
| addr => new ReplicaStatisticsEntry(addr) | ||
| ); | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var start = Stopwatch.StartNew(); | ||
|
|
||
| var orderedReplicas = replicaStats.Values | ||
| .OrderBy(x => x.AverageTime) | ||
| .Select(x => x.Address) | ||
| .ToArray(); | ||
|
Comment on lines
+27
to
+30
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Вижу сортировку по времени ответа, но не вижу ниже сохранения статистики. Получается этот клиент по факту не реализует обновление статистики и сортировка тут "бессмысленна" |
||
|
|
||
| for (var i = 0; i < orderedReplicas.Length; i++) | ||
| { | ||
| var remainingTime = timeout - start.Elapsed; | ||
|
|
||
| if (remainingTime <= TimeSpan.Zero) | ||
| throw new TimeoutException(); | ||
|
|
||
| var remainingReplicas = orderedReplicas.Length - i; | ||
| var replicaTimeout = TimeSpan.FromMilliseconds( | ||
| remainingTime.TotalMilliseconds / remainingReplicas); | ||
|
|
||
| var uri = orderedReplicas[i] + "?query=" + query; | ||
| var webRequest = CreateRequest(uri); | ||
|
|
||
| Log.InfoFormat($"Processing {webRequest.RequestUri}, timeout {replicaTimeout}"); | ||
|
|
||
| var requestTask = ProcessRequestAsync(webRequest); | ||
| var delayTask = Task.Delay(replicaTimeout); | ||
|
|
||
| var completed = await Task.WhenAny(requestTask, delayTask); | ||
|
|
||
| if (completed != requestTask) continue; | ||
| if (!requestTask.IsFaulted) | ||
| return requestTask.Result; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,103 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class SmartClusterClient : ClusterClientBase | ||
| { | ||
| private readonly Dictionary<string, ReplicaStatisticsEntry> replicaStats; | ||
|
|
||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| replicaStats = replicaAddresses.ToDictionary( | ||
| addr => addr, | ||
| addr => new ReplicaStatisticsEntry(addr) | ||
| ); | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
|
|
||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var deadline = DateTime.UtcNow + timeout; | ||
| var runningTasks = new List<(Task<string> task, string address, Stopwatch timer)>(); | ||
|
|
||
| var orderedReplicas = replicaStats.Values | ||
| .OrderBy(x => x.AverageTime) | ||
| .Select(x => x.Address) | ||
| .ToArray(); | ||
|
|
||
| var addressesLeft = orderedReplicas.Length; | ||
|
|
||
| foreach (var replica in orderedReplicas) | ||
| { | ||
| var remaining = deadline - DateTime.UtcNow; | ||
| if (remaining <= TimeSpan.Zero) | ||
| throw new TimeoutException(); | ||
|
|
||
| var replicaTimeout = remaining / addressesLeft; | ||
|
|
||
| var request = CreateRequest($"{replica}?query={query}"); | ||
| Log.InfoFormat($"Processing {request.RequestUri}"); | ||
|
|
||
| var timer = Stopwatch.StartNew(); | ||
| var task = ProcessRequestAsync(request); | ||
| runningTasks.Add((task, replica, timer)); | ||
|
|
||
| var finished = await WaitOneAsync(runningTasks, replicaTimeout); | ||
| if (finished != null) | ||
| { | ||
| var (resultTask, resultReplica, resultTimer) = finished.Value; | ||
| resultTimer.Stop(); | ||
| replicaStats[resultReplica].RecordResponseTime(resultTimer.ElapsedMilliseconds); | ||
| return await resultTask; | ||
| } | ||
|
|
||
| addressesLeft--; | ||
| } | ||
|
|
||
| while (runningTasks.Count > 0) | ||
| { | ||
| var remaining = deadline - DateTime.UtcNow; | ||
| if (remaining <= TimeSpan.Zero) | ||
| throw new TimeoutException(); | ||
|
|
||
| var finished = await WaitOneAsync(runningTasks, remaining); | ||
| if (finished == null) continue; | ||
|
|
||
| var (resultTask, resultReplica, resultTimer) = finished.Value; | ||
| resultTimer.Stop(); | ||
| replicaStats[resultReplica].RecordResponseTime(resultTimer.ElapsedMilliseconds); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Записывается только статистика по успешным запросам, но если вдруг одна реплика ответит "очень хорошо", а потом будет тупить, то она не окажется в конце списка реплик, как того бы хотелось. Записывать статистику нужно всегда, просто в случае проблем (ошибки/таймаута) записывать время * на штраф, чтобы гарантированно ее перемещать ниже стабильно работающих реплик |
||
| return await resultTask; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| private async Task<(Task<string>, string, Stopwatch)?> WaitOneAsync( | ||
| List<(Task<string> task, string address, Stopwatch timer)> runningTasks, | ||
| TimeSpan timeout) | ||
| { | ||
| if (timeout <= TimeSpan.Zero) | ||
| return null; | ||
|
|
||
| var delayTask = Task.Delay(timeout); | ||
| var tasksList = runningTasks.Select(x => x.task).Append(delayTask).ToList(); | ||
|
|
||
| var completed = await Task.WhenAny(tasksList); | ||
|
|
||
| if (completed == delayTask) | ||
| return null; | ||
|
|
||
| var finishedTask = (Task<string>)completed; | ||
| var result = runningTasks.First(x => x.task == finishedTask); | ||
| runningTasks.Remove(result); | ||
|
|
||
| return finishedTask.IsCompletedSuccessfully ? (finishedTask, result.address, result.timer) : null; | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queueне потокобезопасный, лучше либо добавить блокировки (асинхронные), хотя бы через семафор, либо использовать потокобезопасныйConcurrentQueueЕсли "в реальной ситуации" кто-то попробует делать несколько разных запросов к одной реплике через один клиент, то можем нарваться в лучшем случае на неопределенное поведение гонки. Поэтому лучше потенциальные места такие обкладывать LockFree структурами или блокировками)