python-osm/map.py

279 lines
11 KiB
Python

from argparse import ArgumentParser
from collections import namedtuple
import json
import math
from pathlib import Path
import xml.etree.ElementTree as ET
import cv2
import numpy as np
from PIL import Image, ImageDraw
Bounds = namedtuple('Bounds', ['maxlat', 'maxlon', 'minlat', 'minlon'])
BoundsTypes = [float, float, float, float]
Member = namedtuple('Member', ['type', 'ref', 'role'])
MemberTypes = [str, int, str]
Node = namedtuple('Node', ['id', 'lat', 'lon', 'tags', 'uid', 'version', 'visible'])
NodeTypes = [int, float, float, dict, int, int, bool]
Relation = namedtuple('Relation', ['id', 'members', 'tags', 'uid', 'version', 'visible'])
RelationTypes = [int, list, dict, int, int, bool]
Way = namedtuple('Way', ['id', 'nodes', 'tags', 'uid', 'version', 'visible'])
WayTypes = [int, list, dict, int, int, bool]
def create_bounds(xml) -> Bounds:
return Bounds(*(key_type(xml.get(key)) for key_type, key in zip(BoundsTypes, Bounds._fields, strict=True)))
def create_node(xml) -> Node:
tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')}
return Node(*(tags if key == 'tags' else key_type(xml.get(key))
for key_type, key in zip(NodeTypes, Node._fields, strict=True)))
def create_way(xml) -> Way:
nodes = [int(nd.get('ref')) for nd in xml.iter('nd')]
tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')}
params = []
for key_type, key in zip(WayTypes, Way._fields, strict=True):
match key:
case 'tags':
params.append(tags)
case 'nodes':
params.append(nodes)
case _:
params.append(key_type(xml.get(key)))
return Way(*params)
def create_member(xml) -> Member:
return Member(*(key_type(xml.get(key)) for key_type, key in zip(MemberTypes, Member._fields, strict=True)))
def create_relation(xml) -> Relation:
members = [create_member(member) for member in xml.iter('member')]
tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')}
params = []
for key_type, key in zip(RelationTypes, Relation._fields, strict=True):
match key:
case 'tags':
params.append(tags)
case 'members':
params.append(members)
case _:
params.append(key_type(xml.get(key)))
return Relation(*params)
def parse_xml(xml_path: Path) -> tuple[Bounds, dict[int, Node], dict[int, Way], dict[int, Relation]]:
root = ET.parse(xml_path).getroot()
bounds = root.find('bounds')
if bounds is None:
raise RuntimeError('Cannot find bounds')
bounds = create_bounds(bounds)
nodes = {}
relations = {}
ways = {}
for element in root:
match element.tag:
case 'node':
node = create_node(element)
nodes[node.id] = node
case 'way':
way = create_way(element)
ways[way.id] = way
case 'relation':
relation = create_relation(element)
relations[relation.id] = relation
case 'bounds':
pass
case _:
raise RuntimeError(f'Unexpected tag : {element.tag}')
return bounds, nodes, ways, relations
def main():
parser = ArgumentParser()
parser.add_argument('osm', type=Path, help='Path to OSM file to parse')
parser.add_argument('--convert', action='store_true', default=False, help='Convert data into JSON')
parser.add_argument('--map-width', type=int, default=1200, help='Target width of output map in pixels')
arguments = parser.parse_args()
osm_path: Path = arguments.osm
convert_data: bool = arguments.convert
map_target_width: int = arguments.map_width
del arguments
bounds, nodes, ways, relations = parse_xml(osm_path)
if convert_data:
with (osm_path.parent / f'{osm_path.stem}.json').open(mode='w', encoding='utf-8') as json_file:
json_file.write('{\n')
json_file.write(
f'"bounds:" {json.dumps(dict(zip(bounds._fields, bounds, strict=True)), ensure_ascii=False)},\n')
json_file.write('"nodes": [\n')
previous_entry = ''
for node in nodes.values():
if previous_entry:
json_file.write(f' {previous_entry},\n')
previous_entry = json.dumps(
{k: v for k, v in zip(node._fields, node, strict=True) if not isinstance(v, dict) or v},
ensure_ascii=False)
if previous_entry:
json_file.write(f' {previous_entry}\n')
json_file.write('],\n"ways": [\n')
previous_entry = ''
for way in ways.values():
if previous_entry:
json_file.write(f' {previous_entry},\n')
previous_entry = json.dumps(
{k: v for k, v in zip(way._fields, way, strict=True) if not isinstance(v, dict) or v},
ensure_ascii=False)
if previous_entry:
json_file.write(f' {previous_entry}\n')
json_file.write('],\n"relations": [\n')
previous_entry = ''
for relation in relations.values():
if previous_entry:
json_file.write(f' {previous_entry},\n')
previous_entry = json.dumps(
{k: (v if k != 'members' else [dict(zip(m._fields, m, strict=True)) for m in v])
for k, v in zip(relation._fields, relation, strict=True) if not isinstance(v, dict) or v},
ensure_ascii=False)
if previous_entry:
json_file.write(f' {previous_entry}\n')
json_file.write('}')
return
zoom_level: int = 0
zoom_scale: float = 0.
def get_map_raw_coords(lon: float, lat: float) -> tuple[int, int]:
return (
int(zoom_scale * math.radians(lon)),
int(zoom_scale * math.log(math.tan(math.pi / 4 + math.radians(lat) / 2)))
)
# def get_gps_raw_coords(x: float, y: float) -> tuple[float, float]:
# return (
# math.degrees(x / zoom_scale),
# math.degrees(2 * math.atan(math.exp(y / zoom_scale)) - math.pi / 2.)
# )
map_width = 0
map_height = 0
min_x = 0
max_y = 0
for zoom_level in range(50):
zoom_scale = 2**zoom_level / 2 * math.pi
min_x, min_y = get_map_raw_coords(bounds.minlon, bounds.minlat)
max_x, max_y = get_map_raw_coords(bounds.maxlon, bounds.maxlat)
map_width = max_x - min_x
if map_width >= map_target_width:
map_height = abs(max_y - min_y)
break
assert map_width > 0 and 0 < map_height < map_width * 3
def get_map_coords(lon: float, lat: float) -> tuple[int, int]:
nonlocal min_x, max_y
x, y = get_map_raw_coords(lon, lat)
return (x - min_x, max_y - y)
background_color = (235, 235, 235)
image = Image.new('RGB', (map_width, map_height), background_color)
draw = ImageDraw.Draw(image, 'RGB')
def draw_way_polygon(way: Way, color: tuple[int, ...] | str, outline: tuple[int, ...] | str | None = None,
width: int = 1):
nonlocal bounds, draw, map_height, map_width, nodes
coords = []
for node_id in way.nodes:
node = nodes[node_id]
new_coord = get_map_coords(node.lon, node.lat)
# print(f'{new_coord=}')
if len(coords) < 2 or new_coord != coords[-1]:
coords.append(new_coord)
draw.polygon(coords, fill=color, outline=outline, width=width)
def draw_way_line(way: Way, width: int, color: tuple[int, ...] | str, joint: str | None = None):
nonlocal bounds, draw, map_height, map_width, nodes
coords = []
for node_id in way.nodes:
node = nodes[node_id]
new_coord = get_map_coords(node.lon, node.lat)
# print(f'{new_coord=}')
if len(coords) < 2 or new_coord != coords[-1]:
coords.append(new_coord)
draw.line(coords, width=width, fill=color, joint=joint)
for relation in relations.values():
if 'natural' in relation.tags and relation.tags['natural'] == 'wood':
outer_ways: list[Way] = []
inner_ways: list[Way] = []
for member in relation.members:
member: Member
if member.type != 'way' or member.ref not in ways:
continue
if member.role == 'outer':
outer_ways.append(ways[member.ref])
elif member.role == 'inner':
inner_ways.append(ways[member.ref])
for way in outer_ways:
draw_way_polygon(way, (140, 200, 120))
for way in inner_ways:
draw_way_polygon(way, background_color)
bridge_outline = (80, 80, 80)
for way in ways.values():
if 'amenity' in way.tags:
match way.tags['amenity']:
case 'school' | 'kindergarten' | 'library' | 'research_institute' | 'university':
draw_way_polygon(way, (250, 250, 220))
case ('bicycle_parking' | 'bus_station' | 'fuel' | 'parking' | 'parking_entrance' | 'parking_space'
| 'taxi'):
draw_way_polygon(way, (220, 220, 220), outline=(180, 180, 180))
case _:
draw_way_polygon(way, (240, 200, 200))
for way in ways.values():
if 'highway' in way.tags:
match way.tags['highway']:
case 'motorway' | 'motorway_link':
draw_way_line(way, 25, (220, 160, 160), joint='curve')
case 'trunk' | 'trunk_link':
draw_way_line(way, 21, bridge_outline if 'bridge' in way.tags else (230, 180, 130), joint='curve')
draw_way_line(way, 19, (240, 200, 170), joint='curve')
case 'primary' | 'primary_link':
draw_way_line(way, 19, bridge_outline if 'bridge' in way.tags else (230, 200, 110), joint='curve')
draw_way_line(way, 17, (245, 225, 150), joint='curve')
case 'secondary' | 'secondary_link':
draw_way_line(way, 17, bridge_outline if 'bridge' in way.tags else (200, 200, 140), joint='curve')
draw_way_line(way, 15, (220, 220, 180), joint='curve')
case 'tertiary' | 'tertiraty_link':
draw_way_line(way, 15, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve')
draw_way_line(way, 13, (245, 245, 245), joint='curve')
case 'residential' | 'unclassified' | 'road':
draw_way_line(way, 13, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve')
draw_way_line(way, 11, (250, 250, 250), joint='curve')
elif 'waterway' in way.tags and way.tags['waterway'] in {'river', 'steam', 'tidal_channel', 'flowline'}:
draw_way_line(way, 21, (180, 200, 240), joint='curve')
elif 'building' in way.tags:
draw_way_polygon(way, (190, 180, 160), outline=(120, 120, 120))
image = cv2.cvtColor(np.asarray(image), cv2.COLOR_BGR2RGB)
cv2.namedWindow('map', cv2.WINDOW_KEEPRATIO)
while cv2.waitKey(50) & 0xff != ord('q'):
cv2.imshow('map', image)
cv2.destroyAllWindows()
if __name__ == '__main__':
main()