Pure Python script to process NLCD for the USA (does not require using QGIS Python Console)

From FlightGear wiki
Jump to navigation Jump to search

Pure Python script to process NLCD for the USA (does not require using QGIS Python Console)

This has been tested in Linux, it may or may not need modification to run under Windows OS.

USAGE: python3 gen-scenery.py --part 89-86_29 --canopy 41

#!/usr/bin/env python3
import os
import subprocess
import argparse
import tempfile
from pathlib import Path
from osgeo import gdal

# ----------------- CONFIG -----------------

BASE_PATH = Path("/media/wayne/TOSHIBA-EXT/Scenery/ws3.0")
YEAR = "2023"
STATE = "Alabama"
# The following two parameters are now arguments, see ARGUMENT PARSING below
# PART = "89-86_29"
# CANOPY = 41

# upsampling / smoothing
PERCENTAGE = 11.0
SIZE = 21

# GRASS binary
GRASS_BIN = "grass"

# ----------------- ARGUMENT PARSING -----------------

# USAGE: python3 gen-scenery.py --part 89-86_29 --canopy 41

#NLCD
# Choose the dominent canopy for the area being processed
# Areas not covered in the Tree Canopy layer will convert retain their original class
#41 = DeciduousForest
#42 = EvergreenForest
#43 = MixedForest

#FG equivalent 
#22 = AgroForest
#23 = DeciduousBroadCover
#24 = EvergreenForest
#25 = MixedForest

parser = argparse.ArgumentParser()
parser.add_argument("--part", required=True, help="Tile part ID, e.g. 89-86_29")
parser.add_argument("--canopy", required=True, type=int, help="Tree canopy FG class")
args = parser.parse_args()

PART = args.part
CANOPY = args.canopy

# ----------------- HELPERS -----------------

def run(cmd, check=True, env=None):
    print(">>", " ".join(str(c) for c in cmd))
    subprocess.run(cmd, check=check, env=env)

def gdal_calc(args):
    fixed = []
    for a in args:
        if a.startswith("--") and "=" in a:
            key, val = a.split("=", 1)
            fixed.append(key)
            fixed.append(val)
        else:
            fixed.append(a)

    cmd = ["gdal_calc.py"] + fixed + ["--overwrite"]
    run(cmd)

def gdalwarp(input_path, output_path, t_srs="EPSG:4326",
             resampling="near", dtype="Byte", extra=None):
    cmd = [
        "gdalwarp",
        "-t_srs", t_srs,
        "-r", resampling,
        "-ot", dtype,
        "-overwrite",
        input_path,
        output_path
    ]
    if extra:
        cmd.extend(extra)
    run(cmd)

def gdal_translate(input_path, output_path,
                   nodata=0, dtype="Byte", compress="LZW"):
    cmd = [
        "gdal_translate",
        "-a_nodata", str(nodata),
        "-ot", dtype,
        "-co", f"COMPRESS={compress}",
        input_path,
        output_path
    ]
    run(cmd)

# ----------------- GRASS SESSION -----------------

# Create a temporary GRASS location with PERMANENT mapset
TMPDIR = tempfile.mkdtemp(prefix="grass_job_")
GRASS_LOCATION = os.path.join(TMPDIR, "location")

# Create the location
subprocess.run([
    GRASS_BIN,
    "-c", "EPSG:4326",
    GRASS_LOCATION,
    "--exec", "echo", "Location created"
], check=True)

# The active mapset is PERMANENT
GRASS_MAPSET = os.path.join(GRASS_LOCATION, "PERMANENT")

def grass_exec(args):
    cmd = [GRASS_BIN, GRASS_MAPSET, "--exec"] + args
    print(">>", " ".join(cmd))
    subprocess.run(cmd, check=True)

def grass_exec(args):
    cmd = [GRASS_BIN, GRASS_MAPSET, "--exec"] + args
    print(">>", " ".join(cmd))
    subprocess.run(cmd, check=True)

def grass_r_neighbors(input_raster, output_raster,
                      method="median", size=7):
    in_name = "in_rast"
    out_name = "out_rast"

    grass_exec([
        "r.in.gdal",
        f"input={input_raster}",
        f"output={in_name}",
        "--overwrite"
    ])

    grass_exec([
        "g.region",
        f"raster={in_name}"
    ])

    grass_exec([
        "r.neighbors",
        f"input={in_name}",
        f"output={out_name}",
        f"method={method}",
        f"size={size}",
        "--overwrite"
    ])

    grass_exec([
        "r.out.gdal",
        "-f",
        f"input={out_name}",
        f"output={output_raster}",
        "format=GTiff",
        "type=Byte",
        "--overwrite"
    ])

def p(*parts):
    return str(BASE_PATH.joinpath(STATE, *parts))

# ----------------- STEPS -----------------

def step1_tree_canopy_reclass():
    src = p("data", "source", f"NLCD_{YEAR}_Tree_Canopy_{STATE}{PART}.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Trees-Combined.tiff")

    expr = f"((A > 0) & (A < 255)) * {CANOPY} + (A <= 0) * A"
    gdal_calc([
        "-A", src,
        "--outfile", dst,
        "--calc", expr,
        "--type", "Byte"
    ])
    print("Step 1: Trees-Combined")

def step2_warp_tree_canopy_4326():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Trees-Combined.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Tree_Canopy_4326.tiff")
    gdalwarp(src, dst)
    print("Step 2: Tree_Canopy_4326")

