-
Notifications
You must be signed in to change notification settings - Fork 54
Зубков Артём 2ДЗ #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Зубков Артём 2ДЗ #58
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Net.Http; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
@@ -9,13 +10,77 @@ namespace ClusterClient.Clients | |
| { | ||
| public class RoundRobinClusterClient : ClusterClientBase | ||
| { | ||
| private readonly Dictionary<string, Queue<double>> stats = new (); | ||
| private readonly object lockObj = new (); | ||
|
|
||
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| foreach (var address in replicaAddresses) | ||
| stats[address] = new Queue<double>(); | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var orderedReplicas = ReplicaAddresses | ||
| .OrderBy(GetAverage) | ||
| .ToArray(); | ||
|
|
||
| var partTimeout = timeout / orderedReplicas.Length; | ||
| var globalTimeout = Task.Delay(timeout); | ||
|
|
||
| foreach (var replicaAddress in orderedReplicas) | ||
| { | ||
| var startTime = DateTime.UtcNow; | ||
| var timeoutTask = Task.Delay(partTimeout); | ||
| var webRequest = CreateRequest(replicaAddress + "?query=" + query); | ||
| Log.InfoFormat($"Processing {webRequest.RequestUri}"); | ||
| var replicaTask = ProcessRequestAsync(webRequest); | ||
| var resultTask = await Task.WhenAny(replicaTask, timeoutTask); | ||
|
|
||
| if (resultTask == timeoutTask) | ||
| continue; | ||
|
|
||
| if(resultTask.IsFaulted) | ||
| continue; | ||
|
|
||
| var result = await replicaTask; | ||
| UpdateStatistics(replicaAddress, DateTime.UtcNow - startTime); | ||
| return result; | ||
| } | ||
|
|
||
| var lastReplica = orderedReplicas.Last(); | ||
| var webRequestLast = CreateRequest(lastReplica + "?query=" + query); | ||
| Log.InfoFormat($"Processing {webRequestLast.RequestUri}"); | ||
|
|
||
| var startLast = DateTime.UtcNow; | ||
| var replicaLastTask = ProcessRequestAsync(webRequestLast); | ||
|
|
||
| var resultLastTask = await Task.WhenAny(replicaLastTask, globalTimeout); | ||
|
|
||
| if (resultLastTask == globalTimeout) | ||
| throw new TimeoutException(); | ||
|
|
||
| var resultFinal = await replicaLastTask; | ||
| UpdateStatistics(lastReplica, DateTime.UtcNow - startLast); | ||
| return resultFinal; | ||
|
Comment on lines
+51
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. К последней реплике делаешь запрос дважды, хотя предполагается, что ко всем будет сделано по одному. |
||
| } | ||
|
|
||
| private double GetAverage(string replica) | ||
| { | ||
| lock (lockObj) | ||
| { | ||
| var queue = stats[replica]; | ||
| return queue.Count == 0 ? double.MaxValue : queue.Average(); | ||
| } | ||
| } | ||
|
|
||
| private void UpdateStatistics(string replica, TimeSpan time) | ||
| { | ||
| lock (lockObj) | ||
| { | ||
| var queue = stats[replica]; | ||
| queue.Enqueue(time.TotalMilliseconds); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Свежие данные можно сделать более значимыми, чем старые (удалять слишком старые или умножать их на понижающий коэффициент) |
||
| } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,118 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Globalization; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using log4net; | ||
|
|
||
| namespace ClusterClient.Clients | ||
| { | ||
| public class SmartClusterClient : ClusterClientBase | ||
| { | ||
| private readonly Dictionary<string, Queue<double>> stats = new Dictionary<string, Queue<double>>(); | ||
| private readonly object lockObj = new object(); | ||
|
|
||
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| foreach (var address in replicaAddresses) | ||
| stats[address] = new Queue<double>(); | ||
| } | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| { | ||
| var orderedReplicas = ReplicaAddresses | ||
| .OrderBy(GetAverage) | ||
| .ToArray(); | ||
| var partOfTimeout = timeout / orderedReplicas.Length; | ||
| var tasksAtWork = new List<(Task<string> task, string replica, DateTime startTime)>(); | ||
| var timeoutGlobal = Task.Delay(timeout) | ||
| .ContinueWith<string>(_ => throw new TimeoutException()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ContinueWith ни на что не влияет: исключение внутри него, конечно, возникнет, но т.к. ты не делаешь await timeoutGlobal, оно останется завёрнутым в таску и дальше не вылетит. |
||
| var tasks = new List<Task<string>> { timeoutGlobal }; | ||
|
|
||
| foreach (var replicaAddress in orderedReplicas) | ||
| { | ||
| var webRequest = CreateRequest(replicaAddress + "?query=" + query); | ||
| Log.InfoFormat($"Processing {webRequest.RequestUri}"); | ||
| var startTime = DateTime.UtcNow; | ||
| var replicaTask = ProcessRequestAsync(webRequest); | ||
|
|
||
| tasksAtWork.Add((replicaTask, replicaAddress, startTime)); | ||
| tasks.Add(replicaTask); | ||
|
|
||
| var timeoutDelay = Task.Delay(partOfTimeout).ContinueWith<string>(_ => throw new TimeoutException()); | ||
|
|
||
| tasks.Add(timeoutDelay); | ||
|
|
||
| var taskInCycle = await Task.WhenAny(tasks); | ||
|
|
||
| tasks.Remove(timeoutDelay); | ||
|
|
||
| if (taskInCycle == timeoutGlobal) | ||
| throw new TimeoutException(); | ||
|
|
||
| if (taskInCycle == timeoutDelay) | ||
| continue; | ||
|
|
||
| try | ||
| { | ||
| var result = await taskInCycle; | ||
| var info = tasksAtWork.First(t => t.task == taskInCycle); | ||
| UpdateStatistics(info.replica, DateTime.UtcNow - info.startTime); | ||
|
|
||
| return result; | ||
| } catch | ||
| { | ||
| tasks.Remove(taskInCycle); | ||
| tasksAtWork.RemoveAll(t => t.task == taskInCycle); | ||
| } | ||
| } | ||
| while (tasksAtWork.Count > 1) | ||
| { | ||
| var completedTask = await Task.WhenAny(tasks); | ||
|
|
||
| if (completedTask == timeoutGlobal) | ||
| throw new TimeoutException(); | ||
|
Comment on lines
+76
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Статистика для тормозящих реплик останется необновлённой |
||
|
|
||
| try | ||
| { | ||
| var result = await completedTask; | ||
|
|
||
| var info = tasksAtWork.First(t => t.task == completedTask); | ||
| UpdateStatistics(info.replica, DateTime.UtcNow - info.startTime); | ||
|
|
||
| return result; | ||
| } | ||
| catch | ||
| { | ||
| tasks.Remove(completedTask); | ||
| tasks.RemoveAll(t => t == completedTask); | ||
| } | ||
| } | ||
| } | ||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
|
|
||
| private double GetAverage(string replica) | ||
| { | ||
| lock (lockObj) | ||
| { | ||
| var queue = stats[replica]; | ||
| return queue.Count == 0 ? double.MaxValue : queue.Average(); | ||
| } | ||
| } | ||
|
|
||
| private void UpdateStatistics(string replica, TimeSpan time) | ||
| { | ||
| lock (lockObj) | ||
| { | ||
| var queue = stats[replica]; | ||
| queue.Enqueue(time.TotalMilliseconds); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uris.First() - первая таска по порядку, а не по готовности. Нет никаких причин считать, что именно она упала с ошибкой, она могла и не успеть ещё отработать