-
Notifications
You must be signed in to change notification settings - Fork 54
Копытов Михаил, ДЗ 2 #63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,48 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class ParallelClusterClient : ClusterClientBase | ||
| public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) | ||
| { | ||
| public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| } | ||
| var requestTasks = ReplicaAddresses | ||
| .Select(baseUri => ProcessRequestAsync(CreateRequest(baseUri + "?query=" + query))) | ||
| .ToList(); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var timeoutTask = Task.Delay(timeout); | ||
|
|
||
| while (requestTasks.Count > 0) | ||
| { | ||
| var completedTask = await Task.WhenAny(requestTasks.Concat([timeoutTask]).ToArray()); | ||
|
|
||
| if (timeoutTask.IsCompleted) | ||
| { | ||
| Log.Error("Request timed out"); | ||
| throw new TimeoutException("Request timed out"); | ||
| } | ||
|
|
||
| requestTasks.Remove((Task<string>)completedTask); | ||
|
|
||
| try | ||
| { | ||
| var result = await (Task<string>)completedTask; | ||
| if (string.IsNullOrEmpty(result)) continue; | ||
| Log.Info("Request completed"); | ||
| return result; | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| Log.Debug($"Request failed: {e.Message}"); | ||
| } | ||
| } | ||
|
|
||
| throw new TimeoutException("All replicas were timed out"); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,68 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| { | ||
| private readonly Dictionary<string, double> _replicaStatus = new(); | ||
| private const double PenaltyTimeInMs = 100; | ||
|
|
||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| foreach (var address in replicaAddresses) | ||
| { | ||
| _replicaStatus[address] = double.MaxValue; | ||
| } | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var timer = Stopwatch.StartNew(); | ||
| var replicasLeft = ReplicaAddresses.Length; | ||
|
|
||
| var sortedReplicas = ReplicaAddresses | ||
| .OrderBy(address => _replicaStatus.GetValueOrDefault(address, double.MaxValue)) | ||
| .ToList(); | ||
|
|
||
| foreach (var replicaAddress in sortedReplicas) | ||
| { | ||
| var timeLeft = timeout - timer.Elapsed; | ||
|
|
||
| if (timeLeft <= TimeSpan.Zero) | ||
| throw new TimeoutException(); | ||
|
|
||
| var replicaTimeout = TimeSpan.FromTicks(timeLeft.Ticks / replicasLeft); | ||
| replicasLeft--; | ||
|
|
||
| var request = CreateRequest($"{replicaAddress}?query={Uri.EscapeDataString(query)}"); | ||
| var requestTask = ProcessRequestAsync(request); | ||
| var timeoutTask = Task.Delay(replicaTimeout); | ||
|
|
||
| var completedTask = await Task.WhenAny(requestTask, timeoutTask); | ||
|
|
||
| if (completedTask == timeoutTask) | ||
| { | ||
| _replicaStatus[replicaAddress] = replicaTimeout.TotalMilliseconds; | ||
| continue; | ||
| } | ||
|
|
||
| if (requestTask.Status == TaskStatus.RanToCompletion) | ||
| { | ||
| _replicaStatus[replicaAddress] = replicaTimeout.TotalMilliseconds; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Не верно записывается время ответа реплики. В результат записывается время таймаута, а не фактическое время выполнения запроса (его можно было бы измерить через создание В итоге даже при успешном выполнении, реплике записывается время его таймаута и при больших таймаутах хорошая реплика будет помечена как медленная |
||
| return await requestTask; | ||
| } | ||
|
|
||
| _replicaStatus[replicaAddress] = PenaltyTimeInMs; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,111 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class SmartClusterClient : ClusterClientBase | ||
| { | ||
| private readonly Dictionary<string, double> _replicaStatus = new(); | ||
| private const double PenaltyTimeInMs = 100; | ||
| private const int TaskCheckIntervalInMs = 10; | ||
|
|
||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| foreach (var address in replicaAddresses) | ||
| { | ||
| _replicaStatus[address] = double.MaxValue; | ||
| } | ||
| } | ||
|
|
||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| var timer = Stopwatch.StartNew(); | ||
| var replicasLeft = ReplicaAddresses.Length; | ||
| var pendingTasks = new List<(Task<string> Task, string Address)>(); | ||
|
|
||
| var sortedReplicas = ReplicaAddresses | ||
| .OrderBy(address => _replicaStatus.GetValueOrDefault(address, double.MaxValue)) | ||
| .ToList(); | ||
|
|
||
| foreach (var replicaAddress in sortedReplicas) | ||
| { | ||
| var timeLeft = timeout - timer.Elapsed; | ||
| if (timeLeft <= TimeSpan.Zero) | ||
| throw new TimeoutException(); | ||
|
|
||
| var replicaTimeout = TimeSpan.FromTicks(timeLeft.Ticks / replicasLeft); | ||
| replicasLeft--; | ||
|
|
||
| var request = CreateRequest($"{replicaAddress}?query={Uri.EscapeDataString(query)}"); | ||
| var requestTask = ProcessRequestAsync(request); | ||
| pendingTasks.Add((requestTask, replicaAddress)); | ||
|
|
||
| var timeoutTask = Task.Delay(replicaTimeout); | ||
| var completedTask = await Task.WhenAny( | ||
| pendingTasks.Select(pt => pt.Task) | ||
| .Append(timeoutTask).ToArray()); | ||
|
|
||
| if (completedTask == timeoutTask) | ||
| continue; | ||
|
Comment on lines
+52
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. При таймауте не обновляется время у реплики, в итоге она будет считаться всегда быстрой. Тут бы записывать в статистику время + какой-то штраф |
||
|
|
||
|
|
||
| var finishedItem = ExtractCompletedTask(pendingTasks, completedTask); | ||
|
|
||
|
|
||
| if (finishedItem.Task.Status == TaskStatus.RanToCompletion) | ||
| { | ||
| _replicaStatus[finishedItem.Address] = timer.ElapsedMilliseconds; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тут записывается неверная статистика таймера для реплики. Мы записываем в статистику время прошедшее в целом с начала вызова нашего клиента и туда включено не фактическое время выполнения запроса к конкретной реплики, а все время затраченное на инициализацию, сортировку, цикл, создание запросов ко всем репликам и все ожидания. В итоге первая реплика получит меньшее время чем к примеру вторая реплика, хотя вторая реплика может отвечать быстрее первой. Необъективная статистика получается |
||
| return finishedItem.Task.Result; | ||
| } | ||
|
|
||
| _replicaStatus[finishedItem.Address] = PenaltyTimeInMs; | ||
| } | ||
|
|
||
| while (pendingTasks.Count != 0) | ||
| { | ||
|
Comment on lines
+68
to
+69
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. В While дубль логики, что и в For. Дублирвоание логики всегда плохо и в особенности если где-то ошибка, то ее надо не забыть исправлять в нескольких местах. Для улучшения ситуации, можно было бы это вынести в отдельный метод |
||
| var timeLeft = timeout - timer.Elapsed; | ||
| if (timeLeft <= TimeSpan.Zero) | ||
| throw new TimeoutException(); | ||
|
|
||
| var checkTask = Task.Delay(TaskCheckIntervalInMs); | ||
| var doneTask = await Task.WhenAny( | ||
| pendingTasks | ||
| .Select(pt => pt.Task) | ||
| .Append(checkTask) | ||
| .ToList()); | ||
|
|
||
|
|
||
| if (doneTask == checkTask) | ||
| continue; | ||
|
|
||
| var finishedItem = ExtractCompletedTask(pendingTasks, doneTask); | ||
|
|
||
| if (finishedItem.Task.Status == TaskStatus.RanToCompletion) | ||
| { | ||
| _replicaStatus[finishedItem.Address] = timer.ElapsedMilliseconds; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Так как дубль логики, то та же ошибка что и выше, мы записываем необъективную оценку времени выполнения в статистику |
||
| return finishedItem.Task.Result; | ||
| } | ||
|
|
||
| _replicaStatus[finishedItem.Address] = PenaltyTimeInMs; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| private static (Task<string> Task, string Address) ExtractCompletedTask( | ||
| List<(Task<string> Task, string Address)> pendingTasks, Task completedTask) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var finishedIndex = pendingTasks.FindIndex(pt => pt.Task == completedTask); | ||
| if (finishedIndex == -1) throw new InvalidOperationException(); | ||
| var finishedItem = pendingTasks[finishedIndex]; | ||
| pendingTasks.RemoveAt(finishedIndex); | ||
| return finishedItem; | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Эта запись статистики становится немного бессмысленной, когда ниже то же время выставляется в статистику. Для исправления ситуации здесь необходимо было бы увеличивать время на какой-то штраф, чтобы гарантировано эту реплику понизить для последующих запросов