Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d511900
Add cluster capacity policy model
ashleyvansp Jun 21, 2025
f38ef03
Add unit tests
ashleyvansp Jun 21, 2025
f9d4433
First pass at writing capacity policy
ashleyvansp Jun 21, 2025
407147e
Ignore nulls when serializing
ashleyvansp Jun 21, 2025
aa036af
Add clusters.yml test files for different capacity policy cases
ashleyvansp Jun 23, 2025
3d7709d
remove unused test files
ashleyvansp Jun 23, 2025
e67a422
Merge branch 'main' into ashleyvansp/capacityPolicy
ashleyvansp Jun 23, 2025
ff73111
Separate generic library functionality from app-specific implementation
ashleyvansp Jun 23, 2025
e0d5028
remove whitespace
ashleyvansp Jun 23, 2025
238625f
fix cluster model
ashleyvansp Jun 24, 2025
cb269f4
new cluster orchestrator
ashleyvansp Jun 25, 2025
9c11c41
fix some errors
ashleyvansp Jun 25, 2025
e0cfb77
fix build
ashleyvansp Jun 25, 2025
e67e3fc
unit test data
ashleyvansp Jun 25, 2025
15ac317
rename
ashleyvansp Jun 25, 2025
3e29698
cluster changes
ashleyvansp Jun 25, 2025
e6ba13f
add ;
ashleyvansp Jun 25, 2025
1d11d5b
small fix
ashleyvansp Jun 25, 2025
5dc3cd2
fix property
ashleyvansp Jun 25, 2025
b24d5a1
build succeeds
ashleyvansp Jun 25, 2025
28e23ce
move YamlDatabaseParserTests
ashleyvansp Jun 25, 2025
5bb8be2
make model comparable and add first unit test
ashleyvansp Jun 25, 2025
5626175
undo schema handler change
ashleyvansp Jun 25, 2025
82e59c9
genericize the capacity policy diff
ashleyvansp Jun 26, 2025
0093e1f
Finish test for generate changes
ashleyvansp Jun 30, 2025
f48e441
revert serialization change
ashleyvansp Jun 30, 2025
0585bc7
Potential fix for code scanning alert no. 36: Call to System.IO.Path.…
ashleyvansp Jun 30, 2025
e1fe5b5
Orchestrator tests
ashleyvansp Jun 30, 2025
59986f1
yaml cluster handler
ashleyvansp Jun 30, 2025
be9c073
yaml cluster handler and test
ashleyvansp Jun 30, 2025
48fd778
fix test path
ashleyvansp Jun 30, 2025
2af167b
handler changes
ashleyvansp Jul 1, 2025
60b6965
add GenerateChangesFromFileAsync
ashleyvansp Jul 1, 2025
b5d6d28
add diagnostics to script
ashleyvansp Jul 2, 2025
b7c400b
remove property changes
ashleyvansp Jul 2, 2025
c0f5562
remove redundant property
ashleyvansp Jul 2, 2025
d2ae9a4
tidy up
ashleyvansp Jul 2, 2025
3207d8b
Potential fix for code scanning alert no. 103: Call to System.IO.Path…
ashleyvansp Jul 2, 2025
467b0df
Merge branch 'main' into ashleyvansp/capacityPolicy
ashleyvansp Jul 2, 2025
f1fa409
yaml cluster handler factory interface
ashleyvansp Jul 2, 2025
76e3a83
Merge branch 'ashleyvansp/capacityPolicy' of https://github.com/githu…
ashleyvansp Jul 2, 2025
b46769c
don't swallow exceptions
ashleyvansp Jul 2, 2025
734b1d1
update readme
ashleyvansp Jul 2, 2025
7c57660
small readme change
ashleyvansp Jul 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 320 additions & 0 deletions KustoSchemaTools.Tests/ClusterCapacityPolicyTests.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
connections:
- name: comprehensive-test
url: comprehensive.eastus
capacityPolicy:
ingestionCapacity:
clusterMaximumConcurrentOperations: 512
coreUtilizationCoefficient: 0.75
extentsMergeCapacity:
minimumConcurrentOperationsPerNode: 1
maximumConcurrentOperationsPerNode: 3
extentsPurgeRebuildCapacity:
maximumConcurrentOperationsPerNode: 1
exportCapacity:
clusterMaximumConcurrentOperations: 100
coreUtilizationCoefficient: 0.25
extentsPartitionCapacity:
clusterMinimumConcurrentOperations: 1
clusterMaximumConcurrentOperations: 32
materializedViewsCapacity:
clusterMaximumConcurrentOperations: 1
extentsRebuildCapacity:
clusterMaximumConcurrentOperations: 50
maximumConcurrentOperationsPerNode: 5
storedQueryResultsCapacity:
maximumConcurrentOperationsPerDbAdmin: 250
coreUtilizationCoefficient: 0.75
streamingIngestionPostProcessingCapacity:
maximumConcurrentOperationsPerNode: 4
purgeStorageArtifactsCleanupCapacity:
maximumConcurrentOperationsPerCluster: 2
periodicStorageArtifactsCleanupCapacity:
maximumConcurrentOperationsPerCluster: 2
queryAccelerationCapacity:
clusterMaximumConcurrentOperations: 100
coreUtilizationCoefficient: 0.5
graphSnapshotsCapacity:
clusterMaximumConcurrentOperations: 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
connections:
- name: partial-test
url: partial.eastus
capacityPolicy:
ingestionCapacity:
clusterMaximumConcurrentOperations: 256
exportCapacity:
coreUtilizationCoefficient: 0.5
materializedViewsCapacity:
clusterMaximumConcurrentOperations: 2
queryAccelerationCapacity:
clusterMaximumConcurrentOperations: 75
coreUtilizationCoefficient: 0.6
1 change: 1 addition & 0 deletions KustoSchemaTools/Helpers/Serialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
{
Formatting = Formatting.Indented,
ContractResolver = new PascalCaseContractResolver(),
NullValueHandling = NullValueHandling.Ignore,
};

