{{error}}
{{(quickSearchResults.length>10)?'10+':(quickSearchResults.length)}} {{(quickSearchResults.length==1)?'result':'results'}}
{{result.title}} {{result.timeStamp | mysql2ymd }}
I am sorry, no such article was written yet.
How I renamed cryptically-named epub files from Gutenberg library with their (meaningful) titles
1 files attached: epub_rename.py
epub_rename.py
import os.path
import pathlib
import zipfile

ROOT=pathlib.Path(r"G:\My Drive\_ Cărți\Gen2\Ivan Sergeevich Turgenev")

def process_path(path:pathlib.Path):
    print(path)
    z=zipfile.ZipFile(path)
    files=z.namelist()
    content_file=z.read('META-INF/container.xml').decode('UTF-8').split('full-path=')[1].split('"')[1]
    title=z.read(content_file).decode('UTF-8').split('title>')[1].split('<')[0]
    title=title.replace('/','-')
    title=title.replace('"','')
    title=title.replace(';','-')
    title=title.replace(':','-')
    z.close()
    print(path)
    print(title)
    path.rename(ROOT.joinpath(title+'.epub'))
    print()



if __name__ == '__main__':
    print(os.path.exists(ROOT))
    for file_name in os.listdir(ROOT):
        if not file_name.startswith('pg'):
            continue
        process_path(ROOT.joinpath(file_name))
        # break
How to create a huge empty file in Windows without occupying space on disk
1 files attached: create_huge_file.cmd
create_huge_file.cmd
type NUL > my_10_tb_file
fsutil sparse setflag my_10_tb_file
fsutil sparse setrange my_10_tb_file 0 10000000000000
fsutil file seteof my_10_tb_file 10000000000000
Python wrapper for PGP encryption
1 files attached: pgp.py
pgp.py
import os
import subprocess, re
import tempfile
from typing import Union
from common.subprocess import SubprocessResult


class PgpWrapper:
    def __init__(self):
        self.work_folder: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory()
        self.env = env

    def _run(self, vargs: list[str]) -> SubprocessResult:
        return common.subprocess.run(self.env.config.get_item('pgp', 'executable-path'),
                                         ['--homedir', self.work_folder.name, *vargs])


class PgpEncrypt(PgpWrapper):
    def __init__(self, public_key_path: str):
        super().__init__(env)
        res = self._run(['--import', public_key_path]).assertCode0()
        key_mail_row: str = [it for it in str(res.err, 'UTF-8').split('\r\n') if it[0:9] == 'gpg: key '][0]
        self.key_id: str = re.sub('[^A-Z0-9]+', ' ', key_mail_row).strip().split(' ')[0]
        self.recipient_email: str = key_mail_row.split('"')[1]

    def encrypt(self, message: Union[bytes, str]):
            if type(message) is str: message = bytes(message, 'UTF-8')
            with tempfile.TemporaryFile(delete=False) as input:
                input.write(message)
            output_name = os.path.join(self.work_folder.name, Ids.next_subid())
            self._run(['--openpgp', '-e', '--always-trust', '-a', '-r', self.key_id, '-o', output_name,
                       input.name]).assertCode0()
            return open(output_name).read()
