common module¶
The common module contains common functions and classes used by the other modules.
compute_d8_direction(dem, nodata_value=nan)
¶
Compute D8 flow direction from a DEM.
Source code in road_flood_risk_map/common.py
def compute_d8_direction(dem, nodata_value=np.nan):
"""Compute D8 flow direction from a DEM."""
# Placeholder for D8 direction computation logic
# This should return a numpy array of the same shape as dem
# with values indicating the flow direction.
direction_code = {
32: (-1, -1), # 0: North-West
64: (-1, 0), # 1: North
128: (-1, 1), # 2: North-East
1: (0, 1), # 3: East
2: (1, 1), # 4: South-East
4: (0, 1), # 5: South
8: (1, -1), # 6: South-West
16: (0, -1), # 7: West
}
d8_direction = np.zeros_like(dem, dtype=np.int32)
rows, cols = dem.shape
for i in range(0, rows):
for j in range(0, cols):
if dem[i, j] == nodata_value: # Assuming -9999 is the nodata value
d8_direction[i, j] = nodata_value
else:
# Compute the direction based on the surrounding cells
maximum_drop = -np.inf
elev = dem[i, j]
direction = -1
for k, (di, dj) in direction_code.items():
ni, nj = i + di, j + dj
if 0 <= ni < rows and 0 <= nj < cols:
# maximum_drop = change_in_z-value / distance * 100
change_in_z = elev - dem[ni, nj]
distance = np.sqrt(di**2 + dj**2)
drop = (change_in_z / distance * 100) if distance != 0 else 0
if drop > maximum_drop:
maximum_drop = drop
direction = k
d8_direction[i, j] = direction
return d8_direction
fill_depression_epsilon(dem, nodata_value=-9999)
¶
Fill depressions in a DEM using an epsilon-based approach.
Source code in road_flood_risk_map/common.py
def fill_depression_epsilon(dem, nodata_value=-9999):
"""
Fill depressions in a DEM using an epsilon-based approach.
"""
import heapq
import numpy as np
import queue
filled_dem = dem.copy()
rows, cols = dem.shape
open_pq = []
pits = queue.Queue()
closed_set = np.zeros((rows, cols), dtype=bool)
neighbors = [
(-1, 0), # North
(1, 0), # South
(0, -1), # West
(0, 1), # East
(-1, -1), # Northwest
(-1, 1), # Northeast
(1, -1), # Southwest
(1, 1), # Southeast
]
# Initialize the priority queue with border cells
for i in range(rows):
heapq.heappush(open_pq, (dem[i, 0], (i, 0)))
heapq.heappush(open_pq, (dem[i, cols - 1], (i, cols - 1)))
closed_set[i, 0] = True
closed_set[i, cols - 1] = True
for j in range(1, cols - 1):
heapq.heappush(open_pq, (dem[0, j], (0, j)))
heapq.heappush(open_pq, (dem[rows - 1, j], (rows - 1, j)))
closed_set[0, j] = True
closed_set[rows - 1, j] = True
pit_top = None
false_pit_cells = 0
while open_pq or not pits.empty():
current = None
if open_pq and (open_pq[0][0] == pit_top):
current = heapq.heappop(open_pq)
pit_top = None
elif not pits.empty():
current = pits.get()
if pit_top is None:
pit_top = dem[current[1][0], current[1][1]]
else:
current = heapq.heappop(open_pq)
pit_top = None
# Process neighbors of current cell
for dx, dy in neighbors:
nx, ny = current[1][0] + dx, current[1][1] + dy
if 0 <= nx < rows and 0 <= ny < cols and not closed_set[nx, ny]:
neighbor_value = dem[nx, ny]
closed_set[nx, ny] = True
if neighbor_value == nodata_value or np.isnan(neighbor_value):
pits.put((neighbor_value, (nx, ny)))
elif neighbor_value <= np.nextafter(current[0], np.float64("inf")):
next_after_value = np.nextafter(current[0], np.float64("inf"))
if pit_top is not None and (
pit_top < neighbor_value and next_after_value >= neighbor_value
):
false_pit_cells += 1
filled_dem[nx, ny] = next_after_value
pits.put((neighbor_value, (nx, ny)))
else:
# Otherwise, add to open set
heapq.heappush(open_pq, (neighbor_value, (nx, ny)))
return filled_dem
fill_depressions(dem)
¶
Fill depressions in a DEM using a simple algorithm.
Source code in road_flood_risk_map/common.py
def fill_depressions(dem):
"""
Fill depressions in a DEM using a simple algorithm.
"""
import heapq
import numpy as np
import queue
filled_dem = dem.copy()
rows, cols = dem.shape
open_pq = []
pits = queue.Queue()
closed_set = np.zeros((rows, cols), dtype=bool)
neighbors = [
(-1, 0), # North
(1, 0), # South
(0, -1), # West
(0, 1), # East
(-1, -1), # Northwest
(-1, 1), # Northeast
(1, -1), # Southwest
(1, 1), # Southeast
]
# Initialize the priority queue with border cells
for i in range(rows):
heapq.heappush(open_pq, (dem[i, 0], (i, 0)))
heapq.heappush(open_pq, (dem[i, cols - 1], (i, cols - 1)))
closed_set[i, 0] = True
closed_set[i, cols - 1] = True
for j in range(1, cols - 1):
heapq.heappush(open_pq, (dem[0, j], (0, j)))
heapq.heappush(open_pq, (dem[rows - 1, j], (rows - 1, j)))
closed_set[0, j] = True
closed_set[rows - 1, j] = True
while open_pq or not pits.empty():
current = None
if not pits.empty():
current = pits.get()
else:
current = heapq.heappop(open_pq)
# Process neighbors of current cell
for dx, dy in neighbors:
nx, ny = current[1][0] + dx, current[1][1] + dy
if 0 <= nx < rows and 0 <= ny < cols and not closed_set[nx, ny]:
neighbor_value = dem[nx, ny]
closed_set[nx, ny] = True
if neighbor_value <= current[0]:
# If neighbor is lower, add to pits
filled_dem[nx, ny] = current[0]
pits.put((neighbor_value, (nx, ny)))
else:
# Otherwise, add to open set
heapq.heappush(open_pq, (neighbor_value, (nx, ny)))
return filled_dem
fill_depressions_flow_dirs(dem)
¶
Fill depressions in a DEM using a simple algorithm.
Source code in road_flood_risk_map/common.py
def fill_depressions_flow_dirs(dem):
"""
Fill depressions in a DEM using a simple algorithm.
"""
import heapq
import numpy as np
import queue
def get_opposite_direction(x, y):
"""Get the opposite direction for a given D8 direction."""
return [-x, -y]
rows, cols = dem.shape
open_pq = []
closed_set = np.zeros((rows, cols), dtype=bool)
flow_dirs = np.zeros_like(dem, dtype=np.int32)
direction_code = {
(-1, 0): 64, # 1: North
(0, 1): 1, # 3: East
(0, 1): 4, # 5: South
(0, -1): 16, # 7: West
(-1, -1): 32, # 0: North-West
(-1, 1): 128, # 2: North-East
(1, 1): 2, # 4: South-East
(1, -1): 8, # 6: South-West
}
# Initialize the priority queue with border cells
for i in range(rows):
heapq.heappush(open_pq, (dem[i, 0], (i, 0)))
heapq.heappush(open_pq, (dem[i, cols - 1], (i, cols - 1)))
closed_set[i, 0] = True
if np.isnan(dem[i, 0]):
flow_dirs[i, 0] = None
else:
flow_dirs[i, 0] = 1
closed_set[i, cols - 1] = True
if np.isnan(dem[i, cols - 1]):
flow_dirs[i, cols - 1] = None
else:
flow_dirs[i, cols - 1] = 16
for j in range(1, cols - 1):
heapq.heappush(open_pq, (dem[0, j], (0, j)))
heapq.heappush(open_pq, (dem[rows - 1, j], (rows - 1, j)))
closed_set[0, j] = True
if np.isnan(dem[0, j]):
flow_dirs[0, j] = None
else:
flow_dirs[0, j] = 64
closed_set[rows - 1, j] = True
if np.isnan(dem[rows - 1, j]):
flow_dirs[rows - 1, j] = None
else:
flow_dirs[rows - 1, j] = 4
while open_pq:
current = heapq.heappop(open_pq)
# Process neighbors of current cell
for (dx, dy), direction in direction_code.items():
nx, ny = current[1][0] + dx, current[1][1] + dy
if 0 <= nx < rows and 0 <= ny < cols and not closed_set[nx, ny]:
neighbor_value = dem[nx, ny]
if np.isnan(neighbor_value):
flow_dirs[nx, ny] = None
else:
ox, oy = get_opposite_direction(nx, ny)
flow_dirs[nx, ny] = direction[(ox, oy)]
closed_set[nx, ny] = True
heapq.heappush(open_pq, (neighbor_value, (nx, ny)))
return flow_dirs
fix_raster_metadata(file_name, output_name)
¶
Converts the compress method string of the Raster file to uppercase as supported by WhiteBoxTools.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_name |
str |
Raster file to modify |
required |
output_name |
str |
Output raster file name without extension |
required |
Returns:
Type | Description |
---|---|
path (str) |
Full path of the output raster file |
Source code in road_flood_risk_map/common.py
def fix_raster_metadata(file_name: str, output_name: str):
"""
Converts the compress method string of the Raster file to uppercase as supported by WhiteBoxTools.
Args:
file_name (str): Raster file to modify
output_name (str): Output raster file name without extension
Returns:
path (str): Full path of the output raster file
"""
import rasterio
from rasterio.transform import Affine
import os
with rasterio.open(file_name) as src:
profile = src.profile.copy()
profile.update(compress=profile["compress"].upper())
profile.update(nodata=-9999)
bounds = src.bounds
# Fix bounds
shift = 0
if bounds.left < -180:
shift = 360
elif bounds.right > 180:
shift = -360
if shift != 0:
new_transform = Affine.translation(shift, 0) * src.transform
profile.update(transform=new_transform)
print(f"Fixed bounds by shifting {shift} degrees.")
with rasterio.open(f"{output_name}.tif", "w", **profile) as dst:
dst.write(src.read())
os.remove(file_name)
return f"{os.path.join(os.getcwd(),output_name)}.tif"