forked from slunyakin-zz/parquet-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathParquetWriter.cs
135 lines (111 loc) · 4.21 KB
/
ParquetWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Parquet.Data;
using Parquet.File;
using Parquet.File.Streams;
namespace Parquet
{
/// <summary>
/// Implements Apache Parquet format writer
/// </summary>
public class ParquetWriter : ParquetActor, IDisposable
{
private ThriftFooter _footer;
private readonly Schema _schema;
private readonly ParquetOptions _formatOptions;
private bool _dataWritten;
private readonly List<ParquetRowGroupWriter> _openedWriters = new List<ParquetRowGroupWriter>();
/// <summary>
/// Type of compression to use, defaults to <see cref="CompressionMethod.Snappy"/>
/// </summary>
public CompressionMethod CompressionMethod { get; set; } = CompressionMethod.Snappy;
/// <summary>
/// Creates an instance of parquet writer on top of a stream
/// </summary>
/// <param name="schema"></param>
/// <param name="output">Writeable, seekable stream</param>
/// <param name="formatOptions">Additional options</param>
/// <param name="append"></param>
/// <exception cref="ArgumentNullException">Output is null.</exception>
/// <exception cref="ArgumentException">Output stream is not writeable</exception>
public ParquetWriter(Schema schema, Stream output, ParquetOptions formatOptions = null, bool append = false)
: base(new GapStream(output))
{
if (output == null) throw new ArgumentNullException(nameof(output));
if (!output.CanWrite) throw new ArgumentException("stream is not writeable", nameof(output));
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
_formatOptions = formatOptions ?? new ParquetOptions();
PrepareFile(append);
}
/// <summary>
/// Creates a new row group and a writer for it.
/// </summary>
public ParquetRowGroupWriter CreateRowGroup()
{
_dataWritten = true;
var writer = new ParquetRowGroupWriter(_schema, Stream, ThriftStream, _footer, CompressionMethod, _formatOptions);
_openedWriters.Add(writer);
return writer;
}
private void PrepareFile(bool append)
{
if (append)
{
if (!Stream.CanSeek) throw new IOException("destination stream must be seekable for append operations.");
ValidateFile();
Thrift.FileMetaData fileMeta = ReadMetadata();
_footer = new ThriftFooter(fileMeta);
ValidateSchemasCompatible(_footer, _schema);
GoBeforeFooter();
}
else
{
if (_footer == null)
{
_footer = new ThriftFooter(_schema, 0 /* todo: don't forget to set the total row count at the end!!! */);
//file starts with magic
WriteMagic();
}
else
{
ValidateSchemasCompatible(_footer, _schema);
_footer.Add(0 /* todo: don't forget to set the total row count at the end!!! */);
}
}
}
private void ValidateSchemasCompatible(ThriftFooter footer, Schema schema)
{
Schema existingSchema = footer.CreateModelSchema(_formatOptions);
if (!schema.Equals(existingSchema))
{
string reason = schema.GetNotEqualsMessage(existingSchema, "appending", "existing");
throw new ParquetException($"passed schema does not match existing file schema, reason: {reason}");
}
}
private void WriteMagic()
{
Stream.Write(MagicBytes, 0, MagicBytes.Length);
}
/// <summary>
/// Disposes the writer and writes the file footer.
/// </summary>
public void Dispose()
{
if (_dataWritten)
{
//update row count (on append add row count to existing metadata)
_footer.Add(_openedWriters.Sum(w => w.RowCount ?? 0));
}
//finalize file
long size = _footer.Write(ThriftStream);
//metadata size
Writer.Write((int)size); //4 bytes
//end magic
WriteMagic(); //4 bytes
Writer.Flush();
Stream.Flush();
}
}
}