Selenium Chrome Driver autodownloading
2 files attached: SeleniumChromeDriverConfigurator.java usage.java
SeleniumChromeDriverConfigurator.java usage.java
if(fetchLatest){
SeleniumChromeDriverConfigurator.configure();
}else{
SeleniumChromeDriverConfigurator.configure("87.0.4280.141");
Ruby list of hashes to PSV text
1 files attached: table_to_psv.rb
table_to_psv.rb
def table_to_psv(table, header = nil)
  header = table.map { |it| it.keys }.flatten if header.nil?
  header = header.uniq.map { |it| it.to_s }
  table = table.map { |row| row.map { |k, v| [k.to_s, v] }.to_h }
  table = table.map { |row| header.map { |h| [h, row[h].to_s] }.to_h }
  column_widths = header.map { |head|
    values_max = table.map { |row| row[head].length }.max
    [head, [values_max, head.length].max]
  }.to_h

  ([header.map { |it| [it, it] }.to_h] + table).map do |row|
    '|' + column_widths.map { |key, length| row[key].ljust(length, ' ') }.join('|') + '|'
  end
end
Simple code that is converting a list of maps to a pipe-separated text
Simple code that is converting a list of maps to a pipe-separated text
1 files attached: Psv.groovy
Psv.groovy
static String toString(List<Map<String, Object>> data) {
        List<String> header = data*.keySet().flatten().unique()*.toString()
        List<List<String>> cells = ([header] + (0..data.size() - 1).collect { row ->
            header.collect { key -> (data[row][key] as String) ?: '' }
        }).transpose().collect { column -> column*.padRight(column*.length().max()) }.transpose()
        return cells.collect { row -> "|" + row.join("|") + "|" }.join("\r\n")
    }
Postgres connection for Python with optional SSH tunnel
print(Postgresql('postgresql://postgres:postgres@host:5432/db_name','ssh://root:root_password@host:22').select('select %s + %s;', 1, 4)) print(Postgresql('postgresql://postgres:postgres@host:5432/db_name').select('select %s + %s;', 1, 4))
1 files attached: postgresql.py
postgresql.py
import urllib
from typing import Union
from urllib.parse import urlparse

import psycopg2 as psycopg2

from sorescu.ssh import SshTunnel


class Postgresql:
    def __init__(self, postgres_url: str, ssh_url: Union[str, None] = None):
        if postgres_url[0:13] == 'postgresql://':
            postgres_url = urlparse(postgres_url)
            if ssh_url is not None:
                tunnel = SshTunnel.SshTunnel(ssh_url, postgres_url.port)
                self._connection = psycopg2.connect(host=tunnel.local_bind_host, port=tunnel.local_bind_port,
                                                    user=postgres_url.username,
                                                    password=postgres_url.password, database=postgres_url.path[1:],
                                                    sslmode=
                                                    urllib.parse.parse_qs(postgres_url.query).get('sslmode', [None])[0])
            else:
                self._connection = psycopg2.connect(host=postgres_url.hostname, port=postgres_url.port,
                                                    user=postgres_url.username, password=postgres_url.password,
                                                    database=postgres_url.path[1:],
                                                    sslmode=
                                                    urllib.parse.parse_qs(postgres_url.query).get('sslmode', [None])[0])
        else:
            raise Exception(f"Postgresql URL not supported: `{postgres_url}`")
        self._connection.autocommit = True

    def exec(self, query: str, *vargs: object):
        cursor = self._connection.cursor()
        print(query)
        cursor.execute(query, tuple(vargs))
        return cursor

    def select(self, query: str, *vargs: object):
        cursor = self.exec(query, *vargs)
        rows = cursor.fetchall()
        columns = cursor.description
        return [({columns[idx].name: row[idx] for idx in range(len(columns))}) for row in rows]

    def append(self, table_name: str, data: dict):
        key_code = ','.join(['"' + key + '"' for key in data])
        query = f"INSERT INTO {table_name} ({key_code})VALUES ({','.join(['%s' for key in data])}); commit;"
        self.exec(query, *[data[key] for key in data])


if __name__ == '__main__':
    print(Postgresql('postgresql://postgres:postgres@host:5432/db_name','ssh://root:root_password@user:22').select('select %s + %s;', 1, 4))
    print(Postgresql('postgresql://postgres:postgres@localhost:5432/db_name').select('select %s + %s;', 1, 4))
Python SSH Tunnel Forwarder
Postgres forwarder: SshTunnel.SshTunnel("ssh://user:pass@host:port", 5432)
1 files attached: SshTunnel.py
SshTunnel.py
import functools
import urllib

from sshtunnel import SSHTunnelForwarder
from urllib.parse import urlparse
import socket
from contextlib import closing


def find_free_port():
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]


class SshTunnel(SSHTunnelForwarder):
    _cache = dict()

    @functools.lru_cache
    def __init__(self, url: str, remote_port: int):
        url = urlparse(url)
        local_port = find_free_port()
        super().__init__((url.hostname, url.port),
                         ssh_username=urllib.parse.unquote(url.username),
                         ssh_password=urllib.parse.unquote(url.password),
                         remote_bind_address=('localhost', remote_port),
                         local_bind_address=('localhost', local_port)
                         )
        self.start()
        self._server_list[0].block_on_close = False
Python minimal HTTP server
1 files attached: server.py
server.py
#!/usr/bin/python
import functools
import http.server
import json
import os
import re
import sys
import traceback
import urllib
from typing import Union
import urllib.parse

