-
Notifications
You must be signed in to change notification settings - Fork 54
Шептихин Вячеслав, ДЗ №2 #69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,54 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| { | ||
| private readonly ConcurrentDictionary<string, long> _replicaTimings = new(); | ||
|
|
||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| foreach (var replicaAddress in replicaAddresses) | ||
| { | ||
| _replicaTimings.TryAdd(replicaAddress, 0); | ||
| } | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var stopwatch = Stopwatch.StartNew(); | ||
| var sortedReplicas = ReplicaAddresses.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)); | ||
| var remainingReplicas = ReplicaAddresses.Length; | ||
|
|
||
| foreach (var replicaAddress in sortedReplicas) | ||
| { | ||
| var timeoutPerReplica = (timeout - stopwatch.Elapsed) / remainingReplicas--; | ||
|
|
||
| var request = CreateRequest(replicaAddress + "?query=" + query); | ||
| var requestTask = ProcessRequestAsync(request); | ||
| var delay = Task.Delay(timeoutPerReplica); | ||
|
|
||
| var completedTask = await Task.WhenAny(requestTask, delay); | ||
| _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; | ||
|
|
||
| if (completedTask == delay) | ||
| continue; | ||
|
|
||
| if (!completedTask.IsCompletedSuccessfully) | ||
| continue; | ||
|
|
||
| return await requestTask; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
|
|
@@ -9,15 +11,63 @@ namespace ClusterClient.Clients | |
| { | ||
| public class SmartClusterClient : ClusterClientBase | ||
| { | ||
| private readonly ConcurrentDictionary<string, long> _replicaTimings = new(); | ||
|
|
||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| foreach (var replicaAddress in replicaAddresses) | ||
| { | ||
| _replicaTimings.TryAdd(replicaAddress, 0); | ||
| } | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var stopwatch = Stopwatch.StartNew(); | ||
| var sortedReplicas = ReplicaAddresses.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)); | ||
| var remainingReplicas = ReplicaAddresses.Length; | ||
| var pendingRequests = new List<Task<string>>(); | ||
|
|
||
| foreach (var replicaAddress in sortedReplicas) | ||
| { | ||
| var timeoutPerReplica = (timeout - stopwatch.Elapsed) / remainingReplicas--; | ||
|
|
||
| pendingRequests.Add(ProcessRequestAsync(CreateRequest(replicaAddress + "?query=" + query))); | ||
|
|
||
| var delay = Task.Delay(timeoutPerReplica); | ||
| var completedTask = await Task.WhenAny(pendingRequests.Append(delay)) as Task<string>; | ||
| _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тут сразу несколько ошибок:
|
||
|
|
||
| if (completedTask == null) | ||
| continue; | ||
|
|
||
| pendingRequests.Remove(completedTask); | ||
|
|
||
| if (completedTask.IsCompletedSuccessfully) | ||
| return await completedTask; | ||
| } | ||
|
|
||
| while (pendingRequests.Count > 0) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. А в этом цикле мы в итоге статистику вообще не обновляем. Соответственно данные по скорости работы реплик будут некорректные и не полные |
||
| { | ||
| var remainingTime = timeout - stopwatch.Elapsed; | ||
| if (remainingTime <= TimeSpan.Zero) | ||
| break; | ||
|
|
||
| var completedTask = | ||
| await Task.WhenAny(pendingRequests.Append(Task.Delay(remainingTime))) as Task<string>; | ||
|
|
||
| if (completedTask == null) | ||
| break; | ||
|
|
||
| pendingRequests.Remove(completedTask); | ||
|
|
||
| if (completedTask.IsCompletedSuccessfully) | ||
| return await completedTask; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Некорректное поведение в случае если реплика не ответила и мы вышли по таймауту или если произошла ошибка при отправке запроса или у самой реплики.
В случае с таймаутом, мы запишем некорректное время т.к. мы не знаем реального времени когда завершится запрос. В таком случае лучше либо не записывать такую статистику (она не достоверная), либо записывать время выполнение но умножать его на какой-то штраф (хотя бы x2/x3)
В случае если произошла ошибка, то
Task.WhenAnyможет моментально вернуть нашу таску и мы запишем время выполнения некорректно, что потом повлияет на сортировку и эта реплика станет первой (а не в конце списка). В этом случае надо выставлять достаточно большое штрафное время (выше нормального времени обработки)