-
Notifications
You must be signed in to change notification settings - Fork 54
Балашов А. Д. homework 2 #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
|
|
@@ -12,10 +14,48 @@ public class RoundRobinClusterClient : ClusterClientBase | |
| public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| } | ||
|
|
||
| private static readonly ConcurrentDictionary<string, long> ReplicaStats = new(); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var sortedReplicas = ReplicaAddresses | ||
| .OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0)) | ||
| .ToList(); | ||
| var sw = Stopwatch.StartNew(); | ||
|
|
||
| for (int i = 0; i < sortedReplicas.Count; i++) | ||
| { | ||
| var remainingReplicas = sortedReplicas.Count - i; | ||
| var timeLeft = timeout - sw.Elapsed; | ||
| var currentReplicaTimeout = TimeSpan.FromMilliseconds(timeLeft.TotalMilliseconds / remainingReplicas); | ||
| var uri = sortedReplicas[i]; | ||
| var request = CreateRequest(uri + "?query=" + query); | ||
| var requestTimer = Stopwatch.StartNew(); | ||
| var task = ProcessRequestAsync(request); | ||
| var delayTask = Task.Delay(currentReplicaTimeout); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. timeLeft и currentReplicaTimeout могут оказаться отрицательными. Task.Delay может или выбросить исключение, или, с маленьким шансом, превратиться в никогда не заканчивающуюся таску |
||
| var completedTask = await Task.WhenAny(task, delayTask); | ||
|
|
||
| if (completedTask == task) | ||
| { | ||
| try | ||
| { | ||
| var result = await task; | ||
| UpdateStats(uri, requestTimer.ElapsedMilliseconds); | ||
| return result; | ||
| } | ||
| catch (Exception) | ||
| { | ||
| UpdateStats(uri, (long)timeout.TotalMilliseconds); | ||
| } | ||
| } | ||
| } | ||
| throw new TimeoutException($"Request {query} timed out"); | ||
| } | ||
|
|
||
| private void UpdateStats(string uri, long time) | ||
| { | ||
| ReplicaStats.AddOrUpdate(uri, time, (key, oldVal) => (oldVal + time) / 2); | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Linq; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
|
|
@@ -12,12 +14,84 @@ public class SmartClusterClient : ClusterClientBase | |
| public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) | ||
| { | ||
| } | ||
|
|
||
| private static readonly ConcurrentDictionary<string, long> ReplicaStats = new(); | ||
|
|
||
| public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout) | ||
| { | ||
| throw new NotImplementedException(); | ||
| var sortedReplicas = ReplicaAddresses | ||
| .OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0)) | ||
| .ToList(); | ||
|
|
||
| var tasks = new List<Task<ResponseData>>(); | ||
| var perReplicaTimeout = TimeSpan.FromMilliseconds(timeout.TotalMilliseconds / ReplicaAddresses.Length); | ||
| var sw = Stopwatch.StartNew(); | ||
|
|
||
| for (int i = 0; i < sortedReplicas.Count; i++) | ||
| { | ||
| var request = CreateRequest(sortedReplicas[i] + "?query=" + query); | ||
| tasks.Add(ProcessAndMeasureAsync(sortedReplicas[i], request)); | ||
|
|
||
| while (true) | ||
| { | ||
| var timePassed = sw.Elapsed; | ||
| var timeToNextLaunch = perReplicaTimeout * (i + 1) - timePassed; | ||
|
|
||
| if (i == ReplicaAddresses.Length - 1) | ||
| timeToNextLaunch = timeout - timePassed; | ||
|
|
||
| var delayTask = Task.Delay(timeToNextLaunch); | ||
| var completedTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask); | ||
|
|
||
| if (completedTask != delayTask) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Если реплики будут тормозить, то соответствующие таски так и останутся в tasks, и статистика для них не обновится |
||
| { | ||
| var finishedTask = await (Task<Task<ResponseData>>)completedTask; | ||
| try | ||
| { | ||
| var response = await finishedTask; | ||
| UpdateStats(response.Uri, response.ElapsedMs); | ||
| return response.Content; | ||
| } | ||
| catch (Exception) | ||
| { | ||
| UpdateStats(sortedReplicas[i], (long)timeout.TotalMilliseconds); | ||
| tasks.Remove(finishedTask); | ||
| if (tasks.Count == 0 || i < ReplicaAddresses.Length - 1) | ||
| { | ||
| goto next; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Почему не просто break? |
||
| } | ||
| } | ||
| } | ||
| else | ||
| { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| next: ; | ||
| } | ||
|
|
||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| private async Task<ResponseData> ProcessAndMeasureAsync(string uri, System.Net.WebRequest request) | ||
| { | ||
| var timer = Stopwatch.StartNew(); | ||
| var content = await ProcessRequestAsync(request); | ||
| return new ResponseData { Uri = uri, Content = content, ElapsedMs = timer.ElapsedMilliseconds }; | ||
| } | ||
| private void UpdateStats(string uri, long elapsedMs) | ||
| { | ||
| ReplicaStats.AddOrUpdate(uri, elapsedMs, (key, oldVal) => (oldVal + elapsedMs) / 2); | ||
| } | ||
|
|
||
| private class ResponseData | ||
| { | ||
| public string Uri { get; set; } | ||
| public string Content { get; set; } | ||
| public long ElapsedMs { get; set; } | ||
| } | ||
|
|
||
| protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
По-хорошему delayTask бы создать до всех ProcessRequestAsync. Иначе выходит, что запросы уже какое-то время как отправлены