python-osm/map.py
2025-09-03 14:15:39 +09:00

254 lines
11 KiB
Python

from argparse import ArgumentParser
from collections import namedtuple
import json
from pathlib import Path
import xml.etree.ElementTree as ET
import cv2
import numpy as np
from PIL import Image, ImageDraw
Bounds = namedtuple('Bounds', ['maxlat', 'maxlon', 'minlat', 'minlon'])
BoundsTypes = [float, float, float, float]
Member = namedtuple('Member', ['type', 'ref', 'role'])
MemberTypes = [str, int, str]
Node = namedtuple('Node', ['id', 'lat', 'lon', 'tags', 'uid', 'version', 'visible'])
NodeTypes = [int, float, float, dict, int, int, bool]
Relation = namedtuple('Relation', ['id', 'members', 'tags', 'uid', 'version', 'visible'])
RelationTypes = [int, list, dict, int, int, bool]
Way = namedtuple('Way', ['id', 'nodes', 'tags', 'uid', 'version', 'visible'])
WayTypes = [int, list, dict, int, int, bool]
def create_bounds(xml) -> Bounds:
return Bounds(*(key_type(xml.get(key)) for key_type, key in zip(BoundsTypes, Bounds._fields, strict=True)))
def create_node(xml) -> Node:
tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')}
return Node(*(tags if key == 'tags' else key_type(xml.get(key))
for key_type, key in zip(NodeTypes, Node._fields, strict=True)))
def create_way(xml) -> Way:
nodes = [int(nd.get('ref')) for nd in xml.iter('nd')]
tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')}
params = []
for key_type, key in zip(WayTypes, Way._fields, strict=True):
match key:
case 'tags':
params.append(tags)
case 'nodes':
params.append(nodes)
case _:
params.append(key_type(xml.get(key)))
return Way(*params)
def create_member(xml) -> Member:
return Member(*(key_type(xml.get(key)) for key_type, key in zip(MemberTypes, Member._fields, strict=True)))
def create_relation(xml) -> Relation:
members = [create_member(member) for member in xml.iter('member')]
tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')}
params = []
for key_type, key in zip(RelationTypes, Relation._fields, strict=True):
match key:
case 'tags':
params.append(tags)
case 'members':
params.append(members)
case _:
params.append(key_type(xml.get(key)))
return Relation(*params)
def parse_xml(xml_path: Path) -> tuple[Bounds, dict[int, Node], dict[int, Way], dict[int, Relation]]:
root = ET.parse(xml_path).getroot()
bounds = root.find('bounds')
if bounds is None:
raise RuntimeError('Cannot find bounds')
bounds = create_bounds(bounds)
nodes = {}
relations = {}
ways = {}
for element in root:
match element.tag:
case 'node':
node = create_node(element)
nodes[node.id] = node
case 'way':
way = create_way(element)
ways[way.id] = way
case 'relation':
relation = create_relation(element)
relations[relation.id] = relation
case 'bounds':
pass
case _:
raise RuntimeError(f'Unexpected tag : {element.tag}')
return bounds, nodes, ways, relations
def main():
parser = ArgumentParser()
parser.add_argument('osm', type=Path, help='Path to OSM file to parse')
parser.add_argument('--convert', action='store_true', default=False, help='Convert data into JSON')
parser.add_argument('--map-height', type=int, default=720, help='Height of output map in pixels')
arguments = parser.parse_args()
osm_path: Path = arguments.osm
convert_data: bool = arguments.convert
map_height: int = arguments.map_height
del arguments
bounds, nodes, ways, relations = parse_xml(osm_path)
print(f'{bounds} {len(nodes)=} {len(ways)=} {len(relations)=}')
if convert_data:
with (osm_path.parent / f'{osm_path.stem}.json').open(mode='w', encoding='utf-8') as json_file:
json_file.write('{\n')
json_file.write(
f'"bounds:" {json.dumps(dict(zip(bounds._fields, bounds, strict=True)), ensure_ascii=False)},\n')
json_file.write('"nodes": [\n')
previous_entry = ''
for node in nodes.values():
if previous_entry:
json_file.write(f' {previous_entry},\n')
previous_entry = json.dumps(
{k: v for k, v in zip(node._fields, node, strict=True) if not isinstance(v, dict) or v},
ensure_ascii=False)
if previous_entry:
json_file.write(f' {previous_entry}\n')
json_file.write('],\n"ways": [\n')
previous_entry = ''
for way in ways.values():
if previous_entry:
json_file.write(f' {previous_entry},\n')
previous_entry = json.dumps(
{k: v for k, v in zip(way._fields, way, strict=True) if not isinstance(v, dict) or v},
ensure_ascii=False)
if previous_entry:
json_file.write(f' {previous_entry}\n')
json_file.write('],\n"relations": [\n')
previous_entry = ''
for relation in relations.values():
if previous_entry:
json_file.write(f' {previous_entry},\n')
previous_entry = json.dumps(
{k: (v if k != 'members' else [dict(zip(m._fields, m, strict=True)) for m in v])
for k, v in zip(relation._fields, relation, strict=True) if not isinstance(v, dict) or v},
ensure_ascii=False)
if previous_entry:
json_file.write(f' {previous_entry}\n')
json_file.write('}')
return
lat_range = bounds.maxlat - bounds.minlat
lon_range = bounds.maxlon - bounds.minlon
map_ratio = lon_range / lat_range
map_width = int(map_height * map_ratio)
lat_scale = map_height / lat_range
lon_scale = map_width / lon_range
background_color = (235, 235, 235)
image = Image.new('RGB', (map_width, map_height), background_color)
draw = ImageDraw.Draw(image, 'RGB')
def draw_way_polygon(way: Way, color: tuple[int, ...] | str, outline: tuple[int, ...] | str | None = None,
width: int = 1):
nonlocal bounds, draw, lat_scale, lon_scale, map_height, map_width, nodes
coords = []
for node_id in way.nodes:
node = nodes[node_id]
new_coord = (
min(max((node.lon - bounds.minlon) * lon_scale, 0), map_width),
min(max(map_height - ((node.lat - bounds.minlat) * lat_scale), 0), map_height))
if not coords or new_coord != coords[-1]:
coords.append(new_coord)
draw.polygon(coords, fill=color, outline=outline, width=width)
def draw_way_line(way: Way, width: int, color: tuple[int, ...] | str, joint: str | None = None):
nonlocal bounds, draw, lat_scale, lon_scale, map_height, map_width, nodes
coords = []
for node_id in way.nodes:
node = nodes[node_id]
new_coord = (
min(max((node.lon - bounds.minlon) * lon_scale, 0), map_width),
min(max(map_height - ((node.lat - bounds.minlat) * lat_scale), 0), map_height))
if not coords or new_coord != coords[-1]:
coords.append(new_coord)
draw.line(coords, width=width, fill=color, joint=joint)
for relation in relations.values():
if 'natural' in relation.tags and relation.tags['natural'] == 'wood':
outer_ways: list[Way] = []
inner_ways: list[Way] = []
for member in relation.members:
member: Member
if member.type != 'way' or member.ref not in ways:
continue
if member.role == 'outer':
outer_ways.append(ways[member.ref])
elif member.role == 'inner':
inner_ways.append(ways[member.ref])
for way in outer_ways:
draw_way_polygon(way, (140, 200, 120))
for way in inner_ways:
draw_way_polygon(way, background_color)
bridge_outline = (80, 80, 80)
for way in ways.values():
if 'amenity' in way.tags:
match way.tags['amenity']:
case 'school' | 'kindergarten' | 'library' | 'research_institute' | 'university':
draw_way_polygon(way, (250, 250, 220))
case ('bicycle_parking' | 'bus_station' | 'fuel' | 'parking' | 'parking_entrance' | 'parking_space'
| 'taxi'):
draw_way_polygon(way, (220, 220, 220), outline=(180, 180, 180))
case _:
draw_way_polygon(way, (240, 200, 200))
for way in ways.values():
if 'highway' in way.tags:
match way.tags['highway']:
case 'motorway' | 'motorway_link':
draw_way_line(way, 15, (220, 160, 160), joint='curve')
case 'trunk' | 'trunk_link':
draw_way_line(way, 13, bridge_outline if 'bridge' in way.tags else (230, 180, 130), joint='curve')
draw_way_line(way, 11, (240, 200, 170), joint='curve')
case 'primary' | 'primary_link':
draw_way_line(way, 11, bridge_outline if 'bridge' in way.tags else (230, 200, 110), joint='curve')
draw_way_line(way, 9, (245, 225, 150), joint='curve')
case 'secondary' | 'secondary_link':
draw_way_line(way, 11, bridge_outline if 'bridge' in way.tags else (200, 200, 140), joint='curve')
draw_way_line(way, 9, (220, 220, 180), joint='curve')
case 'tertiary' | 'tertiraty_link':
draw_way_line(way, 9, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve')
draw_way_line(way, 7, (245, 245, 245), joint='curve')
case 'residential' | 'unclassified' | 'road':
draw_way_line(way, 7, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve')
draw_way_line(way, 5, (250, 250, 250), joint='curve')
elif 'waterway' in way.tags and way.tags['waterway'] in {'river', 'steam', 'tidal_channel', 'flowline'}:
draw_way_line(way, 11, (180, 200, 240), joint='curve')
elif 'building' in way.tags:
draw_way_polygon(way, (190, 180, 160), outline=(120, 120, 120))
image = cv2.cvtColor(np.asarray(image), cv2.COLOR_BGR2RGB)
print(f'{image.shape=}')
cv2.namedWindow('map', cv2.WINDOW_KEEPRATIO)
while cv2.waitKey(50) & 0xff != ord('q'):
cv2.imshow('map', image)
cv2.destroyAllWindows()
if __name__ == '__main__':
main()