Postgresql remote query in Java when direct or SSL is not working
SshPostgresql.java
package eu.sorescu.connector.ssh.postgresql;
import eu.sorescu.connector.ssh.SshClient;
import eu.sorescu.rt.*;
import eu.sorescu.util.DataTable;
import eu.sorescu.util.PasswordCredentials;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurperClassic;
import groovy.lang.Tuple2;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
public class SshPostgresql {
private final SshClient client;
public final String host;
private final int port;
private final PasswordCredentials credentials;
private final String db;
private final String dockerId;
public SshPostgresql(SshClient client, String dockerId) {
this.client = client;
this.host = "localhost";
this.port = 5432;
this.credentials = PasswordCredentials.of(user, pass);
this.db = dbname;
this.dockerId = dockerId;
}
public SshPostgresql(SshClient client, String host, int port, PasswordCredentials of, String db) {
this.client = client;
this.host = host;
this.port = port;
this.credentials = of;
this.db = db;
dockerId = null;
}
public List<DataTable<Object>> batchExecute(List<Tuple2<String, List<Object>>> queries) {
List<Map<String, Object>> request = new ArrayList<>();
queries.forEach(query -> {
Map<String, Object> queryRequest = new HashMap<>();
queryRequest.put("query", query.getFirst());
queryRequest.put("parameters", query.getSecond().toArray());
request.add(queryRequest);
});
String id = "d" + Ids.nextId();
client.upload("/tmp/" + id + ".json", JsonOutput.prettyPrint(JsonOutput.toJson(request)).getBytes());
client.upload("/tmp/" + id + ".py", new Resource(this.getClass(), "postgresql-batch.py").asByteArray());
client.evalQuiet("chmod 777 /tmp/" + id + ".*");
String prefix = "";
if (dockerId != null) {
prefix = "docker exec " + dockerId + " ";
client.evalQuiet("docker cp /tmp/" + id + ".json " + dockerId + ":/tmp/" + id + ".json");
client.evalQuiet("docker cp /tmp/" + id + ".py " + dockerId + ":/tmp/" + id + ".py");
client.evalQuiet("docker exec " + dockerId + " chmod 777 /tmp/" + id + ".*");
}
String obj = client.evalQuiet(prefix + "python /tmp/" + id + ".py /tmp/" + id + ".json " + host + " " + port + " " + credentials.user + " " + credentials.password + " " + db).toString();
if (dockerId != null) {
client.evalQuiet("docker cp " + dockerId + ":/tmp/" + id + ".json.out /tmp/" + id + ".json.out");
}
byte[] output = Functionals.tryGet(() -> client.download("/tmp/" + id + ".json.out"), it -> null);
client.evalQuiet("rm /tmp/" + id + ".*");
if (obj.trim().length() > 0) throw new RtException("e1910280946", obj);
List<List<Map<String, Object>>> res = Functionals.tryGet(() -> (List<List<Map<String, Object>>>) (new JsonSlurperClassic().parseText(new String(output))), it -> {
throw new RtException("e1910280903", obj);
});
List<DataTable<Object>> finalResult = new ArrayList<>();
System.out.println("-- " + client);
for (List<Map<String, Object>> rrr : res) {
List<Map<String, Object>> result = new ArrayList<>();
for (Map<String, Object> r : rrr) {
Map<String, Object> row = new LinkedHashMap<>();
r.forEach((key, value) -> row.put(key, json2java(value)));
result.add(row);
}
finalResult.add(DataTable.of(result));
System.out.println(Psv.toString(result));
}
return finalResult;
}
public DataTable<Object> select(String sql, Object... args) {
return batchExecute(Collections.singletonList(new Tuple2<>(sql, Arrays.asList(args)))).get(0);
}
public List<Map<String, Object>> SELECT(String sql) {
String id = "d" + Ids.nextId();
if (sql.trim().endsWith(";")) sql = sql.trim().substring(0, sql.length() - 1);
String augmentedSql = "COPY(SELECT row_to_json(t)FROM(" + sql + ")t)TO STDOUT;";
client.upload("/tmp/" + id + ".sql", augmentedSql.getBytes());
client.evalQuiet("chmod 777 /tmp/" + id + ".*");
String prefix = "";
if (dockerId != null) {
prefix = "docker exec " + dockerId + " ";
client.evalQuiet("docker cp /tmp/" + id + ".json " + dockerId + ":/tmp/" + id + ".sql");
client.evalQuiet("docker exec " + dockerId + " chmod 777 /tmp/" + id + ".*");
}
String output = client.eval(prefix + "psql postgresql://" + credentials.user + ":" + credentials.password + "@" + host + ":" + port + "/" + db + " -f /tmp/" + id + ".sql").toString();
client.eval("rm /tmp/" + id + ".*");
JsonSlurperClassic slurper = new JsonSlurperClassic();
List<Map<String, Object>> result = new ArrayList<>();
System.out.println(output);
output=output.replaceAll("\\\\\\\\","\\\\");
Arrays.stream(output.split("\\v+")).map(String::trim).filter(it -> it.length() != 0).map(slurper::parseText).forEach(it -> result.add((Map<String, Object>) it));
return result;
}
public void EXECUTE(String sql) {
String id = "d" + Ids.nextId();
if (sql.trim().endsWith(";")) sql = sql.trim().substring(0, sql.length() - 1);
String augmentedSql = sql;///"COPY(SELECT row_to_json(t)FROM(" + sql + ")t)TO STDOUT;";
client.upload("/tmp/" + id + ".sql", augmentedSql.getBytes());
client.evalQuiet("chmod 777 /tmp/" + id + ".*");
String prefix = "";
if (dockerId != null) {
prefix = "docker exec " + dockerId + " ";
client.evalQuiet("docker cp /tmp/" + id + ".json " + dockerId + ":/tmp/" + id + ".sql");
client.evalQuiet("docker exec " + dockerId + " chmod 777 /tmp/" + id + ".*");
}
String output = client.eval(prefix + "psql postgresql: client.eval("rm /tmp/" + id + ".*");
System.out.println(output);
}
private static Object json2java(Object json) {
if (json == null) return json;
if (json instanceof Number) return json;
if (json instanceof CharSequence) return json;
if (json instanceof Character) return json;
if (json instanceof Boolean) return json;
if (json instanceof List) return json;
if (json instanceof Map) {
if (((Map) json).containsKey("$datetime.datetime"))
return Instant.ofEpochSecond(((Number) ((Map) json).get("$datetime.datetime")).longValue());
if (((Map) json).containsKey("$datetime.timedelta"))
return Duration.ofSeconds(((Number) ((Map) json).get("$datetime.timedelta")).longValue());
throw new RtException("e1910221254", "Unrecognized type: `" + ((Map) json).keySet() + "`.");
}
throw new RtException("e1910141445", "JSON deserialization for `" + json.getClass() + "` not implemented.");
}
}
postgresql-batch.py
import psycopg2, sys, json, datetime
def deserialize(v):
if isinstance(v, dict):
if '$datetime.datetime' in v.keys():
return datetime.datetime(v['$datetime.datetime'])
elif '$datetime.timedelta'in v.keys():
return datetime.timedelta(v['$datetime.timedelta'])
else:
raise Exception('Type not understood: '+v.__str__())
return v
def serialize(v):
if (isinstance(v, datetime.datetime)):
epochZero = datetime.datetime(1970, 1, 1, tzinfo=v.tzinfo)
return {'$datetime.datetime': (v - epochZero).total_seconds()}
if (isinstance(v, datetime.timedelta)):
return {'$datetime.timedelta': v.total_seconds()}
return v
batchResult = []
for request in json.load(open(sys.argv[1], 'r')):
cursor = psycopg2.connect(host=sys.argv[2], port=sys.argv[3], user=sys.argv[4], password=sys.argv[5], database=sys.argv[6]).cursor()
cursor.execute(request['query'], tuple([deserialize(v)for v in request['parameters']]))
try:
rows = cursor.fetchall()
columns = cursor.description
rs = [({columns[idx].name: serialize(row[idx]) for idx in range(len(columns))}) for row in rows]
batchResult.append(rs)
except psycopg2.ProgrammingError, e:
batchResult.append([])
json.dump(batchResult, open(sys.argv[1] + '.out', 'w'))