Skip to content

Commit 4a96aed

Browse files
committed
Basic implementation of Parquet data source
1 parent 423f513 commit 4a96aed

5 files changed

Lines changed: 207 additions & 11 deletions

File tree

src/DatabaseBenchmark/DataSources/DataSourceFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using DatabaseBenchmark.DataSources.Csv;
55
using DatabaseBenchmark.DataSources.Database;
66
using DatabaseBenchmark.DataSources.Generator;
7+
using DatabaseBenchmark.DataSources.Parquet;
78
using DatabaseBenchmark.DataSources.Interfaces;
89

910
namespace DatabaseBenchmark.DataSources
@@ -19,6 +20,7 @@ public DataSourceFactory(IDatabase currentDatabase, IDatabaseFactory databaseFac
1920
_factories = new()
2021
{
2122
["Csv"] = filePath => new CsvDataSource(filePath, optionsProvider),
23+
["Parquet"] = filePath => new ParquetDataSource(filePath),
2224
["Database"] = filePath => new DatabaseDataSource(filePath, databaseFactory),
2325
["Generator"] = filePath => new GeneratorDataSource(filePath, this, currentDatabase)
2426
};
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using DatabaseBenchmark.Core.Interfaces;
2+
using DatabaseBenchmark.DataSources.Interfaces;
3+
using Parquet;
4+
using Parquet.Data;
5+
6+
namespace DatabaseBenchmark.DataSources.Parquet
7+
{
8+
public sealed class ParquetDataSource : IDataSource
9+
{
10+
private readonly string _filePath;
11+
12+
private Stream _fileStream;
13+
private ParquetReader _reader;
14+
private DataColumn[] _columns;
15+
private int _currentRowGroupIndex = -1;
16+
private int _currentGroupRowIndex = -1;
17+
private int _rowsInCurrentGroup = 0;
18+
19+
public ParquetDataSource(string filePath)
20+
{
21+
_filePath = Path.GetFullPath(filePath);
22+
}
23+
24+
public void Dispose()
25+
{
26+
_reader?.Dispose();
27+
_fileStream?.Dispose();
28+
}
29+
30+
public object GetValue(string name)
31+
{
32+
if (_columns == null)
33+
{
34+
throw new InvalidOperationException("No data has been read yet. Call Read() first.");
35+
}
36+
37+
var column = _columns.FirstOrDefault(c => c.Field.Name == name);
38+
if (column == null)
39+
{
40+
throw new ArgumentException($"Column '{name}' not found in Parquet file.");
41+
}
42+
43+
return column.Data.GetValue(_currentGroupRowIndex);
44+
}
45+
46+
public bool Read()
47+
{
48+
if (_reader == null)
49+
{
50+
Open();
51+
}
52+
53+
_currentGroupRowIndex++;
54+
55+
if (_currentGroupRowIndex >= _rowsInCurrentGroup)
56+
{
57+
if (!LoadNextRowGroup())
58+
{
59+
return false;
60+
}
61+
}
62+
63+
return true;
64+
}
65+
66+
private void Open()
67+
{
68+
_fileStream = File.OpenRead(_filePath);
69+
_reader = ParquetReader.CreateAsync(_fileStream).GetAwaiter().GetResult();
70+
}
71+
72+
private bool LoadNextRowGroup()
73+
{
74+
_currentRowGroupIndex++;
75+
76+
if (_currentRowGroupIndex >= _reader.RowGroupCount)
77+
{
78+
return false;
79+
}
80+
81+
using (var groupReader = _reader.OpenRowGroupReader(_currentRowGroupIndex))
82+
{
83+
var columns = new List<DataColumn>();
84+
foreach (var field in _reader.Schema.GetDataFields())
85+
{
86+
var column = groupReader.ReadColumnAsync(field).GetAwaiter().GetResult();
87+
columns.Add(column);
88+
}
89+
90+
_columns = columns.ToArray();
91+
_rowsInCurrentGroup = _columns.Length > 0 ? _columns[0].Data.Length : 0;
92+
}
93+
94+
_currentGroupRowIndex = 0;
95+
return true;
96+
}
97+
}
98+
}

src/DatabaseBenchmark/DatabaseBenchmark.csproj

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,24 @@
1818
</PropertyGroup>
1919

2020
<ItemGroup>
21-
<PackageReference Include="AWSSDK.DynamoDBv2" Version="4.0.9.1" />
21+
<PackageReference Include="AWSSDK.DynamoDBv2" Version="4.0.14" />
2222
<PackageReference Include="Azure.Search.Documents" Version="11.7.0" />
23-
<PackageReference Include="BloomFilter.NetCore" Version="2.5.3" />
23+
<PackageReference Include="BloomFilter.NetCore" Version="3.0.0" />
2424
<PackageReference Include="Bogus" Version="35.6.5" />
2525
<PackageReference Include="CsvHelper" Version="33.1.0" />
26+
<PackageReference Include="Parquet.Net" Version="5.5.0" />
2627
<PackageReference Include="Fare" Version="2.2.1" />
27-
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.54.0" />
28-
<PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.2" />
28+
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.57.0" />
29+
<PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.4" />
2930
<PackageReference Include="MonetDB" Version="2.1.2" />
30-
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
31-
<PackageReference Include="MySqlConnector" Version="2.4.0" />
32-
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="9.2.2" />
33-
<PackageReference Include="Npgsql" Version="9.0.4" />
34-
<PackageReference Include="Octonica.ClickHouseClient" Version="3.1.3" />
35-
<PackageReference Include="Oracle.ManagedDataAccess.Core" Version="23.26.0" />
31+
<PackageReference Include="MongoDB.Driver" Version="3.6.0" />
32+
<PackageReference Include="MySqlConnector" Version="2.5.0" />
33+
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="9.3.0" />
34+
<PackageReference Include="Npgsql" Version="10.0.1" />
35+
<PackageReference Include="Octonica.ClickHouseClient" Version="3.1.8" />
36+
<PackageReference Include="Oracle.ManagedDataAccess.Core" Version="23.26.100" />
3637
<PackageReference Include="SimpleInjector" Version="5.5.0" />
37-
<PackageReference Include="Snowflake.Data" Version="5.0.0" />
38+
<PackageReference Include="Snowflake.Data" Version="5.4.0" />
3839
</ItemGroup>
3940

4041
</Project>
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using DatabaseBenchmark.DataSources.Parquet;
2+
using Parquet;
3+
using Parquet.Data;
4+
using Parquet.Schema;
5+
using System;
6+
using System.IO;
7+
using System.Threading.Tasks;
8+
using Xunit;
9+
10+
namespace DatabaseBenchmark.Tests.DataSources
11+
{
12+
public class ParquetDataSourceTests : IDisposable
13+
{
14+
private readonly string _testFilePath = "test.parquet";
15+
16+
public ParquetDataSourceTests()
17+
{
18+
}
19+
20+
public void Dispose()
21+
{
22+
if (File.Exists(_testFilePath))
23+
{
24+
File.Delete(_testFilePath);
25+
}
26+
}
27+
28+
[Fact]
29+
public async Task ReadValues()
30+
{
31+
await CreateTestFile(_testFilePath);
32+
33+
using var dataSource = new ParquetDataSource(_testFilePath);
34+
35+
// Row 1 from row group 1
36+
Assert.True(dataSource.Read());
37+
Assert.Equal(1, dataSource.GetValue("ArchiveId"));
38+
Assert.Equal("One", dataSource.GetValue("Name"));
39+
Assert.Equal(10.1, dataSource.GetValue("Price"));
40+
41+
// Row 2 from row group 1
42+
Assert.True(dataSource.Read());
43+
Assert.Equal(2, dataSource.GetValue("ArchiveId"));
44+
Assert.Equal("Two", dataSource.GetValue("Name"));
45+
Assert.Equal(20.2, dataSource.GetValue("Price"));
46+
47+
// Row 1 from row group 2 (same data pattern)
48+
Assert.True(dataSource.Read());
49+
Assert.Equal(1, dataSource.GetValue("ArchiveId"));
50+
Assert.Equal("One", dataSource.GetValue("Name"));
51+
Assert.Equal(10.1, dataSource.GetValue("Price"));
52+
53+
// Row 2 from row group 2
54+
Assert.True(dataSource.Read());
55+
Assert.Equal(2, dataSource.GetValue("ArchiveId"));
56+
Assert.Equal("Two", dataSource.GetValue("Name"));
57+
Assert.Equal(20.2, dataSource.GetValue("Price"));
58+
59+
// No more rows
60+
Assert.False(dataSource.Read());
61+
}
62+
63+
private static async Task CreateTestFile(string filePath)
64+
{
65+
var schema = new ParquetSchema(
66+
new DataField<int>("ArchiveId"),
67+
new DataField<string>("Name"),
68+
new DataField<double>("Price"));
69+
70+
var column1 = new DataColumn(schema.DataFields[0], new int[] { 1, 2 });
71+
var column2 = new DataColumn(schema.DataFields[1], new string[] { "One", "Two" });
72+
var column3 = new DataColumn(schema.DataFields[2], new double[] { 10.1, 20.2 });
73+
74+
using var stream = File.OpenWrite(filePath);
75+
using var writer = await ParquetWriter.CreateAsync(schema, stream);
76+
77+
// Write first row group
78+
using (var groupWriter = writer.CreateRowGroup())
79+
{
80+
await groupWriter.WriteColumnAsync(column1);
81+
await groupWriter.WriteColumnAsync(column2);
82+
await groupWriter.WriteColumnAsync(column3);
83+
}
84+
85+
// Write second row group with same data
86+
using (var groupWriter = writer.CreateRowGroup())
87+
{
88+
await groupWriter.WriteColumnAsync(column1);
89+
await groupWriter.WriteColumnAsync(column2);
90+
await groupWriter.WriteColumnAsync(column3);
91+
}
92+
}
93+
}
94+
}

tests/DatabaseBenchmark.Tests/DatabaseBenchmark.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
3232
<PrivateAssets>all</PrivateAssets>
3333
</PackageReference>
34+
<PackageReference Include="Parquet.Net" Version="5.5.0" />
3435
</ItemGroup>
3536

3637
<ItemGroup>

0 commit comments

Comments
 (0)