This repository was archived by the owner on Jul 22, 2024. It is now read-only.
forked from slunyakin-zz/parquet-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathDataSet.cs
221 lines (183 loc) · 6.21 KB
/
DataSet.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Parquet.File;
namespace Parquet.Data
{
/// <summary>
/// Represents a data set
/// </summary>
public partial class DataSet
{
private readonly Schema _schema;
private readonly Dictionary<string, IList> _columns;
private int _rowCount;
private readonly DataSetMetadata _metadata = new DataSetMetadata();
internal Thrift.FileMetaData Thrift { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="DataSet"/> class.
/// </summary>
/// <param name="schema">The schema.</param>
public DataSet(Schema schema)
{
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
_columns = new Dictionary<string, IList>();
}
/// <summary>
/// Initializes a new instance of the <see cref="DataSet"/> class.
/// </summary>
/// <param name="schema">The schema.</param>
public DataSet(IEnumerable<Field> schema) : this(new Schema(schema))
{
}
/// <summary>
/// Initializes a new instance of the <see cref="DataSet"/> class.
/// </summary>
/// <param name="schema">The schema.</param>
public DataSet(params Field[] schema) : this(new Schema(schema))
{
}
/// <summary>
/// Gets dataset schema
/// </summary>
public Schema Schema => _schema;
/// <summary>
/// Gets the public metadata
/// </summary>
public DataSetMetadata Metadata => _metadata;
/// <summary>
/// Gets the total row count in the source file this dataset was read from
/// </summary>
public long TotalRowCount { get; }
internal DataSet(Schema schema,
Dictionary<string, IList> pathToValues,
long totalRowCount,
string createdBy) : this(schema)
{
_columns = pathToValues;
_rowCount = _columns.Count == 0 ? 0 : pathToValues.Min(pv => pv.Value.Count);
TotalRowCount = totalRowCount;
_metadata.CreatedBy = createdBy;
}
/// <summary>
/// Slices rows and returns list of all values in a particular column.
/// </summary>
/// <param name="schemaElement">Schema element</param>
/// <param name="offset">The offset.</param>
/// <param name="count">The count.</param>
/// <returns>
/// Column values
/// </returns>
/// <exception cref="ArgumentException"></exception>
public IList GetColumn(DataField schemaElement, int offset = 0, int count = -1)
{
return GetColumn(schemaElement.Path, offset, count);
}
internal IList GetColumn(string path, int offset, int count)
{
if (!_columns.TryGetValue(path, out IList values))
{
return null;
}
//optimise for performance by not instantiating another list if you want all the column values
if (offset == 0 && count == -1) return values;
IList page = (IList)Activator.CreateInstance(values.GetType());
int max = (count == -1)
? values.Count
: Math.Min(offset + count, values.Count);
for(int i = offset; i < max; i++)
{
page.Add(values[i]);
}
return page;
}
/// <summary>
/// Gets the column as strong typed collection
/// </summary>
/// <typeparam name="T">Column element type</typeparam>
/// <param name="schemaElement">Column schema</param>
/// <returns>Strong typed collection</returns>
public IReadOnlyCollection<T> GetColumn<T>(DataField schemaElement)
{
return (List<T>)GetColumn(schemaElement);
}
/// <summary>
/// Adds the specified values.
/// </summary>
/// <param name="values">The values.</param>
public void Add(params object[] values)
{
AddRow(new Row(values));
}
#region [ Row Manipulation ]
private Row CreateRow(int index)
{
ValidateIndex(index);
return RowExtractor.Extract(_schema.Fields, index, _columns);
}
private void RemoveRow(int index)
{
ValidateIndex(index);
foreach(KeyValuePair<string, IList> pe in _columns)
{
pe.Value.RemoveAt(index);
}
_rowCount -= 1;
}
private void AddRow(Row row)
{
RowAppender.Append(_columns, _schema.Fields, row);
_rowCount += 1;
}
private void ValidateIndex(int index)
{
if(index < 0 || index >= _rowCount)
{
throw new IndexOutOfRangeException($"row index {index} is not within allowed range [0; {_rowCount})");
}
}
#endregion
#region [ Data Helpers ]
/// <summary>
/// Merges dataset into this dataset
/// </summary>
/// <param name="source">DataSet to take the data from</param>
public void Merge(DataSet source)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (source == this) throw new ArgumentException("cannot merge into itself", nameof(source));
if(!Schema.Equals(source.Schema))
{
string reason = Schema.GetNotEqualsMessage(source.Schema, "current", "source");
throw new ArgumentException($"{nameof(DataSet)} schema does not match existing file schema, reason: {reason}", nameof(source));
}
foreach(KeyValuePair<string, IList> kvp in source._columns)
{
IList dest = _columns[kvp.Key];
dest.AddOneByOne(kvp.Value);
}
_rowCount += source.RowCount;
}
#endregion
/// <summary>
/// Displays some DataSet rows
/// </summary>
/// <returns></returns>
public override string ToString()
{
var sb = new StringBuilder();
int count = Math.Min(Count, 10);
sb.AppendFormat("first {0} rows:", count);
sb.AppendLine();
for(int i = 0; i < count; i++)
{
Row row = CreateRow(i);
sb.AppendFormat("{0,5}: ");
sb.AppendLine(row.ToString());
}
return sb.ToString();
}
}
}