<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">import os
import gzip
import subprocess
import shutil
import boto3
from mm_stats.config import (
    POSTGRES_HOST,
    POSTGRES_PORT,
    POSTGRES_USER,
    PGPASSWORD,
    PGDATABASE_HOT_TM,
    PGDATABASE,
    AWS_ACCESS_KEY_ID,
    AWS_SECRET_ACCESS_KEY,
    AWS_BUCKET_NAME,
    AWS_PREFIX,
)
from mm_stats.preparation.sql_templates import (
    copy_projects_table,
    copy_tasks_table,
    copy_task_history_table,
    copy_project_info_table,
    copy_users_table,
    copy_campaigns_table,
    copy_organisations_table,
    copy_impact_areas_table,
)
from mm_stats.auth import PostgresDb
from mm_stats.definitions import logger, TEMP_DATA_PATH
import psycopg2
from psycopg2 import sql
from psycopg2 import errors
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT


def download_db_dump(outfile="hot-tm.sql.gz", dump_filename=None):
    """Download HOT Tasking Manager DB dump from AWS S3 bucket."""
    outfile = os.path.join(
        TEMP_DATA_PATH, outfile
    )
    # open s3 client session
    s3 = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    )
    # get files in bucket and folder
    files = s3.list_objects_v2(Bucket=AWS_BUCKET_NAME, Prefix=AWS_PREFIX)["Contents"]
    logger.info("connected to AWS bucket.")

    # select latest file
    if dump_filename:
        bucket_object = dump_filename
        logger.info(f"using specified tm dump: {bucket_object}")
    else:
        bucket_object = files[-1]["Key"]
        logger.info(f"got latest tm dump: {bucket_object}")

    # download file
    with open(outfile, "wb") as f:
        s3.download_fileobj(AWS_BUCKET_NAME, bucket_object, f)
    logger.info(f"downloaded file: {outfile}")


def unzip_db_dump(infile="hot-tm.sql.gz"):
    """Unzip HOT Tasking Manager DB dump file."""
    infile = os.path.join(
        TEMP_DATA_PATH, infile
    )
    outfile = os.path.join(
        TEMP_DATA_PATH, "hot-tm.sql"
    )
    with gzip.open(infile, "rb") as f_in:
        with open(outfile, "wb") as f_out:
            shutil.copyfileobj(f_in, f_out)
    logger.info(f"unzipped file: {infile}")


def create_hot_tm_db():
    """Create hot_tm DB."""
    con = psycopg2.connect(
        dbname=PGDATABASE,
        user=POSTGRES_USER,
        host=POSTGRES_HOST,
        port=POSTGRES_PORT,
        password=PGPASSWORD,
    )

    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  # &lt;-- ADD THIS LINE
    cur = con.cursor()
    # Use the psycopg2.sql module instead of string concatenation
    # in order to avoid sql injection attacks.
    try:
        cur.execute(
            sql.SQL("CREATE DATABASE {};").format(sql.Identifier(PGDATABASE_HOT_TM))
        )
        logger.info(f"created database: {PGDATABASE_HOT_TM}")
        con.close()
    except errors.DuplicateDatabase:
        logger.info(f"database already exists: {PGDATABASE_HOT_TM}")
        con.close()


def drop_hot_tm_db():
    """Drop hot_tm DB."""
    try:
        process = subprocess.Popen(
            [
                "dropdb",
                "-h",
                POSTGRES_HOST,
                "-p",
                POSTGRES_PORT,
                "-U",
                POSTGRES_USER,
                PGDATABASE_HOT_TM,
            ],
            stdout=subprocess.PIPE,
        )
        output = process.communicate()[0]
        if int(process.returncode) != 0:
            print("Command failed. Return code : {}".format(process.returncode))

        logger.info(f"dropped database: {PGDATABASE_HOT_TM}")
        return output
    except Exception as e:
        print("Issue with the db deletion : {}".format(e))


def restore_db_dump(path="hot-tm.sql"):
    """Restore HOT Tasking Manager DB dump from file into hot_tm database."""
    path = os.path.join(
        TEMP_DATA_PATH, path
    )
    try:
        process = subprocess.Popen(
            [
                "psql",
                "-h",
                POSTGRES_HOST,
                "-p",
                POSTGRES_PORT,
                "-U",
                POSTGRES_USER,
                "-d",
                PGDATABASE_HOT_TM,
                "-f",
                path,
            ],
            stdout=subprocess.PIPE,
        )
        output = process.communicate()[0]
        if int(process.returncode) != 0:
            print("Command failed. Return code : {}".format(process.returncode))

        logger.info(f"restored database: {PGDATABASE_HOT_TM} from file: {path}")
        return output
    except Exception as e:
        print("Issue with the db restore : {}".format(e))


def run_copy_projects_table():
    """Copy projects table from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_projects_table.load_sql()
    db.query(sql)
    logger.info(f"copied projects to mm_stats db.")


def run_copy_tasks_table():
    """Copy tasks table from hot_tm db to mm_stats db.

    During this step ST_MakeValid on the task geometry is performed.
    For each task the geom_mollweide and centroid_mollweide are calculated.
    Indices are created for project_id, tasks_id and geometries.
    In total it takes around 10 minutes to complete all steps.
    """
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_tasks_table.load_sql()
    db.query(sql)
    logger.info(f"copied tasks to mm_stats db.")


def run_copy_task_history_table():
    """Copy task_history table from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_task_history_table.load_sql()
    db.query(sql)
    logger.info(f"copied task history to mm_stats db.")


def run_copy_project_info_table():
    """Copy project_info table from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_project_info_table.load_sql()
    db.query(sql)
    logger.info(f"copied project info to mm_stats db.")


def run_copy_users_table():
    """Copy users table from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_users_table.load_sql()
    db.query(sql)
    logger.info(f"copied users to mm_stats db.")


def run_copy_campaigns_table():
    """Copy campaigns table from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_campaigns_table.load_sql()
    db.query(sql)
    logger.info(f"copied campaigns to mm_stats db.")


def run_copy_organisations_table():
    """Copy organisations table from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_organisations_table.load_sql()
    db.query(sql)
    logger.info(f"copied organisations to mm_stats db.")


def run_copy_impact_areas_table():
    """Copy information in impact areas from hot_tm db to mm_stats db."""
    db = PostgresDb()
    sql = "set schema 'data_preparation';"
    sql += copy_impact_areas_table.load_sql()
    db.query(sql)
    logger.info(f"copied impact areas to mm_stats db.")
</pre></body></html>