Skip to content

Commit e5939ab

Browse files
committed
Try to integrate libraries
1 parent e897629 commit e5939ab

File tree

5 files changed

+132
-14
lines changed

5 files changed

+132
-14
lines changed

.scalafmt.conf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
version=2.3.2
2+
project.git = true
3+
project.excludeFilters = [
4+
scalafmt-benchmarks/src/resources,
5+
sbt-test
6+
bin/issue
7+
]
8+
align = none
9+
# Disabled in default since this operation is potentially
10+
# dangerous if you define your own stripMargin with different
11+
# semantics from the stdlib stripMargin.
12+
assumeStandardLibraryStripMargin = true
13+
onTestFailure = "To fix this, run ./scalafmt from the project root directory"

build.gradle

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ plugins {
22
id 'scala'
33
id 'com.github.jruby-gradle.base' version '1.6.0'
44
id 'com.github.johnrengelman.shadow' version '5.2.0'
5+
id 'cz.alenkacz.gradle.scalafmt' version '1.10.0'
56
}
67
import com.github.jrubygradle.JRubyExec
78

@@ -11,17 +12,24 @@ configurations {
1112

1213
repositories {
1314
mavenCentral()
15+
mavenLocal()
1416
jcenter()
1517
}
1618

1719
// Relocate Guava packages since it's incompatible with Guava's version from Embulk
1820
shadowJar {
1921
classifier 'shadow'
20-
exclude 'org/embulk/plugin/**'
2122

2223
dependencies {
23-
include 'com.google.guava:guava'
24-
include 'com.google.cloud:google-cloud-firestore'
24+
include dependency('com.google.guava:guava')
25+
26+
include dependency('com.google.cloud:google-cloud-firestore')
27+
include dependency('com.google.firebase:firebase-admin')
28+
include dependency('com.syucream:firesql')
29+
30+
include dependency('io.grpc:grpc-netty-shaded')
31+
include dependency('io.grpc:grpc-core')
32+
include dependency('io.grpc:grpc-api')
2533
}
2634

2735
relocate 'com.google.common', 'relocated.com.google.common'
@@ -36,7 +44,9 @@ dependencies {
3644
compile 'org.embulk:embulk-core:0.9.12'
3745
provided 'org.embulk:embulk-core:0.9.12'
3846

39-
compile 'com.google.cloud:google-cloud-firestore:1.28.0'
47+
// Should be shadowed
48+
compile 'com.google.firebase:firebase-admin:6.11.0'
49+
compile 'com.syucream:firesql:0.0.1'
4050
}
4151

4252
task classpath(type: Copy, dependsOn: ['jar', 'shadowJar']) {
@@ -45,8 +55,8 @@ task classpath(type: Copy, dependsOn: ['jar', 'shadowJar']) {
4555
from (configurations.runtime
4656
- configurations.provided
4757
+ configurations.shadow
48-
+ files(shadowJar.archiveFile)
49-
+ files(jar.archiveFile))
58+
- files(shadowJar.getIncludedDependencies())
59+
+ files(shadowJar.archiveFile))
5060

5161
into 'classpath'
5262
}

examples/firestore2stdout.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ in:
22
type: firestore
33
project_id: "syucream-firebase-dev"
44
json_keyfile: credential.json
5-
collection: users
5+
sql: "SELECT name FROM users"
66

77
out:
88
type: stdout
Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,99 @@
11
package org.embulk.input.firestore
22

3+
import java.io.FileInputStream
34
import java.util
45

6+
import com.google.auth.oauth2.GoogleCredentials
7+
import com.google.cloud.firestore.Firestore
8+
import com.google.firebase.cloud.FirestoreClient
9+
import com.google.firebase.{FirebaseApp, FirebaseOptions}
10+
import com.syucream.firesql.FireSQL
511
import org.embulk.config.{ConfigDiff, ConfigSource, TaskReport, TaskSource}
12+
import org.embulk.spi.`type`.Types
13+
import org.embulk.spi.PageBuilder
14+
import org.embulk.spi.json.JsonParser
615
import org.embulk.spi.{Exec, InputPlugin, PageOutput, Schema}
716

17+
import scala.util.Try
18+
819
case class FirestoreInputPlugin() extends InputPlugin {
20+
private val TASK_COUNT = 1
21+
22+
private val jsonParser = new JsonParser()
23+
24+
override def transaction(
25+
config: ConfigSource,
26+
control: InputPlugin.Control
27+
): ConfigDiff = {
28+
val task = config.loadConfig(classOf[PluginTask])
29+
30+
// TODO runtime validations
931

10-
override def transaction(config: ConfigSource, control: InputPlugin.Control): ConfigDiff = {
11-
null
32+
val schema = Schema
33+
.builder()
34+
.add(task.getJsonColumnName, Types.JSON)
35+
.build()
36+
37+
resume(task.dump(), schema, TASK_COUNT, control)
1238
}
1339

14-
override def resume(taskSource: TaskSource, schema: Schema, taskCount: Int, control: InputPlugin.Control): ConfigDiff = {
15-
null
40+
override def resume(
41+
taskSource: TaskSource,
42+
schema: Schema,
43+
taskCount: Int,
44+
control: InputPlugin.Control
45+
): ConfigDiff = {
46+
// XXX unimplemented
47+
control.run(taskSource, schema, taskCount)
48+
Exec.newConfigDiff()
1649
}
1750

18-
override def cleanup(taskSource: TaskSource, schema: Schema, taskCount: Int, successTaskReports: util.List[TaskReport]): Unit = {
51+
override def cleanup(
52+
taskSource: TaskSource,
53+
schema: Schema,
54+
taskCount: Int,
55+
successTaskReports: util.List[TaskReport]
56+
): Unit = {
57+
// nothing to do
1958
}
2059

21-
override def run(taskSource: TaskSource, schema: Schema, taskIndex: Int, output: PageOutput): TaskReport = {
22-
null
60+
override def run(
61+
taskSource: TaskSource,
62+
schema: Schema,
63+
taskIndex: Int,
64+
output: PageOutput
65+
): TaskReport = {
66+
val task = taskSource.loadTask(classOf[PluginTask])
67+
val allocator = task.getBufferAllocator
68+
val pageBuilder = new PageBuilder(allocator, schema, output)
69+
70+
val firestore = createFirestore(task.getJsonKeyfile).get
71+
val firesql = new FireSQL(firestore)
72+
val query = firesql.query(task.getSql)
73+
74+
val col = pageBuilder.getSchema.getColumn(0)
75+
76+
query.get.getDocuments.forEach { d =>
77+
pageBuilder.setJson(col, jsonParser.parse(d.toString))
78+
pageBuilder.addRecord()
79+
}
80+
pageBuilder.finish()
81+
82+
Exec.newTaskReport()
2383
}
2484

2585
override def guess(config: ConfigSource): ConfigDiff =
2686
Exec.newConfigDiff()
87+
88+
private def createFirestore(pathToCredJson: String): Try[Firestore] = {
89+
Try {
90+
val serviceAccount = new FileInputStream(pathToCredJson)
91+
val credentials = GoogleCredentials.fromStream(serviceAccount)
92+
val options =
93+
new FirebaseOptions.Builder().setCredentials(credentials).build
94+
FirebaseApp.initializeApp(options)
95+
96+
FirestoreClient.getFirestore
97+
}
98+
}
2799
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.embulk.input.firestore
2+
3+
import org.embulk.config.{Config, ConfigDefault, ConfigInject, Task}
4+
import org.embulk.spi.BufferAllocator
5+
6+
trait PluginTask extends Task {
7+
8+
@Config("project_id")
9+
def getProjectId: String
10+
11+
@Config("json_keyfile")
12+
def getJsonKeyfile: String
13+
14+
@Config("json_column_name")
15+
@ConfigDefault("\"record\"")
16+
def getJsonColumnName: String
17+
18+
@Config("sql")
19+
def getSql: String
20+
21+
@ConfigInject
22+
def getBufferAllocator: BufferAllocator
23+
}

0 commit comments

Comments
 (0)