street_find.py
· 17 KiB · Python
原始檔案
import argparse
import bisect
import csv
import unicodedata
from dataclasses import dataclass
_EXTRA_MAP = str.maketrans({"ł": "l", "Ł": "L"})
@dataclass(frozen=True)
class City:
sym: str
name: str
woj_code: str
woj_name: str
pow_code: str
gmi_code: str
rodz_gmi: str
rm: str
mz: str
sympod: str
@dataclass(frozen=True)
class CityGroup:
label: str
woj_code: str
woj_name: str
pow_code: str
members: tuple[City, ...]
def normalize(text: str) -> str:
text = text.translate(_EXTRA_MAP)
text = unicodedata.normalize("NFKD", text.lower())
return "".join(c for c in text if not unicodedata.combining(c))
def load_terc(path: str, encoding: str = "utf-8-sig") -> tuple[dict[str, str], dict[tuple[str, str, str, str], str]]:
"""Laduje mape kodow wojewodztw i jednostek TERC."""
woj_names: dict[str, str] = {}
terc_units: dict[tuple[str, str, str, str], str] = {}
with open(path, encoding=encoding, newline="") as f:
reader = csv.DictReader(f, delimiter=";")
for row in reader:
woj = row["WOJ"].strip()
pow_code = row["POW"].strip()
gmi_code = row["GMI"].strip()
rodz_code = row["RODZ"].strip()
name = row["NAZWA"].strip()
if woj and not pow_code and not gmi_code and name:
woj_names[woj] = name
if woj and pow_code and gmi_code and rodz_code and name:
terc_units[(woj, pow_code, gmi_code, rodz_code)] = name
return woj_names, terc_units
def load_simc(path: str, woj_names: dict[str, str], encoding: str = "utf-8-sig") -> dict[str, City]:
"""Laduje miejscowosci z SIMC i dokleja nazwe wojewodztwa z TERC."""
cities: dict[str, City] = {}
with open(path, encoding=encoding, newline="") as f:
reader = csv.DictReader(f, delimiter=";")
for row in reader:
sym = row["SYM"].strip()
woj_code = row["WOJ"].strip()
cities[sym] = City(
sym=sym,
name=row["NAZWA"].strip(),
woj_code=woj_code,
woj_name=woj_names.get(woj_code, "nieznane"),
pow_code=row["POW"].strip(),
gmi_code=row["GMI"].strip(),
rodz_gmi=row["RODZ_GMI"].strip(),
rm=row["RM"].strip(),
mz=row["MZ"].strip(),
sympod=row["SYMPOD"].strip(),
)
return cities
def load_ulic(
path: str,
terc_units: dict[tuple[str, str, str, str], str],
encoding: str = "utf-8-sig",
) -> dict[str, list[dict[str, str]]]:
"""Laduje ulice z ULIC i grupuje po SYM miejscowosci."""
streets_by_sym: dict[str, list[dict[str, str]]] = {}
with open(path, encoding=encoding, newline="") as f:
reader = csv.DictReader(f, delimiter=";")
for row in reader:
sym = row["SYM"].strip()
cecha = row["CECHA"].strip()
nazwa_1 = row["NAZWA_1"].strip()
nazwa_2 = row["NAZWA_2"].strip()
full = " ".join(part for part in [cecha, nazwa_1, nazwa_2] if part).strip()
street = {
"woj": row["WOJ"].strip(),
"pow": row["POW"].strip(),
"gmi": row["GMI"].strip(),
"rodz_gmi": row["RODZ_GMI"].strip(),
"sym": sym,
"sym_ul": row["SYM_UL"].strip(),
"full": full,
"cecha": cecha,
"nazwa_1": nazwa_1,
"nazwa_2": nazwa_2,
"stan_na": row["STAN_NA"].strip(),
"terc_unit_name": terc_units.get(
(row["WOJ"].strip(), row["POW"].strip(), row["GMI"].strip(), row["RODZ_GMI"].strip()),
"",
),
}
streets_by_sym.setdefault(sym, []).append(street)
return streets_by_sym
def format_street(
street: dict[str, str],
city_name_basic: str = "",
city_name_precise: str = "",
) -> str:
basic_name = city_name_basic or street.get("city_name_basic", "")
precise_name = city_name_precise or street.get("city_name_precise", "") or street.get("terc_unit_name", "")
if precise_name and basic_name and normalize(precise_name) == normalize(basic_name):
precise_name = ""
parts = [
f"full={street['full']}",
f"sym_ul={street['sym_ul']}",
f"cecha={street['cecha']}",
f"nazwa_1={street['nazwa_1']}",
f"nazwa_2={street['nazwa_2']}",
f"woj={street['woj']}",
f"pow={street['pow']}",
f"gmi={street['gmi']}",
f"rodz_gmi={street['rodz_gmi']}",
f"sym={street['sym']}",
f"miejscowosc={basic_name}",
]
if precise_name:
parts.append(f"miejscowosc_precyzyjna={precise_name}")
parts.append(f"stan_na={street['stan_na']}")
return " | ".join(parts)
def build_city_index(cities: dict[str, City]) -> tuple[list[tuple[str, City]], list[str]]:
"""Buduje indeks prefiksowy do wyszukiwania miejscowosci."""
index: list[tuple[str, City]] = []
for city in cities.values():
words = city.name.split()
for word in words:
index.append((normalize(word), city))
if len(words) > 1:
index.append((normalize(city.name.replace(" ", "")), city))
index.sort(key=lambda x: x[0])
keys = [item[0] for item in index]
return index, keys
def search_cities(query: str, city_index: list[tuple[str, City]], city_keys: list[str], limit: int = 10) -> list[City]:
query_norm = normalize(query)
if len(query_norm) < 2:
return []
lo = bisect.bisect_left(city_keys, query_norm)
hi = bisect.bisect_left(city_keys, query_norm + "\uffff")
seen: dict[str, City] = {}
for _, city in city_index[lo:hi]:
if city.sym not in seen:
seen[city.sym] = city
if len(seen) >= limit:
break
return list(seen.values())
def build_street_index(streets: list[dict[str, str]]) -> tuple[list[tuple[str, dict[str, str]]], list[str]]:
"""Buduje indeks prefiksowy do wyszukiwania ulic w jednej miejscowosci."""
index: list[tuple[str, dict[str, str]]] = []
for street in streets:
words = [street["cecha"], street["nazwa_1"], street["nazwa_2"]]
for word in words:
if word:
index.append((normalize(word), street))
if street["nazwa_1"] and street["nazwa_2"]:
index.append((normalize(street["nazwa_1"] + street["nazwa_2"]), street))
index.sort(key=lambda x: x[0])
keys = [item[0] for item in index]
return index, keys
def search_streets(
query: str,
street_index: list[tuple[str, dict[str, str]]],
street_keys: list[str],
limit: int = 20,
) -> list[dict[str, str]]:
query_norm = normalize(query)
if len(query_norm) < 2:
return []
lo = bisect.bisect_left(street_keys, query_norm)
hi = bisect.bisect_left(street_keys, query_norm + "\uffff")
seen: dict[str, dict[str, str]] = {}
for _, street in street_index[lo:hi]:
key = street["sym_ul"]
if key not in seen:
seen[key] = street
if len(seen) >= limit:
break
return list(seen.values())
def city_group_key(city: City) -> tuple[str, ...]:
"""Buduje klucz grupowania tak, by laczyc administracyjne czesci jednego miasta."""
if city.sympod and city.sympod != city.sym:
return ("parent", city.sympod)
if city.rm in {"95", "98"} and city.mz == "1":
return ("rm95_98", normalize(city.name), city.woj_code, city.pow_code)
return ("self", city.sym)
def group_cities(cities: list[City]) -> list[CityGroup]:
grouped: dict[tuple[str, ...], list[City]] = {}
for city in cities:
grouped.setdefault(city_group_key(city), []).append(city)
groups: list[CityGroup] = []
for members in grouped.values():
sorted_members = sorted(members, key=lambda c: c.sym)
first = sorted_members[0]
groups.append(
CityGroup(
label=first.name,
woj_code=first.woj_code,
woj_name=first.woj_name,
pow_code=first.pow_code,
members=tuple(sorted_members),
)
)
return groups
def merge_group_streets(
group: CityGroup,
streets_by_sym: dict[str, list[dict[str, str]]],
include_precise_city_name: bool = False,
) -> list[dict[str, str]]:
merged: list[dict[str, str]] = []
seen_sym_ul: set[str] = set()
for city in group.members:
for street in streets_by_sym.get(city.sym, []):
sym_ul = street["sym_ul"]
if sym_ul not in seen_sym_ul:
seen_sym_ul.add(sym_ul)
if include_precise_city_name:
precise_name = street.get("terc_unit_name") or city.name
enriched = {**street, "city_name_basic": city.name}
if normalize(precise_name) != normalize(city.name):
enriched["city_name_precise"] = precise_name
merged.append(enriched)
else:
merged.append(street)
return merged
def city_match_rank(query_norm: str, label: str) -> tuple[int, str]:
label_norm = normalize(label)
words = label_norm.split()
if label_norm == query_norm:
match_level = 0
elif label_norm.startswith(query_norm):
match_level = 1
elif any(word.startswith(query_norm) for word in words):
match_level = 2
elif query_norm in label_norm:
match_level = 3
else:
match_level = 4
return match_level, label_norm
def print_stats(woj_names: dict[str, str], cities: dict[str, City], streets_by_sym: dict[str, list[dict[str, str]]]) -> None:
streets_count = sum(len(items) for items in streets_by_sym.values())
print(f"Wojewodztwa (TERC): {len(woj_names)}")
print(f"Miejscowosci (SIMC): {len(cities)}")
print(f"Miejscowosci z ulicami (ULIC): {len(streets_by_sym)}")
print(f"Lacznie ulic (ULIC): {streets_count}")
def run_search_city(query: str, city_index: list[tuple[str, City]], city_keys: list[str], limit: int) -> int:
results = search_cities(query, city_index, city_keys, limit=limit)
if not results:
print("Brak wynikow.")
return 1
for city in results:
print(
f"{city.name} | sym={city.sym} | woj={city.woj_code} ({city.woj_name}) "
f"| pow={city.pow_code} | gmi={city.gmi_code}"
)
return 0
def run_list_streets(
city_sym: str,
streets_by_sym: dict[str, list[dict[str, str]]],
cities_by_sym: dict[str, City],
limit: int,
) -> int:
streets = streets_by_sym.get(city_sym, [])
if not streets:
print(f"Brak ulic dla SYM={city_sym}.")
return 1
city_name = cities_by_sym.get(city_sym).name if city_sym in cities_by_sym else ""
for street in streets[:limit]:
precise_name = street.get("terc_unit_name") or city_name
print(format_street(street, city_name_basic=city_name, city_name_precise=precise_name))
if len(streets) > limit:
print(f"... i jeszcze {len(streets) - limit} ulic")
return 0
def run_interactive(
city_index: list[tuple[str, City]],
city_keys: list[str],
streets_by_sym: dict[str, list[dict[str, str]]],
cities_by_sym: dict[str, City],
city_limit: int,
street_limit: int,
) -> int:
print("Tryb interaktywny.")
print("Wpisz nazwe miejscowosci (min. 2 znaki), pusta linia = wyjscie.\n")
while True:
try:
city_query = input("miasto >>> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nKoniec.")
return 0
if not city_query:
print("Koniec.")
return 0
if len(normalize(city_query)) < 2:
print("Podaj co najmniej 2 znaki.\n")
continue
city_matches = search_cities(city_query, city_index, city_keys, limit=city_limit * 5)
grouped_cities = group_cities(city_matches)
query_norm = normalize(city_query)
ranked_groups: list[tuple[CityGroup, int, tuple[int, str]]] = []
for group in grouped_cities:
streets_count = len(merge_group_streets(group, streets_by_sym))
rank_key = city_match_rank(query_norm, group.label)
ranked_groups.append((group, streets_count, rank_key))
ranked_groups.sort(key=lambda item: (item[2][0], -item[1], item[2][1]))
ranked_groups = ranked_groups[:city_limit]
if not ranked_groups:
print("Brak wynikow.\n")
continue
for idx, (group, streets_count, _) in enumerate(ranked_groups):
members_suffix = ""
if len(group.members) > 1:
members_suffix = f" | scalone_sym={len(group.members)}"
print(
f"[{idx}] {group.label} | woj={group.woj_code} ({group.woj_name}) | pow={group.pow_code} "
f"| ulice={streets_count}{members_suffix}"
)
selected: CityGroup | None = None
while selected is None:
try:
choice = input("wybierz numer miasta (Enter = anuluj): ").strip()
except (EOFError, KeyboardInterrupt):
print("\nKoniec.")
return 0
if not choice:
print()
break
if not choice.isdigit() or int(choice) >= len(ranked_groups):
print("Niepoprawny wybor.")
continue
selected = ranked_groups[int(choice)][0]
if selected is None:
continue
city_streets = merge_group_streets(selected, streets_by_sym, include_precise_city_name=True)
merged_syms = ",".join(city.sym for city in selected.members)
print(f"\nWybrane miasto: {selected.label}")
print(f"SYM w grupie: {merged_syms}")
print(f"Ulic w miejscowosci: {len(city_streets)}")
if not city_streets:
print("Brak ulic dla tej miejscowosci.\n")
continue
street_index, street_keys = build_street_index(city_streets)
print("Wpisz fragment nazwy ulicy (min. 2 znaki), pusta linia = wybor innego miasta.\n")
while True:
try:
street_query = input("ulica >>> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nKoniec.")
return 0
if not street_query:
print()
break
if len(normalize(street_query)) < 2:
print("Podaj co najmniej 2 znaki.\n")
continue
found = search_streets(street_query, street_index, street_keys, limit=street_limit)
if not found:
print("Brak ulic.\n")
continue
for street in found:
print(f"- {format_street(street)}")
print()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="CLI do ladowania TERC/SIMC/ULIC oraz wyszukiwania miejscowosci i ulic"
)
parser.add_argument("--terc", default="TERC.csv", help="Sciezka do TERC.csv")
parser.add_argument("--simc", default="SIMC.csv", help="Sciezka do SIMC.csv")
parser.add_argument("--ulic", default="ULIC.csv", help="Sciezka do ULIC.csv")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("stats", help="Pokaz statystyki zaladowanych danych")
city_parser = subparsers.add_parser("miasto", help="Szukaj miejscowosci po fragmencie nazwy")
city_parser.add_argument("query", help="Fraza do wyszukania")
city_parser.add_argument("--limit", type=int, default=10, help="Maksymalna liczba wynikow")
streets_parser = subparsers.add_parser("ulice", help="Pokaz ulice dla SYM miejscowosci")
streets_parser.add_argument("sym", help="Kod SYM miejscowosci")
streets_parser.add_argument("--limit", type=int, default=30, help="Maksymalna liczba ulic")
interactive_parser = subparsers.add_parser("interaktywne", help="Tryb interaktywny: miasto -> ulice")
interactive_parser.add_argument("--city-limit", type=int, default=10, help="Maksymalna liczba miast do wyboru")
interactive_parser.add_argument("--street-limit", type=int, default=20, help="Maksymalna liczba wynikow ulic")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
woj_names, terc_units = load_terc(args.terc)
cities = load_simc(args.simc, woj_names)
streets_by_sym = load_ulic(args.ulic, terc_units)
city_index, city_keys = build_city_index(cities)
if args.command == "stats":
print_stats(woj_names, cities, streets_by_sym)
return 0
if args.command == "miasto":
return run_search_city(args.query, city_index, city_keys, limit=args.limit)
if args.command == "ulice":
return run_list_streets(args.sym, streets_by_sym, cities, limit=args.limit)
if args.command == "interaktywne":
return run_interactive(
city_index,
city_keys,
streets_by_sym,
cities,
city_limit=args.city_limit,
street_limit=args.street_limit,
)
parser.print_help()
return 1
if __name__ == "__main__":
raise SystemExit(main())
| 1 | import argparse |
| 2 | import bisect |
| 3 | import csv |
| 4 | import unicodedata |
| 5 | from dataclasses import dataclass |
| 6 | |
| 7 | _EXTRA_MAP = str.maketrans({"ł": "l", "Ł": "L"}) |
| 8 | |
| 9 | |
| 10 | @dataclass(frozen=True) |
| 11 | class City: |
| 12 | sym: str |
| 13 | name: str |
| 14 | woj_code: str |
| 15 | woj_name: str |
| 16 | pow_code: str |
| 17 | gmi_code: str |
| 18 | rodz_gmi: str |
| 19 | rm: str |
| 20 | mz: str |
| 21 | sympod: str |
| 22 | |
| 23 | |
| 24 | @dataclass(frozen=True) |
| 25 | class CityGroup: |
| 26 | label: str |
| 27 | woj_code: str |
| 28 | woj_name: str |
| 29 | pow_code: str |
| 30 | members: tuple[City, ...] |
| 31 | |
| 32 | |
| 33 | def normalize(text: str) -> str: |
| 34 | text = text.translate(_EXTRA_MAP) |
| 35 | text = unicodedata.normalize("NFKD", text.lower()) |
| 36 | return "".join(c for c in text if not unicodedata.combining(c)) |
| 37 | |
| 38 | |
| 39 | def load_terc(path: str, encoding: str = "utf-8-sig") -> tuple[dict[str, str], dict[tuple[str, str, str, str], str]]: |
| 40 | """Laduje mape kodow wojewodztw i jednostek TERC.""" |
| 41 | woj_names: dict[str, str] = {} |
| 42 | terc_units: dict[tuple[str, str, str, str], str] = {} |
| 43 | with open(path, encoding=encoding, newline="") as f: |
| 44 | reader = csv.DictReader(f, delimiter=";") |
| 45 | for row in reader: |
| 46 | woj = row["WOJ"].strip() |
| 47 | pow_code = row["POW"].strip() |
| 48 | gmi_code = row["GMI"].strip() |
| 49 | rodz_code = row["RODZ"].strip() |
| 50 | name = row["NAZWA"].strip() |
| 51 | if woj and not pow_code and not gmi_code and name: |
| 52 | woj_names[woj] = name |
| 53 | if woj and pow_code and gmi_code and rodz_code and name: |
| 54 | terc_units[(woj, pow_code, gmi_code, rodz_code)] = name |
| 55 | return woj_names, terc_units |
| 56 | |
| 57 | |
| 58 | def load_simc(path: str, woj_names: dict[str, str], encoding: str = "utf-8-sig") -> dict[str, City]: |
| 59 | """Laduje miejscowosci z SIMC i dokleja nazwe wojewodztwa z TERC.""" |
| 60 | cities: dict[str, City] = {} |
| 61 | with open(path, encoding=encoding, newline="") as f: |
| 62 | reader = csv.DictReader(f, delimiter=";") |
| 63 | for row in reader: |
| 64 | sym = row["SYM"].strip() |
| 65 | woj_code = row["WOJ"].strip() |
| 66 | cities[sym] = City( |
| 67 | sym=sym, |
| 68 | name=row["NAZWA"].strip(), |
| 69 | woj_code=woj_code, |
| 70 | woj_name=woj_names.get(woj_code, "nieznane"), |
| 71 | pow_code=row["POW"].strip(), |
| 72 | gmi_code=row["GMI"].strip(), |
| 73 | rodz_gmi=row["RODZ_GMI"].strip(), |
| 74 | rm=row["RM"].strip(), |
| 75 | mz=row["MZ"].strip(), |
| 76 | sympod=row["SYMPOD"].strip(), |
| 77 | ) |
| 78 | return cities |
| 79 | |
| 80 | |
| 81 | def load_ulic( |
| 82 | path: str, |
| 83 | terc_units: dict[tuple[str, str, str, str], str], |
| 84 | encoding: str = "utf-8-sig", |
| 85 | ) -> dict[str, list[dict[str, str]]]: |
| 86 | """Laduje ulice z ULIC i grupuje po SYM miejscowosci.""" |
| 87 | streets_by_sym: dict[str, list[dict[str, str]]] = {} |
| 88 | with open(path, encoding=encoding, newline="") as f: |
| 89 | reader = csv.DictReader(f, delimiter=";") |
| 90 | for row in reader: |
| 91 | sym = row["SYM"].strip() |
| 92 | cecha = row["CECHA"].strip() |
| 93 | nazwa_1 = row["NAZWA_1"].strip() |
| 94 | nazwa_2 = row["NAZWA_2"].strip() |
| 95 | full = " ".join(part for part in [cecha, nazwa_1, nazwa_2] if part).strip() |
| 96 | street = { |
| 97 | "woj": row["WOJ"].strip(), |
| 98 | "pow": row["POW"].strip(), |
| 99 | "gmi": row["GMI"].strip(), |
| 100 | "rodz_gmi": row["RODZ_GMI"].strip(), |
| 101 | "sym": sym, |
| 102 | "sym_ul": row["SYM_UL"].strip(), |
| 103 | "full": full, |
| 104 | "cecha": cecha, |
| 105 | "nazwa_1": nazwa_1, |
| 106 | "nazwa_2": nazwa_2, |
| 107 | "stan_na": row["STAN_NA"].strip(), |
| 108 | "terc_unit_name": terc_units.get( |
| 109 | (row["WOJ"].strip(), row["POW"].strip(), row["GMI"].strip(), row["RODZ_GMI"].strip()), |
| 110 | "", |
| 111 | ), |
| 112 | } |
| 113 | streets_by_sym.setdefault(sym, []).append(street) |
| 114 | return streets_by_sym |
| 115 | |
| 116 | |
| 117 | def format_street( |
| 118 | street: dict[str, str], |
| 119 | city_name_basic: str = "", |
| 120 | city_name_precise: str = "", |
| 121 | ) -> str: |
| 122 | basic_name = city_name_basic or street.get("city_name_basic", "") |
| 123 | precise_name = city_name_precise or street.get("city_name_precise", "") or street.get("terc_unit_name", "") |
| 124 | |
| 125 | if precise_name and basic_name and normalize(precise_name) == normalize(basic_name): |
| 126 | precise_name = "" |
| 127 | |
| 128 | parts = [ |
| 129 | f"full={street['full']}", |
| 130 | f"sym_ul={street['sym_ul']}", |
| 131 | f"cecha={street['cecha']}", |
| 132 | f"nazwa_1={street['nazwa_1']}", |
| 133 | f"nazwa_2={street['nazwa_2']}", |
| 134 | f"woj={street['woj']}", |
| 135 | f"pow={street['pow']}", |
| 136 | f"gmi={street['gmi']}", |
| 137 | f"rodz_gmi={street['rodz_gmi']}", |
| 138 | f"sym={street['sym']}", |
| 139 | f"miejscowosc={basic_name}", |
| 140 | ] |
| 141 | if precise_name: |
| 142 | parts.append(f"miejscowosc_precyzyjna={precise_name}") |
| 143 | parts.append(f"stan_na={street['stan_na']}") |
| 144 | return " | ".join(parts) |
| 145 | |
| 146 | |
| 147 | def build_city_index(cities: dict[str, City]) -> tuple[list[tuple[str, City]], list[str]]: |
| 148 | """Buduje indeks prefiksowy do wyszukiwania miejscowosci.""" |
| 149 | index: list[tuple[str, City]] = [] |
| 150 | for city in cities.values(): |
| 151 | words = city.name.split() |
| 152 | for word in words: |
| 153 | index.append((normalize(word), city)) |
| 154 | if len(words) > 1: |
| 155 | index.append((normalize(city.name.replace(" ", "")), city)) |
| 156 | index.sort(key=lambda x: x[0]) |
| 157 | keys = [item[0] for item in index] |
| 158 | return index, keys |
| 159 | |
| 160 | |
| 161 | def search_cities(query: str, city_index: list[tuple[str, City]], city_keys: list[str], limit: int = 10) -> list[City]: |
| 162 | query_norm = normalize(query) |
| 163 | if len(query_norm) < 2: |
| 164 | return [] |
| 165 | lo = bisect.bisect_left(city_keys, query_norm) |
| 166 | hi = bisect.bisect_left(city_keys, query_norm + "\uffff") |
| 167 | seen: dict[str, City] = {} |
| 168 | for _, city in city_index[lo:hi]: |
| 169 | if city.sym not in seen: |
| 170 | seen[city.sym] = city |
| 171 | if len(seen) >= limit: |
| 172 | break |
| 173 | return list(seen.values()) |
| 174 | |
| 175 | |
| 176 | def build_street_index(streets: list[dict[str, str]]) -> tuple[list[tuple[str, dict[str, str]]], list[str]]: |
| 177 | """Buduje indeks prefiksowy do wyszukiwania ulic w jednej miejscowosci.""" |
| 178 | index: list[tuple[str, dict[str, str]]] = [] |
| 179 | for street in streets: |
| 180 | words = [street["cecha"], street["nazwa_1"], street["nazwa_2"]] |
| 181 | for word in words: |
| 182 | if word: |
| 183 | index.append((normalize(word), street)) |
| 184 | if street["nazwa_1"] and street["nazwa_2"]: |
| 185 | index.append((normalize(street["nazwa_1"] + street["nazwa_2"]), street)) |
| 186 | index.sort(key=lambda x: x[0]) |
| 187 | keys = [item[0] for item in index] |
| 188 | return index, keys |
| 189 | |
| 190 | |
| 191 | def search_streets( |
| 192 | query: str, |
| 193 | street_index: list[tuple[str, dict[str, str]]], |
| 194 | street_keys: list[str], |
| 195 | limit: int = 20, |
| 196 | ) -> list[dict[str, str]]: |
| 197 | query_norm = normalize(query) |
| 198 | if len(query_norm) < 2: |
| 199 | return [] |
| 200 | lo = bisect.bisect_left(street_keys, query_norm) |
| 201 | hi = bisect.bisect_left(street_keys, query_norm + "\uffff") |
| 202 | seen: dict[str, dict[str, str]] = {} |
| 203 | for _, street in street_index[lo:hi]: |
| 204 | key = street["sym_ul"] |
| 205 | if key not in seen: |
| 206 | seen[key] = street |
| 207 | if len(seen) >= limit: |
| 208 | break |
| 209 | return list(seen.values()) |
| 210 | |
| 211 | |
| 212 | def city_group_key(city: City) -> tuple[str, ...]: |
| 213 | """Buduje klucz grupowania tak, by laczyc administracyjne czesci jednego miasta.""" |
| 214 | if city.sympod and city.sympod != city.sym: |
| 215 | return ("parent", city.sympod) |
| 216 | |
| 217 | if city.rm in {"95", "98"} and city.mz == "1": |
| 218 | return ("rm95_98", normalize(city.name), city.woj_code, city.pow_code) |
| 219 | |
| 220 | return ("self", city.sym) |
| 221 | |
| 222 | |
| 223 | def group_cities(cities: list[City]) -> list[CityGroup]: |
| 224 | grouped: dict[tuple[str, ...], list[City]] = {} |
| 225 | for city in cities: |
| 226 | grouped.setdefault(city_group_key(city), []).append(city) |
| 227 | |
| 228 | groups: list[CityGroup] = [] |
| 229 | for members in grouped.values(): |
| 230 | sorted_members = sorted(members, key=lambda c: c.sym) |
| 231 | first = sorted_members[0] |
| 232 | groups.append( |
| 233 | CityGroup( |
| 234 | label=first.name, |
| 235 | woj_code=first.woj_code, |
| 236 | woj_name=first.woj_name, |
| 237 | pow_code=first.pow_code, |
| 238 | members=tuple(sorted_members), |
| 239 | ) |
| 240 | ) |
| 241 | return groups |
| 242 | |
| 243 | |
| 244 | def merge_group_streets( |
| 245 | group: CityGroup, |
| 246 | streets_by_sym: dict[str, list[dict[str, str]]], |
| 247 | include_precise_city_name: bool = False, |
| 248 | ) -> list[dict[str, str]]: |
| 249 | merged: list[dict[str, str]] = [] |
| 250 | seen_sym_ul: set[str] = set() |
| 251 | for city in group.members: |
| 252 | for street in streets_by_sym.get(city.sym, []): |
| 253 | sym_ul = street["sym_ul"] |
| 254 | if sym_ul not in seen_sym_ul: |
| 255 | seen_sym_ul.add(sym_ul) |
| 256 | if include_precise_city_name: |
| 257 | precise_name = street.get("terc_unit_name") or city.name |
| 258 | enriched = {**street, "city_name_basic": city.name} |
| 259 | if normalize(precise_name) != normalize(city.name): |
| 260 | enriched["city_name_precise"] = precise_name |
| 261 | merged.append(enriched) |
| 262 | else: |
| 263 | merged.append(street) |
| 264 | return merged |
| 265 | |
| 266 | |
| 267 | def city_match_rank(query_norm: str, label: str) -> tuple[int, str]: |
| 268 | label_norm = normalize(label) |
| 269 | words = label_norm.split() |
| 270 | |
| 271 | if label_norm == query_norm: |
| 272 | match_level = 0 |
| 273 | elif label_norm.startswith(query_norm): |
| 274 | match_level = 1 |
| 275 | elif any(word.startswith(query_norm) for word in words): |
| 276 | match_level = 2 |
| 277 | elif query_norm in label_norm: |
| 278 | match_level = 3 |
| 279 | else: |
| 280 | match_level = 4 |
| 281 | |
| 282 | return match_level, label_norm |
| 283 | |
| 284 | |
| 285 | def print_stats(woj_names: dict[str, str], cities: dict[str, City], streets_by_sym: dict[str, list[dict[str, str]]]) -> None: |
| 286 | streets_count = sum(len(items) for items in streets_by_sym.values()) |
| 287 | print(f"Wojewodztwa (TERC): {len(woj_names)}") |
| 288 | print(f"Miejscowosci (SIMC): {len(cities)}") |
| 289 | print(f"Miejscowosci z ulicami (ULIC): {len(streets_by_sym)}") |
| 290 | print(f"Lacznie ulic (ULIC): {streets_count}") |
| 291 | |
| 292 | |
| 293 | def run_search_city(query: str, city_index: list[tuple[str, City]], city_keys: list[str], limit: int) -> int: |
| 294 | results = search_cities(query, city_index, city_keys, limit=limit) |
| 295 | if not results: |
| 296 | print("Brak wynikow.") |
| 297 | return 1 |
| 298 | for city in results: |
| 299 | print( |
| 300 | f"{city.name} | sym={city.sym} | woj={city.woj_code} ({city.woj_name}) " |
| 301 | f"| pow={city.pow_code} | gmi={city.gmi_code}" |
| 302 | ) |
| 303 | return 0 |
| 304 | |
| 305 | |
| 306 | def run_list_streets( |
| 307 | city_sym: str, |
| 308 | streets_by_sym: dict[str, list[dict[str, str]]], |
| 309 | cities_by_sym: dict[str, City], |
| 310 | limit: int, |
| 311 | ) -> int: |
| 312 | streets = streets_by_sym.get(city_sym, []) |
| 313 | if not streets: |
| 314 | print(f"Brak ulic dla SYM={city_sym}.") |
| 315 | return 1 |
| 316 | city_name = cities_by_sym.get(city_sym).name if city_sym in cities_by_sym else "" |
| 317 | for street in streets[:limit]: |
| 318 | precise_name = street.get("terc_unit_name") or city_name |
| 319 | print(format_street(street, city_name_basic=city_name, city_name_precise=precise_name)) |
| 320 | if len(streets) > limit: |
| 321 | print(f"... i jeszcze {len(streets) - limit} ulic") |
| 322 | return 0 |
| 323 | |
| 324 | |
| 325 | def run_interactive( |
| 326 | city_index: list[tuple[str, City]], |
| 327 | city_keys: list[str], |
| 328 | streets_by_sym: dict[str, list[dict[str, str]]], |
| 329 | cities_by_sym: dict[str, City], |
| 330 | city_limit: int, |
| 331 | street_limit: int, |
| 332 | ) -> int: |
| 333 | print("Tryb interaktywny.") |
| 334 | print("Wpisz nazwe miejscowosci (min. 2 znaki), pusta linia = wyjscie.\n") |
| 335 | while True: |
| 336 | try: |
| 337 | city_query = input("miasto >>> ").strip() |
| 338 | except (EOFError, KeyboardInterrupt): |
| 339 | print("\nKoniec.") |
| 340 | return 0 |
| 341 | |
| 342 | if not city_query: |
| 343 | print("Koniec.") |
| 344 | return 0 |
| 345 | |
| 346 | if len(normalize(city_query)) < 2: |
| 347 | print("Podaj co najmniej 2 znaki.\n") |
| 348 | continue |
| 349 | |
| 350 | city_matches = search_cities(city_query, city_index, city_keys, limit=city_limit * 5) |
| 351 | grouped_cities = group_cities(city_matches) |
| 352 | query_norm = normalize(city_query) |
| 353 | |
| 354 | ranked_groups: list[tuple[CityGroup, int, tuple[int, str]]] = [] |
| 355 | for group in grouped_cities: |
| 356 | streets_count = len(merge_group_streets(group, streets_by_sym)) |
| 357 | rank_key = city_match_rank(query_norm, group.label) |
| 358 | ranked_groups.append((group, streets_count, rank_key)) |
| 359 | |
| 360 | ranked_groups.sort(key=lambda item: (item[2][0], -item[1], item[2][1])) |
| 361 | ranked_groups = ranked_groups[:city_limit] |
| 362 | |
| 363 | if not ranked_groups: |
| 364 | print("Brak wynikow.\n") |
| 365 | continue |
| 366 | |
| 367 | for idx, (group, streets_count, _) in enumerate(ranked_groups): |
| 368 | members_suffix = "" |
| 369 | if len(group.members) > 1: |
| 370 | members_suffix = f" | scalone_sym={len(group.members)}" |
| 371 | print( |
| 372 | f"[{idx}] {group.label} | woj={group.woj_code} ({group.woj_name}) | pow={group.pow_code} " |
| 373 | f"| ulice={streets_count}{members_suffix}" |
| 374 | ) |
| 375 | |
| 376 | selected: CityGroup | None = None |
| 377 | while selected is None: |
| 378 | try: |
| 379 | choice = input("wybierz numer miasta (Enter = anuluj): ").strip() |
| 380 | except (EOFError, KeyboardInterrupt): |
| 381 | print("\nKoniec.") |
| 382 | return 0 |
| 383 | |
| 384 | if not choice: |
| 385 | print() |
| 386 | break |
| 387 | if not choice.isdigit() or int(choice) >= len(ranked_groups): |
| 388 | print("Niepoprawny wybor.") |
| 389 | continue |
| 390 | selected = ranked_groups[int(choice)][0] |
| 391 | |
| 392 | if selected is None: |
| 393 | continue |
| 394 | |
| 395 | city_streets = merge_group_streets(selected, streets_by_sym, include_precise_city_name=True) |
| 396 | merged_syms = ",".join(city.sym for city in selected.members) |
| 397 | print(f"\nWybrane miasto: {selected.label}") |
| 398 | print(f"SYM w grupie: {merged_syms}") |
| 399 | print(f"Ulic w miejscowosci: {len(city_streets)}") |
| 400 | |
| 401 | if not city_streets: |
| 402 | print("Brak ulic dla tej miejscowosci.\n") |
| 403 | continue |
| 404 | |
| 405 | street_index, street_keys = build_street_index(city_streets) |
| 406 | print("Wpisz fragment nazwy ulicy (min. 2 znaki), pusta linia = wybor innego miasta.\n") |
| 407 | |
| 408 | while True: |
| 409 | try: |
| 410 | street_query = input("ulica >>> ").strip() |
| 411 | except (EOFError, KeyboardInterrupt): |
| 412 | print("\nKoniec.") |
| 413 | return 0 |
| 414 | |
| 415 | if not street_query: |
| 416 | print() |
| 417 | break |
| 418 | |
| 419 | if len(normalize(street_query)) < 2: |
| 420 | print("Podaj co najmniej 2 znaki.\n") |
| 421 | continue |
| 422 | |
| 423 | found = search_streets(street_query, street_index, street_keys, limit=street_limit) |
| 424 | if not found: |
| 425 | print("Brak ulic.\n") |
| 426 | continue |
| 427 | |
| 428 | for street in found: |
| 429 | print(f"- {format_street(street)}") |
| 430 | print() |
| 431 | |
| 432 | |
| 433 | def build_parser() -> argparse.ArgumentParser: |
| 434 | parser = argparse.ArgumentParser( |
| 435 | description="CLI do ladowania TERC/SIMC/ULIC oraz wyszukiwania miejscowosci i ulic" |
| 436 | ) |
| 437 | parser.add_argument("--terc", default="TERC.csv", help="Sciezka do TERC.csv") |
| 438 | parser.add_argument("--simc", default="SIMC.csv", help="Sciezka do SIMC.csv") |
| 439 | parser.add_argument("--ulic", default="ULIC.csv", help="Sciezka do ULIC.csv") |
| 440 | |
| 441 | subparsers = parser.add_subparsers(dest="command", required=True) |
| 442 | |
| 443 | subparsers.add_parser("stats", help="Pokaz statystyki zaladowanych danych") |
| 444 | |
| 445 | city_parser = subparsers.add_parser("miasto", help="Szukaj miejscowosci po fragmencie nazwy") |
| 446 | city_parser.add_argument("query", help="Fraza do wyszukania") |
| 447 | city_parser.add_argument("--limit", type=int, default=10, help="Maksymalna liczba wynikow") |
| 448 | |
| 449 | streets_parser = subparsers.add_parser("ulice", help="Pokaz ulice dla SYM miejscowosci") |
| 450 | streets_parser.add_argument("sym", help="Kod SYM miejscowosci") |
| 451 | streets_parser.add_argument("--limit", type=int, default=30, help="Maksymalna liczba ulic") |
| 452 | |
| 453 | interactive_parser = subparsers.add_parser("interaktywne", help="Tryb interaktywny: miasto -> ulice") |
| 454 | interactive_parser.add_argument("--city-limit", type=int, default=10, help="Maksymalna liczba miast do wyboru") |
| 455 | interactive_parser.add_argument("--street-limit", type=int, default=20, help="Maksymalna liczba wynikow ulic") |
| 456 | |
| 457 | return parser |
| 458 | |
| 459 | |
| 460 | def main() -> int: |
| 461 | parser = build_parser() |
| 462 | args = parser.parse_args() |
| 463 | |
| 464 | woj_names, terc_units = load_terc(args.terc) |
| 465 | cities = load_simc(args.simc, woj_names) |
| 466 | streets_by_sym = load_ulic(args.ulic, terc_units) |
| 467 | city_index, city_keys = build_city_index(cities) |
| 468 | |
| 469 | if args.command == "stats": |
| 470 | print_stats(woj_names, cities, streets_by_sym) |
| 471 | return 0 |
| 472 | if args.command == "miasto": |
| 473 | return run_search_city(args.query, city_index, city_keys, limit=args.limit) |
| 474 | if args.command == "ulice": |
| 475 | return run_list_streets(args.sym, streets_by_sym, cities, limit=args.limit) |
| 476 | if args.command == "interaktywne": |
| 477 | return run_interactive( |
| 478 | city_index, |
| 479 | city_keys, |
| 480 | streets_by_sym, |
| 481 | cities, |
| 482 | city_limit=args.city_limit, |
| 483 | street_limit=args.street_limit, |
| 484 | ) |
| 485 | |
| 486 | parser.print_help() |
| 487 | return 1 |
| 488 | |
| 489 | |
| 490 | if __name__ == "__main__": |
| 491 | raise SystemExit(main()) |
| 492 |