This repository was archived by the owner on May 12, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathPostgresProcess.java
376 lines (344 loc) · 16.4 KB
/
PostgresProcess.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package ru.yandex.qatools.embed.postgresql;
import de.flapdoodle.embed.process.config.IRuntimeConfig;
import de.flapdoodle.embed.process.config.io.ProcessOutput;
import de.flapdoodle.embed.process.config.store.IDownloadConfig;
import de.flapdoodle.embed.process.distribution.Distribution;
import de.flapdoodle.embed.process.extract.IExtractedFileSet;
import de.flapdoodle.embed.process.io.Slf4jLevel;
import de.flapdoodle.embed.process.io.Slf4jStreamProcessor;
import de.flapdoodle.embed.process.io.directories.IDirectory;
import de.flapdoodle.embed.process.io.progress.Slf4jProgressListener;
import de.flapdoodle.embed.process.runtime.Executable;
import de.flapdoodle.embed.process.runtime.ProcessControl;
import de.flapdoodle.embed.process.store.IArtifactStore;
import de.flapdoodle.embed.process.store.IMutableArtifactStore;
import de.flapdoodle.embed.process.store.PostgresArtifactStore;
import de.flapdoodle.embed.process.store.PostgresArtifactStoreBuilder;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import ru.yandex.qatools.embed.postgresql.config.IMutableDownloadConfig;
import ru.yandex.qatools.embed.postgresql.config.PostgresConfig;
import ru.yandex.qatools.embed.postgresql.config.PostgresDownloadConfigBuilder;
import ru.yandex.qatools.embed.postgresql.config.RuntimeConfigBuilder;
import ru.yandex.qatools.embed.postgresql.ext.LogWatchStreamProcessor;
import ru.yandex.qatools.embed.postgresql.ext.SubdirTempDir;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static de.flapdoodle.embed.process.io.file.Files.forceDelete;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
import static java.nio.file.Files.lines;
import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.slf4j.LoggerFactory.getLogger;
import static ru.yandex.qatools.embed.postgresql.Command.CreateDb;
import static ru.yandex.qatools.embed.postgresql.Command.InitDb;
import static ru.yandex.qatools.embed.postgresql.Command.PgDump;
import static ru.yandex.qatools.embed.postgresql.Command.PgRestore;
import static ru.yandex.qatools.embed.postgresql.Command.Psql;
import static ru.yandex.qatools.embed.postgresql.PostgresStarter.getCommand;
import static ru.yandex.qatools.embed.postgresql.config.AbstractPostgresConfig.Storage;
/**
* postgres process
*/
public class PostgresProcess extends AbstractPGProcess<PostgresExecutable, PostgresProcess> {
private static final int MAX_CREATEDB_TRIALS = 3;
private static final int DEFAULT_CMD_TIMEOUT = 2000;
private static Logger LOGGER = getLogger(PostgresProcess.class);
private final IRuntimeConfig runtimeConfig;
private volatile boolean processReady = false;
private volatile boolean stopped = false;
public PostgresProcess(Distribution distribution, PostgresConfig config,
IRuntimeConfig runtimeConfig, PostgresExecutable executable) throws IOException {
super(distribution, config, runtimeConfig, executable);
this.runtimeConfig = runtimeConfig;
}
private static String runCmd(PostgresConfig config, IRuntimeConfig parentRuntimeCfg, Command cmd, String successOutput,
Set<String> failOutput, String... args) {
return runCmd(false, config, parentRuntimeCfg, cmd, successOutput, failOutput, args);
}
private static String runCmd(boolean silent,
PostgresConfig config, IRuntimeConfig parentRuntimeCfg, Command cmd, String successOutput,
Set<String> failOutput, String... args) {
try {
final LogWatchStreamProcessor logWatch = new LogWatchStreamProcessor(successOutput,
failOutput, new Slf4jStreamProcessor(LOGGER, Slf4jLevel.TRACE));
IArtifactStore artifactStore = parentRuntimeCfg.getArtifactStore();
IDownloadConfig downloadCfg = ((PostgresArtifactStore) artifactStore).getDownloadConfig();
if (downloadCfg instanceof IMutableDownloadConfig) {
IDirectory tempDir = SubdirTempDir.defaultInstance();
if (downloadCfg.getPackageResolver() instanceof PackagePaths) {
tempDir = ((PackagePaths) downloadCfg.getPackageResolver()).getTempDir();
}
((IMutableDownloadConfig) downloadCfg).setPackageResolver(new PackagePaths(cmd, tempDir));
} else {
LOGGER.warn("Could not use the configured download configuration for '" + cmd.commandName() +
"', falling back to default!");
downloadCfg = new PostgresDownloadConfigBuilder().defaultsForCommand(cmd)
.progressListener(new Slf4jProgressListener(LOGGER)).build();
}
if (artifactStore instanceof IMutableArtifactStore) {
((IMutableArtifactStore) artifactStore).setDownloadConfig(downloadCfg);
} else {
LOGGER.warn("Could not use the configured artifact store for '" + cmd.commandName() +
"', falling back to default!");
artifactStore = new PostgresArtifactStoreBuilder().defaults(cmd).download(downloadCfg).build();
}
final IRuntimeConfig runtimeCfg = new RuntimeConfigBuilder().defaults(cmd)
.daemonProcess(false)
.processOutput(new ProcessOutput(logWatch, logWatch, logWatch))
.artifactStore(artifactStore)
.commandLinePostProcessor(parentRuntimeCfg.getCommandLinePostProcessor()).build();
final PostgresConfig postgresConfig = new PostgresConfig(config).withArgs(args);
if (Command.InitDb == cmd) {
postgresConfig.withAdditionalInitDbParams(config.getAdditionalInitDbParams());
}
final Executable<?, ? extends AbstractPGProcess> exec = getCommand(cmd, runtimeCfg)
.prepare(postgresConfig);
AbstractPGProcess proc = exec.start();
logWatch.waitForResult(DEFAULT_CMD_TIMEOUT);
if (!logWatch.isInitWithSuccess() && !silent) {
LOGGER.warn("Possibly failed to run {}:\n{}", cmd.commandName(), logWatch.getOutput());
}
proc.waitFor();
return logWatch.getOutput();
} catch (IOException | InterruptedException e) {
if (!silent) {
LOGGER.warn("Failed to run command {}", cmd.commandName(), e);
}
}
return null;
}
private static boolean shutdownPostgres(PostgresConfig config, IRuntimeConfig runtimeConfig) {
try {
return isEmpty(runCmd(true, config, runtimeConfig, Command.PgCtl, "server stopped", emptySet(), "stop"));
} catch (Exception e) {
LOGGER.trace("Failed to stop postgres by pg_ctl!", e);
}
return false;
}
@Override
protected synchronized void stopInternal() {
if (!stopped && isProcessRunning()) {
stopped = true;
LOGGER.info("trying to stop postgresql");
if (!sendStopToPostgresqlInstance() && !sendTermToProcess() && waitUntilProcessHasStopped(2000)) {
LOGGER.warn("could not stop postgresql with pg_ctl/SIGTERM, trying to kill it...");
if (!sendKillToProcess() && !tryKillToProcess() && waitUntilProcessHasStopped(3000)) {
LOGGER.warn("could not kill postgresql within 4s!");
}
}
}
if (waitUntilProcessHasStopped(5000)) {
LOGGER.error("Postgres has not been stopped within 10s! Something's wrong!");
}
deleteTempFiles();
}
private boolean waitUntilProcessHasStopped(int timeoutMillis) {
long started = currentTimeMillis();
while (currentTimeMillis() - started < timeoutMillis && isProcessRunning()) {
try {
sleep(50);
} catch (InterruptedException e) {
LOGGER.warn("Failed to wait with timeout until the process has been killed", e);
}
}
return isProcessRunning();
}
protected final boolean sendStopToPostgresqlInstance() {
final boolean result = shutdownPostgres(getConfig(), runtimeConfig);
if (runtimeConfig.getArtifactStore() instanceof PostgresArtifactStore) {
final IDirectory tempDir = ((PostgresArtifactStore) runtimeConfig.getArtifactStore()).getTempDir();
if (tempDir != null && tempDir.asFile() != null && tempDir.isGenerated()) {
LOGGER.info("Cleaning up after the embedded process (removing {})...", tempDir.asFile().getAbsolutePath());
forceDelete(tempDir.asFile());
}
}
return result;
}
@Override
protected void onBeforeProcess(IRuntimeConfig runtimeConfig)
throws IOException {
super.onBeforeProcess(runtimeConfig);
PostgresConfig config = getConfig();
final File dbDir = config.storage().dbDir();
if (dbDir.exists() && dbDir.listFiles() != null && dbDir.listFiles().length > 0) {
return;
}
runCmd(config, runtimeConfig, InitDb, "Success", singleton("[initdb error]"));
}
@Override
protected List<String> getCommandLine(Distribution distribution, PostgresConfig config, IExtractedFileSet exe)
throws IOException {
List<String> ret = new ArrayList<>();
switch (config.supportConfig().getName()) {
case "postgres": //NOSONAR
ret.addAll(asList(exe.executable().getAbsolutePath(),
"-p", String.valueOf(config.net().port()),
"-h", config.net().host(),
"-D", config.storage().dbDir().getAbsolutePath()
));
break;
case "pg_ctl": //NOSONAR
ret.addAll(asList(exe.executable().getAbsolutePath(),
"-o",
String.format("-p %s -h %s", config.net().port(), config.net().host()),
"-D", config.storage().dbDir().getAbsolutePath(),
"-w",
"start"
));
break;
default:
throw new RuntimeException("Failed to launch Postgres: Unknown command " +
config.supportConfig().getName() + "!");
}
return ret;
}
protected void deleteTempFiles() {
final Storage storage = getConfig().storage();
if (storage.dbDir() == null) {
return;
}
if (!storage.isTmpDir()) {
return;
}
if (!forceDelete(storage.dbDir())) {
LOGGER.warn("Could not delete temp db dir: {}", storage.dbDir());
}
}
@Override
protected final void onAfterProcessStart(ProcessControl process,
IRuntimeConfig runtimeConfig) throws IOException {
final Storage storage = getConfig().storage();
final Path pidFilePath = Paths.get(storage.dbDir().getAbsolutePath(), "postmaster.pid");
final File pidFile = new File(pidFilePath.toAbsolutePath().toString());
int timeout = TIMEOUT;
while (!pidFile.exists() && ((timeout = timeout - 100) > 0)) {
try {
sleep(100);
} catch (InterruptedException ie) { /* safe to ignore */ }
}
int pid = -1;
try {
pid = lines(pidFilePath).findFirst().map(Integer::valueOf)
.orElseThrow(() -> new IllegalStateException("Pid file is empty"));
} catch (Exception e) {
LOGGER.error("Failed to read PID file ({})", e.getMessage(), e);
}
if (pid != -1) {
setProcessId(pid);
} else {
// fallback, try to read pid file. will throw IOException if that fails
setProcessId(getPidFromFile(pidFile()));
}
int trial = 0;
do {
String output = runCmd(getConfig(),
runtimeConfig,
CreateDb,
"",
new HashSet<>(singleton("database creation failed")),
storage.dbName());
try {
if (isEmpty(output) || !output.contains("could not connect to database")) {
this.processReady = true;
break;
}
LOGGER.warn("Could not create database first time ({} of {} trials)", trial, MAX_CREATEDB_TRIALS);
sleep(100);
} catch (InterruptedException ie) { /* safe to ignore */ }
} while (trial++ < MAX_CREATEDB_TRIALS);
}
/**
* Import into database from file
*
* @param file The file to import into database
*/
public void importFromFile(File file) {
importFromFileWithArgs(file);
}
/**
* Import into database from file with additional args
*
* @param file
* @param cliArgs additional arguments for psql (be sure to separate args from their values)
*/
public void importFromFileWithArgs(File file, String... cliArgs) {
if (file.exists()) {
String[] args = {
"-U", getConfig().credentials().username(),
"-d", getConfig().storage().dbName(),
"-h", getConfig().net().host(),
"-p", String.valueOf(getConfig().net().port()),
"-f", file.getAbsolutePath()};
if (cliArgs != null && cliArgs.length != 0) {
args = ArrayUtils.addAll(args, cliArgs);
}
runCmd(getConfig(), runtimeConfig, Psql, "", new HashSet<>(singletonList("import into " + getConfig().storage().dbName() + " failed")), args);
}
}
/**
* Import into database from file with additional args
*
* @param file
* @param cliArgs additional arguments for psql (be sure to separate args from their values)
*/
public void restoreFromFile(File file, String... cliArgs) {
if (file.exists()) {
String[] args = {
"-U", getConfig().credentials().username(),
"-d", getConfig().storage().dbName(),
"-h", getConfig().net().host(),
"-p", String.valueOf(getConfig().net().port()),
file.getAbsolutePath()};
if (cliArgs != null && cliArgs.length != 0) {
args = ArrayUtils.addAll(args, cliArgs);
}
runCmd(getConfig(), runtimeConfig, PgRestore, "", new HashSet<>(singletonList("restore into " + getConfig().storage().dbName() + " failed")), args);
}
}
public void exportToFile(File file) {
runCmd(getConfig(), runtimeConfig, PgDump, "", new HashSet<>(singletonList("export from " + getConfig().storage().dbName() + " failed")),
"-U", getConfig().credentials().username(),
"-d", getConfig().storage().dbName(),
"-h", getConfig().net().host(),
"-p", String.valueOf(getConfig().net().port()),
"-f", file.getAbsolutePath()
);
}
public void exportSchemeToFile(File file) {
runCmd(getConfig(), runtimeConfig, PgDump, "", new HashSet<>(singletonList("export from " + getConfig().storage().dbName() + " failed")),
"-U", getConfig().credentials().username(),
"-d", getConfig().storage().dbName(),
"-h", getConfig().net().host(),
"-p", String.valueOf(getConfig().net().port()),
"-f", file.getAbsolutePath(),
"-s"
);
}
public void exportDataToFile(File file) {
runCmd(getConfig(), runtimeConfig, PgDump, "", new HashSet<>(singletonList("export from " + getConfig().storage().dbName() + " failed")),
"-U", getConfig().credentials().username(),
"-d", getConfig().storage().dbName(),
"-h", getConfig().net().host(),
"-p", String.valueOf(getConfig().net().port()),
"-f", file.getAbsolutePath(),
"-a"
);
}
public boolean isProcessReady() {
return processReady;
}
@Override
protected void cleanupInternal() {
}
}