public static JsonSerializer JsonSerializer { get; } = new JsonSerializer
Expand Down Expand Up @@ -59,19 +60,19 @@

public static T Merge<T>(this T baseObject, T mergeObject)
{
var o1 = JObject.FromObject(baseObject, CloneJsonSerializer);

Check warning on line 63 in KustoSchemaTools/Helpers/Serialization.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'o' in 'JObject JObject.FromObject(object o, JsonSerializer jsonSerializer)'.
var o2 = JObject.FromObject(mergeObject, CloneJsonSerializer);

Check warning on line 64 in KustoSchemaTools/Helpers/Serialization.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'o' in 'JObject JObject.FromObject(object o, JsonSerializer jsonSerializer)'.
o1.Merge(o2, new JsonMergeSettings
{
MergeArrayHandling = MergeArrayHandling.Replace
});

return o1.ToObject<T>(CloneJsonSerializer);

Check warning on line 70 in KustoSchemaTools/Helpers/Serialization.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference return.
}

public static T Clone<T>(this T baseObject)
{
return JsonConvert.DeserializeObject<T>(JsonConvert.SerializeObject(baseObject));

Check warning on line 75 in KustoSchemaTools/Helpers/Serialization.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference return.
}

public static int RowLength(this string source)
Expand Down
57 changes: 43 additions & 14 deletions KustoSchemaTools/KustoSchemaHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
using KustoSchemaTools.Model;
using KustoSchemaTools.Parser;
using KustoSchemaTools.Parser.KustoLoader;
using KustoSchemaTools.Parser.KustoWriter;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Data;
using System.Diagnostics.Metrics;
using System.Text;
using System.Threading.Channels;