class HttpRequest:
    __ROOT_FOLDER = os.path.join(Config().root_folder(), 'http')

    def __init__(self, handler: http.server.SimpleHTTPRequestHandler):
        self.__handler = handler
        self._url = urllib.parse.urlparse(self.__handler.path)

        segments = self._url.path[1::].split('/')

        self.file_path = self.__ROOT_FOLDER
        while segments.__len__() > 0:
            candidate = os.path.join(self.file_path, segments[0])
            if not os.path.exists(candidate):
                if not os.path.exists(f"{candidate}.py"):
                    break
            self.file_path = os.path.join(self.file_path, segments[0])
            segments = segments[1:]
        self.SEGMENTS={k: v for k, v in zip(range(0, segments.__len__()), segments)}

    # @property
    # @functools.lru_cache
    # def file_path(self):
    #     result = self._url.path[1::]
    #     segments = result.split('/')
    #     current_path = self.__ROOT_FOLDER
    #     while segments.__len__() > 0:
    #         candidate = os.path.join(current_path, segments[0])
    #         if not os.path.exists(candidate):
    #             if not os.path.exists(f"{candidate}.py"):
    #                 break
    #         current_path = os.path.join(current_path, segments[0])
    #         segments = segments[1:]
    #     return current_path

    @property
    @functools.lru_cache
    def file(self):
        # segments=self.file_path.split('/')
        # segments.remove(0)
        # print(f">>>>>>>>> {segments}")
        result = self.file_path
        print(f"file: {self.file_path}")
        if os.path.isfile(result):
            return result
        print(f"file2: {self.file_path}")
        if not os.path.exists(result):
            if os.path.exists(result + '.py'):
                print(f"file3: {self.file_path}")
                return result + '.py'
        print(f"file4: {self.file_path}")
        if os.path.isdir(result):
            print(f"file5: {self.file_path}")
            result = os.path.join(result, 'index.html')
        if os.path.exists(result):
            print(f"file6: {self.file_path}")
            return result
        raise FileNotFoundError(f"HTTP resource `{self.__handler.path}` not found: {result}")

    @property
    @functools.lru_cache
    def extension(self):
        return os.path.splitext(self.file)[1][1::]

    @property
    def GET(self) -> dict:
        qs = urllib.parse.parse_qs(self._url.query)
        return dict((key, qs[key][0]) for key in qs)

    @property
    @functools.lru_cache
    def text(self):
        if self.__handler.headers['Content-Type'] is not None:
            if self.__handler.headers['Content-Type'].startswith('application/json'):
                return self.__handler.rfile.read(int(self.__handler.headers['Content-Length']))
        return ''

    @property
    @functools.lru_cache
    def POST(self):
        if self.text != "" and self.text is not None:
            return json.loads(self.text)
        else:
            return dict()

    @property
    @functools.lru_cache
    def ARGS(self) -> dict:
        result = dict()
        result.update(self.SEGMENTS)
        result.update(self.GET)
        if isinstance(self.POST, dict):
            result.update(self.POST)
        return result


class HttpResponse:
    def __init__(self, handler: http.server.SimpleHTTPRequestHandler):
        self.__handler = handler
        self.code = 200
        self.body = bytearray()
        self.headers = {}

    def write(self, data: Union[str, bytes, BaseException]):
        if isinstance(data, str):
            self.body += bytes(data, 'utf-8')
        elif isinstance(data, bytes):
            self.body += data
        elif isinstance(data, BaseException):
            self.write('</pre></style></script></textarea><pre color="red">\r\n')
            self.write(data.__str__())
            self.write('\r\n\t')
            self.write('\r\n\t'.join(traceback.format_tb(data.__traceback__)))

            sys.stderr.write(data.__str__())
            sys.stderr.write('\r\n\t')
            sys.stderr.write('\r\n\t'.join(traceback.format_tb(data.__traceback__)))
        else:
            raise TypeError(f"Type not supported: {data.__class__.__qualname__}")


