from argparse import ArgumentParser from collections import namedtuple import json import math from pathlib import Path import xml.etree.ElementTree as ET import cv2 import numpy as np from PIL import Image, ImageDraw Bounds = namedtuple('Bounds', ['maxlat', 'maxlon', 'minlat', 'minlon']) BoundsTypes = [float, float, float, float] Member = namedtuple('Member', ['type', 'ref', 'role']) MemberTypes = [str, int, str] Node = namedtuple('Node', ['id', 'lat', 'lon', 'tags', 'uid', 'version', 'visible']) NodeTypes = [int, float, float, dict, int, int, bool] Relation = namedtuple('Relation', ['id', 'members', 'tags', 'uid', 'version', 'visible']) RelationTypes = [int, list, dict, int, int, bool] Way = namedtuple('Way', ['id', 'nodes', 'tags', 'uid', 'version', 'visible']) WayTypes = [int, list, dict, int, int, bool] def create_bounds(xml) -> Bounds: return Bounds(*(key_type(xml.get(key)) for key_type, key in zip(BoundsTypes, Bounds._fields, strict=True))) def create_node(xml) -> Node: tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')} return Node(*(tags if key == 'tags' else key_type(xml.get(key)) for key_type, key in zip(NodeTypes, Node._fields, strict=True))) def create_way(xml) -> Way: nodes = [int(nd.get('ref')) for nd in xml.iter('nd')] tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')} params = [] for key_type, key in zip(WayTypes, Way._fields, strict=True): match key: case 'tags': params.append(tags) case 'nodes': params.append(nodes) case _: params.append(key_type(xml.get(key))) return Way(*params) def create_member(xml) -> Member: return Member(*(key_type(xml.get(key)) for key_type, key in zip(MemberTypes, Member._fields, strict=True))) def create_relation(xml) -> Relation: members = [create_member(member) for member in xml.iter('member')] tags = {tag.get('k'): tag.get('v') for tag in xml.iter('tag')} params = [] for key_type, key in zip(RelationTypes, Relation._fields, strict=True): match key: case 'tags': params.append(tags) case 'members': params.append(members) case _: params.append(key_type(xml.get(key))) return Relation(*params) def parse_xml(xml_path: Path) -> tuple[Bounds, dict[int, Node], dict[int, Way], dict[int, Relation]]: root = ET.parse(xml_path).getroot() bounds = root.find('bounds') if bounds is None: raise RuntimeError('Cannot find bounds') bounds = create_bounds(bounds) nodes = {} relations = {} ways = {} for element in root: match element.tag: case 'node': node = create_node(element) nodes[node.id] = node case 'way': way = create_way(element) ways[way.id] = way case 'relation': relation = create_relation(element) relations[relation.id] = relation case 'bounds': pass case _: raise RuntimeError(f'Unexpected tag : {element.tag}') return bounds, nodes, ways, relations def main(): parser = ArgumentParser() parser.add_argument('osm', type=Path, help='Path to OSM file to parse') parser.add_argument('--convert', action='store_true', default=False, help='Convert data into JSON') parser.add_argument('--map-width', type=int, default=1200, help='Target width of output map in pixels') arguments = parser.parse_args() osm_path: Path = arguments.osm convert_data: bool = arguments.convert map_target_width: int = arguments.map_width del arguments bounds, nodes, ways, relations = parse_xml(osm_path) if convert_data: with (osm_path.parent / f'{osm_path.stem}.json').open(mode='w', encoding='utf-8') as json_file: json_file.write('{\n') json_file.write( f'"bounds:" {json.dumps(dict(zip(bounds._fields, bounds, strict=True)), ensure_ascii=False)},\n') json_file.write('"nodes": [\n') previous_entry = '' for node in nodes.values(): if previous_entry: json_file.write(f' {previous_entry},\n') previous_entry = json.dumps( {k: v for k, v in zip(node._fields, node, strict=True) if not isinstance(v, dict) or v}, ensure_ascii=False) if previous_entry: json_file.write(f' {previous_entry}\n') json_file.write('],\n"ways": [\n') previous_entry = '' for way in ways.values(): if previous_entry: json_file.write(f' {previous_entry},\n') previous_entry = json.dumps( {k: v for k, v in zip(way._fields, way, strict=True) if not isinstance(v, dict) or v}, ensure_ascii=False) if previous_entry: json_file.write(f' {previous_entry}\n') json_file.write('],\n"relations": [\n') previous_entry = '' for relation in relations.values(): if previous_entry: json_file.write(f' {previous_entry},\n') previous_entry = json.dumps( {k: (v if k != 'members' else [dict(zip(m._fields, m, strict=True)) for m in v]) for k, v in zip(relation._fields, relation, strict=True) if not isinstance(v, dict) or v}, ensure_ascii=False) if previous_entry: json_file.write(f' {previous_entry}\n') json_file.write('}') return zoom_level: int = 0 zoom_scale: float = 0. def get_map_raw_coords(lon: float, lat: float) -> tuple[int, int]: nonlocal zoom_scale return ( int(zoom_scale * math.radians(lon)), int(zoom_scale * math.log(math.tan(math.pi / 4 + math.radians(lat) / 2))) ) # def get_gps_raw_coords(x: float, y: float) -> tuple[float, float]: # return ( # math.degrees(x / zoom_scale), # math.degrees(2 * math.atan(math.exp(y / zoom_scale)) - math.pi / 2.) # ) zoom_level = math.ceil(-math.log2(((bounds.maxlon - bounds.minlon) * 2 * math.pi) / map_target_width)) zoom_scale = (2**zoom_level * map_target_width) / (2 * math.pi) min_x, min_y = get_map_raw_coords(bounds.minlon, bounds.minlat) max_x, max_y = get_map_raw_coords(bounds.maxlon, bounds.maxlat) map_width = max_x - min_x map_height = max_y - min_y assert map_width > 0 and 0 < map_height < map_width * 10 def get_map_coords(lon: float, lat: float) -> tuple[int, int]: nonlocal min_x, max_y x, y = get_map_raw_coords(lon, lat) return (x - min_x, max_y - y) background_color = (235, 235, 235) image = Image.new('RGB', (map_width, map_height), background_color) draw = ImageDraw.Draw(image, 'RGB') def draw_way_polygon(way: Way, color: tuple[int, ...] | str, outline: tuple[int, ...] | str | None = None, width: int = 1): nonlocal bounds, draw, map_height, map_width, nodes coords = [] for node_id in way.nodes: node = nodes[node_id] new_coord = get_map_coords(node.lon, node.lat) # print(f'{new_coord=}') if not coords or new_coord != coords[-1]: coords.append(new_coord) if len(coords) > 2: # noqa: PLR2004 draw.polygon(coords, fill=color, outline=outline, width=width) def draw_way_line(way: Way, width: int, color: tuple[int, ...] | str, joint: str | None = None): nonlocal bounds, draw, map_height, map_width, nodes coords = [] for node_id in way.nodes: node = nodes[node_id] new_coord = get_map_coords(node.lon, node.lat) # print(f'{new_coord=}') if not coords or new_coord != coords[-1]: coords.append(new_coord) if len(coords) > 1: draw.line(coords, width=width, fill=color, joint=joint) for relation in relations.values(): if 'natural' in relation.tags and relation.tags['natural'] == 'wood': outer_ways: list[Way] = [] inner_ways: list[Way] = [] for member in relation.members: member: Member if member.type != 'way' or member.ref not in ways: continue if member.role == 'outer': outer_ways.append(ways[member.ref]) elif member.role == 'inner': inner_ways.append(ways[member.ref]) for way in outer_ways: draw_way_polygon(way, (140, 200, 120)) for way in inner_ways: draw_way_polygon(way, background_color) bridge_outline = (80, 80, 80) for way in ways.values(): if 'amenity' in way.tags: match way.tags['amenity']: case 'school' | 'kindergarten' | 'library' | 'research_institute' | 'university': draw_way_polygon(way, (250, 250, 220)) case ('bicycle_parking' | 'bus_station' | 'fuel' | 'parking' | 'parking_entrance' | 'parking_space' | 'taxi'): draw_way_polygon(way, (220, 220, 220), outline=(180, 180, 180)) case _: draw_way_polygon(way, (240, 200, 200)) for way in ways.values(): if 'highway' in way.tags: match way.tags['highway']: case 'motorway' | 'motorway_link': draw_way_line(way, 25, (220, 160, 160), joint='curve') case 'trunk' | 'trunk_link': draw_way_line(way, 21, bridge_outline if 'bridge' in way.tags else (230, 180, 130), joint='curve') draw_way_line(way, 19, (240, 200, 170), joint='curve') case 'primary' | 'primary_link': draw_way_line(way, 19, bridge_outline if 'bridge' in way.tags else (230, 200, 110), joint='curve') draw_way_line(way, 17, (245, 225, 150), joint='curve') case 'secondary' | 'secondary_link': draw_way_line(way, 17, bridge_outline if 'bridge' in way.tags else (200, 200, 140), joint='curve') draw_way_line(way, 15, (220, 220, 180), joint='curve') case 'tertiary' | 'tertiraty_link': draw_way_line(way, 15, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve') draw_way_line(way, 13, (245, 245, 245), joint='curve') case 'residential' | 'unclassified' | 'road': draw_way_line(way, 13, bridge_outline if 'bridge' in way.tags else (160, 160, 160), joint='curve') draw_way_line(way, 11, (250, 250, 250), joint='curve') elif 'waterway' in way.tags and way.tags['waterway'] in {'river', 'steam', 'tidal_channel', 'flowline'}: draw_way_line(way, 21, (180, 200, 240), joint='curve') elif 'building' in way.tags: draw_way_polygon(way, (190, 180, 160), outline=(120, 120, 120)) image = cv2.cvtColor(np.asarray(image), cv2.COLOR_BGR2RGB) cv2.namedWindow('map', cv2.WINDOW_KEEPRATIO) while cv2.waitKey(50) & 0xff != ord('q'): cv2.imshow('map', image) cv2.destroyAllWindows() if __name__ == '__main__': main()