Skip to content

Commit bdaa933

Browse files
committed
[bq] introduce BigQuery Storage Pending Write API (JSON)
Signed-off-by: Dgray16 <[email protected]>
1 parent 77fd0f0 commit bdaa933

15 files changed

+839
-83
lines changed

spring-batch-bigquery/README.adoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implement
44

55
`ItemWriter` support:
66

7-
[cols="h,1,1"]
7+
[cols="h,1,1, 1"]
88
|===
9-
| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)]
9+
| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)] | https://cloud.google.com/bigquery/docs/write-api#pending_type[Write API (Pending)]
1010

11-
|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported
12-
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported |
11+
|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported | Supported
12+
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | |
1313
|===
1414

1515
`ItemReader` support:

spring-batch-bigquery/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@
153153
<plugin>
154154
<groupId>org.codehaus.mojo</groupId>
155155
<artifactId>flatten-maven-plugin</artifactId>
156-
<version>1.7.0</version>
156+
<version>1.7.1</version>
157157
<executions>
158158
<execution>
159159
<id>flatten</id>

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,12 @@ public class BigQueryItemWriterException extends ItemWriterException {
3535
public BigQueryItemWriterException(String message, Throwable cause) {
3636
super(message, cause);
3737
}
38+
39+
/**
40+
* Create a new {@link BigQueryItemWriterException} based on a message.
41+
* @param message the message for this {@link Exception}
42+
*/
43+
public BigQueryItemWriterException(String message) {
44+
super(message);
45+
}
3846
}
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* @see <a href="https://cloud.google.com/bigquery/docs/write-api#committed_type">Commited type storage write API</a>
5151
* @since 0.2.0
5252
*/
53-
public class BigQueryWriteApiJsonItemWriter<T> implements ItemWriter<T>, InitializingBean {
53+
public class BigQueryWriteApiCommitedJsonItemWriter<T> implements ItemWriter<T>, InitializingBean {
5454

5555
/**
5656
* Logger that can be reused
@@ -112,7 +112,10 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
112112
throw new BigQueryItemWriterException("Error on write happened", e);
113113
} finally {
114114
if (StringUtils.hasText(streamName)) {
115-
bigQueryWriteClient.finalizeWriteStream(streamName);
115+
long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
116+
if (chunk.size() != rowCount) {
117+
logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size()));
118+
}
116119
}
117120

118121
if (!writeFailed && logger.isDebugEnabled()) {
@@ -164,7 +167,7 @@ public void setMarshaller(final JsonObjectMarshaller<T> marshaller) {
164167
* {@link ApiFutureCallback} that will be called in case of successful of failed response.
165168
*
166169
* @param apiFutureCallback a callback
167-
* @see BigQueryWriteApiJsonItemWriter#setExecutor(Executor)
170+
* @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor)
168171
*/
169172
public void setApiFutureCallback(final ApiFutureCallback<AppendRowsResponse> apiFutureCallback) {
170173
this.apiFutureCallback = apiFutureCallback;
@@ -174,7 +177,7 @@ public void setApiFutureCallback(final ApiFutureCallback<AppendRowsResponse> api
174177
* An {@link Executor} that will be calling a {@link ApiFutureCallback}.
175178
*
176179
* @param executor an executor
177-
* @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
180+
* @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
178181
*/
179182
public void setExecutor(final Executor executor) {
180183
this.executor = executor;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2002-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.writer.writeapi.json;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
22+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
23+
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
24+
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
25+
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
26+
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
27+
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
28+
import com.google.cloud.bigquery.storage.v1.TableName;
29+
import com.google.cloud.bigquery.storage.v1.WriteStream;
30+
import org.apache.commons.logging.Log;
31+
import org.apache.commons.logging.LogFactory;
32+
import org.json.JSONArray;
33+
import org.json.JSONObject;
34+
import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException;
35+
import org.springframework.batch.item.Chunk;
36+
import org.springframework.batch.item.ItemWriter;
37+
import org.springframework.batch.item.json.JsonObjectMarshaller;
38+
import org.springframework.beans.factory.InitializingBean;
39+
import org.springframework.util.Assert;
40+
import org.springframework.util.StringUtils;
41+
42+
import java.util.List;
43+
import java.util.concurrent.Executor;
44+
import java.util.concurrent.atomic.AtomicLong;
45+
46+
/**
47+
* JSON writer for BigQuery using Storage Write API.
48+
*
49+
* @param <T> your DTO type
50+
* @author Volodymyr Perebykivskyi
51+
* @see <a href="https://en.wikipedia.org/wiki/JSON">JSON</a>
52+
* @see <a href="https://cloud.google.com/bigquery/docs/write-api#pending_type">Pending type storage write API</a>
53+
* @since 0.2.0
54+
*/
55+
public class BigQueryWriteApiPendingJsonItemWriter<T> implements ItemWriter<T>, InitializingBean {
56+
57+
/**
58+
* Logger that can be reused
59+
*/
60+
private final Log logger = LogFactory.getLog(getClass());
61+
62+
private final AtomicLong bigQueryWriteCounter = new AtomicLong();
63+
64+
private BigQueryWriteClient bigQueryWriteClient;
65+
private TableName tableName;
66+
private JsonObjectMarshaller<T> marshaller;
67+
private ApiFutureCallback<AppendRowsResponse> apiFutureCallback;
68+
private Executor executor;
69+
70+
private boolean writeFailed;
71+
72+
@Override
73+
public void write(final Chunk<? extends T> chunk) throws Exception {
74+
if (!chunk.isEmpty()) {
75+
final List<? extends T> items = chunk.getItems();
76+
String streamName = null;
77+
78+
try {
79+
WriteStream writeStreamToCreate = WriteStream.newBuilder()
80+
.setType(WriteStream.Type.PENDING)
81+
.build();
82+
83+
CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder()
84+
.setParent(tableName.toString())
85+
.setWriteStream(writeStreamToCreate)
86+
.build();
87+
88+
WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest);
89+
streamName = writeStream.getName();
90+
91+
if (logger.isDebugEnabled()) {
92+
logger.debug("Created a stream=" + streamName);
93+
}
94+
95+
try (final JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), bigQueryWriteClient).build()) {
96+
if (logger.isDebugEnabled()) {
97+
logger.debug(String.format("Mapping %d elements", items.size()));
98+
}
99+
final JSONArray array = new JSONArray();
100+
items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put);
101+
102+
if (logger.isDebugEnabled()) {
103+
logger.debug("Writing data to BigQuery");
104+
}
105+
final ApiFuture<AppendRowsResponse> future = writer.append(array);
106+
107+
if (apiFutureCallback != null) {
108+
ApiFutures.addCallback(future, apiFutureCallback, executor);
109+
}
110+
}
111+
} catch (Exception e) {
112+
writeFailed = true;
113+
logger.error("BigQuery error", e);
114+
throw new BigQueryItemWriterException("Error on write happened", e);
115+
} finally {
116+
if (StringUtils.hasText(streamName)) {
117+
long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
118+
if (chunk.size() != rowCount) {
119+
logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size()));
120+
}
121+
122+
BatchCommitWriteStreamsRequest batchRequest = BatchCommitWriteStreamsRequest.newBuilder()
123+
.setParent(tableName.toString())
124+
.addWriteStreams(streamName)
125+
.build();
126+
BatchCommitWriteStreamsResponse batchResponse = bigQueryWriteClient.batchCommitWriteStreams(batchRequest);
127+
128+
if (!batchResponse.hasCommitTime()) {
129+
writeFailed = true;
130+
logger.error("BigQuery error=" + batchResponse.getStreamErrorsList());
131+
}
132+
}
133+
134+
if (!writeFailed && logger.isDebugEnabled()) {
135+
logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
136+
}
137+
}
138+
139+
if (writeFailed) {
140+
throw new BigQueryItemWriterException("Error on write happened");
141+
}
142+
}
143+
}
144+
145+
@Override
146+
public void afterPropertiesSet() throws Exception {
147+
Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided");
148+
Assert.notNull(this.tableName, "Table name must be provided");
149+
Assert.notNull(this.marshaller, "Marshaller must be provided");
150+
151+
if (this.apiFutureCallback != null) {
152+
Assert.notNull(this.executor, "Executor must be provided");
153+
}
154+
}
155+
156+
/**
157+
* GRPC client that wraps communication with BigQuery.
158+
*
159+
* @param bigQueryWriteClient a client
160+
*/
161+
public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
162+
this.bigQueryWriteClient = bigQueryWriteClient;
163+
}
164+
165+
/**
166+
* A full path to the BigQuery table.
167+
*
168+
* @param tableName a name
169+
*/
170+
public void setTableName(final TableName tableName) {
171+
this.tableName = tableName;
172+
}
173+
174+
/**
175+
* Converter that transforms a single row into a {@link String}.
176+
*
177+
* @param marshaller your JSON mapper
178+
*/
179+
public void setMarshaller(final JsonObjectMarshaller<T> marshaller) {
180+
this.marshaller = marshaller;
181+
}
182+
183+
/**
184+
* {@link ApiFutureCallback} that will be called in case of successful of failed response.
185+
*
186+
* @param apiFutureCallback a callback
187+
* @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor)
188+
*/
189+
public void setApiFutureCallback(final ApiFutureCallback<AppendRowsResponse> apiFutureCallback) {
190+
this.apiFutureCallback = apiFutureCallback;
191+
}
192+
193+
/**
194+
* An {@link Executor} that will be calling a {@link ApiFutureCallback}.
195+
*
196+
* @param executor an executor
197+
* @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
198+
*/
199+
public void setExecutor(final Executor executor) {
200+
this.executor = executor;
201+
}
202+
}

0 commit comments

Comments
 (0)