class WebHandler(http.server.SimpleHTTPRequestHandler):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._METHOD = None

    @property
    @functools.lru_cache
    def _REQUEST(self):
        return HttpRequest(self)

    @property
    @functools.lru_cache
    def _RESPONSE(self):
        return HttpResponse(self)

    def do_request(self, method):
        print(f"do_request {method}")
        self._METHOD = method
        try:
            open(self._REQUEST.file, "rb").read()
        except FileNotFoundError:
            print(f"File not found: {self._REQUEST.file_path}")
            self._RESPONSE.headers['Content-Type'] = 'text/html'
            self._RESPONSE.code = 404
            self._RESPONSE.write("<style>*{font-family:sans-serif}</style>")
            self._RESPONSE.write(f"<h1>{self.path}</h1>")
            self._RESPONSE.write(f"<a href='..'>Go up one level</a><br/>")

            if os.path.isdir(self._REQUEST.file_path):
                self._RESPONSE.write(f"<strong>This folder contains no index.html</strong><br/>")
            else:
                self._RESPONSE.write(f"<strong>This resource does not exist.</strong><br/>")
            self._RESPONSE.write("<br/>")
            if os.path.isdir(self._REQUEST.file_path):
                for segment in os.listdir(self._REQUEST.file_path):
                    if os.path.isdir(os.path.join(self._REQUEST.file_path, segment)):
                        segment = os.path.join(segment, '')
                    self._RESPONSE.write(f"<a href='{segment}'>{segment}</a><br/><br/>")
        try:
            if self._REQUEST.extension == 'py':
                exec(compile(open(self._REQUEST.file, "r").read(),self._REQUEST.file_path,'exec'))
            else:
                self._RESPONSE.body += open(self._REQUEST.file, "rb").read()
        except BaseException as exception:
            print(exception)
            self._RESPONSE.code = 422
            self._RESPONSE.write(exception)

        self.send_response(self._RESPONSE.code)
        try:
            body_text = str(self._RESPONSE.body, 'utf-8')
            body_text = re.sub(pattern=r"<%fa\:([^%]*)%>", repl='<span class="fas fa-\\1"></span>', string=body_text)
            self._RESPONSE.body = bytes(body_text, 'utf-8')
        except:
            pass
        self._RESPONSE.headers['Content-length'] = self._RESPONSE.body.__len__()
        for k in self._RESPONSE.headers:
            print(k + ": " + self._RESPONSE.headers[k].__str__())
            self.send_header(k, self._RESPONSE.headers[k].__str__())
        self.end_headers()
        self.wfile.write(self._RESPONSE.body)

    def do_GET(self):
        self.do_request('GET')

    def do_PUT(self):
        self.do_request('PUT')

    def do_DELETE(self):
        self.do_request('DELETE')

    def do_POST(self):
        self.do_request('POST')


class HttpServer:
    def __init__(self, port: int = 3187):
        self._server = http.server.ThreadingHTTPServer
        self._httpd = self._server(("", port), WebHandler)

    def start(self):
        print("HttpServer.start()")
        self._httpd.serve_forever()


if __name__ == "__main__":
    HttpServer().start()
Postgresql remote query in Java when direct or SSL is not working
2 files attached: SshPostgresql.java postgresql-batch.py
SshPostgresql.java postgresql-batch.py
import psycopg2, sys, json, datetime


def deserialize(v):
    if isinstance(v, dict):
        if '$datetime.datetime' in v.keys():
            return datetime.datetime(v['$datetime.datetime'])
        elif '$datetime.timedelta'in v.keys():
            return datetime.timedelta(v['$datetime.timedelta'])
        else:
            raise Exception('Type not understood: '+v.__str__())
    return v
def serialize(v):
    if (isinstance(v, datetime.datetime)):
        epochZero = datetime.datetime(1970, 1, 1, tzinfo=v.tzinfo)
        return {'$datetime.datetime': (v - epochZero).total_seconds()}
    if (isinstance(v, datetime.timedelta)):
        return {'$datetime.timedelta': v.total_seconds()}
    return v


batchResult = []
for request in json.load(open(sys.argv[1], 'r')):
    cursor = psycopg2.connect(host=sys.argv[2], port=sys.argv[3], user=sys.argv[4], password=sys.argv[5], database=sys.argv[6]).cursor()
    cursor.execute(request['query'], tuple([deserialize(v)for v in request['parameters']]))
    try:
        rows = cursor.fetchall()
        columns = cursor.description
        rs = [({columns[idx].name: serialize(row[idx]) for idx in range(len(columns))}) for row in rows]
        batchResult.append(rs)
    except psycopg2.ProgrammingError, e:
        batchResult.append([])
json.dump(batchResult, open(sys.argv[1] + '.out', 'w'))