Enterprise Java

BigQuery Storage API: Avro

Previously we had an introduction on the BigQuery Storage API and we proceeded reading data using the Arrow format. In this tutorial we shall read Data using the Avro format.

What applied on the previous tutorial applies here too.

We shall create a BigQuery Storage Client, create a ReadSession using the Avro format and iterate the data on each stream.

Let’s get started by importing the dependencies, we do import the Avro library needed.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>20.5.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
 
    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquerystorage</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

Our next step would be to create an Avro Data Reader for our rows that have the schema of col1:string, col2:int. In our case we shall just print the data through sys.out

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.gkatzioura.bigquery.storage.api.avro;
 
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
 
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
 
import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.protobuf.ByteString;
 
public class AvroReader {
 
    private final GenericDatumReader<GenericRecord> datumReader;
 
    public AvroReader(AvroSchema arrowSchema) {
        Schema schema = new Schema.Parser().parse(arrowSchema.getSchema());
        this.datumReader = new GenericDatumReader<>(schema);
    }
 
    public void processRows(ByteString avroRows) throws IOException {
        try(InputStream inputStream = new ByteArrayInputStream(avroRows.toByteArray())) {
            BinaryDecoder decoder =DecoderFactory.get().binaryDecoder(inputStream, null);
 
            while (!decoder.isEnd()) {
                GenericRecord item = datumReader.read(null, decoder);
 
                System.out.println(item.get("col1")+","+item.get("col2"));
            }
        }
    }
 
}

Then on to our main class which is the one with any BigQuery logic needed.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.gkatzioura.bigquery.storage.api.avro;
 
import org.apache.arrow.util.Preconditions;
 
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
 
public class AvroMain {
 
    public static void main(String[] args) throws Exception {
 
        String projectId = System.getenv("PROJECT_ID");
 
        try (BigQueryReadClient client = BigQueryReadClient.create()) {
            String parent = String.format("projects/%s", projectId);
 
            String srcTable =
                    String.format(
                            "projects/%s/datasets/%s/tables/%s",
                            projectId, System.getenv("DATASET"), System.getenv("TABLE"));
 
            ReadSession.Builder sessionBuilder =
                    ReadSession.newBuilder()
                               .setTable(srcTable)
                               .setDataFormat(DataFormat.AVRO);
 
 
            CreateReadSessionRequest.Builder builder =
                    CreateReadSessionRequest.newBuilder()
                                            .setParent(parent)
                                            .setReadSession(sessionBuilder)
                                            .setMaxStreamCount(1);
            ReadSession session = client.createReadSession(builder.build());
 
            Preconditions.checkState(session.getStreamsCount() > 0);
 
            String streamName = session.getStreams(0).getName();
 
            ReadRowsRequest readRowsRequest =
                    ReadRowsRequest.newBuilder().setReadStream(streamName).build();
 
            ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
 
            for (ReadRowsResponse response : stream) {
                new AvroReader(session.getAvroSchema()).processRows(response.getAvroRows().getSerializedBinaryRows());
            }
        }
    }
 
}

A BigQuery client is created. Then we create a session request with a max number of streams. We did specify that the format to be used will be Avro.
Once we get a Response, the response will contain the initiated the Session, the Avro schema and the streams that we shall use to retrieve the Data.
For each stream there has to be a ReadRowsRequest in order to fetch the data.
Then we pass data to our Avro decoder.

That’s it we just read data from the BigQuery Storage API using Avro and Arrow!

Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: BigQuery Storage API: Avro

Opinions expressed by Java Code Geeks contributors are their own.

Emmanouil Gkatziouras

He is a versatile software engineer with experience in a wide variety of applications/services.He is enthusiastic about new projects, embracing new technologies, and getting to know people in the field of software.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button