namespace KustoSchemaTools
{
Expand Down Expand Up @@ -43,13 +41,13 @@ public KustoSchemaHandler(ILogger<KustoSchemaHandler<T>> schemaHandlerLogger, Ya
{
Log.LogInformation($"Generating diff markdown for {Path.Combine(path, databaseName)} => {cluster}/{escapedDbName}");

var dbHandler = KustoDatabaseHandlerFactory.Create(cluster.Url, escapedDbName);
var dbHandler = CreateDatabaseHandlerWithCapacityPolicy(cluster.Url, escapedDbName, clusters.CapacityPolicy);
var kustoDb = await dbHandler.LoadAsync();
var changes = DatabaseChanges.GenerateChanges(kustoDb, yamlDb, escapedDbName, Log);

var comments = changes.Select(itm => itm.Comment).Where(itm => itm != null).ToList();


isValid &= changes.All(itm => itm.Scripts.All(itm => itm.IsValid != false)) && comments.All(itm => itm.FailsRollout == false);

sb.AppendLine($"# {cluster.Name}/{escapedDbName} ({cluster.Url})");
Expand All @@ -74,15 +72,15 @@ public KustoSchemaHandler(ILogger<KustoSchemaHandler<T>> schemaHandlerLogger, Ya
}

var scriptSb = new StringBuilder();
foreach(var script in changes.SelectMany(itm => itm.Scripts).Where(itm => itm.IsValid == true).OrderBy(itm => itm.Order))
foreach (var script in changes.SelectMany(itm => itm.Scripts).Where(itm => itm.IsValid == true).OrderBy(itm => itm.Order))
{
scriptSb.AppendLine(script.Text);
}

Log.LogInformation($"Following scripts will be applied:\n{scriptSb}");
}

foreach(var follower in yamlDb.Followers)
foreach (var follower in yamlDb.Followers)
{

Log.LogInformation($"Generating diff markdown for {Path.Combine(path, databaseName)} => {follower.Key}/{follower.Value.DatabaseName}");
Expand All @@ -100,7 +98,7 @@ public KustoSchemaHandler(ILogger<KustoSchemaHandler<T>> schemaHandlerLogger, Ya
foreach (var change in changes)
{
sb.AppendLine(change.Markdown);
sb.AppendLine();
sb.AppendLine();
}
}
return (sb.ToString(), isValid);
Expand All @@ -112,12 +110,12 @@ public async Task Import(string path, string databaseName, bool includeColumns)
var clusters = Serialization.YamlPascalCaseDeserializer.Deserialize<Clusters>(clustersFile);

var escapedDbName = databaseName.BracketIfIdentifier();
var dbHandler = KustoDatabaseHandlerFactory.Create(clusters.Connections[0].Url, escapedDbName);
var dbHandler = CreateDatabaseHandlerWithCapacityPolicy(clusters.Connections[0].Url, escapedDbName, clusters.CapacityPolicy);

var db = await dbHandler.LoadAsync();
if (includeColumns == false)
{
foreach(var table in db.Tables.Values)
foreach (var table in db.Tables.Values)
{
table.Columns = new Dictionary<string, string>();
}
Expand All @@ -128,7 +126,7 @@ public async Task Import(string path, string databaseName, bool includeColumns)
}


public async Task<ConcurrentDictionary<string,Exception>> Apply(string path, string databaseName)
public async Task<ConcurrentDictionary<string, Exception>> Apply(string path, string databaseName)
{
var clustersFile = File.ReadAllText(Path.Combine(path, "clusters.yml"));
var clusters = Serialization.YamlPascalCaseDeserializer.Deserialize<Clusters>(clustersFile);
Expand All @@ -137,16 +135,18 @@ public async Task<ConcurrentDictionary<string,Exception>> Apply(string path, str
var yamlHandler = YamlDatabaseHandlerFactory.Create(path, databaseName);
var yamlDb = await yamlHandler.LoadAsync();

var results = new ConcurrentDictionary<string,Exception>();
var results = new ConcurrentDictionary<string, Exception>();

await Parallel.ForEachAsync(clusters.Connections, async (cluster, token) =>
{
try
{
Log.LogInformation($"Generating and applying script for {Path.Combine(path, databaseName)} => {cluster}/{escapedDbName}");
var dbHandler = KustoDatabaseHandlerFactory.Create(cluster.Url, escapedDbName);

// Create the database handler with capacity policy writer if needed
var dbHandler = CreateDatabaseHandlerWithCapacityPolicy(cluster.Url, escapedDbName, clusters.CapacityPolicy);
await dbHandler.WriteAsync(yamlDb);
results.TryAdd(cluster.Url, null);
results.TryAdd(cluster.Url, null!);
}
catch (Exception ex)
{
Expand All @@ -156,5 +156,34 @@ await Parallel.ForEachAsync(clusters.Connections, async (cluster, token) =>

return results;
}

private IDatabaseHandler<T> CreateDatabaseHandlerWithCapacityPolicy(string clusterUrl, string databaseName, ClusterCapacityPolicy? capacityPolicy)
{
// Create a logger - in a real application this would come from DI
var loggerFactory = LoggerFactory.Create(builder => { });
Comment thread
ashleyvansp marked this conversation as resolved.
Outdated
var logger = loggerFactory.CreateLogger<KustoDatabaseHandler<T>>();

var factory = new KustoDatabaseHandlerFactory<T>(logger)
.WithReader<KustoDatabasePrincipalLoader>()
.WithReader<KustoDatabaseRetentionAndCacheLoader>()
.WithReader<KustoTableBulkLoader>()
.WithReader<KustoFunctionBulkLoader>()
.WithReader<KustoMaterializedViewBulkLoader>()
.WithReader<KustoExternalTableBulkLoader>()
.WithReader<KustoContinuousExportBulkLoader>()
.WithReader<KustoEntityGroupBulkLoader>()
.WithReader<KustoPartitioningPolicyLoader>()
.WithReader<ClusterCapacityPolicyLoader>()
.WithReader<DatabaseCleanup>()
.WithWriter<DefaultDatabaseWriter>();

// Add capacity policy writer if policy is defined
if (capacityPolicy != null)
{
factory.WithPlugin(new ClusterCapacityPolicyWriter(capacityPolicy));
}

return factory.Create(clusterUrl, databaseName);
}
}
}
101 changes: 101 additions & 0 deletions KustoSchemaTools/Model/ClusterCapacityPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using KustoSchemaTools.Changes;
Comment thread
ashleyvansp marked this conversation as resolved.
using KustoSchemaTools.Helpers;
using Newtonsoft.Json;

namespace KustoSchemaTools.Model
{
public class ClusterCapacityPolicy
{
public IngestionCapacity? IngestionCapacity { get; set; }
public ExtentsMergeCapacity? ExtentsMergeCapacity { get; set; }
public ExtentsPurgeRebuildCapacity? ExtentsPurgeRebuildCapacity { get; set; }
public ExportCapacity? ExportCapacity { get; set; }
public ExtentsPartitionCapacity? ExtentsPartitionCapacity { get; set; }
public MaterializedViewsCapacity? MaterializedViewsCapacity { get; set; }
public StoredQueryResultsCapacity? StoredQueryResultsCapacity { get; set; }
public StreamingIngestionPostProcessingCapacity? StreamingIngestionPostProcessingCapacity { get; set; }
public PurgeStorageArtifactsCleanupCapacity? PurgeStorageArtifactsCleanupCapacity { get; set; }
public PeriodicStorageArtifactsCleanupCapacity? PeriodicStorageArtifactsCleanupCapacity { get; set; }
public QueryAccelerationCapacity? QueryAccelerationCapacity { get; set; }
public GraphSnapshotsCapacity? GraphSnapshotsCapacity { get; set; }

public DatabaseScriptContainer CreateScript()
{
var policy = JsonConvert.SerializeObject(this, Serialization.JsonPascalCase);
return new DatabaseScriptContainer("ClusterCapacityPolicy", 10, $".alter-merge cluster policy capacity ```{policy}```");
}
}

public class IngestionCapacity
{
public int? ClusterMaximumConcurrentOperations { get; set; }
public double? CoreUtilizationCoefficient { get; set; }
}

public class ExtentsMergeCapacity
{
public int? MinimumConcurrentOperationsPerNode { get; set; }
public int? MaximumConcurrentOperationsPerNode { get; set; }
}

public class ExtentsPurgeRebuildCapacity
{
public int? MaximumConcurrentOperationsPerNode { get; set; }
}

public class ExportCapacity
{
public int? ClusterMaximumConcurrentOperations { get; set; }
public double? CoreUtilizationCoefficient { get; set; }
}

public class ExtentsPartitionCapacity
{
public int? ClusterMinimumConcurrentOperations { get; set; }
public int? ClusterMaximumConcurrentOperations { get; set; }
}

public class MaterializedViewsCapacity
{
public int? ClusterMaximumConcurrentOperations { get; set; }
public ExtentsRebuildCapacity? ExtentsRebuildCapacity { get; set; }
}

public class ExtentsRebuildCapacity
{
public int? ClusterMaximumConcurrentOperations { get; set; }
public int? MaximumConcurrentOperationsPerNode { get; set; }
}

public class StoredQueryResultsCapacity
{
public int? MaximumConcurrentOperationsPerDbAdmin { get; set; }
public double? CoreUtilizationCoefficient { get; set; }
}

public class StreamingIngestionPostProcessingCapacity
{
public int? MaximumConcurrentOperationsPerNode { get; set; }
}

public class PurgeStorageArtifactsCleanupCapacity
{
public int? MaximumConcurrentOperationsPerCluster { get; set; }
}

public class PeriodicStorageArtifactsCleanupCapacity
{
public int? MaximumConcurrentOperationsPerCluster { get; set; }
}

public class QueryAccelerationCapacity
{
public int? ClusterMaximumConcurrentOperations { get; set; }
public double? CoreUtilizationCoefficient { get; set; }
}

public class GraphSnapshotsCapacity
{
public int? ClusterMaximumConcurrentOperations { get; set; }
}
}
1 change: 1 addition & 0 deletions KustoSchemaTools/Model/Clusters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class Clusters
{
public List<Cluster> Connections { get; set; } = new List<Cluster>();
public List<DatabaseScript> Scripts { get; set; } = new List<DatabaseScript>();
public ClusterCapacityPolicy? CapacityPolicy { get; set; }
}

}
37 changes: 35 additions & 2 deletions KustoSchemaTools/Parser/KustoDatabaseHandlerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using KustoSchemaTools.Model;
using KustoSchemaTools.Parser.KustoLoader;
using KustoSchemaTools.Parser.KustoWriter;
using KustoSchemaTools.Plugins;
using Microsoft.Extensions.Logging;

Expand All @@ -11,8 +13,8 @@ public KustoDatabaseHandlerFactory(ILogger<KustoDatabaseHandler<T>> logger)
Logger = logger;
}

public List<IKustoBulkEntitiesLoader> Reader { get; } = new ();
public List<IDBEntityWriter> Writer { get; } = new ();
public List<IKustoBulkEntitiesLoader> Reader { get; } = new();
public List<IDBEntityWriter> Writer { get; } = new();

public KustoDatabaseHandlerFactory<T> WithPlugin(IKustoBulkEntitiesLoader plugin)
{
Expand Down Expand Up @@ -43,5 +45,36 @@ public IDatabaseHandler<T> Create(string cluster, string database)
{
return new KustoDatabaseHandler<T>(cluster, database, Logger, Reader, Writer);
}

public static IDatabaseHandler<T> CreateDefault(string cluster, string database)
{
// Create a logger - in a real application this would come from DI
var loggerFactory = LoggerFactory.Create(builder => { });
var logger = loggerFactory.CreateLogger<KustoDatabaseHandler<T>>();

return new KustoDatabaseHandlerFactory<T>(logger)
.WithReader<KustoDatabasePrincipalLoader>()
Comment thread
ashleyvansp marked this conversation as resolved.
Outdated
.WithReader<KustoDatabaseRetentionAndCacheLoader>()
.WithReader<KustoTableBulkLoader>()
.WithReader<KustoFunctionBulkLoader>()
.WithReader<KustoMaterializedViewBulkLoader>()
.WithReader<KustoExternalTableBulkLoader>()
.WithReader<KustoContinuousExportBulkLoader>()
.WithReader<KustoEntityGroupBulkLoader>()
.WithReader<KustoPartitioningPolicyLoader>()
.WithReader<ClusterCapacityPolicyLoader>()
.WithReader<DatabaseCleanup>()
.WithWriter<DefaultDatabaseWriter>()
.Create(cluster, database);
}
}

// Add static factory for the non-generic version too
public static class KustoDatabaseHandlerFactory
{
public static IDatabaseHandler<Database> Create(string cluster, string database)
{
return KustoDatabaseHandlerFactory<Database>.CreateDefault(cluster, database);
}
}
}
36 changes: 36 additions & 0 deletions KustoSchemaTools/Parser/KustoLoader/ClusterCapacityPolicyLoader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Kusto.Data.Common;
using KustoSchemaTools.Model;
using KustoSchemaTools.Plugins;
using Newtonsoft.Json;

namespace KustoSchemaTools.Parser.KustoLoader
{
public class ClusterCapacityPolicyLoader : IKustoBulkEntitiesLoader
{
const string query = @"
.show cluster policy capacity
| project CapacityPolicy = todynamic(Policy)
";

public async Task Load(Database database, string databaseName, KustoClient client)
{
try
{
var response = await client.Client.ExecuteQueryAsync("", query, new ClientRequestProperties());
var capacityPolicyData = response.ToScalar<string>();

if (!string.IsNullOrEmpty(capacityPolicyData))
{
var capacityPolicy = JsonConvert.DeserializeObject<ClusterCapacityPolicy>(capacityPolicyData);
Comment thread Fixed
// Store in database for comparison if needed
// For now, we don't need to store it as we're only applying changes
}
}
catch (Exception)
Comment thread
ashleyvansp marked this conversation as resolved.
Outdated
{
// Capacity policy might not exist or access might be denied
// This is not critical for the loading process
}
}
}
}
Loading
Loading