def step3_warp_land_cover_4326():
    src = p("data", "source", f"NLCD_{YEAR}_Land_Cover_{STATE}{PART}.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Land_Cover_4326.tiff")
    gdalwarp(src, dst)
    print("Step 3: Land_Cover_4326")

def step4_combine_tree_land_4326():
    a = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Tree_Canopy_4326.tiff")
    b = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Land_Cover_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Tree_Land_Combined_4326.tiff")

    expr = (
        "(((B > 0) & (B < 255)) & (B != 41) & (B != 42) & (B != 43) & (A > 0)) * A + "
        "((B == 41) | (B == 42) | (B == 43) | (A <= 0)) * B"
    )
    gdal_calc([
        "-A", a,
        "-B", b,
        "--outfile", dst,
        "--calc", expr,
        "--type", "Byte"
    ])
    print("Step 4: Tree_Land_Combined_4326")

def step5_replace_urban_clutter_with_grass():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Tree_Land_Combined_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Grass-Only_4326.tiff")

    expr = (
        "(A == 0) * 44 + "
        "(A == 11) * 41 + "
        "(A == 12) * 34 + "
        "(A == 21) * 26 + "
        "(A == 22) * 26 + "
        "(A == 23) * 26 + "
        "(A == 24) * 26 + "
        "(A == 31) * 27 + "
        "(A == 41) * 23 + "
        "(A == 42) * 24 + "
        "(A == 43) * 25 + "
        "(A == 51) * 30 + "
        "(A == 52) * 29 + "
        "(A == 71) * 26 + "
        "(A == 72) * 32 + "
        "(A == 73) * 31 + "
        "(A == 74) * 31 + "
        "(A == 75) * 32 + "
        "(A == 81) * 18 + "
        "(A == 82) * 19 + "
        "(A == 90) * 25 + "
        "(A == 95) * 35"
    )
    gdal_calc([
        "-A", src,
        "--outfile", dst,
        "--calc", expr,
        "--type", "Byte"
    ])
    print("Step 5: Grass-Only_4326")

def step6_reclass_urban():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Tree_Land_Combined_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Reclassed-Urban_4326.tiff")

    expr = (
        "(A == 21)*10 + "
        "(A == 22)*1 + "
        "(A == 23)*2 + "
        "(A == 24)*3"
    )
    gdal_calc([
        "-A", src,
        "--outfile", dst,
        "--calc", expr,
        "--type", "Byte"
    ])
    print("Step 6: Reclassed-Urban_4326")

def step7_remove_clutter_roads_from_urban():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Reclassed-Urban_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Urban-Only_4326.tiff")

    grass_r_neighbors(src, dst, method="median", size=7)
    print("Step 7: Urban-Only_4326")

def step8_combine_grass_and_clean_urban():
    a = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Urban-Only_4326.tiff")
    b = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Grass-Only_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Combined-Clean_4326.tiff")

    expr = "((A > 0) & (B > 0)) * A + ((A <= 0) | (B <= 0)) * B"
    gdal_calc([
        "-A", a,
        "-B", b,
        "--outfile", dst,
        "--calc", expr,
        "--NoDataValue", "0",
        "--type", "Byte"
    ])
    print("Step 8: Combined-Clean_4326")

def step9_upsample_to_hd():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Combined-Clean_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Combined-Clean-HD_4326.tiff")

    ds = gdal.Open(src)
    if ds is None:
        raise RuntimeError(f"Cannot open {src}")
    gt = ds.GetGeoTransform()
    original_xRes = gt[1]
    original_yRes = abs(gt[5])

    new_xRes = original_xRes / PERCENTAGE
    new_yRes = original_yRes / PERCENTAGE

    gdal.Warp(
        dst,
        src,
        xRes=new_xRes,
        yRes=new_yRes,
        outputType=gdal.GDT_Byte
    )
    print("Step 9: Combined-Clean-HD_4326")

def step10_smooth_all_features():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Combined-Clean-HD_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Smoothed-HD_4326.tiff")

    grass_r_neighbors(src, dst, method="median", size=SIZE)
    print("Step 10: Smoothed-HD_4326")

def step11_convert_to_8bit_compressed():
    src = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Smoothed-HD_4326.tiff")
    dst = p("data", f"NLCD_{YEAR}_{STATE}{PART}_Smoothed-HD-Compressed_4326.tiff")

    gdal_translate(src, dst)
    print("Step 11: Smoothed-HD-Compressed_4326")

# ----------------- MAIN -----------------

def main():
    step1_tree_canopy_reclass()
    step2_warp_tree_canopy_4326()
    step3_warp_land_cover_4326()
    step4_combine_tree_land_4326()
    step5_replace_urban_clutter_with_grass()
    step6_reclass_urban()
    step7_remove_clutter_roads_from_urban()
    step8_combine_grass_and_clean_urban()
    step9_upsample_to_hd()
    step10_smooth_all_features()
    step11_convert_to_8bit_compressed()

if __name__ == "__main__":
    main()

Batch script for multiple processes

#!/bin/bash

python3 gen-scenery.py --part 89-86_29 --canopy 41 &
python3 gen-scenery.py --part 89-86_30 --canopy 41 &
python3 gen-scenery.py --part 89-86_31 --canopy 41 &
python3 gen-scenery.py --part 89-86_32 --canopy 41 &
python3 gen-scenery.py --part 89-86_33 --canopy 41 &
python3 gen-scenery.py --part 89-86_34 --canopy 41 &

wait
echo "All scenery jobs completed."