Skip to content

API Reference

The supported public contract is now subpackage-first:

  • subway_access.models
  • subway_access.io
  • subway_access.analysis
  • subway_access.factors
  • subway_access.helpers
  • subway_access.export
  • subway_access.pipeline
  • subway_access.reporting (optional; requires [factor-factory] or [tearsheets])
  • subway_access.temporal
  • subway_access.cli

The root subway_access namespace is intentionally minimal and only exposes __version__.

The advanced network layer lives alongside the baseline analysis surfaces under subway_access.analysis, while graph cache helpers live under subway_access.io and subway_access.pipeline.

Root

subway_access

Minimal root namespace for the subway-access package.

Models

subway_access.models

Public typed models for subway-access.

AccessibilityLabel module-attribute

AccessibilityLabel = Literal[
    "accessible",
    "partially_accessible",
    "not_accessible",
    "unknown",
]

EquipmentType module-attribute

EquipmentType = Literal[
    "elevator",
    "escalator",
    "station",
    "platform",
    "unknown",
]

OutageStatus module-attribute

OutageStatus = Literal[
    "active", "resolved", "scheduled", "unknown"
]

AccessibilityQuery dataclass

High-level filter for a borough or district analysis pass.

Source code in src/subway_access/models/_common.py
25
26
27
28
29
30
@dataclass(frozen=True, slots=True)
class AccessibilityQuery:
    """High-level filter for a borough or district analysis pass."""

    geography: str
    value: str

geography instance-attribute

geography: str

value instance-attribute

value: str

Entrance dataclass

A street-level subway entrance or exit from MTA Open Data (entrances / exits layer).

Source code in src/subway_access/models/_entrance.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@dataclass(frozen=True, slots=True)
class Entrance:
    """A street-level subway entrance or exit from MTA Open Data (entrances / exits layer)."""

    entrance_id: str
    station_id: str
    latitude: float
    longitude: float
    stop_name: str
    constituent_station_name: str
    complex_id: str | None
    gtfs_stop_id: str | None
    borough_code: str
    entrance_type: str
    entry_allowed: bool
    exit_allowed: bool
    division: str | None = None
    line: str | None = None
    daytime_routes: tuple[str, ...] = ()
    source: str = ""

entrance_id instance-attribute

entrance_id: str

station_id instance-attribute

station_id: str

latitude instance-attribute

latitude: float

longitude instance-attribute

longitude: float

stop_name instance-attribute

stop_name: str

constituent_station_name instance-attribute

constituent_station_name: str

complex_id instance-attribute

complex_id: str | None

gtfs_stop_id instance-attribute

gtfs_stop_id: str | None

borough_code instance-attribute

borough_code: str

entrance_type instance-attribute

entrance_type: str

entry_allowed instance-attribute

entry_allowed: bool

exit_allowed instance-attribute

exit_allowed: bool

division class-attribute instance-attribute

division: str | None = None

line class-attribute instance-attribute

line: str | None = None

daytime_routes class-attribute instance-attribute

daytime_routes: tuple[str, ...] = ()

source class-attribute instance-attribute

source: str = ''

EntranceDataset dataclass

Loaded entrance / exit point rows for a study area.

Source code in src/subway_access/models/_entrance.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass(frozen=True, slots=True)
class EntranceDataset:
    """Loaded entrance / exit point rows for a study area."""

    entrances: tuple[Entrance, ...]

    def count_by_gtfs_stop_id(self) -> dict[str, int]:
        """Count entrances per GTFS parent stop id."""

        counts: dict[str, int] = defaultdict(int)
        for entrance in self.entrances:
            if entrance.gtfs_stop_id:
                counts[entrance.gtfs_stop_id] += 1
        return dict(counts)

    def count_by_complex_id(self) -> dict[str, int]:
        """Count entrances per station complex id."""

        counts: dict[str, int] = defaultdict(int)
        for entrance in self.entrances:
            if entrance.complex_id:
                counts[entrance.complex_id] += 1
        return dict(counts)

    def for_station_id(self, station_id: str) -> tuple[Entrance, ...]:
        """Return entrances whose MTA station id matches."""

        return tuple(e for e in self.entrances if e.station_id == station_id)

entrances instance-attribute

entrances: tuple[Entrance, ...]

count_by_gtfs_stop_id

count_by_gtfs_stop_id() -> dict[str, int]

Count entrances per GTFS parent stop id.

Source code in src/subway_access/models/_entrance.py
37
38
39
40
41
42
43
44
def count_by_gtfs_stop_id(self) -> dict[str, int]:
    """Count entrances per GTFS parent stop id."""

    counts: dict[str, int] = defaultdict(int)
    for entrance in self.entrances:
        if entrance.gtfs_stop_id:
            counts[entrance.gtfs_stop_id] += 1
    return dict(counts)

count_by_complex_id

count_by_complex_id() -> dict[str, int]

Count entrances per station complex id.

Source code in src/subway_access/models/_entrance.py
46
47
48
49
50
51
52
53
def count_by_complex_id(self) -> dict[str, int]:
    """Count entrances per station complex id."""

    counts: dict[str, int] = defaultdict(int)
    for entrance in self.entrances:
        if entrance.complex_id:
            counts[entrance.complex_id] += 1
    return dict(counts)

for_station_id

for_station_id(station_id: str) -> tuple[Entrance, ...]

Return entrances whose MTA station id matches.

Source code in src/subway_access/models/_entrance.py
55
56
57
58
def for_station_id(self, station_id: str) -> tuple[Entrance, ...]:
    """Return entrances whose MTA station id matches."""

    return tuple(e for e in self.entrances if e.station_id == station_id)

GtfsLocation dataclass

One row from GTFS locations.txt (GTFS-Pathways extension).

Source code in src/subway_access/models/_entrance.py
78
79
80
81
82
83
84
85
86
@dataclass(frozen=True, slots=True)
class GtfsLocation:
    """One row from GTFS ``locations.txt`` (GTFS-Pathways extension)."""

    location_id: str
    location_type: str = ""
    parent_station: str | None = None
    latitude: str | None = None
    longitude: str | None = None

location_id instance-attribute

location_id: str

location_type class-attribute instance-attribute

location_type: str = ''

parent_station class-attribute instance-attribute

parent_station: str | None = None

latitude class-attribute instance-attribute

latitude: str | None = None

longitude class-attribute instance-attribute

longitude: str | None = None

GtfsPathway dataclass

One row from GTFS pathways.txt (GTFS-Pathways extension).

Source code in src/subway_access/models/_entrance.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass(frozen=True, slots=True)
class GtfsPathway:
    """One row from GTFS ``pathways.txt`` (GTFS-Pathways extension)."""

    pathway_id: str
    from_stop_id: str
    to_stop_id: str
    pathway_mode: str = ""
    is_bidirectional: str = ""
    length: str | None = None
    traversal_time: str | None = None
    stair_count: str | None = None
    max_slope: str | None = None
    min_width: str | None = None
    signposted_as: str | None = None

pathway_id instance-attribute

pathway_id: str

from_stop_id instance-attribute

from_stop_id: str

to_stop_id instance-attribute

to_stop_id: str

pathway_mode class-attribute instance-attribute

pathway_mode: str = ''

is_bidirectional class-attribute instance-attribute

is_bidirectional: str = ''

length class-attribute instance-attribute

length: str | None = None

traversal_time class-attribute instance-attribute

traversal_time: str | None = None

stair_count class-attribute instance-attribute

stair_count: str | None = None

max_slope class-attribute instance-attribute

max_slope: str | None = None

min_width class-attribute instance-attribute

min_width: str | None = None

signposted_as class-attribute instance-attribute

signposted_as: str | None = None

GtfsPathwaysSnapshot dataclass

Optional pathways + locations parsed from a static GTFS zip.

Source code in src/subway_access/models/_entrance.py
89
90
91
92
93
94
@dataclass(frozen=True, slots=True)
class GtfsPathwaysSnapshot:
    """Optional pathways + locations parsed from a static GTFS zip."""

    pathways: tuple[GtfsPathway, ...]
    locations: tuple[GtfsLocation, ...]

pathways instance-attribute

pathways: tuple[GtfsPathway, ...]

locations instance-attribute

locations: tuple[GtfsLocation, ...]

GapAnalysis dataclass

Ranked tract accessibility gap results.

Source code in src/subway_access/models/_gap.py
29
30
31
32
33
@dataclass(frozen=True, slots=True)
class GapAnalysis:
    """Ranked tract accessibility gap results."""

    records: tuple[GapRecord, ...]

records instance-attribute

records: tuple[GapRecord, ...]

GapRecord dataclass

Final tract-level accessibility gap output.

Source code in src/subway_access/models/_gap.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@dataclass(frozen=True, slots=True)
class GapRecord:
    """Final tract-level accessibility gap output."""

    tract_id: str
    tract_name: str
    borough: str
    disability_rate: float
    senior_rate: float
    poverty_rate: float
    total_population: int
    need_score: float
    has_accessible_station: bool
    accessible_station_count: int
    nearest_accessible_station_id: str | None
    nearest_accessible_station_name: str | None
    nearest_accessible_distance_meters: float | None
    gap_score: float
    gap_label: str

tract_id instance-attribute

tract_id: str

tract_name instance-attribute

tract_name: str

borough instance-attribute

borough: str

disability_rate instance-attribute

disability_rate: float

senior_rate instance-attribute

senior_rate: float

poverty_rate instance-attribute

poverty_rate: float

total_population instance-attribute

total_population: int

need_score instance-attribute

need_score: float

has_accessible_station instance-attribute

has_accessible_station: bool

accessible_station_count instance-attribute

accessible_station_count: int

nearest_accessible_station_id instance-attribute

nearest_accessible_station_id: str | None

nearest_accessible_station_name instance-attribute

nearest_accessible_station_name: str | None

nearest_accessible_distance_meters instance-attribute

nearest_accessible_distance_meters: float | None

gap_score instance-attribute

gap_score: float

gap_label instance-attribute

gap_label: str

CatchmentDataset dataclass

Generated catchment geometries.

Source code in src/subway_access/models/_metric.py
44
45
46
47
48
49
50
51
@dataclass(frozen=True, slots=True)
class CatchmentDataset:
    """Generated catchment geometries."""

    features: tuple[CatchmentFeature, ...]

    def radius_by_station_id(self) -> dict[str, float]:
        return {feature.station_id: feature.radius_meters for feature in self.features}

features instance-attribute

features: tuple[CatchmentFeature, ...]

radius_by_station_id

radius_by_station_id() -> dict[str, float]
Source code in src/subway_access/models/_metric.py
50
51
def radius_by_station_id(self) -> dict[str, float]:
    return {feature.station_id: feature.radius_meters for feature in self.features}

CatchmentFeature dataclass

Map-friendly catchment geometry for a station.

Source code in src/subway_access/models/_metric.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@dataclass(frozen=True, slots=True)
class CatchmentFeature:
    """Map-friendly catchment geometry for a station."""

    station_id: str
    station_name: str
    borough: str
    ada_status: AccessibilityLabel
    center_latitude: float
    center_longitude: float
    radius_meters: float
    minutes: int
    method: str
    polygon: tuple[tuple[float, float], ...]

station_id instance-attribute

station_id: str

station_name instance-attribute

station_name: str

borough instance-attribute

borough: str

ada_status instance-attribute

ada_status: AccessibilityLabel

center_latitude instance-attribute

center_latitude: float

center_longitude instance-attribute

center_longitude: float

radius_meters instance-attribute

radius_meters: float

minutes instance-attribute

minutes: int

method instance-attribute

method: str

polygon instance-attribute

polygon: tuple[tuple[float, float], ...]

CatchmentRequest dataclass

Request parameters for station catchment generation.

Source code in src/subway_access/models/_metric.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@dataclass(frozen=True, slots=True)
class CatchmentRequest:
    """Request parameters for station catchment generation."""

    minutes: int
    mode: str = "walk"

    def __post_init__(self) -> None:
        if self.minutes <= 0:
            message = "Catchment minutes must be greater than zero."
            raise ValueError(message)
        if self.mode != "walk":
            message = "Only walk catchments are implemented in subway-access."
            raise ValueError(message)

minutes instance-attribute

minutes: int

mode class-attribute instance-attribute

mode: str = 'walk'

StationMetricDataset dataclass

Export-ready station metrics.

Source code in src/subway_access/models/_metric.py
80
81
82
83
84
@dataclass(frozen=True, slots=True)
class StationMetricDataset:
    """Export-ready station metrics."""

    records: tuple[StationMetricRecord, ...]

records instance-attribute

records: tuple[StationMetricRecord, ...]

StationMetricRecord dataclass

Station-level metrics derived from coverage and reliability outputs.

Source code in src/subway_access/models/_metric.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@dataclass(frozen=True, slots=True)
class StationMetricRecord:
    """Station-level metrics derived from coverage and reliability outputs."""

    station_id: str
    station_name: str
    borough: str
    latitude: float
    longitude: float
    ada_status: AccessibilityLabel
    catchment_minutes: int
    catchment_radius_meters: float
    covered_tract_count: int
    covered_population: int
    nearby_gap_tract_count: int
    nearby_gap_population: int
    mean_need_score: float
    reliability_score: float | None
    reliability_label: str | None
    outage_minutes: int | None
    network_connection_count: int
    daytime_routes: tuple[str, ...] = ()
    structure: str | None = None
    analysis_method: str = "euclidean"

station_id instance-attribute

station_id: str

station_name instance-attribute

station_name: str

borough instance-attribute

borough: str

latitude instance-attribute

latitude: float

longitude instance-attribute

longitude: float

ada_status instance-attribute

ada_status: AccessibilityLabel

catchment_minutes instance-attribute

catchment_minutes: int

catchment_radius_meters instance-attribute

catchment_radius_meters: float

covered_tract_count instance-attribute

covered_tract_count: int

covered_population instance-attribute

covered_population: int

nearby_gap_tract_count instance-attribute

nearby_gap_tract_count: int

nearby_gap_population instance-attribute

nearby_gap_population: int

mean_need_score instance-attribute

mean_need_score: float

reliability_score instance-attribute

reliability_score: float | None

reliability_label instance-attribute

reliability_label: str | None

outage_minutes instance-attribute

outage_minutes: int | None

network_connection_count instance-attribute

network_connection_count: int

daytime_routes class-attribute instance-attribute

daytime_routes: tuple[str, ...] = ()

structure class-attribute instance-attribute

structure: str | None = None

analysis_method class-attribute instance-attribute

analysis_method: str = 'euclidean'

AccessibilityComparisonDataset dataclass

Collection of tract-level Euclidean vs network comparison results.

Source code in src/subway_access/models/_network.py
84
85
86
87
88
@dataclass(frozen=True, slots=True)
class AccessibilityComparisonDataset:
    """Collection of tract-level Euclidean vs network comparison results."""

    records: tuple[AccessibilityComparisonRecord, ...]

records instance-attribute

records: tuple[AccessibilityComparisonRecord, ...]

AccessibilityComparisonRecord dataclass

Per-tract comparison between Euclidean and network accessibility models.

Source code in src/subway_access/models/_network.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@dataclass(frozen=True, slots=True)
class AccessibilityComparisonRecord:
    """Per-tract comparison between Euclidean and network accessibility models."""

    tract_id: str
    tract_name: str
    borough: str
    need_score: float
    euclidean_has_access: bool
    network_has_access: bool
    euclidean_station_count: int
    network_station_count: int
    euclidean_station_id: str | None
    network_station_id: str | None
    euclidean_travel_minutes: float | None
    network_travel_minutes: float | None
    euclidean_path_meters: float | None
    network_path_meters: float | None
    coverage_change_label: str

tract_id instance-attribute

tract_id: str

tract_name instance-attribute

tract_name: str

borough instance-attribute

borough: str

need_score instance-attribute

need_score: float

euclidean_has_access instance-attribute

euclidean_has_access: bool

network_has_access instance-attribute

network_has_access: bool

euclidean_station_count instance-attribute

euclidean_station_count: int

network_station_count instance-attribute

network_station_count: int

euclidean_station_id instance-attribute

euclidean_station_id: str | None

network_station_id instance-attribute

network_station_id: str | None

euclidean_travel_minutes instance-attribute

euclidean_travel_minutes: float | None

network_travel_minutes instance-attribute

network_travel_minutes: float | None

euclidean_path_meters instance-attribute

euclidean_path_meters: float | None

network_path_meters instance-attribute

network_path_meters: float | None

coverage_change_label instance-attribute

coverage_change_label: str

NetworkGraphSnapshot dataclass

Cached OSM walking graph metadata for one study area.

Source code in src/subway_access/models/_network.py
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass(frozen=True, slots=True)
class NetworkGraphSnapshot:
    """Cached OSM walking graph metadata for one study area."""

    query: AccessibilityQuery
    graph_path: Path
    metadata_path: Path
    refreshed_at: datetime
    network_type: str
    node_count: int
    edge_count: int
    source_url: str
    buffer_meters: int

query instance-attribute

query: AccessibilityQuery

graph_path instance-attribute

graph_path: Path

metadata_path instance-attribute

metadata_path: Path

refreshed_at instance-attribute

refreshed_at: datetime

network_type instance-attribute

network_type: str

node_count instance-attribute

node_count: int

edge_count instance-attribute

edge_count: int

source_url instance-attribute

source_url: str

buffer_meters instance-attribute

buffer_meters: int

PedestrianConnection dataclass

A simplified pedestrian connection between two stations or nodes.

Source code in src/subway_access/models/_network.py
16
17
18
19
20
21
22
23
24
25
26
27
28
@dataclass(frozen=True, slots=True)
class PedestrianConnection:
    """A simplified pedestrian connection between two stations or nodes."""

    connection_id: str
    from_station_id: str
    to_station_id: str
    walk_minutes: float
    distance_meters: float
    geometry: tuple[tuple[float, float], ...] = ()
    from_kind: str = "station"
    to_kind: str = "station"
    travel_mode: str = "walk"

connection_id instance-attribute

connection_id: str

from_station_id instance-attribute

from_station_id: str

to_station_id instance-attribute

to_station_id: str

walk_minutes instance-attribute

walk_minutes: float

distance_meters instance-attribute

distance_meters: float

geometry class-attribute instance-attribute

geometry: tuple[tuple[float, float], ...] = ()

from_kind class-attribute instance-attribute

from_kind: str = 'station'

to_kind class-attribute instance-attribute

to_kind: str = 'station'

travel_mode class-attribute instance-attribute

travel_mode: str = 'walk'

PedestrianNetworkDataset dataclass

Loaded pedestrian connections used for richer examples and metrics.

Source code in src/subway_access/models/_network.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@dataclass(frozen=True, slots=True)
class PedestrianNetworkDataset:
    """Loaded pedestrian connections used for richer examples and metrics."""

    connections: tuple[PedestrianConnection, ...]
    source: str = ""

    def connection_count_by_station(self) -> dict[str, int]:
        counts: dict[str, int] = defaultdict(int)
        for connection in self.connections:
            if connection.from_kind == "station":
                counts[connection.from_station_id] += 1
            if connection.to_kind == "station":
                counts[connection.to_station_id] += 1
        return dict(counts)

connections instance-attribute

connections: tuple[PedestrianConnection, ...]

source class-attribute instance-attribute

source: str = ''

connection_count_by_station

connection_count_by_station() -> dict[str, int]
Source code in src/subway_access/models/_network.py
38
39
40
41
42
43
44
45
def connection_count_by_station(self) -> dict[str, int]:
    counts: dict[str, int] = defaultdict(int)
    for connection in self.connections:
        if connection.from_kind == "station":
            counts[connection.from_station_id] += 1
        if connection.to_kind == "station":
            counts[connection.to_station_id] += 1
    return dict(counts)

OutageDataset dataclass

Loaded outage events.

Source code in src/subway_access/models/_outage.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@dataclass(frozen=True, slots=True)
class OutageDataset:
    """Loaded outage events."""

    records: tuple[OutageRecord, ...]

    def outage_minutes_by_station(
        self, window: TimeWindow, *, as_of: datetime | None = None
    ) -> dict[str, int]:
        minutes_by_station: dict[str, int] = defaultdict(int)
        for record in self.records:
            minutes_by_station[record.station_id] += record.overlap_minutes(
                window,
                as_of=as_of,
            )
        return dict(minutes_by_station)

    def outage_count_by_station(
        self, window: TimeWindow, *, as_of: datetime | None = None
    ) -> dict[str, int]:
        counts: dict[str, int] = defaultdict(int)
        for record in self.records:
            if record.overlap_minutes(window, as_of=as_of) > 0:
                counts[record.station_id] += 1
        return dict(counts)

    def recommended_as_of(self) -> datetime | None:
        """Return the latest timestamp represented in the dataset."""

        timestamps = [
            _coerce_utc(timestamp)
            for record in self.records
            for timestamp in (record.started_at, record.ended_at)
            if timestamp is not None
        ]
        return max(timestamps) if timestamps else None

    def outage_total_by_station(
        self,
        window: TimeWindow,
        *,
        as_of: datetime | None = None,
    ) -> dict[str, int]:
        totals: dict[str, int] = defaultdict(int)
        for record in self.records:
            if record.overlap_minutes(window, as_of=as_of) > 0 and record.total_outages:
                totals[record.station_id] += record.total_outages
        return dict(totals)

    def scheduled_outage_total_by_station(
        self,
        window: TimeWindow,
        *,
        as_of: datetime | None = None,
    ) -> dict[str, int]:
        totals: dict[str, int] = defaultdict(int)
        for record in self.records:
            if (
                record.overlap_minutes(window, as_of=as_of) > 0
                and record.scheduled_outages
            ):
                totals[record.station_id] += record.scheduled_outages
        return dict(totals)

    def unscheduled_outage_total_by_station(
        self,
        window: TimeWindow,
        *,
        as_of: datetime | None = None,
    ) -> dict[str, int]:
        totals: dict[str, int] = defaultdict(int)
        for record in self.records:
            if (
                record.overlap_minutes(window, as_of=as_of) > 0
                and record.unscheduled_outages
            ):
                totals[record.station_id] += record.unscheduled_outages
        return dict(totals)

    def mean_availability_ratio_by_station(
        self,
        window: TimeWindow,
        *,
        as_of: datetime | None = None,
    ) -> dict[str, float]:
        sums: dict[str, float] = defaultdict(float)
        counts: dict[str, int] = defaultdict(int)
        for record in self.records:
            if (
                record.overlap_minutes(window, as_of=as_of) > 0
                and record.availability_ratio is not None
            ):
                sums[record.station_id] += record.availability_ratio
                counts[record.station_id] += 1
        return {
            station_id: sums[station_id] / counts[station_id]
            for station_id in sums
            if counts[station_id] > 0
        }

records instance-attribute

records: tuple[OutageRecord, ...]

outage_minutes_by_station

outage_minutes_by_station(
    window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]
Source code in src/subway_access/models/_outage.py
102
103
104
105
106
107
108
109
110
111
def outage_minutes_by_station(
    self, window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]:
    minutes_by_station: dict[str, int] = defaultdict(int)
    for record in self.records:
        minutes_by_station[record.station_id] += record.overlap_minutes(
            window,
            as_of=as_of,
        )
    return dict(minutes_by_station)

outage_count_by_station

outage_count_by_station(
    window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]
Source code in src/subway_access/models/_outage.py
113
114
115
116
117
118
119
120
def outage_count_by_station(
    self, window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]:
    counts: dict[str, int] = defaultdict(int)
    for record in self.records:
        if record.overlap_minutes(window, as_of=as_of) > 0:
            counts[record.station_id] += 1
    return dict(counts)

recommended_as_of

recommended_as_of() -> datetime | None

Return the latest timestamp represented in the dataset.

Source code in src/subway_access/models/_outage.py
122
123
124
125
126
127
128
129
130
131
def recommended_as_of(self) -> datetime | None:
    """Return the latest timestamp represented in the dataset."""

    timestamps = [
        _coerce_utc(timestamp)
        for record in self.records
        for timestamp in (record.started_at, record.ended_at)
        if timestamp is not None
    ]
    return max(timestamps) if timestamps else None

outage_total_by_station

outage_total_by_station(
    window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]
Source code in src/subway_access/models/_outage.py
133
134
135
136
137
138
139
140
141
142
143
def outage_total_by_station(
    self,
    window: TimeWindow,
    *,
    as_of: datetime | None = None,
) -> dict[str, int]:
    totals: dict[str, int] = defaultdict(int)
    for record in self.records:
        if record.overlap_minutes(window, as_of=as_of) > 0 and record.total_outages:
            totals[record.station_id] += record.total_outages
    return dict(totals)

scheduled_outage_total_by_station

scheduled_outage_total_by_station(
    window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]
Source code in src/subway_access/models/_outage.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def scheduled_outage_total_by_station(
    self,
    window: TimeWindow,
    *,
    as_of: datetime | None = None,
) -> dict[str, int]:
    totals: dict[str, int] = defaultdict(int)
    for record in self.records:
        if (
            record.overlap_minutes(window, as_of=as_of) > 0
            and record.scheduled_outages
        ):
            totals[record.station_id] += record.scheduled_outages
    return dict(totals)

unscheduled_outage_total_by_station

unscheduled_outage_total_by_station(
    window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, int]
Source code in src/subway_access/models/_outage.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def unscheduled_outage_total_by_station(
    self,
    window: TimeWindow,
    *,
    as_of: datetime | None = None,
) -> dict[str, int]:
    totals: dict[str, int] = defaultdict(int)
    for record in self.records:
        if (
            record.overlap_minutes(window, as_of=as_of) > 0
            and record.unscheduled_outages
        ):
            totals[record.station_id] += record.unscheduled_outages
    return dict(totals)

mean_availability_ratio_by_station

mean_availability_ratio_by_station(
    window: TimeWindow, *, as_of: datetime | None = None
) -> dict[str, float]
Source code in src/subway_access/models/_outage.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def mean_availability_ratio_by_station(
    self,
    window: TimeWindow,
    *,
    as_of: datetime | None = None,
) -> dict[str, float]:
    sums: dict[str, float] = defaultdict(float)
    counts: dict[str, int] = defaultdict(int)
    for record in self.records:
        if (
            record.overlap_minutes(window, as_of=as_of) > 0
            and record.availability_ratio is not None
        ):
            sums[record.station_id] += record.availability_ratio
            counts[record.station_id] += 1
    return {
        station_id: sums[station_id] / counts[station_id]
        for station_id in sums
        if counts[station_id] > 0
    }

OutageRecord dataclass

Single elevator or escalator outage event.

Source code in src/subway_access/models/_outage.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@dataclass(frozen=True, slots=True)
class OutageRecord:
    """Single elevator or escalator outage event."""

    station_id: str
    equipment_id: str
    equipment_type: EquipmentType
    status: OutageStatus
    started_at: datetime
    ended_at: datetime | None
    description: str = ""
    source: str = ""
    station_complex_id: str | None = None
    total_outages: int | None = None
    scheduled_outages: int | None = None
    unscheduled_outages: int | None = None
    availability_ratio: float | None = None
    outage_minutes_override: int | None = None

    def overlap_minutes(
        self, window: TimeWindow, *, as_of: datetime | None = None
    ) -> int:
        """Return outage minutes that fall inside the supplied rolling window."""

        window_end = _coerce_utc(as_of or self.ended_at or self.started_at)
        event_start = _coerce_utc(self.started_at)
        event_end = _coerce_utc(self.ended_at or window_end)
        window_start = window_end - timedelta(days=window.days)
        overlap_start = max(event_start, window_start)
        overlap_end = min(event_end, window_end)
        if overlap_end <= overlap_start:
            return 0
        if self.outage_minutes_override is not None:
            total_duration_minutes = max(
                int((event_end - event_start).total_seconds() // 60),
                1,
            )
            overlap_duration_minutes = int(
                (overlap_end - overlap_start).total_seconds() // 60
            )
            return int(
                self.outage_minutes_override
                * (overlap_duration_minutes / total_duration_minutes)
            )
        return int((overlap_end - overlap_start).total_seconds() // 60)

station_id instance-attribute

station_id: str

equipment_id instance-attribute

equipment_id: str

equipment_type instance-attribute

equipment_type: EquipmentType

status instance-attribute

status: OutageStatus

started_at instance-attribute

started_at: datetime

ended_at instance-attribute

ended_at: datetime | None

description class-attribute instance-attribute

description: str = ''

source class-attribute instance-attribute

source: str = ''

station_complex_id class-attribute instance-attribute

station_complex_id: str | None = None

total_outages class-attribute instance-attribute

total_outages: int | None = None

scheduled_outages class-attribute instance-attribute

scheduled_outages: int | None = None

unscheduled_outages class-attribute instance-attribute

unscheduled_outages: int | None = None

availability_ratio class-attribute instance-attribute

availability_ratio: float | None = None

outage_minutes_override class-attribute instance-attribute

outage_minutes_override: int | None = None

overlap_minutes

overlap_minutes(
    window: TimeWindow, *, as_of: datetime | None = None
) -> int

Return outage minutes that fall inside the supplied rolling window.

Source code in src/subway_access/models/_outage.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def overlap_minutes(
    self, window: TimeWindow, *, as_of: datetime | None = None
) -> int:
    """Return outage minutes that fall inside the supplied rolling window."""

    window_end = _coerce_utc(as_of or self.ended_at or self.started_at)
    event_start = _coerce_utc(self.started_at)
    event_end = _coerce_utc(self.ended_at or window_end)
    window_start = window_end - timedelta(days=window.days)
    overlap_start = max(event_start, window_start)
    overlap_end = min(event_end, window_end)
    if overlap_end <= overlap_start:
        return 0
    if self.outage_minutes_override is not None:
        total_duration_minutes = max(
            int((event_end - event_start).total_seconds() // 60),
            1,
        )
        overlap_duration_minutes = int(
            (overlap_end - overlap_start).total_seconds() // 60
        )
        return int(
            self.outage_minutes_override
            * (overlap_duration_minutes / total_duration_minutes)
        )
    return int((overlap_end - overlap_start).total_seconds() // 60)

ReliabilityDataset dataclass

Rolling reliability results for stations.

Source code in src/subway_access/models/_outage.py
197
198
199
200
201
202
203
204
@dataclass(frozen=True, slots=True)
class ReliabilityDataset:
    """Rolling reliability results for stations."""

    records: tuple[ReliabilityRecord, ...]

    def as_mapping(self) -> dict[str, ReliabilityRecord]:
        return {record.station_id: record for record in self.records}

records instance-attribute

records: tuple[ReliabilityRecord, ...]

as_mapping

as_mapping() -> dict[str, ReliabilityRecord]
Source code in src/subway_access/models/_outage.py
203
204
def as_mapping(self) -> dict[str, ReliabilityRecord]:
    return {record.station_id: record for record in self.records}

ReliabilityRecord dataclass

Rolling reliability summary for a station.

Source code in src/subway_access/models/_outage.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@dataclass(frozen=True, slots=True)
class ReliabilityRecord:
    """Rolling reliability summary for a station."""

    station_id: str
    station_name: str | None
    borough: str | None
    ada_status: AccessibilityLabel
    outage_count: int
    outage_minutes: int
    window_days: int
    reliability_score: float
    reliability_label: str
    total_outages: int = 0
    scheduled_outages: int = 0
    unscheduled_outages: int = 0
    mean_availability_ratio: float | None = None

station_id instance-attribute

station_id: str

station_name instance-attribute

station_name: str | None

borough instance-attribute

borough: str | None

ada_status instance-attribute

ada_status: AccessibilityLabel

outage_count instance-attribute

outage_count: int

outage_minutes instance-attribute

outage_minutes: int

window_days instance-attribute

window_days: int

reliability_score instance-attribute

reliability_score: float

reliability_label instance-attribute

reliability_label: str

total_outages class-attribute instance-attribute

total_outages: int = 0

scheduled_outages class-attribute instance-attribute

scheduled_outages: int = 0

unscheduled_outages class-attribute instance-attribute

unscheduled_outages: int = 0

mean_availability_ratio class-attribute instance-attribute

mean_availability_ratio: float | None = None

TimeWindow dataclass

Rolling time window used for reliability calculations.

Source code in src/subway_access/models/_outage.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@dataclass(frozen=True, slots=True)
class TimeWindow:
    """Rolling time window used for reliability calculations."""

    days: int

    def __post_init__(self) -> None:
        if self.days <= 0:
            message = "TimeWindow.days must be greater than zero."
            raise ValueError(message)

    @property
    def total_minutes(self) -> int:
        """Return the total minutes in the window."""

        return self.days * 24 * 60

days instance-attribute

days: int

total_minutes property

total_minutes: int

Return the total minutes in the window.

AccessibilitySummaryDataset dataclass

Grouped rollup summaries for accessibility results.

Source code in src/subway_access/models/_snapshot.py
74
75
76
77
78
@dataclass(frozen=True, slots=True)
class AccessibilitySummaryDataset:
    """Grouped rollup summaries for accessibility results."""

    records: tuple[AccessibilitySummaryRecord, ...]

records instance-attribute

records: tuple[AccessibilitySummaryRecord, ...]

AccessibilitySummaryRecord dataclass

Rollup summary for accessibility metrics at a group level.

Source code in src/subway_access/models/_snapshot.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@dataclass(frozen=True, slots=True)
class AccessibilitySummaryRecord:
    """Rollup summary for accessibility metrics at a group level."""

    group_by: str
    group_value: str
    tract_count: int
    covered_tract_count: int
    uncovered_tract_count: int
    total_population: int
    covered_population: int
    uncovered_population: int
    mean_need_score: float
    mean_nearest_travel_minutes: float | None
    coverage_rate: float

group_by instance-attribute

group_by: str

group_value instance-attribute

group_value: str

tract_count instance-attribute

tract_count: int

covered_tract_count instance-attribute

covered_tract_count: int

uncovered_tract_count instance-attribute

uncovered_tract_count: int

total_population instance-attribute

total_population: int

covered_population instance-attribute

covered_population: int

uncovered_population instance-attribute

uncovered_population: int

mean_need_score instance-attribute

mean_need_score: float

mean_nearest_travel_minutes instance-attribute

mean_nearest_travel_minutes: float | None

coverage_rate instance-attribute

coverage_rate: float

DataSourceMetadata dataclass

Metadata describing one cached or fetched public dataset.

Source code in src/subway_access/models/_snapshot.py
28
29
30
31
32
33
34
35
36
37
@dataclass(frozen=True, slots=True)
class DataSourceMetadata:
    """Metadata describing one cached or fetched public dataset."""

    name: str
    source_url: str
    cache_path: Path
    refreshed_at: datetime
    record_count: int
    notes: str = ""

name instance-attribute

name: str

source_url instance-attribute

source_url: str

cache_path instance-attribute

cache_path: Path

refreshed_at instance-attribute

refreshed_at: datetime

record_count instance-attribute

record_count: int

notes class-attribute instance-attribute

notes: str = ''

ExportTarget dataclass

Destination metadata for export commands.

Source code in src/subway_access/models/_snapshot.py
20
21
22
23
24
25
@dataclass(frozen=True, slots=True)
class ExportTarget:
    """Destination metadata for export commands."""

    format: str
    output_path: Path

format instance-attribute

format: str

output_path instance-attribute

output_path: Path

StudyAreaSnapshot dataclass

In-memory snapshot of one real-data accessibility study area.

Source code in src/subway_access/models/_snapshot.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(frozen=True, slots=True)
class StudyAreaSnapshot:
    """In-memory snapshot of one real-data accessibility study area."""

    query: AccessibilityQuery
    stations: StationDataset
    accessibility: AccessibilityDataset
    demographics: DemographicDataset
    outages: OutageDataset
    metadata: tuple[DataSourceMetadata, ...]
    entrances: EntranceDataset
    gtfs_pathways: GtfsPathwaysSnapshot | None = None
    pedestrian_network: PedestrianNetworkDataset | None = None
    generated_at: datetime | None = None
    cache_dir: Path | None = None

query instance-attribute

query: AccessibilityQuery

stations instance-attribute

stations: StationDataset

accessibility instance-attribute

accessibility: AccessibilityDataset

demographics instance-attribute

demographics: DemographicDataset

outages instance-attribute

outages: OutageDataset

metadata instance-attribute

metadata: tuple[DataSourceMetadata, ...]

entrances instance-attribute

entrances: EntranceDataset

gtfs_pathways class-attribute instance-attribute

gtfs_pathways: GtfsPathwaysSnapshot | None = None

pedestrian_network class-attribute instance-attribute

pedestrian_network: PedestrianNetworkDataset | None = None

generated_at class-attribute instance-attribute

generated_at: datetime | None = None

cache_dir class-attribute instance-attribute

cache_dir: Path | None = None

AccessibilityDataset dataclass

Loaded ADA status rows.

Source code in src/subway_access/models/_station.py
 98
 99
100
101
102
103
104
105
@dataclass(frozen=True, slots=True)
class AccessibilityDataset:
    """Loaded ADA status rows."""

    statuses: tuple[AccessibilityStatus, ...]

    def as_mapping(self) -> dict[str, AccessibilityStatus]:
        return {status.station_id: status for status in self.statuses}

statuses instance-attribute

statuses: tuple[AccessibilityStatus, ...]

as_mapping

as_mapping() -> dict[str, AccessibilityStatus]
Source code in src/subway_access/models/_station.py
104
105
def as_mapping(self) -> dict[str, AccessibilityStatus]:
    return {status.station_id: status for status in self.statuses}

AccessibilityStatus dataclass

ADA accessibility status keyed by station identifier.

Source code in src/subway_access/models/_station.py
46
47
48
49
50
51
52
53
@dataclass(frozen=True, slots=True)
class AccessibilityStatus:
    """ADA accessibility status keyed by station identifier."""

    station_id: str
    ada_status: AccessibilityLabel
    notes: str = ""
    source: str = ""

station_id instance-attribute

station_id: str

ada_status instance-attribute

ada_status: AccessibilityLabel

notes class-attribute instance-attribute

notes: str = ''

source class-attribute instance-attribute

source: str = ''

Station dataclass

Station record used by the current accessibility workflow.

Source code in src/subway_access/models/_station.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass(frozen=True, slots=True)
class Station:
    """Station record used by the current accessibility workflow."""

    station_id: str
    name: str
    borough: str
    latitude: float
    longitude: float
    complex_id: str | None = None
    gtfs_stop_id: str | None = None
    daytime_routes: tuple[str, ...] = ()
    division: str | None = None
    line: str | None = None
    structure: str | None = None
    north_direction_label: str | None = None
    south_direction_label: str | None = None
    accessibility_notes: str = ""
    source: str = ""
    ada_status: AccessibilityLabel = "unknown"

    @property
    def is_accessible(self) -> bool:
        """Return whether the station counts as accessible in the current model."""

        return self.ada_status == "accessible"

    @property
    def is_partially_accessible(self) -> bool:
        """Return whether the station has partial accessibility coverage."""

        return self.ada_status == "partially_accessible"

station_id instance-attribute

station_id: str

name instance-attribute

name: str

borough instance-attribute

borough: str

latitude instance-attribute

latitude: float

longitude instance-attribute

longitude: float

complex_id class-attribute instance-attribute

complex_id: str | None = None

gtfs_stop_id class-attribute instance-attribute

gtfs_stop_id: str | None = None

daytime_routes class-attribute instance-attribute

daytime_routes: tuple[str, ...] = ()

division class-attribute instance-attribute

division: str | None = None

line class-attribute instance-attribute

line: str | None = None

structure class-attribute instance-attribute

structure: str | None = None

north_direction_label class-attribute instance-attribute

north_direction_label: str | None = None

south_direction_label class-attribute instance-attribute

south_direction_label: str | None = None

accessibility_notes class-attribute instance-attribute

accessibility_notes: str = ''

source class-attribute instance-attribute

source: str = ''

ada_status class-attribute instance-attribute

ada_status: AccessibilityLabel = 'unknown'

is_accessible property

is_accessible: bool

Return whether the station counts as accessible in the current model.

is_partially_accessible property

is_partially_accessible: bool

Return whether the station has partial accessibility coverage.

StationDataset dataclass

Loaded station rows for an analysis run.

Source code in src/subway_access/models/_station.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass(frozen=True, slots=True)
class StationDataset:
    """Loaded station rows for an analysis run."""

    stations: tuple[Station, ...]

    def as_mapping(self) -> dict[str, Station]:
        return {station.station_id: station for station in self.stations}

    @property
    def accessible_stations(self) -> tuple[Station, ...]:
        return tuple(station for station in self.stations if station.is_accessible)

    def with_accessibility(self, accessibility: AccessibilityDataset) -> StationDataset:
        """Merge ADA status into the station dataset."""

        status_by_station = accessibility.as_mapping()
        station_ids = {station.station_id for station in self.stations}
        unknown_ids = sorted(set(status_by_station).difference(station_ids))
        if unknown_ids:
            joined_ids = ", ".join(unknown_ids)
            message = (
                f"Accessibility data references unknown station IDs: {joined_ids}."
            )
            raise ValueError(message)

        merged = tuple(
            replace(
                station,
                ada_status=status_by_station.get(
                    station.station_id,
                    AccessibilityStatus(
                        station_id=station.station_id,
                        ada_status="unknown",
                    ),
                ).ada_status,
            )
            for station in self.stations
        )
        return StationDataset(stations=merged)

stations instance-attribute

stations: tuple[Station, ...]

accessible_stations property

accessible_stations: tuple[Station, ...]

as_mapping

as_mapping() -> dict[str, Station]
Source code in src/subway_access/models/_station.py
62
63
def as_mapping(self) -> dict[str, Station]:
    return {station.station_id: station for station in self.stations}

with_accessibility

with_accessibility(
    accessibility: AccessibilityDataset,
) -> StationDataset

Merge ADA status into the station dataset.

Source code in src/subway_access/models/_station.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def with_accessibility(self, accessibility: AccessibilityDataset) -> StationDataset:
    """Merge ADA status into the station dataset."""

    status_by_station = accessibility.as_mapping()
    station_ids = {station.station_id for station in self.stations}
    unknown_ids = sorted(set(status_by_station).difference(station_ids))
    if unknown_ids:
        joined_ids = ", ".join(unknown_ids)
        message = (
            f"Accessibility data references unknown station IDs: {joined_ids}."
        )
        raise ValueError(message)

    merged = tuple(
        replace(
            station,
            ada_status=status_by_station.get(
                station.station_id,
                AccessibilityStatus(
                    station_id=station.station_id,
                    ada_status="unknown",
                ),
            ).ada_status,
        )
        for station in self.stations
    )
    return StationDataset(stations=merged)

AccessibilityScoreDataset dataclass

Joined tract accessibility scoring results.

Source code in src/subway_access/models/_tract.py
55
56
57
58
59
@dataclass(frozen=True, slots=True)
class AccessibilityScoreDataset:
    """Joined tract accessibility scoring results."""

    records: tuple[TractAccessibilityRecord, ...]

records instance-attribute

records: tuple[TractAccessibilityRecord, ...]

DemographicDataset dataclass

Loaded tract-level demographic rows.

Source code in src/subway_access/models/_tract.py
48
49
50
51
52
@dataclass(frozen=True, slots=True)
class DemographicDataset:
    """Loaded tract-level demographic rows."""

    tracts: tuple[TractDemographics, ...]

tracts instance-attribute

tracts: tuple[TractDemographics, ...]

TractAccessibilityRecord dataclass

Joined tract-level accessibility score for the current workflow.

Source code in src/subway_access/models/_tract.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@dataclass(frozen=True, slots=True)
class TractAccessibilityRecord:
    """Joined tract-level accessibility score for the current workflow."""

    tract_id: str
    tract_name: str
    borough: str
    centroid_latitude: float
    centroid_longitude: float
    disability_rate: float
    senior_rate: float
    poverty_rate: float
    total_population: int
    need_score: float
    has_accessible_station: bool
    accessible_station_count: int
    covering_station_ids: tuple[str, ...]
    nearest_accessible_station_id: str | None
    nearest_accessible_station_name: str | None
    nearest_accessible_distance_meters: float | None
    nearest_accessible_path_meters: float | None = None
    nearest_accessible_travel_minutes: float | None = None
    analysis_method: str = "euclidean"

tract_id instance-attribute

tract_id: str

tract_name instance-attribute

tract_name: str

borough instance-attribute

borough: str

centroid_latitude instance-attribute

centroid_latitude: float

centroid_longitude instance-attribute

centroid_longitude: float

disability_rate instance-attribute

disability_rate: float

senior_rate instance-attribute

senior_rate: float

poverty_rate instance-attribute

poverty_rate: float

total_population instance-attribute

total_population: int

need_score instance-attribute

need_score: float

has_accessible_station instance-attribute

has_accessible_station: bool

accessible_station_count instance-attribute

accessible_station_count: int

covering_station_ids instance-attribute

covering_station_ids: tuple[str, ...]

nearest_accessible_station_id instance-attribute

nearest_accessible_station_id: str | None

nearest_accessible_station_name instance-attribute

nearest_accessible_station_name: str | None

nearest_accessible_distance_meters instance-attribute

nearest_accessible_distance_meters: float | None

nearest_accessible_path_meters class-attribute instance-attribute

nearest_accessible_path_meters: float | None = None

nearest_accessible_travel_minutes class-attribute instance-attribute

nearest_accessible_travel_minutes: float | None = None

analysis_method class-attribute instance-attribute

analysis_method: str = 'euclidean'

TractDemographics dataclass

Demographic summary for a tract centroid.

Source code in src/subway_access/models/_tract.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@dataclass(frozen=True, slots=True)
class TractDemographics:
    """Demographic summary for a tract centroid."""

    tract_id: str
    tract_name: str
    borough: str
    centroid_latitude: float
    centroid_longitude: float
    disability_rate: float
    senior_rate: float
    poverty_rate: float
    total_population: int

tract_id instance-attribute

tract_id: str

tract_name instance-attribute

tract_name: str

borough instance-attribute

borough: str

centroid_latitude instance-attribute

centroid_latitude: float

centroid_longitude instance-attribute

centroid_longitude: float

disability_rate instance-attribute

disability_rate: float

senior_rate instance-attribute

senior_rate: float

poverty_rate instance-attribute

poverty_rate: float

total_population instance-attribute

total_population: int

IO

subway_access.io

Public loader entry points for subway-access.

ACS_5YEAR_YEAR module-attribute

ACS_5YEAR_YEAR = 2023

MTA_ELEVATOR_AVAILABILITY_API_URL module-attribute

MTA_ELEVATOR_AVAILABILITY_API_URL = (
    "https://data.ny.gov/resource/rc78-7x78.json"
)

MTA_EQUIPMENT_ASSET_API_URL module-attribute

MTA_EQUIPMENT_ASSET_API_URL = (
    "https://data.ny.gov/resource/94fv-bak7.json"
)

MTA_GTFS_STATIC_URL module-attribute

MTA_GTFS_STATIC_URL = (
    "https://rrgtfsfeeds.s3.amazonaws.com/gtfs_subway.zip"
)

MTA_SUBWAY_ENTRANCES_API_URL module-attribute

MTA_SUBWAY_ENTRANCES_API_URL = (
    "https://data.ny.gov/resource/i9wp-a4ja.json"
)

MTA_SUBWAY_STATIONS_API_URL module-attribute

MTA_SUBWAY_STATIONS_API_URL = (
    "https://data.ny.gov/resource/39hk-dx4f.json"
)

OSM_SOURCE_URL module-attribute

OSM_SOURCE_URL = 'https://www.openstreetmap.org'

fetch_nyc_acs_tract_estimates

fetch_nyc_acs_tract_estimates(
    *, tract_geoids: tuple[str, ...] | None = None
) -> dict[str, dict[str, object]]

Fetch ACS tract-level population, senior, disability, and poverty data.

Source code in src/subway_access/io/_acs.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def fetch_nyc_acs_tract_estimates(
    *,
    tract_geoids: tuple[str, ...] | None = None,
) -> dict[str, dict[str, object]]:
    """Fetch ACS tract-level population, senior, disability, and poverty data."""

    requested_geoids = set(tract_geoids or ())
    county_codes = (
        sorted({geoid[2:5] for geoid in requested_geoids})
        if requested_geoids
        else list(NYC_COUNTY_CODES)
    )
    estimates: dict[str, dict[str, object]] = {}

    for county_code in county_codes:
        count_header, count_rows = _read_census_rows(_counts_url(county_code))
        subject_header, subject_rows = _read_census_rows(_subject_url(county_code))
        subject_lookup = {
            f"{row[subject_header.index('state')]}{row[subject_header.index('county')]}{row[subject_header.index('tract')]}": row
            for row in subject_rows
        }

        for row in count_rows:
            row_map = {name: row[index] for index, name in enumerate(count_header)}
            geoid = f"{row_map['state']}{row_map['county']}{row_map['tract']}"
            if requested_geoids and geoid not in requested_geoids:
                continue
            subject_row = subject_lookup.get(geoid)
            if subject_row is None:
                continue
            subject_map = {
                name: subject_row[index] for index, name in enumerate(subject_header)
            }
            total_population = _as_int(row_map["B01003_001E"])
            senior_population = sum(
                _as_int(row_map[name]) for name in _SENIOR_VARIABLES
            )
            estimates[geoid] = {
                "tract_id": geoid,
                "tract_name": row_map["NAME"],
                "total_population": total_population,
                "senior_rate": 0.0
                if total_population == 0
                else senior_population / total_population,
                "poverty_rate": _as_percent(subject_map["S1701_C02_001E"]),
                "disability_rate": _as_percent(subject_map["S1810_C02_001E"]),
            }

    return estimates

cache_timestamp

cache_timestamp() -> str

Return an ISO-8601 UTC timestamp for cache metadata.

Source code in src/subway_access/io/_cache.py
21
22
23
24
def cache_timestamp() -> str:
    """Return an ISO-8601 UTC timestamp for cache metadata."""

    return datetime.now(tz=timezone.utc).isoformat()

ensure_directory

ensure_directory(path: Path) -> Path

Create a directory if needed and return it.

Source code in src/subway_access/io/_cache.py
14
15
16
17
18
def ensure_directory(path: Path) -> Path:
    """Create a directory if needed and return it."""

    path.mkdir(parents=True, exist_ok=True)
    return path

write_csv_rows

write_csv_rows(
    path: Path, rows: list[dict[str, Any]]
) -> Path

Write a tabular CSV snapshot.

Source code in src/subway_access/io/_cache.py
27
28
29
30
31
32
33
34
35
36
37
def write_csv_rows(path: Path, rows: list[dict[str, Any]]) -> Path:
    """Write a tabular CSV snapshot."""

    path.parent.mkdir(parents=True, exist_ok=True)
    fieldnames = list(rows[0]) if rows else []
    with path.open("w", newline="", encoding="utf-8") as handle:
        writer = csv.DictWriter(handle, fieldnames=fieldnames)
        writer.writeheader()
        for row in rows:
            writer.writerow(row)
    return path

write_json

write_json(path: Path, payload: object) -> Path

Write JSON with stable formatting.

Source code in src/subway_access/io/_cache.py
40
41
42
43
44
45
def write_json(path: Path, payload: object) -> Path:
    """Write JSON with stable formatting."""

    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(f"{json.dumps(payload, indent=2)}\n", encoding="utf-8")
    return path

load_accessibility_status

load_accessibility_status(
    source: str | Path,
) -> AccessibilityDataset

Load cached station accessibility status keyed by station identifier.

Source code in src/subway_access/io/_core.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def load_accessibility_status(source: str | Path) -> AccessibilityDataset:
    """Load cached station accessibility status keyed by station identifier."""

    rows, source_name = _load_csv_rows(
        source,
        required_columns=("station_id", "ada_status"),
    )
    _ensure_unique_ids(rows, id_field="station_id", source_name=source_name)

    statuses = tuple(
        AccessibilityStatus(
            station_id=row["station_id"].strip(),
            ada_status=_parse_accessibility_label(
                row["ada_status"],
                source_name=source_name,
                station_id=row["station_id"].strip(),
            ),
            notes=(row.get("notes") or row.get("ada_direction_notes") or "").strip(),
            source=source_name,
        )
        for row in rows
    )
    return AccessibilityDataset(statuses=statuses)

load_census_data

load_census_data(source: str | Path) -> DemographicDataset

Load cached tract-level demographic variables used in need scoring.

Source code in src/subway_access/io/_core.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def load_census_data(source: str | Path) -> DemographicDataset:
    """Load cached tract-level demographic variables used in need scoring."""

    text, source_name = _read_text(source)
    raw_payload = json.loads(text)
    features = raw_payload.get("features")
    if not isinstance(features, list):
        message = (
            f"{source_name} must contain a GeoJSON FeatureCollection with features."
        )
        raise TypeError(message)

    tracts: list[TractDemographics] = []
    seen_ids: set[str] = set()
    duplicates: set[str] = set()
    for feature in features:
        if not isinstance(feature, dict):
            message = f"{source_name} contains a non-object feature entry."
            raise TypeError(message)
        properties = feature.get("properties")
        if not isinstance(properties, dict):
            message = f"{source_name} contains a feature without object properties."
            raise TypeError(message)

        tract_id = str(properties.get("tract_id", "")).strip()
        if not tract_id:
            message = f"{source_name} contains a feature without a tract_id."
            raise ValueError(message)
        if tract_id in seen_ids:
            duplicates.add(tract_id)
        seen_ids.add(tract_id)

        geometry = feature.get("geometry")
        if not isinstance(geometry, dict) or geometry.get("type") != "Point":
            message = (
                f"{source_name} feature {tract_id} must use Point geometry "
                "for centroid-based joins."
            )
            raise TypeError(message)
        coordinates = geometry.get("coordinates")
        if (
            not isinstance(coordinates, list)
            or len(coordinates) != 2
            or not all(isinstance(value, int | float) for value in coordinates)
        ):
            message = (
                f"{source_name} feature {tract_id} must contain two numeric point "
                "coordinates."
            )
            raise TypeError(message)

        tracts.append(
            TractDemographics(
                tract_id=tract_id,
                tract_name=str(properties.get("tract_name", "")).strip() or tract_id,
                borough=str(properties.get("borough", "")).strip(),
                centroid_latitude=_parse_float(
                    properties.get("centroid_latitude", coordinates[1]),
                    field_name="centroid_latitude",
                    source_name=source_name,
                    row_id=tract_id,
                ),
                centroid_longitude=_parse_float(
                    properties.get("centroid_longitude", coordinates[0]),
                    field_name="centroid_longitude",
                    source_name=source_name,
                    row_id=tract_id,
                ),
                disability_rate=_parse_float(
                    properties["disability_rate"],
                    field_name="disability_rate",
                    source_name=source_name,
                    row_id=tract_id,
                ),
                senior_rate=_parse_float(
                    properties["senior_rate"],
                    field_name="senior_rate",
                    source_name=source_name,
                    row_id=tract_id,
                ),
                poverty_rate=_parse_float(
                    properties["poverty_rate"],
                    field_name="poverty_rate",
                    source_name=source_name,
                    row_id=tract_id,
                ),
                total_population=_parse_int(
                    properties["population"],
                    field_name="population",
                    source_name=source_name,
                    row_id=tract_id,
                ),
            )
        )

    if duplicates:
        joined = ", ".join(sorted(duplicates))
        message = f"{source_name} contains duplicate tract_id values: {joined}."
        raise ValueError(message)

    return DemographicDataset(tracts=tuple(tracts))

load_gtfs

load_gtfs(source: str | Path) -> StationDataset

Load a cached real-data station snapshot CSV.

Source code in src/subway_access/io/_core.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def load_gtfs(source: str | Path) -> StationDataset:
    """Load a cached real-data station snapshot CSV."""

    rows, source_name = _load_csv_rows(
        source,
        required_columns=("station_id", "name", "borough", "latitude", "longitude"),
    )
    _ensure_unique_ids(rows, id_field="station_id", source_name=source_name)

    stations = tuple(
        Station(
            station_id=row["station_id"].strip(),
            name=row["name"].strip(),
            borough=row["borough"].strip(),
            latitude=_parse_float(
                row["latitude"],
                field_name="latitude",
                source_name=source_name,
                row_id=row["station_id"],
            ),
            longitude=_parse_float(
                row["longitude"],
                field_name="longitude",
                source_name=source_name,
                row_id=row["station_id"],
            ),
            complex_id=(row.get("complex_id") or "").strip() or None,
            gtfs_stop_id=(row.get("gtfs_stop_id") or "").strip() or None,
            daytime_routes=_parse_routes(row.get("daytime_routes")),
            division=(row.get("division") or "").strip() or None,
            line=(row.get("line") or "").strip() or None,
            structure=(row.get("structure") or "").strip() or None,
            north_direction_label=(row.get("north_direction_label") or "").strip()
            or None,
            south_direction_label=(row.get("south_direction_label") or "").strip()
            or None,
            accessibility_notes=(row.get("accessibility_notes") or "").strip(),
            source=source_name,
        )
        for row in rows
    )
    return StationDataset(stations=stations)

load_outages

load_outages(source: str | Path) -> OutageDataset

Load cached elevator availability or outage history from CSV or JSON.

Source code in src/subway_access/io/_core.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def load_outages(source: str | Path) -> OutageDataset:
    """Load cached elevator availability or outage history from CSV or JSON."""

    if not _is_url(source):
        path = _resolve_source(source)
        suffix = path.suffix.lower()
    else:
        path = None
        suffix = ".json"

    if suffix == ".csv":
        rows, source_name = _load_csv_rows(
            source,
            required_columns=(
                "station_id",
                "equipment_id",
                "equipment_type",
                "status",
                "started_at",
                "ended_at",
            ),
        )
        records = tuple(
            _normalize_outage_record(row, source_name=source_name, record_index=index)
            for index, row in enumerate(rows, start=1)
        )
        return OutageDataset(records=records)

    text, source_name = _read_text(source)
    payload = json.loads(text)
    raw_records = payload
    if isinstance(payload, dict):
        raw_records = (
            payload.get("outages")
            or payload.get("results")
            or payload.get("data")
            or []
        )
    if not isinstance(raw_records, list):
        message = f"{source_name} must contain a list of outage records."
        raise TypeError(message)
    records = tuple(
        _normalize_outage_record(record, source_name=source_name, record_index=index)
        for index, record in enumerate(raw_records, start=1)
        if isinstance(record, dict)
    )
    return OutageDataset(records=records)

load_pedestrian_network

load_pedestrian_network(
    source: str | Path,
) -> PedestrianNetworkDataset

Load a cached pedestrian connection graph from CSV or GeoJSON.

Source code in src/subway_access/io/_core.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def load_pedestrian_network(source: str | Path) -> PedestrianNetworkDataset:
    """Load a cached pedestrian connection graph from CSV or GeoJSON."""

    suffix = ".json" if _is_url(source) else _resolve_source(source).suffix.lower()

    if suffix == ".csv":
        rows, source_name = _load_csv_rows(
            source,
            required_columns=(
                "from_station_id",
                "to_station_id",
                "walk_minutes",
                "distance_meters",
            ),
        )
        csv_connections = tuple(
            _normalize_network_record(row, source_name=source_name, record_index=index)
            for index, row in enumerate(rows, start=1)
        )
        return PedestrianNetworkDataset(connections=csv_connections, source=source_name)

    text, source_name = _read_text(source)
    payload = json.loads(text)
    features = payload.get("features") if isinstance(payload, dict) else None
    if not isinstance(features, list):
        message = f"{source_name} must contain a GeoJSON FeatureCollection."
        raise TypeError(message)

    connections: list[PedestrianConnection] = []
    for index, feature in enumerate(features, start=1):
        if not isinstance(feature, dict):
            continue
        properties = feature.get("properties")
        geometry = feature.get("geometry")
        if not isinstance(properties, dict) or not isinstance(geometry, dict):
            continue
        coordinates = geometry.get("coordinates")
        if geometry.get("type") != "LineString" or not isinstance(coordinates, list):
            message = f"{source_name} pedestrian connection {index} must use LineString geometry."
            raise TypeError(message)
        geometry_points = tuple(
            (float(point[0]), float(point[1]))
            for point in coordinates
            if isinstance(point, list) and len(point) >= 2
        )
        connections.append(
            _normalize_network_record(
                properties,
                source_name=source_name,
                record_index=index,
                geometry=geometry_points,
            )
        )
    return PedestrianNetworkDataset(connections=tuple(connections), source=source_name)

load_entrances

load_entrances(source: str | Path) -> EntranceDataset

Load a cached entrances.geojson FeatureCollection into an EntranceDataset.

Source code in src/subway_access/io/_entrances.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def load_entrances(source: str | Path) -> EntranceDataset:
    """Load a cached ``entrances.geojson`` FeatureCollection into an ``EntranceDataset``."""

    path = Path(source).expanduser().resolve()
    payload = json.loads(path.read_text(encoding="utf-8"))
    if payload.get("type") != "FeatureCollection":
        message = f"{path} must be a GeoJSON FeatureCollection."
        raise ValueError(message)

    entrances: list[Entrance] = []
    for feature in payload.get("features", []):
        if not isinstance(feature, dict) or feature.get("type") != "Feature":
            continue
        geometry = feature.get("geometry")
        properties = feature.get("properties")
        if not isinstance(geometry, dict) or geometry.get("type") != "Point":
            continue
        if not isinstance(properties, dict):
            continue
        coords = geometry.get("coordinates")
        if not isinstance(coords, list) or len(coords) < 2:
            continue
        lon, lat = float(coords[0]), float(coords[1])
        complex_id = properties.get("complex_id")
        gtfs_stop_id = properties.get("gtfs_stop_id")
        entrances.append(
            Entrance(
                entrance_id=str(properties.get("entrance_id") or "").strip(),
                station_id=str(properties.get("station_id") or "").strip(),
                latitude=lat,
                longitude=lon,
                stop_name=str(properties.get("stop_name") or "").strip(),
                constituent_station_name=str(
                    properties.get("constituent_station_name") or ""
                ).strip(),
                complex_id=str(complex_id).strip() if complex_id else None,
                gtfs_stop_id=str(gtfs_stop_id).strip() if gtfs_stop_id else None,
                borough_code=str(properties.get("borough") or "").strip(),
                entrance_type=str(properties.get("entrance_type") or "").strip(),
                entry_allowed=bool(properties.get("entry_allowed")),
                exit_allowed=bool(properties.get("exit_allowed")),
                division=_optional_str(properties.get("division")),
                line=_optional_str(properties.get("line")),
                daytime_routes=_parse_daytime_routes(properties.get("daytime_routes")),
                source=str(properties.get("source") or "").strip(),
            )
        )
    return EntranceDataset(entrances=tuple(entrances))

load_gtfs_pathways_snapshot

load_gtfs_pathways_snapshot(
    source: str | Path,
) -> GtfsPathwaysSnapshot

Load gtfs-pathways.json written by the pipeline.

Source code in src/subway_access/io/_gtfs_static.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def load_gtfs_pathways_snapshot(source: str | Path) -> GtfsPathwaysSnapshot:
    """Load ``gtfs-pathways.json`` written by the pipeline."""

    path = Path(source).expanduser().resolve()
    payload = json.loads(path.read_text(encoding="utf-8"))
    pathways_raw = payload.get("pathways", [])
    locations_raw = payload.get("locations", [])
    pathways_list: list[GtfsPathway] = []
    for row in pathways_raw:
        if not isinstance(row, dict):
            continue
        pathways_list.append(
            GtfsPathway(
                pathway_id=str(row.get("pathway_id") or ""),
                from_stop_id=str(row.get("from_stop_id") or ""),
                to_stop_id=str(row.get("to_stop_id") or ""),
                pathway_mode=str(row.get("pathway_mode") or ""),
                is_bidirectional=str(row.get("is_bidirectional") or ""),
                length=_optional_cell(row.get("length")),
                traversal_time=_optional_cell(row.get("traversal_time")),
                stair_count=_optional_cell(row.get("stair_count")),
                max_slope=_optional_cell(row.get("max_slope")),
                min_width=_optional_cell(row.get("min_width")),
                signposted_as=_optional_cell(row.get("signposted_as")),
            )
        )
    locations_list: list[GtfsLocation] = []
    for row in locations_raw:
        if not isinstance(row, dict):
            continue
        parent = row.get("parent_station")
        locations_list.append(
            GtfsLocation(
                location_id=str(row.get("location_id") or ""),
                location_type=str(row.get("location_type") or ""),
                parent_station=str(parent).strip() if parent else None,
                latitude=_optional_cell(row.get("latitude")),
                longitude=_optional_cell(row.get("longitude")),
            )
        )
    return GtfsPathwaysSnapshot(
        pathways=tuple(pathways_list),
        locations=tuple(locations_list),
    )

parse_gtfs_pathways_zip

parse_gtfs_pathways_zip(
    zip_path: Path,
) -> GtfsPathwaysSnapshot | None

Parse GTFS-Pathways files from a static archive if present.

Returns None when neither pathways.txt nor locations.txt exists in the zip (current MTA gtfs_subway.zip has neither).

Source code in src/subway_access/io/_gtfs_static.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def parse_gtfs_pathways_zip(zip_path: Path) -> GtfsPathwaysSnapshot | None:
    """Parse GTFS-Pathways files from a static archive if present.

    Returns ``None`` when neither ``pathways.txt`` nor ``locations.txt`` exists in the zip
    (current MTA ``gtfs_subway.zip`` has neither).
    """

    path = zip_path.expanduser().resolve()
    if not path.exists():
        return None

    with zipfile.ZipFile(path) as zf:
        names = set(zf.namelist())
        has_pathways = "pathways.txt" in names
        has_locations = "locations.txt" in names
        if not has_pathways and not has_locations:
            return None

        pathways: tuple[GtfsPathway, ...] = ()
        locations: tuple[GtfsLocation, ...] = ()

        if has_pathways:
            text = zf.read("pathways.txt").decode("utf-8")
            pathways = _parse_pathways_txt(text)
        if has_locations:
            text = zf.read("locations.txt").decode("utf-8")
            locations = _parse_locations_txt(text)

    return GtfsPathwaysSnapshot(pathways=pathways, locations=locations)

build_entrance_snapshot_rows

build_entrance_snapshot_rows(
    entrance_rows: list[dict[str, Any]],
) -> list[dict[str, Any]]

Normalize raw entrance API rows into cacheable property dicts (with latitude/longitude).

Join keys: station_id (MTA station id), complex_id, gtfs_stop_id (parent stop).

Source code in src/subway_access/io/_mta.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def build_entrance_snapshot_rows(
    entrance_rows: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Normalize raw entrance API rows into cacheable property dicts (with latitude/longitude).

    Join keys: ``station_id`` (MTA station id), ``complex_id``, ``gtfs_stop_id`` (parent stop).
    """

    normalized: list[dict[str, Any]] = []
    for row in entrance_rows:
        lat = row.get("entrance_latitude")
        lon = row.get("entrance_longitude")
        if lat is None or lon is None:
            continue
        try:
            flat = float(str(lat).strip())
            flon = float(str(lon).strip())
        except (TypeError, ValueError):
            continue
        geo = row.get("entrance_georeference")
        if isinstance(geo, dict) and geo.get("type") == "Point":
            coords = geo.get("coordinates")
            if isinstance(coords, list) and len(coords) >= 2:
                flon = float(coords[0])
                flat = float(coords[1])
        row_for_id = {
            **row,
            "entrance_latitude": str(flat),
            "entrance_longitude": str(flon),
        }
        normalized.append(
            {
                "entrance_id": _entrance_stable_id(row_for_id),
                "station_id": str(row.get("station_id") or "").strip(),
                "complex_id": str(row.get("complex_id") or "").strip() or None,
                "gtfs_stop_id": str(row.get("gtfs_stop_id") or "").strip() or None,
                "stop_name": str(row.get("stop_name") or "").strip(),
                "constituent_station_name": str(
                    row.get("constituent_station_name") or ""
                ).strip(),
                "borough": str(row.get("borough") or "").strip(),
                "division": str(row.get("division") or "").strip() or None,
                "line": str(row.get("line") or "").strip() or None,
                "daytime_routes": str(row.get("daytime_routes") or "").strip(),
                "entrance_type": str(row.get("entrance_type") or "").strip(),
                "entry_allowed": _parse_yes_no(row.get("entry_allowed")),
                "exit_allowed": _parse_yes_no(row.get("exit_allowed")),
                "latitude": flat,
                "longitude": flon,
                "source": "mta_subway_entrances",
            }
        )
    return normalized

build_outage_snapshot_rows

build_outage_snapshot_rows(
    availability_rows: list[dict[str, Any]],
) -> list[dict[str, Any]]

Normalize public availability history rows into outage-style snapshot rows.

Source code in src/subway_access/io/_mta.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def build_outage_snapshot_rows(
    availability_rows: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Normalize public availability history rows into outage-style snapshot rows."""

    normalized_rows: list[dict[str, Any]] = []
    for row in availability_rows:
        station_id = str(
            row.get("station_mrn") or row.get("station_complex_mrn") or ""
        ).strip()
        equipment_id = str(row.get("equipment_code") or "").strip()
        month_text = str(row.get("month") or "").strip()
        if not station_id or not equipment_id or not month_text:
            continue

        month_start = datetime.fromisoformat(month_text.replace("Z", "+00:00"))
        month_start = (
            month_start
            if month_start.tzinfo is not None
            else month_start.replace(tzinfo=timezone.utc)
        )
        last_day = calendar.monthrange(month_start.year, month_start.month)[1]
        month_end = month_start.replace(day=last_day, hour=23, minute=59, second=59)

        total_hours = float(str(row.get("_24_hour_total_hours") or 0))
        available_hours = float(str(row.get("_24_hour_hours_available") or 0))
        availability_ratio = (
            float(str(row.get("_24_hour_availability")))
            if row.get("_24_hour_availability") is not None
            else 0.0
        )
        outage_minutes_override = max(round((total_hours - available_hours) * 60), 0)
        normalized_rows.append(
            {
                "station_id": station_id,
                "station_complex_id": str(row.get("station_complex_mrn") or "").strip(),
                "equipment_id": equipment_id,
                "equipment_type": str(row.get("equipment_type") or "").strip().lower(),
                "status": "resolved",
                "started_at": month_start.isoformat(),
                "ended_at": month_end.isoformat(),
                "description": (
                    f"Monthly availability snapshot for {row.get('station_name', station_id)}"
                ),
                "availability_ratio": availability_ratio,
                "outage_minutes_override": outage_minutes_override,
                "total_outages": row.get("total_outages"),
                "scheduled_outages": row.get("scheduled_outages"),
                "unscheduled_outages": row.get("unscheduled_outages"),
            }
        )
    return normalized_rows

build_station_snapshot_rows

build_station_snapshot_rows(
    station_catalog_rows: list[dict[str, Any]],
) -> tuple[list[dict[str, str]], list[dict[str, str]]]

Normalize raw station catalog rows into cacheable CSV snapshots.

Source code in src/subway_access/io/_mta.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def build_station_snapshot_rows(
    station_catalog_rows: list[dict[str, Any]],
) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
    """Normalize raw station catalog rows into cacheable CSV snapshots."""

    station_rows: list[dict[str, str]] = []
    accessibility_rows: list[dict[str, str]] = []
    seen_station_ids: set[str] = set()

    for row in station_catalog_rows:
        station_id = str(row.get("station_id") or "").strip()
        if not station_id or station_id in seen_station_ids:
            continue
        seen_station_ids.add(station_id)
        station_rows.append(
            {
                "station_id": station_id,
                "name": str(row.get("stop_name") or "").strip(),
                "borough": _full_borough_name(str(row.get("borough") or "")),
                "latitude": str(row.get("gtfs_latitude") or ""),
                "longitude": str(row.get("gtfs_longitude") or ""),
                "complex_id": str(row.get("complex_id") or "").strip(),
                "gtfs_stop_id": str(row.get("gtfs_stop_id") or "").strip(),
                "daytime_routes": str(row.get("daytime_routes") or "").strip(),
                "division": str(row.get("division") or "").strip(),
                "line": str(row.get("line") or "").strip(),
                "structure": str(row.get("structure") or "").strip(),
                "north_direction_label": str(
                    row.get("north_direction_label") or ""
                ).strip(),
                "south_direction_label": str(
                    row.get("south_direction_label") or ""
                ).strip(),
                "accessibility_notes": str(
                    row.get("ada_direction_notes") or ""
                ).strip(),
            }
        )
        accessibility_rows.append(
            {
                "station_id": station_id,
                "ada_status": _map_ada_status(str(row.get("ada") or "")),
                "notes": str(row.get("ada_direction_notes") or "").strip(),
            }
        )

    return station_rows, accessibility_rows

fetch_mta_availability_history

fetch_mta_availability_history(
    *,
    station_complex_ids: tuple[str, ...] | None = None,
    start_month: date | None = None,
    limit: int = 50000,
) -> list[dict[str, Any]]

Fetch public monthly elevator and escalator availability history.

Source code in src/subway_access/io/_mta.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def fetch_mta_availability_history(
    *,
    station_complex_ids: tuple[str, ...] | None = None,
    start_month: date | None = None,
    limit: int = 50000,
) -> list[dict[str, Any]]:
    """Fetch public monthly elevator and escalator availability history."""

    where_clauses: list[str] = []
    if station_complex_ids:
        joined = ", ".join(f"'{value}'" for value in sorted(set(station_complex_ids)))
        where_clauses.append(f"station_complex_mrn in ({joined})")
    if start_month is not None:
        start_text = start_month.isoformat()
        where_clauses.append(f"month >= '{start_text}T00:00:00'")
    where = " and ".join(where_clauses) if where_clauses else None
    url = _build_socrata_url(
        MTA_ELEVATOR_AVAILABILITY_API_URL,
        where=where,
        order="month desc",
        limit=limit,
    )
    return _read_json(url)

fetch_mta_equipment_assets

fetch_mta_equipment_assets(
    *,
    station_complex_ids: tuple[str, ...] | None = None,
    limit: int = 5000,
) -> list[dict[str, Any]]

Fetch elevator and escalator asset inventory rows.

Source code in src/subway_access/io/_mta.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def fetch_mta_equipment_assets(
    *,
    station_complex_ids: tuple[str, ...] | None = None,
    limit: int = 5000,
) -> list[dict[str, Any]]:
    """Fetch elevator and escalator asset inventory rows."""

    where = None
    if station_complex_ids:
        joined = ", ".join(f"'{value}'" for value in sorted(set(station_complex_ids)))
        where = f"station_complex_mrn in ({joined})"
    url = _build_socrata_url(
        MTA_EQUIPMENT_ASSET_API_URL,
        where=where,
        order="equipment_code",
        limit=limit,
    )
    return _read_json(url)

fetch_mta_gtfs_archive

fetch_mta_gtfs_archive(
    target_path: Path,
    *,
    source_url: str = MTA_GTFS_STATIC_URL,
    refresh: bool = False,
) -> Path

Download the official subway GTFS archive into the cache.

Source code in src/subway_access/io/_mta.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def fetch_mta_gtfs_archive(
    target_path: Path,
    *,
    source_url: str = MTA_GTFS_STATIC_URL,
    refresh: bool = False,
) -> Path:
    """Download the official subway GTFS archive into the cache."""

    if target_path.exists() and not refresh:
        return target_path
    target_path.parent.mkdir(parents=True, exist_ok=True)
    with urlopen(source_url, timeout=60.0) as response:
        target_path.write_bytes(response.read())
    return target_path

fetch_mta_station_catalog

fetch_mta_station_catalog(
    *, limit: int = 2000
) -> list[dict[str, Any]]

Fetch the public MTA subway station catalog from Open NY.

Source code in src/subway_access/io/_mta.py
151
152
153
154
155
156
157
158
159
def fetch_mta_station_catalog(*, limit: int = 2000) -> list[dict[str, Any]]:
    """Fetch the public MTA subway station catalog from Open NY."""

    url = _build_socrata_url(
        MTA_SUBWAY_STATIONS_API_URL,
        limit=limit,
        order="station_id",
    )
    return _read_json(url)

fetch_mta_subway_entrances

fetch_mta_subway_entrances(
    *, limit: int = 5000
) -> list[dict[str, Any]]

Fetch MTA subway entrance and exit points (NYC Transit) from Open NY.

Source code in src/subway_access/io/_mta.py
162
163
164
165
166
167
168
169
170
def fetch_mta_subway_entrances(*, limit: int = 5000) -> list[dict[str, Any]]:
    """Fetch MTA subway entrance and exit points (NYC Transit) from Open NY."""

    url = _build_socrata_url(
        MTA_SUBWAY_ENTRANCES_API_URL,
        limit=limit,
        order="station_id",
    )
    return _read_json(url)

fetch_walk_graph

fetch_walk_graph(
    query: AccessibilityQuery,
    *,
    cache_dir: str | Path,
    refresh: bool = False,
    network_type: str = "walk",
    buffer_meters: int = 0,
) -> NetworkGraphSnapshot

Fetch and cache an OSM walking graph for the selected study area.

Source code in src/subway_access/io/_osm.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def fetch_walk_graph(
    query: AccessibilityQuery,
    *,
    cache_dir: str | Path,
    refresh: bool = False,
    network_type: str = "walk",
    buffer_meters: int = 0,
) -> NetworkGraphSnapshot:
    """Fetch and cache an OSM walking graph for the selected study area."""

    ox, _ = _require_osm_stack()
    cache_root = ensure_directory(Path(cache_dir).expanduser().resolve())
    graph_path, metadata_path = _graph_paths(cache_root)
    if graph_path.exists() and metadata_path.exists() and not refresh:
        return load_cached_walk_graph(cache_root)[1]

    polygon = _selected_polygon(query, buffer_meters=buffer_meters)
    graph = ox.graph_from_polygon(polygon, network_type=network_type, simplify=True)
    ox.save_graphml(graph, graph_path)
    snapshot = NetworkGraphSnapshot(
        query=query,
        graph_path=graph_path,
        metadata_path=metadata_path,
        refreshed_at=datetime.fromisoformat(cache_timestamp()),
        network_type=network_type,
        node_count=graph.number_of_nodes(),
        edge_count=graph.number_of_edges(),
        source_url=OSM_SOURCE_URL,
        buffer_meters=buffer_meters,
    )
    write_json(metadata_path, _snapshot_payload(snapshot))
    return snapshot

load_cached_walk_graph

load_cached_walk_graph(
    cache_dir: str | Path,
) -> tuple[Any, NetworkGraphSnapshot]

Load a previously cached OSM walking graph and its metadata.

Source code in src/subway_access/io/_osm.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def load_cached_walk_graph(
    cache_dir: str | Path,
) -> tuple[Any, NetworkGraphSnapshot]:
    """Load a previously cached OSM walking graph and its metadata."""

    ox, _ = _require_osm_stack()
    cache_root = Path(cache_dir).expanduser().resolve()
    graph_path, metadata_path = _graph_paths(cache_root)
    if not graph_path.exists() or not metadata_path.exists():
        message = (
            f"Missing cached walk graph in {cache_root}. Run fetch_walk_graph() first."
        )
        raise FileNotFoundError(message)
    payload = json.loads(metadata_path.read_text(encoding="utf-8"))
    graph = ox.load_graphml(graph_path)
    snapshot = NetworkGraphSnapshot(
        query=AccessibilityQuery(
            geography=payload["query"]["geography"],
            value=payload["query"]["value"],
        ),
        graph_path=graph_path,
        metadata_path=metadata_path,
        refreshed_at=datetime.fromisoformat(payload["refreshed_at"]),
        network_type=str(payload["network_type"]),
        node_count=int(payload["node_count"]),
        edge_count=int(payload["edge_count"]),
        source_url=str(payload["source_url"]),
        buffer_meters=int(payload["buffer_meters"]),
    )
    return graph, snapshot

Analysis

subway_access.analysis

Public analysis helpers for subway-access.

analyze_gaps

analyze_gaps(
    scored_data: AccessibilityScoreDataset,
) -> GapAnalysis

Identify tracts with high need and weak accessible station coverage.

Classifies each tract as "covered" or "gap" based on whether it has at least one accessible station in its catchment. Gap score is 0.0 for covered tracts, or the need score for uncovered tracts.

Parameters:

Name Type Description Default
scored_data AccessibilityScoreDataset

Tract accessibility scores from score_accessibility.

required

Returns:

Type Description
GapAnalysis

A GapAnalysis with records sorted by gap score descending.

Example

gaps = analyze_gaps(scores) gaps.records[0].gap_label 'gap'

Source code in src/subway_access/analysis/_core.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def analyze_gaps(scored_data: AccessibilityScoreDataset) -> GapAnalysis:
    """Identify tracts with high need and weak accessible station coverage.

    Classifies each tract as ``"covered"`` or ``"gap"`` based on whether
    it has at least one accessible station in its catchment. Gap score
    is 0.0 for covered tracts, or the need score for uncovered tracts.

    Args:
        scored_data: Tract accessibility scores from ``score_accessibility``.

    Returns:
        A ``GapAnalysis`` with records sorted by gap score descending.

    Example:
        >>> gaps = analyze_gaps(scores)
        >>> gaps.records[0].gap_label
        'gap'
    """

    records = tuple(
        sorted(
            (
                GapRecord(
                    tract_id=record.tract_id,
                    tract_name=record.tract_name,
                    borough=record.borough,
                    disability_rate=record.disability_rate,
                    senior_rate=record.senior_rate,
                    poverty_rate=record.poverty_rate,
                    total_population=record.total_population,
                    need_score=record.need_score,
                    has_accessible_station=record.has_accessible_station,
                    accessible_station_count=record.accessible_station_count,
                    nearest_accessible_station_id=record.nearest_accessible_station_id,
                    nearest_accessible_station_name=record.nearest_accessible_station_name,
                    nearest_accessible_distance_meters=record.nearest_accessible_distance_meters,
                    gap_score=0.0
                    if record.has_accessible_station
                    else record.need_score,
                    gap_label="covered" if record.has_accessible_station else "gap",
                )
                for record in scored_data.records
            ),
            key=lambda record: (-record.gap_score, record.tract_id),
        )
    )
    return GapAnalysis(records=records)

build_station_metrics

build_station_metrics(
    station_data: StationDataset,
    catchments: CatchmentDataset,
    scored_data: AccessibilityScoreDataset,
    *,
    reliability: ReliabilityDataset | None = None,
    pedestrian_network: PedestrianNetworkDataset
    | None = None,
    analysis_method: str = "euclidean",
) -> StationMetricDataset

Aggregate station-level metrics for reporting and export.

Combines coverage counts, population served, need scores, and optional reliability data into one record per station.

Parameters:

Name Type Description Default
station_data StationDataset

Loaded station rows.

required
catchments CatchmentDataset

Generated catchment geometries.

required
scored_data AccessibilityScoreDataset

Tract accessibility scores.

required
reliability ReliabilityDataset | None

Optional reliability dataset for score/label inclusion.

None
pedestrian_network PedestrianNetworkDataset | None

Optional pedestrian network for connection counts.

None
analysis_method str

Label for the analysis method used.

'euclidean'

Returns:

Type Description
StationMetricDataset

A StationMetricDataset with one record per station.

Example

metrics = build_station_metrics( ... stations, catchments, scores, reliability=reliability ... )

Source code in src/subway_access/analysis/_core.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def build_station_metrics(
    station_data: StationDataset,
    catchments: CatchmentDataset,
    scored_data: AccessibilityScoreDataset,
    *,
    reliability: ReliabilityDataset | None = None,
    pedestrian_network: PedestrianNetworkDataset | None = None,
    analysis_method: str = "euclidean",
) -> StationMetricDataset:
    """Aggregate station-level metrics for reporting and export.

    Combines coverage counts, population served, need scores, and
    optional reliability data into one record per station.

    Args:
        station_data: Loaded station rows.
        catchments: Generated catchment geometries.
        scored_data: Tract accessibility scores.
        reliability: Optional reliability dataset for score/label inclusion.
        pedestrian_network: Optional pedestrian network for connection counts.
        analysis_method: Label for the analysis method used.

    Returns:
        A ``StationMetricDataset`` with one record per station.

    Example:
        >>> metrics = build_station_metrics(
        ...     stations, catchments, scores, reliability=reliability
        ... )
    """

    reliability_by_station = {} if reliability is None else reliability.as_mapping()
    network_counts = (
        {}
        if pedestrian_network is None
        else pedestrian_network.connection_count_by_station()
    )
    catchment_by_station = {
        feature.station_id: feature for feature in catchments.features
    }

    records: list[StationMetricRecord] = []
    for station in station_data.stations:
        covered = [
            record
            for record in scored_data.records
            if station.station_id in record.covering_station_ids
        ]
        nearby_gap_records = [
            record
            for record in scored_data.records
            if (
                record.nearest_accessible_station_id == station.station_id
                and not record.has_accessible_station
            )
        ]
        mean_need_score = (
            fmean([record.need_score for record in covered]) if covered else 0.0
        )
        reliability_record = reliability_by_station.get(station.station_id)
        catchment = catchment_by_station.get(station.station_id)
        if catchment is None:
            message = f"Missing catchment for station {station.station_id}."
            raise ValueError(message)

        records.append(
            StationMetricRecord(
                station_id=station.station_id,
                station_name=station.name,
                borough=station.borough,
                latitude=station.latitude,
                longitude=station.longitude,
                ada_status=station.ada_status,
                catchment_minutes=catchment.minutes,
                catchment_radius_meters=catchment.radius_meters,
                covered_tract_count=len(covered),
                covered_population=sum(record.total_population for record in covered),
                nearby_gap_tract_count=len(nearby_gap_records),
                nearby_gap_population=sum(
                    record.total_population for record in nearby_gap_records
                ),
                mean_need_score=mean_need_score,
                reliability_score=None
                if reliability_record is None
                else reliability_record.reliability_score,
                reliability_label=None
                if reliability_record is None
                else reliability_record.reliability_label,
                outage_minutes=None
                if reliability_record is None
                else reliability_record.outage_minutes,
                network_connection_count=network_counts.get(station.station_id, 0),
                daytime_routes=station.daytime_routes,
                structure=station.structure,
                analysis_method=analysis_method,
            )
        )

    return StationMetricDataset(records=tuple(records))

compute_reliability

compute_reliability(
    accessibility_data: StationDataset
    | AccessibilityDataset,
    outage_data: OutageDataset,
    window: TimeWindow,
) -> ReliabilityDataset

Compute a rolling station reliability score from outage history.

For each station, calculates the fraction of the time window that was outage-free and assigns a reliability label.

Parameters:

Name Type Description Default
accessibility_data StationDataset | AccessibilityDataset

Station or accessibility dataset with ADA status.

required
outage_data OutageDataset

Loaded outage events.

required
window TimeWindow

Rolling time window (e.g. 30 days, 365 days).

required

Returns:

Type Description
ReliabilityDataset

A ReliabilityDataset with one record per station.

Example

reliability = compute_reliability( ... stations, outages, models.TimeWindow(days=30) ... )

Source code in src/subway_access/analysis/_core.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def compute_reliability(
    accessibility_data: StationDataset | AccessibilityDataset,
    outage_data: OutageDataset,
    window: TimeWindow,
) -> ReliabilityDataset:
    """Compute a rolling station reliability score from outage history.

    For each station, calculates the fraction of the time window that
    was outage-free and assigns a reliability label.

    Args:
        accessibility_data: Station or accessibility dataset with ADA status.
        outage_data: Loaded outage events.
        window: Rolling time window (e.g. 30 days, 365 days).

    Returns:
        A ``ReliabilityDataset`` with one record per station.

    Example:
        >>> reliability = compute_reliability(
        ...     stations, outages, models.TimeWindow(days=30)
        ... )
    """

    status_by_station, names_by_station, boroughs_by_station = _station_metadata(
        accessibility_data
    )
    as_of = outage_data.recommended_as_of()
    outage_minutes = outage_data.outage_minutes_by_station(window, as_of=as_of)
    outage_counts = outage_data.outage_count_by_station(window, as_of=as_of)
    total_outages = outage_data.outage_total_by_station(window, as_of=as_of)
    scheduled_outages = outage_data.scheduled_outage_total_by_station(
        window,
        as_of=as_of,
    )
    unscheduled_outages = outage_data.unscheduled_outage_total_by_station(
        window,
        as_of=as_of,
    )
    availability_ratios = outage_data.mean_availability_ratio_by_station(
        window,
        as_of=as_of,
    )
    station_ids = sorted(
        set(status_by_station)
        | set(outage_minutes)
        | set(outage_counts)
        | set(total_outages)
        | set(scheduled_outages)
        | set(unscheduled_outages)
    )
    total_minutes = window.total_minutes

    records = []
    for station_id in station_ids:
        ada_status = status_by_station.get(station_id, "unknown")
        total_outage_minutes = min(outage_minutes.get(station_id, 0), total_minutes)
        outage_count = outage_counts.get(station_id, 0)
        reliability_score = (
            0.0
            if ada_status != "accessible"
            else max(total_minutes - total_outage_minutes, 0) / total_minutes
        )
        records.append(
            ReliabilityRecord(
                station_id=station_id,
                station_name=names_by_station.get(station_id),
                borough=boroughs_by_station.get(station_id),
                ada_status=ada_status,
                outage_count=outage_count,
                outage_minutes=total_outage_minutes,
                window_days=window.days,
                reliability_score=reliability_score,
                reliability_label=_reliability_label(
                    score=reliability_score,
                    ada_status=ada_status,
                ),
                total_outages=total_outages.get(station_id, 0),
                scheduled_outages=scheduled_outages.get(station_id, 0),
                unscheduled_outages=unscheduled_outages.get(station_id, 0),
                mean_availability_ratio=availability_ratios.get(station_id),
            )
        )

    return ReliabilityDataset(records=tuple(records))

generate_catchments

generate_catchments(
    station_data: StationDataset, request: CatchmentRequest
) -> CatchmentDataset

Generate first-pass Euclidean catchments for a walk threshold.

Parameters:

Name Type Description Default
station_data StationDataset

Loaded station rows for the study area.

required
request CatchmentRequest

Catchment parameters (walk minutes, mode).

required

Returns:

Type Description
CatchmentDataset

A CatchmentDataset with one circular polygon per station.

Example

catchments = generate_catchments( ... stations, models.CatchmentRequest(minutes=10) ... )

Source code in src/subway_access/analysis/_core.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def generate_catchments(
    station_data: StationDataset,
    request: CatchmentRequest,
) -> CatchmentDataset:
    """Generate first-pass Euclidean catchments for a walk threshold.

    Args:
        station_data: Loaded station rows for the study area.
        request: Catchment parameters (walk minutes, mode).

    Returns:
        A ``CatchmentDataset`` with one circular polygon per station.

    Example:
        >>> catchments = generate_catchments(
        ...     stations, models.CatchmentRequest(minutes=10)
        ... )
    """

    radius_meters = walk_radius_meters(request.minutes)
    features = tuple(
        CatchmentFeature(
            station_id=station.station_id,
            station_name=station.name,
            borough=station.borough,
            ada_status=station.ada_status,
            center_latitude=station.latitude,
            center_longitude=station.longitude,
            radius_meters=radius_meters,
            minutes=request.minutes,
            method="euclidean-buffer-v0.2",
            polygon=build_circle_polygon(
                latitude=station.latitude,
                longitude=station.longitude,
                radius_meters=radius_meters,
            ),
        )
        for station in station_data.stations
    )
    return CatchmentDataset(features=features)

score_accessibility

score_accessibility(
    station_data: StationDataset,
    catchments: CatchmentDataset,
    demographics: DemographicDataset,
) -> AccessibilityScoreDataset

Score tract accessibility using station, catchment, and demographic inputs.

For each tract, tests whether its centroid falls within any accessible station's catchment radius and computes a composite need score.

Parameters:

Name Type Description Default
station_data StationDataset

Loaded station rows with ADA status.

required
catchments CatchmentDataset

Generated catchment geometries.

required
demographics DemographicDataset

Tract-level demographic data.

required

Returns:

Type Description
AccessibilityScoreDataset

An AccessibilityScoreDataset with one record per tract.

Example

scores = score_accessibility(stations, catchments, demographics) scores.records[0].has_accessible_station True

Source code in src/subway_access/analysis/_core.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def score_accessibility(
    station_data: StationDataset,
    catchments: CatchmentDataset,
    demographics: DemographicDataset,
) -> AccessibilityScoreDataset:
    """Score tract accessibility using station, catchment, and demographic inputs.

    For each tract, tests whether its centroid falls within any accessible
    station's catchment radius and computes a composite need score.

    Args:
        station_data: Loaded station rows with ADA status.
        catchments: Generated catchment geometries.
        demographics: Tract-level demographic data.

    Returns:
        An ``AccessibilityScoreDataset`` with one record per tract.

    Example:
        >>> scores = score_accessibility(stations, catchments, demographics)
        >>> scores.records[0].has_accessible_station
        True
    """

    from ..factors._compat import score_tracts_via_factors

    records = score_tracts_via_factors(station_data, catchments, demographics)
    return AccessibilityScoreDataset(records=records)

entrances_per_complex_id

entrances_per_complex_id(
    dataset: EntranceDataset,
) -> dict[str, int]

Return entrance counts keyed by station complex id.

Source code in src/subway_access/analysis/_entrances.py
17
18
19
20
def entrances_per_complex_id(dataset: EntranceDataset) -> dict[str, int]:
    """Return entrance counts keyed by station complex id."""

    return dataset.count_by_complex_id()

entrances_per_gtfs_stop_id

entrances_per_gtfs_stop_id(
    dataset: EntranceDataset,
) -> dict[str, int]

Return entrance counts keyed by GTFS parent stop id.

Source code in src/subway_access/analysis/_entrances.py
11
12
13
14
def entrances_per_gtfs_stop_id(dataset: EntranceDataset) -> dict[str, int]:
    """Return entrance counts keyed by GTFS parent stop id."""

    return dataset.count_by_gtfs_stop_id()

pathways_and_locations_counts

pathways_and_locations_counts(
    snapshot: GtfsPathwaysSnapshot | None,
) -> tuple[int, int]

Return (pathway row count, location row count) for optional GTFS-Pathways data.

Source code in src/subway_access/analysis/_entrances.py
23
24
25
26
27
28
29
30
def pathways_and_locations_counts(
    snapshot: GtfsPathwaysSnapshot | None,
) -> tuple[int, int]:
    """Return (pathway row count, location row count) for optional GTFS-Pathways data."""

    if snapshot is None:
        return (0, 0)
    return (len(snapshot.pathways), len(snapshot.locations))

compare_accessibility_models

compare_accessibility_models(
    euclidean_scores: AccessibilityScoreDataset,
    network_scores: AccessibilityScoreDataset,
) -> AccessibilityComparisonDataset

Compare tract accessibility results between Euclidean and network models.

Source code in src/subway_access/analysis/_network.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def compare_accessibility_models(
    euclidean_scores: AccessibilityScoreDataset,
    network_scores: AccessibilityScoreDataset,
) -> AccessibilityComparisonDataset:
    """Compare tract accessibility results between Euclidean and network models."""

    euclidean_by_tract = {
        record.tract_id: record for record in euclidean_scores.records
    }
    network_by_tract = {record.tract_id: record for record in network_scores.records}
    tract_ids = sorted(set(euclidean_by_tract) | set(network_by_tract))
    records = []
    for tract_id in tract_ids:
        euclidean = euclidean_by_tract[tract_id]
        network = network_by_tract[tract_id]
        records.append(
            AccessibilityComparisonRecord(
                tract_id=tract_id,
                tract_name=euclidean.tract_name,
                borough=euclidean.borough,
                need_score=euclidean.need_score,
                euclidean_has_access=euclidean.has_accessible_station,
                network_has_access=network.has_accessible_station,
                euclidean_station_count=euclidean.accessible_station_count,
                network_station_count=network.accessible_station_count,
                euclidean_station_id=euclidean.nearest_accessible_station_id,
                network_station_id=network.nearest_accessible_station_id,
                euclidean_travel_minutes=euclidean.nearest_accessible_travel_minutes,
                network_travel_minutes=network.nearest_accessible_travel_minutes,
                euclidean_path_meters=euclidean.nearest_accessible_path_meters,
                network_path_meters=network.nearest_accessible_path_meters,
                coverage_change_label=_coverage_change_label(
                    euclidean_has_access=euclidean.has_accessible_station,
                    network_has_access=network.has_accessible_station,
                ),
            )
        )
    return AccessibilityComparisonDataset(records=tuple(records))

generate_network_isochrones

generate_network_isochrones(
    station_data: StationDataset,
    graph: Any,
    request: CatchmentRequest,
) -> CatchmentDataset

Generate walking-network catchments from a cached OSM graph.

Source code in src/subway_access/analysis/_network.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def generate_network_isochrones(
    station_data: StationDataset,
    graph: Any,
    request: CatchmentRequest,
) -> CatchmentDataset:
    """Generate walking-network catchments from a cached OSM graph."""

    nx, _ = _require_network_stack()
    radius_meters = request.minutes * _METERS_PER_MINUTE
    station_nodes = _station_nodes(graph, station_data)
    features = []

    for station in station_data.stations:
        station_node = station_nodes[station.station_id]
        reachable = nx.ego_graph(
            graph,
            station_node,
            radius=radius_meters,
            distance="length",
        )
        coordinates = _node_coordinates(graph, list(reachable.nodes))
        if len(coordinates) >= 3:
            hull = MultiPoint(coordinates).convex_hull
            if hull.geom_type == "Polygon":
                polygon = tuple(
                    (float(lon), float(lat)) for lon, lat in hull.exterior.coords
                )
            else:
                polygon = build_circle_polygon(
                    latitude=station.latitude,
                    longitude=station.longitude,
                    radius_meters=radius_meters,
                )
        else:
            polygon = build_circle_polygon(
                latitude=station.latitude,
                longitude=station.longitude,
                radius_meters=radius_meters,
            )
        features.append(
            CatchmentFeature(
                station_id=station.station_id,
                station_name=station.name,
                borough=station.borough,
                ada_status=station.ada_status,
                center_latitude=station.latitude,
                center_longitude=station.longitude,
                radius_meters=radius_meters,
                minutes=request.minutes,
                method="walk-network-convex-hull-v0.3",
                polygon=polygon,
            )
        )
    return CatchmentDataset(features=tuple(features))

score_accessibility_network

score_accessibility_network(
    station_data: StationDataset,
    graph: Any,
    demographics: DemographicDataset,
    request: CatchmentRequest,
) -> AccessibilityScoreDataset

Score tract accessibility using walking-network travel rather than circles.

Source code in src/subway_access/analysis/_network.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def score_accessibility_network(
    station_data: StationDataset,
    graph: Any,
    demographics: DemographicDataset,
    request: CatchmentRequest,
) -> AccessibilityScoreDataset:
    """Score tract accessibility using walking-network travel rather than circles."""

    nx, ox = _require_network_stack()
    threshold_meters = request.minutes * _METERS_PER_MINUTE
    accessible_stations = station_data.accessible_stations
    station_nodes = _station_nodes(graph, StationDataset(accessible_stations))

    records: list[TractAccessibilityRecord] = []
    for tract in demographics.tracts:
        tract_node = int(
            ox.distance.nearest_nodes(
                graph,
                X=tract.centroid_longitude,
                Y=tract.centroid_latitude,
            )
        )
        covering_station_ids: list[str] = []
        nearest_station_id: str | None = None
        nearest_station_name: str | None = None
        nearest_distance: float | None = None
        nearest_minutes: float | None = None

        for station in accessible_stations:
            try:
                path_meters = float(
                    nx.shortest_path_length(
                        graph,
                        tract_node,
                        station_nodes[station.station_id],
                        weight="length",
                    )
                )
            except (nx.NetworkXNoPath, KeyError):
                continue
            if nearest_distance is None or path_meters < nearest_distance:
                nearest_distance = path_meters
                nearest_minutes = path_meters / _METERS_PER_MINUTE
                nearest_station_id = station.station_id
                nearest_station_name = station.name
            if path_meters <= threshold_meters:
                covering_station_ids.append(station.station_id)

        need_score = fmean(
            (tract.disability_rate, tract.senior_rate, tract.poverty_rate)
        )
        records.append(
            TractAccessibilityRecord(
                tract_id=tract.tract_id,
                tract_name=tract.tract_name,
                borough=tract.borough,
                centroid_latitude=tract.centroid_latitude,
                centroid_longitude=tract.centroid_longitude,
                disability_rate=tract.disability_rate,
                senior_rate=tract.senior_rate,
                poverty_rate=tract.poverty_rate,
                total_population=tract.total_population,
                need_score=need_score,
                has_accessible_station=bool(covering_station_ids),
                accessible_station_count=len(covering_station_ids),
                covering_station_ids=tuple(covering_station_ids),
                nearest_accessible_station_id=nearest_station_id,
                nearest_accessible_station_name=nearest_station_name,
                nearest_accessible_distance_meters=nearest_distance,
                nearest_accessible_path_meters=nearest_distance,
                nearest_accessible_travel_minutes=nearest_minutes,
                analysis_method="network-isochrone",
            )
        )

    return AccessibilityScoreDataset(records=tuple(records))

summarize_accessibility_by_group

summarize_accessibility_by_group(
    scored_data: AccessibilityScoreDataset,
    *,
    group_by: str = "borough",
) -> AccessibilitySummaryDataset

Summarize tract accessibility results by a record attribute.

Groups scored tracts by the given attribute and computes aggregate coverage rates, population counts, need scores, and travel times.

Parameters:

Name Type Description Default
scored_data AccessibilityScoreDataset

Tract accessibility scores from score_accessibility.

required
group_by str

Attribute name to group by (default "borough").

'borough'

Returns:

Type Description
AccessibilitySummaryDataset

An AccessibilitySummaryDataset with one record per group.

Example

summary = summarize_accessibility_by_group(scores, group_by="borough") summary.records[0].coverage_rate 0.72

Source code in src/subway_access/analysis/_summaries.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def summarize_accessibility_by_group(
    scored_data: AccessibilityScoreDataset,
    *,
    group_by: str = "borough",
) -> AccessibilitySummaryDataset:
    """Summarize tract accessibility results by a record attribute.

    Groups scored tracts by the given attribute and computes aggregate
    coverage rates, population counts, need scores, and travel times.

    Args:
        scored_data: Tract accessibility scores from ``score_accessibility``.
        group_by: Attribute name to group by (default ``"borough"``).

    Returns:
        An ``AccessibilitySummaryDataset`` with one record per group.

    Example:
        >>> summary = summarize_accessibility_by_group(scores, group_by="borough")
        >>> summary.records[0].coverage_rate
        0.72
    """

    grouped = defaultdict(list)
    for record in scored_data.records:
        grouped[str(getattr(record, group_by))].append(record)

    summary_records = []
    for group_value, records in sorted(grouped.items()):
        covered = [record for record in records if record.has_accessible_station]
        uncovered = [record for record in records if not record.has_accessible_station]
        covered_population = sum(record.total_population for record in covered)
        total_population = sum(record.total_population for record in records)
        travel_minutes = [
            record.nearest_accessible_travel_minutes
            for record in records
            if record.nearest_accessible_travel_minutes is not None
        ]
        summary_records.append(
            AccessibilitySummaryRecord(
                group_by=group_by,
                group_value=group_value,
                tract_count=len(records),
                covered_tract_count=len(covered),
                uncovered_tract_count=len(uncovered),
                total_population=total_population,
                covered_population=covered_population,
                uncovered_population=total_population - covered_population,
                mean_need_score=fmean(record.need_score for record in records),
                mean_nearest_travel_minutes=None
                if not travel_minutes
                else fmean(travel_minutes),
                coverage_rate=0.0 if not records else len(covered) / len(records),
            )
        )
    return AccessibilitySummaryDataset(records=tuple(summary_records))

Factors

subway_access.factors

Composable factor pipeline for accessibility analysis.

Factor

Bases: ABC

Base class for a single computed column in the accessibility pipeline.

Subclass this and implement compute to create custom factors. Each factor produces one value per tract when the pipeline runs.

Attributes:

Name Type Description
name str

Column name for this factor in pipeline output.

dtype Literal['float', 'str', 'bool', 'int']

Data type of the computed value.

Example

class MyFactor(Factor): ... name = "my_metric" ... dtype = "float" ... def compute(self, context: FactorContext) -> float: ... return context.tract.disability_rate * 2

Source code in src/subway_access/factors/_base.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Factor(ABC):
    """Base class for a single computed column in the accessibility pipeline.

    Subclass this and implement ``compute`` to create custom factors.
    Each factor produces one value per tract when the pipeline runs.

    Attributes:
        name: Column name for this factor in pipeline output.
        dtype: Data type of the computed value.

    Example:
        >>> class MyFactor(Factor):
        ...     name = "my_metric"
        ...     dtype = "float"
        ...     def compute(self, context: FactorContext) -> float:
        ...         return context.tract.disability_rate * 2
    """

    name: str
    dtype: Literal["float", "str", "bool", "int"]

    @abstractmethod
    def compute(self, context: FactorContext) -> float | str | bool | int:
        """Compute this factor's value for a single tract.

        Args:
            context: Row-level context with tract demographics, stations,
                catchments, and optional external data.

        Returns:
            The computed value for this tract.
        """

name instance-attribute

name: str

dtype instance-attribute

dtype: Literal['float', 'str', 'bool', 'int']

compute abstractmethod

compute(context: FactorContext) -> float | str | bool | int

Compute this factor's value for a single tract.

Parameters:

Name Type Description Default
context FactorContext

Row-level context with tract demographics, stations, catchments, and optional external data.

required

Returns:

Type Description
float | str | bool | int

The computed value for this tract.

Source code in src/subway_access/factors/_base.py
62
63
64
65
66
67
68
69
70
71
72
@abstractmethod
def compute(self, context: FactorContext) -> float | str | bool | int:
    """Compute this factor's value for a single tract.

    Args:
        context: Row-level context with tract demographics, stations,
            catchments, and optional external data.

    Returns:
        The computed value for this tract.
    """

FactorContext dataclass

Row-level context passed to each Factor during pipeline execution.

Parameters:

Name Type Description Default
tract TractDemographics

Demographic summary for the tract being evaluated.

required
stations StationDataset

All stations in the study area.

required
catchments CatchmentDataset

Generated catchment geometries for all stations.

required
extras dict[str, Any] | None

Extensible slot for external data (housing costs, etc.).

None
Example

ctx = FactorContext(tract=tract, stations=stations, catchments=catchments) factor.compute(ctx) 0.42

Source code in src/subway_access/factors/_base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@dataclass(frozen=True, slots=True)
class FactorContext:
    """Row-level context passed to each Factor during pipeline execution.

    Args:
        tract: Demographic summary for the tract being evaluated.
        stations: All stations in the study area.
        catchments: Generated catchment geometries for all stations.
        extras: Extensible slot for external data (housing costs, etc.).

    Example:
        >>> ctx = FactorContext(tract=tract, stations=stations, catchments=catchments)
        >>> factor.compute(ctx)
        0.42
    """

    tract: TractDemographics
    stations: StationDataset
    catchments: CatchmentDataset
    extras: dict[str, Any] | None = None

tract instance-attribute

tract: TractDemographics

stations instance-attribute

stations: StationDataset

catchments instance-attribute

catchments: CatchmentDataset

extras class-attribute instance-attribute

extras: dict[str, Any] | None = None

Pipeline

Composable factor pipeline -- add factors, run across a dataset.

Pipelines are immutable: add returns a new Pipeline instance.

Parameters:

Name Type Description Default
factors tuple[Factor, ...]

Initial tuple of factors.

()
Example

from subway_access.factors import Pipeline, NeedScoreFactor, CoverageFactor pipe = Pipeline().add(NeedScoreFactor()).add(CoverageFactor()) result = pipe.run(contexts)

Source code in src/subway_access/factors/_base.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class Pipeline:
    """Composable factor pipeline -- add factors, run across a dataset.

    Pipelines are immutable: ``add`` returns a new Pipeline instance.

    Args:
        factors: Initial tuple of factors.

    Example:
        >>> from subway_access.factors import Pipeline, NeedScoreFactor, CoverageFactor
        >>> pipe = Pipeline().add(NeedScoreFactor()).add(CoverageFactor())
        >>> result = pipe.run(contexts)
    """

    __slots__ = ("_factors",)

    def __init__(self, factors: tuple[Factor, ...] = ()) -> None:
        self._factors = factors

    @property
    def factors(self) -> tuple[Factor, ...]:
        """Return the factors registered in this pipeline."""

        return self._factors

    def add(self, factor: Factor) -> Pipeline:
        """Return a new Pipeline with the given factor appended.

        Args:
            factor: The factor to add.

        Returns:
            A new Pipeline instance containing all previous factors plus this one.
        """

        return Pipeline((*self._factors, factor))

    def run(self, contexts: Iterable[FactorContext]) -> PipelineResult:
        """Execute all factors across the provided contexts.

        Args:
            contexts: One FactorContext per tract to evaluate.

        Returns:
            A PipelineResult with one column per factor and one row per tract.
        """

        context_list = list(contexts)
        tract_ids: list[str] = []
        columns: dict[str, list[Any]] = {f.name: [] for f in self._factors}

        for ctx in context_list:
            tract_ids.append(ctx.tract.tract_id)
            for factor in self._factors:
                columns[factor.name].append(factor.compute(ctx))

        return PipelineResult(
            columns={name: tuple(values) for name, values in columns.items()},
            tract_ids=tuple(tract_ids),
        )

factors property

factors: tuple[Factor, ...]

Return the factors registered in this pipeline.

add

add(factor: Factor) -> Pipeline

Return a new Pipeline with the given factor appended.

Parameters:

Name Type Description Default
factor Factor

The factor to add.

required

Returns:

Type Description
Pipeline

A new Pipeline instance containing all previous factors plus this one.

Source code in src/subway_access/factors/_base.py
155
156
157
158
159
160
161
162
163
164
165
def add(self, factor: Factor) -> Pipeline:
    """Return a new Pipeline with the given factor appended.

    Args:
        factor: The factor to add.

    Returns:
        A new Pipeline instance containing all previous factors plus this one.
    """

    return Pipeline((*self._factors, factor))

run

run(contexts: Iterable[FactorContext]) -> PipelineResult

Execute all factors across the provided contexts.

Parameters:

Name Type Description Default
contexts Iterable[FactorContext]

One FactorContext per tract to evaluate.

required

Returns:

Type Description
PipelineResult

A PipelineResult with one column per factor and one row per tract.

Source code in src/subway_access/factors/_base.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def run(self, contexts: Iterable[FactorContext]) -> PipelineResult:
    """Execute all factors across the provided contexts.

    Args:
        contexts: One FactorContext per tract to evaluate.

    Returns:
        A PipelineResult with one column per factor and one row per tract.
    """

    context_list = list(contexts)
    tract_ids: list[str] = []
    columns: dict[str, list[Any]] = {f.name: [] for f in self._factors}

    for ctx in context_list:
        tract_ids.append(ctx.tract.tract_id)
        for factor in self._factors:
            columns[factor.name].append(factor.compute(ctx))

    return PipelineResult(
        columns={name: tuple(values) for name, values in columns.items()},
        tract_ids=tuple(tract_ids),
    )

PipelineResult dataclass

Output of a Pipeline run -- one column per factor, one row per tract.

Parameters:

Name Type Description Default
columns dict[str, tuple[Any, ...]]

Mapping of factor name to tuple of computed values.

required
tract_ids tuple[str, ...]

Tuple of tract identifiers, one per row.

required
Example

result = pipeline.run(contexts) result.columns["need_score"] (0.12, 0.34, 0.56)

Source code in src/subway_access/factors/_base.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@dataclass(frozen=True, slots=True)
class PipelineResult:
    """Output of a Pipeline run -- one column per factor, one row per tract.

    Args:
        columns: Mapping of factor name to tuple of computed values.
        tract_ids: Tuple of tract identifiers, one per row.

    Example:
        >>> result = pipeline.run(contexts)
        >>> result.columns["need_score"]
        (0.12, 0.34, 0.56)
    """

    columns: dict[str, tuple[Any, ...]]
    tract_ids: tuple[str, ...]

    def to_records(self) -> tuple[dict[str, Any], ...]:
        """Convert to a tuple of row dicts.

        Returns:
            One dict per tract with tract_id plus all factor columns.
        """

        records: list[dict[str, Any]] = []
        for i, tract_id in enumerate(self.tract_ids):
            row: dict[str, Any] = {"tract_id": tract_id}
            for col_name, values in self.columns.items():
                row[col_name] = values[i]
            records.append(row)
        return tuple(records)

    def to_dataframe(self) -> Any:
        """Convert to a pandas DataFrame.

        Returns:
            A DataFrame with tract_id index and one column per factor.

        Raises:
            ImportError: If pandas is not installed.
        """

        try:
            import pandas as pd
        except ImportError as exc:
            message = (
                "pandas is required for to_dataframe(). "
                "Install it with: pip install subway-access[panel]"
            )
            raise ImportError(message) from exc

        data = {"tract_id": self.tract_ids, **self.columns}
        return pd.DataFrame(data).set_index("tract_id")

columns instance-attribute

columns: dict[str, tuple[Any, ...]]

tract_ids instance-attribute

tract_ids: tuple[str, ...]

to_records

to_records() -> tuple[dict[str, Any], ...]

Convert to a tuple of row dicts.

Returns:

Type Description
tuple[dict[str, Any], ...]

One dict per tract with tract_id plus all factor columns.

Source code in src/subway_access/factors/_base.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def to_records(self) -> tuple[dict[str, Any], ...]:
    """Convert to a tuple of row dicts.

    Returns:
        One dict per tract with tract_id plus all factor columns.
    """

    records: list[dict[str, Any]] = []
    for i, tract_id in enumerate(self.tract_ids):
        row: dict[str, Any] = {"tract_id": tract_id}
        for col_name, values in self.columns.items():
            row[col_name] = values[i]
        records.append(row)
    return tuple(records)

to_dataframe

to_dataframe() -> Any

Convert to a pandas DataFrame.

Returns:

Type Description
Any

A DataFrame with tract_id index and one column per factor.

Raises:

Type Description
ImportError

If pandas is not installed.

Source code in src/subway_access/factors/_base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def to_dataframe(self) -> Any:
    """Convert to a pandas DataFrame.

    Returns:
        A DataFrame with tract_id index and one column per factor.

    Raises:
        ImportError: If pandas is not installed.
    """

    try:
        import pandas as pd
    except ImportError as exc:
        message = (
            "pandas is required for to_dataframe(). "
            "Install it with: pip install subway-access[panel]"
        )
        raise ImportError(message) from exc

    data = {"tract_id": self.tract_ids, **self.columns}
    return pd.DataFrame(data).set_index("tract_id")

CoverageFactor

Bases: Factor

Whether a tract is covered by at least one accessible station's catchment.

Example

CoverageFactor().compute(ctx) True

Source code in src/subway_access/factors/_builtin.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class CoverageFactor(Factor):
    """Whether a tract is covered by at least one accessible station's catchment.

    Example:
        >>> CoverageFactor().compute(ctx)
        True
    """

    name = "has_accessible_station"
    dtype = "bool"

    def compute(self, context: FactorContext) -> bool:
        radius_map = context.catchments.radius_by_station_id()
        for station in context.stations.accessible_stations:
            distance = haversine_distance_meters(
                latitude_a=context.tract.centroid_latitude,
                longitude_a=context.tract.centroid_longitude,
                latitude_b=station.latitude,
                longitude_b=station.longitude,
            )
            if distance <= radius_map.get(station.station_id, 0.0):
                return True
        return False

name class-attribute instance-attribute

name = 'has_accessible_station'

dtype class-attribute instance-attribute

dtype = 'bool'

compute

compute(context: FactorContext) -> bool
Source code in src/subway_access/factors/_builtin.py
82
83
84
85
86
87
88
89
90
91
92
93
def compute(self, context: FactorContext) -> bool:
    radius_map = context.catchments.radius_by_station_id()
    for station in context.stations.accessible_stations:
        distance = haversine_distance_meters(
            latitude_a=context.tract.centroid_latitude,
            longitude_a=context.tract.centroid_longitude,
            latitude_b=station.latitude,
            longitude_b=station.longitude,
        )
        if distance <= radius_map.get(station.station_id, 0.0):
            return True
    return False

GapScoreFactor

Bases: Factor

Gap score: 0.0 if covered, need_score otherwise.

Depends on coverage and need score computations. Uses a NeedScoreFactor and CoverageFactor internally.

Parameters:

Name Type Description Default
need_weights dict[str, float] | None

Optional weights forwarded to the internal NeedScoreFactor.

None
Example

GapScoreFactor().compute(ctx) 0.0

Source code in src/subway_access/factors/_builtin.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class GapScoreFactor(Factor):
    """Gap score: 0.0 if covered, need_score otherwise.

    Depends on coverage and need score computations.
    Uses a ``NeedScoreFactor`` and ``CoverageFactor`` internally.

    Args:
        need_weights: Optional weights forwarded to the internal NeedScoreFactor.

    Example:
        >>> GapScoreFactor().compute(ctx)
        0.0
    """

    name = "gap_score"
    dtype = "float"

    def __init__(self, need_weights: dict[str, float] | None = None) -> None:
        self._need = NeedScoreFactor(weights=need_weights)
        self._coverage = CoverageFactor()

    def compute(self, context: FactorContext) -> float:
        if self._coverage.compute(context):
            return 0.0
        return self._need.compute(context)

name class-attribute instance-attribute

name = 'gap_score'

dtype class-attribute instance-attribute

dtype = 'float'

compute

compute(context: FactorContext) -> float
Source code in src/subway_access/factors/_builtin.py
117
118
119
120
def compute(self, context: FactorContext) -> float:
    if self._coverage.compute(context):
        return 0.0
    return self._need.compute(context)

NearestStationDistanceFactor

Bases: Factor

Haversine distance in meters to the nearest accessible station.

Returns -1.0 if no accessible stations exist.

Example

NearestStationDistanceFactor().compute(ctx) 423.7

Source code in src/subway_access/factors/_builtin.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class NearestStationDistanceFactor(Factor):
    """Haversine distance in meters to the nearest accessible station.

    Returns -1.0 if no accessible stations exist.

    Example:
        >>> NearestStationDistanceFactor().compute(ctx)
        423.7
    """

    name = "nearest_accessible_distance_meters"
    dtype = "float"

    def compute(self, context: FactorContext) -> float:
        nearest: float | None = None
        for station in context.stations.accessible_stations:
            distance = haversine_distance_meters(
                latitude_a=context.tract.centroid_latitude,
                longitude_a=context.tract.centroid_longitude,
                latitude_b=station.latitude,
                longitude_b=station.longitude,
            )
            if nearest is None or distance < nearest:
                nearest = distance
        return nearest if nearest is not None else -1.0

name class-attribute instance-attribute

name = 'nearest_accessible_distance_meters'

dtype class-attribute instance-attribute

dtype = 'float'

compute

compute(context: FactorContext) -> float
Source code in src/subway_access/factors/_builtin.py
136
137
138
139
140
141
142
143
144
145
146
147
def compute(self, context: FactorContext) -> float:
    nearest: float | None = None
    for station in context.stations.accessible_stations:
        distance = haversine_distance_meters(
            latitude_a=context.tract.centroid_latitude,
            longitude_a=context.tract.centroid_longitude,
            latitude_b=station.latitude,
            longitude_b=station.longitude,
        )
        if nearest is None or distance < nearest:
            nearest = distance
    return nearest if nearest is not None else -1.0

NearestStationTravelMinutesFactor

Bases: Factor

Estimated walking time in minutes to the nearest accessible station.

Uses a fixed walking speed of 80 m/min. Returns -1.0 if no accessible stations exist.

Example

NearestStationTravelMinutesFactor().compute(ctx) 5.3

Source code in src/subway_access/factors/_builtin.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class NearestStationTravelMinutesFactor(Factor):
    """Estimated walking time in minutes to the nearest accessible station.

    Uses a fixed walking speed of 80 m/min.  Returns -1.0 if no accessible
    stations exist.

    Example:
        >>> NearestStationTravelMinutesFactor().compute(ctx)
        5.3
    """

    name = "nearest_accessible_travel_minutes"
    dtype = "float"

    def compute(self, context: FactorContext) -> float:
        distance = NearestStationDistanceFactor().compute(context)
        if distance < 0:
            return -1.0
        return distance / _METERS_PER_MINUTE

name class-attribute instance-attribute

name = 'nearest_accessible_travel_minutes'

dtype class-attribute instance-attribute

dtype = 'float'

compute

compute(context: FactorContext) -> float
Source code in src/subway_access/factors/_builtin.py
164
165
166
167
168
def compute(self, context: FactorContext) -> float:
    distance = NearestStationDistanceFactor().compute(context)
    if distance < 0:
        return -1.0
    return distance / _METERS_PER_MINUTE

NeedScoreFactor

Bases: Factor

Composite need score from demographic vulnerability indicators.

By default, computes the unweighted mean of disability_rate, senior_rate, and poverty_rate. Pass weights to override.

Parameters:

Name Type Description Default
weights dict[str, float] | None

Optional mapping of indicator name to weight. Keys must be a subset of {"disability", "senior", "poverty"}. Weights are normalized to sum to 1.0.

None
Example

factor = NeedScoreFactor() factor.compute(ctx) 0.15 weighted = NeedScoreFactor(weights={"disability": 0.5, "senior": 0.3, "poverty": 0.2})

Source code in src/subway_access/factors/_builtin.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class NeedScoreFactor(Factor):
    """Composite need score from demographic vulnerability indicators.

    By default, computes the unweighted mean of disability_rate,
    senior_rate, and poverty_rate.  Pass ``weights`` to override.

    Args:
        weights: Optional mapping of indicator name to weight.
            Keys must be a subset of ``{"disability", "senior", "poverty"}``.
            Weights are normalized to sum to 1.0.

    Example:
        >>> factor = NeedScoreFactor()
        >>> factor.compute(ctx)
        0.15
        >>> weighted = NeedScoreFactor(weights={"disability": 0.5, "senior": 0.3, "poverty": 0.2})
    """

    name = "need_score"
    dtype = "float"

    _VALID_KEYS = frozenset({"disability", "senior", "poverty"})

    def __init__(
        self,
        weights: dict[str, float] | None = None,
    ) -> None:
        if weights is not None:
            invalid = set(weights) - self._VALID_KEYS
            if invalid:
                message = f"Invalid weight keys: {sorted(invalid)}"
                raise ValueError(message)
            total = sum(weights.values())
            self._weights: dict[str, float] | None = {
                k: v / total for k, v in weights.items()
            }
        else:
            self._weights = None

    def compute(self, context: FactorContext) -> float:
        tract = context.tract
        if self._weights is None:
            return fmean((tract.disability_rate, tract.senior_rate, tract.poverty_rate))
        result = 0.0
        rates = {
            "disability": tract.disability_rate,
            "senior": tract.senior_rate,
            "poverty": tract.poverty_rate,
        }
        for key, weight in self._weights.items():
            result += rates[key] * weight
        return result

name class-attribute instance-attribute

name = 'need_score'

dtype class-attribute instance-attribute

dtype = 'float'

compute

compute(context: FactorContext) -> float
Source code in src/subway_access/factors/_builtin.py
56
57
58
59
60
61
62
63
64
65
66
67
68
def compute(self, context: FactorContext) -> float:
    tract = context.tract
    if self._weights is None:
        return fmean((tract.disability_rate, tract.senior_rate, tract.poverty_rate))
    result = 0.0
    rates = {
        "disability": tract.disability_rate,
        "senior": tract.senior_rate,
        "poverty": tract.poverty_rate,
    }
    for key, weight in self._weights.items():
        result += rates[key] * weight
    return result

ReliabilityWeightedCoverageFactor

Bases: Factor

Coverage weighted by the best-covering station's reliability score.

Returns the highest reliability score among accessible stations whose catchment covers this tract, or 0.0 if the tract is uncovered.

Parameters:

Name Type Description Default
reliability_scores dict[str, float]

Mapping of station_id to reliability score (0-1).

required
Example

scores = {"station_1": 0.99, "station_2": 0.85} ReliabilityWeightedCoverageFactor(scores).compute(ctx) 0.99

Source code in src/subway_access/factors/_builtin.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class ReliabilityWeightedCoverageFactor(Factor):
    """Coverage weighted by the best-covering station's reliability score.

    Returns the highest reliability score among accessible stations whose
    catchment covers this tract, or 0.0 if the tract is uncovered.

    Args:
        reliability_scores: Mapping of station_id to reliability score (0-1).

    Example:
        >>> scores = {"station_1": 0.99, "station_2": 0.85}
        >>> ReliabilityWeightedCoverageFactor(scores).compute(ctx)
        0.99
    """

    name = "reliability_weighted_coverage"
    dtype = "float"

    def __init__(self, reliability_scores: dict[str, float]) -> None:
        self._scores = reliability_scores

    def compute(self, context: FactorContext) -> float:
        radius_map = context.catchments.radius_by_station_id()
        best_score = 0.0
        for station in context.stations.accessible_stations:
            distance = haversine_distance_meters(
                latitude_a=context.tract.centroid_latitude,
                longitude_a=context.tract.centroid_longitude,
                latitude_b=station.latitude,
                longitude_b=station.longitude,
            )
            if distance <= radius_map.get(station.station_id, 0.0):
                score = self._scores.get(station.station_id, 0.0)
                best_score = max(best_score, score)
        return best_score

name class-attribute instance-attribute

name = 'reliability_weighted_coverage'

dtype class-attribute instance-attribute

dtype = 'float'

compute

compute(context: FactorContext) -> float
Source code in src/subway_access/factors/_builtin.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def compute(self, context: FactorContext) -> float:
    radius_map = context.catchments.radius_by_station_id()
    best_score = 0.0
    for station in context.stations.accessible_stations:
        distance = haversine_distance_meters(
            latitude_a=context.tract.centroid_latitude,
            longitude_a=context.tract.centroid_longitude,
            latitude_b=station.latitude,
            longitude_b=station.longitude,
        )
        if distance <= radius_map.get(station.station_id, 0.0):
            score = self._scores.get(station.station_id, 0.0)
            best_score = max(best_score, score)
    return best_score

StationCountFactor

Bases: Factor

Number of accessible stations whose catchment covers this tract.

Example

StationCountFactor().compute(ctx) 2

Source code in src/subway_access/factors/_builtin.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class StationCountFactor(Factor):
    """Number of accessible stations whose catchment covers this tract.

    Example:
        >>> StationCountFactor().compute(ctx)
        2
    """

    name = "accessible_station_count"
    dtype = "int"

    def compute(self, context: FactorContext) -> int:
        radius_map = context.catchments.radius_by_station_id()
        count = 0
        for station in context.stations.accessible_stations:
            distance = haversine_distance_meters(
                latitude_a=context.tract.centroid_latitude,
                longitude_a=context.tract.centroid_longitude,
                latitude_b=station.latitude,
                longitude_b=station.longitude,
            )
            if distance <= radius_map.get(station.station_id, 0.0):
                count += 1
        return count

name class-attribute instance-attribute

name = 'accessible_station_count'

dtype class-attribute instance-attribute

dtype = 'int'

compute

compute(context: FactorContext) -> int
Source code in src/subway_access/factors/_builtin.py
182
183
184
185
186
187
188
189
190
191
192
193
194
def compute(self, context: FactorContext) -> int:
    radius_map = context.catchments.radius_by_station_id()
    count = 0
    for station in context.stations.accessible_stations:
        distance = haversine_distance_meters(
            latitude_a=context.tract.centroid_latitude,
            longitude_a=context.tract.centroid_longitude,
            latitude_b=station.latitude,
            longitude_b=station.longitude,
        )
        if distance <= radius_map.get(station.station_id, 0.0):
            count += 1
    return count

Helpers

subway_access.helpers

Reusable helpers extracted from example scaffolding.

ALL_BOROUGHS module-attribute

ALL_BOROUGHS: tuple[str, ...] = (
    "Manhattan",
    "Brooklyn",
    "Queens",
    "Bronx",
    "Staten Island",
)

dataclass_fieldnames

dataclass_fieldnames(cls: type) -> tuple[str, ...]

Return field names for a frozen dataclass type.

Parameters:

Name Type Description Default
cls type

A dataclass type.

required

Returns:

Type Description
tuple[str, ...]

Tuple of field name strings in declaration order.

Example

from subway_access.models import GapRecord dataclass_fieldnames(GapRecord) ('tract_id', 'tract_name', 'borough', ...)

Source code in src/subway_access/helpers/_export.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def dataclass_fieldnames(cls: type) -> tuple[str, ...]:
    """Return field names for a frozen dataclass type.

    Args:
        cls: A dataclass type.

    Returns:
        Tuple of field name strings in declaration order.

    Example:
        >>> from subway_access.models import GapRecord
        >>> dataclass_fieldnames(GapRecord)
        ('tract_id', 'tract_name', 'borough', ...)
    """

    return tuple(f.name for f in fields(cls))

export_records_csv

export_records_csv(
    records: tuple[Any, ...],
    path: Path,
    *,
    fieldnames: tuple[str, ...] | None = None,
    formatters: dict[str, str] | None = None,
) -> Path

Export a tuple of frozen dataclass records to CSV.

Automatically extracts field names from the dataclass type if fieldnames is not provided.

Parameters:

Name Type Description Default
records tuple[Any, ...]

Tuple of frozen dataclass instances.

required
path Path

Output file path.

required
fieldnames tuple[str, ...] | None

Column names to include. Defaults to all fields.

None
formatters dict[str, str] | None

Optional mapping of field name to format string (e.g. {"disability_rate": ".4f"}).

None

Returns:

Type Description
Path

The resolved output path.

Example

export_records_csv(gap_analysis.records, Path("gaps.csv")) PosixPath('gaps.csv')

Source code in src/subway_access/helpers/_export.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def export_records_csv(
    records: tuple[Any, ...],
    path: Path,
    *,
    fieldnames: tuple[str, ...] | None = None,
    formatters: dict[str, str] | None = None,
) -> Path:
    """Export a tuple of frozen dataclass records to CSV.

    Automatically extracts field names from the dataclass type if
    ``fieldnames`` is not provided.

    Args:
        records: Tuple of frozen dataclass instances.
        path: Output file path.
        fieldnames: Column names to include. Defaults to all fields.
        formatters: Optional mapping of field name to format string
            (e.g. ``{"disability_rate": ".4f"}``).

    Returns:
        The resolved output path.

    Example:
        >>> export_records_csv(gap_analysis.records, Path("gaps.csv"))
        PosixPath('gaps.csv')
    """

    if not records:
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text("", encoding="utf-8")
        return path

    names = fieldnames or dataclass_fieldnames(type(records[0]))
    fmt = formatters or {}

    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", newline="", encoding="utf-8") as handle:
        writer = csv.DictWriter(handle, fieldnames=list(names))
        writer.writeheader()
        for record in records:
            row_data = asdict(record)
            row: dict[str, Any] = {}
            for name in names:
                value = row_data.get(name)
                if value is None:
                    row[name] = ""
                elif name in fmt:
                    row[name] = format(value, fmt[name])
                else:
                    row[name] = value
            writer.writerow(row)

    return path

borough_cache_dir

borough_cache_dir(
    cache_root: str | Path, borough: str
) -> Path

Return the canonical cache directory for a borough snapshot.

Parameters:

Name Type Description Default
cache_root str | Path

Parent directory for all borough caches.

required
borough str

Borough name (e.g. "Manhattan").

required

Returns:

Type Description
Path

Path to the borough-specific cache directory.

Example

borough_cache_dir("cache", "Manhattan") PosixPath('cache/manhattan')

Source code in src/subway_access/helpers/_iteration.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def borough_cache_dir(cache_root: str | Path, borough: str) -> Path:
    """Return the canonical cache directory for a borough snapshot.

    Args:
        cache_root: Parent directory for all borough caches.
        borough: Borough name (e.g. ``"Manhattan"``).

    Returns:
        Path to the borough-specific cache directory.

    Example:
        >>> borough_cache_dir("cache", "Manhattan")
        PosixPath('cache/manhattan')
    """

    return Path(cache_root).expanduser().resolve() / borough.lower().replace(" ", "-")

fetch_borough_snapshots

fetch_borough_snapshots(
    boroughs: tuple[str, ...] | None = None,
    *,
    cache_root: str | Path,
    refresh: bool = False,
    availability_months: int = 12,
    include_gtfs_archive: bool = True,
) -> dict[str, StudyAreaSnapshot]

Fetch and cache study-area snapshots for multiple boroughs.

Downloads GTFS archive only for the first borough to avoid redundant fetches of the same static feed.

Parameters:

Name Type Description Default
boroughs tuple[str, ...] | None

Borough names to fetch. Defaults to all five boroughs.

None
cache_root str | Path

Parent directory for all borough caches.

required
refresh bool

Force re-download of all data sources.

False
availability_months int

Months of availability history to fetch.

12
include_gtfs_archive bool

Whether to download the GTFS static archive.

True

Returns:

Type Description
dict[str, StudyAreaSnapshot]

Mapping of borough name to loaded StudyAreaSnapshot.

Example

snapshots = fetch_borough_snapshots( ... ("Manhattan", "Brooklyn"), ... cache_root="cache", ... ) len(snapshots["Manhattan"].stations.stations) 151

Source code in src/subway_access/helpers/_iteration.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def fetch_borough_snapshots(
    boroughs: tuple[str, ...] | None = None,
    *,
    cache_root: str | Path,
    refresh: bool = False,
    availability_months: int = 12,
    include_gtfs_archive: bool = True,
) -> dict[str, StudyAreaSnapshot]:
    """Fetch and cache study-area snapshots for multiple boroughs.

    Downloads GTFS archive only for the first borough to avoid redundant
    fetches of the same static feed.

    Args:
        boroughs: Borough names to fetch. Defaults to all five boroughs.
        cache_root: Parent directory for all borough caches.
        refresh: Force re-download of all data sources.
        availability_months: Months of availability history to fetch.
        include_gtfs_archive: Whether to download the GTFS static archive.

    Returns:
        Mapping of borough name to loaded ``StudyAreaSnapshot``.

    Example:
        >>> snapshots = fetch_borough_snapshots(
        ...     ("Manhattan", "Brooklyn"),
        ...     cache_root="cache",
        ... )
        >>> len(snapshots["Manhattan"].stations.stations)
        151
    """

    borough_list = boroughs or ALL_BOROUGHS
    root = Path(cache_root).expanduser().resolve()
    root.mkdir(parents=True, exist_ok=True)

    snapshots: dict[str, StudyAreaSnapshot] = {}
    for index, borough in enumerate(borough_list):
        target = borough_cache_dir(root, borough)
        snapshot = fetch_study_area_snapshot(
            AccessibilityQuery(geography="borough", value=borough),
            cache_dir=target,
            refresh=refresh,
            availability_months=availability_months,
            include_gtfs_archive=include_gtfs_archive and index == 0,
        )
        snapshots[borough] = snapshot
    return snapshots

iter_borough_snapshots

iter_borough_snapshots(
    boroughs: tuple[str, ...] | None = None,
    *,
    cache_root: str | Path,
) -> Iterator[tuple[str, StudyAreaSnapshot]]

Iterate over cached borough snapshots lazily.

Parameters:

Name Type Description Default
boroughs tuple[str, ...] | None

Borough names to iterate. Defaults to all five boroughs.

None
cache_root str | Path

Parent directory for all borough caches.

required

Yields:

Type Description
tuple[str, StudyAreaSnapshot]

Tuples of (borough_name, snapshot).

Example

for borough, snap in iter_borough_snapshots(cache_root="cache"): ... print(borough, len(snap.stations.stations))

Source code in src/subway_access/helpers/_iteration.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def iter_borough_snapshots(
    boroughs: tuple[str, ...] | None = None,
    *,
    cache_root: str | Path,
) -> Iterator[tuple[str, StudyAreaSnapshot]]:
    """Iterate over cached borough snapshots lazily.

    Args:
        boroughs: Borough names to iterate. Defaults to all five boroughs.
        cache_root: Parent directory for all borough caches.

    Yields:
        Tuples of ``(borough_name, snapshot)``.

    Example:
        >>> for borough, snap in iter_borough_snapshots(cache_root="cache"):
        ...     print(borough, len(snap.stations.stations))
    """

    borough_list = boroughs or ALL_BOROUGHS
    root = Path(cache_root).expanduser().resolve()
    for borough in borough_list:
        yield borough, load_cached_snapshot(borough_cache_dir(root, borough))

load_borough_snapshots

load_borough_snapshots(
    boroughs: tuple[str, ...] | None = None,
    *,
    cache_root: str | Path,
) -> dict[str, StudyAreaSnapshot]

Load previously cached snapshots for multiple boroughs.

Parameters:

Name Type Description Default
boroughs tuple[str, ...] | None

Borough names to load. Defaults to all five boroughs.

None
cache_root str | Path

Parent directory for all borough caches.

required

Returns:

Type Description
dict[str, StudyAreaSnapshot]

Mapping of borough name to loaded StudyAreaSnapshot.

Raises:

Type Description
FileNotFoundError

If any borough's cache is missing.

Example

snapshots = load_borough_snapshots( ... ("Manhattan",), cache_root="cache" ... )

Source code in src/subway_access/helpers/_iteration.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def load_borough_snapshots(
    boroughs: tuple[str, ...] | None = None,
    *,
    cache_root: str | Path,
) -> dict[str, StudyAreaSnapshot]:
    """Load previously cached snapshots for multiple boroughs.

    Args:
        boroughs: Borough names to load. Defaults to all five boroughs.
        cache_root: Parent directory for all borough caches.

    Returns:
        Mapping of borough name to loaded ``StudyAreaSnapshot``.

    Raises:
        FileNotFoundError: If any borough's cache is missing.

    Example:
        >>> snapshots = load_borough_snapshots(
        ...     ("Manhattan",), cache_root="cache"
        ... )
    """

    borough_list = boroughs or ALL_BOROUGHS
    root = Path(cache_root).expanduser().resolve()
    return {
        borough: load_cached_snapshot(borough_cache_dir(root, borough))
        for borough in borough_list
    }

write_markdown_report

write_markdown_report(path: Path, content: str) -> Path

Write a markdown report file.

Parameters:

Name Type Description Default
path Path

Output file path.

required
content str

Markdown content to write.

required

Returns:

Type Description
Path

The resolved output path.

Source code in src/subway_access/helpers/_metadata.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def write_markdown_report(
    path: Path,
    content: str,
) -> Path:
    """Write a markdown report file.

    Args:
        path: Output file path.
        content: Markdown content to write.

    Returns:
        The resolved output path.
    """

    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")
    return path

write_metadata_json

write_metadata_json(
    path: Path,
    *,
    title: str,
    generated_at: datetime | None = None,
    extra: dict[str, Any] | None = None,
) -> Path

Write a timestamped metadata JSON file.

Parameters:

Name Type Description Default
path Path

Output file path.

required
title str

Human-readable title for the metadata record.

required
generated_at datetime | None

Timestamp. Defaults to now (UTC).

None
extra dict[str, Any] | None

Additional key-value pairs to include.

None

Returns:

Type Description
Path

The resolved output path.

Example

write_metadata_json( ... Path("metadata.json"), ... title="Manhattan gap analysis", ... extra={"tract_count": 200}, ... )

Source code in src/subway_access/helpers/_metadata.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def write_metadata_json(
    path: Path,
    *,
    title: str,
    generated_at: datetime | None = None,
    extra: dict[str, Any] | None = None,
) -> Path:
    """Write a timestamped metadata JSON file.

    Args:
        path: Output file path.
        title: Human-readable title for the metadata record.
        generated_at: Timestamp. Defaults to now (UTC).
        extra: Additional key-value pairs to include.

    Returns:
        The resolved output path.

    Example:
        >>> write_metadata_json(
        ...     Path("metadata.json"),
        ...     title="Manhattan gap analysis",
        ...     extra={"tract_count": 200},
        ... )
    """

    payload: dict[str, Any] = {
        "title": title,
        "generated_at": (generated_at or datetime.now(tz=timezone.utc)).isoformat(),
    }
    if extra:
        payload.update(extra)

    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(f"{json.dumps(payload, indent=2)}\n", encoding="utf-8")
    return path

Export

subway_access.export

Public exporters for subway-access.

export_catchments_geojson

export_catchments_geojson(
    data: CatchmentDataset, target: ExportTarget
) -> Path

Export station catchments to GeoJSON for mapping workflows.

Parameters:

Name Type Description Default
data CatchmentDataset

Generated catchment geometries.

required
target ExportTarget

Export target with format "geojson" and output path.

required

Returns:

Type Description
Path

The resolved output path.

Source code in src/subway_access/export/_core.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def export_catchments_geojson(data: CatchmentDataset, target: ExportTarget) -> Path:
    """Export station catchments to GeoJSON for mapping workflows.

    Args:
        data: Generated catchment geometries.
        target: Export target with format ``"geojson"`` and output path.

    Returns:
        The resolved output path.
    """

    output_path = _validate_target_format(target, expected_formats=("geojson",))
    payload = {
        "type": "FeatureCollection",
        "features": [
            {
                "type": "Feature",
                "properties": {
                    "station_id": feature.station_id,
                    "station_name": feature.station_name,
                    "borough": feature.borough,
                    "ada_status": feature.ada_status,
                    "radius_meters": round(feature.radius_meters, 3),
                    "minutes": feature.minutes,
                    "method": feature.method,
                },
                "geometry": {
                    "type": "Polygon",
                    "coordinates": [[list(point) for point in feature.polygon]],
                },
            }
            for feature in data.features
        ],
    }
    output_path.write_text(f"{json.dumps(payload, indent=2)}\n", encoding="utf-8")
    return output_path

export_gap_table

export_gap_table(
    data: GapAnalysis, target: ExportTarget
) -> Path

Export tract-level accessibility gap tables.

Parameters:

Name Type Description Default
data GapAnalysis

Gap analysis results from analyze_gaps.

required
target ExportTarget

Export target with format "csv" and output path.

required

Returns:

Type Description
Path

The resolved output path.

Source code in src/subway_access/export/_core.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def export_gap_table(data: GapAnalysis, target: ExportTarget) -> Path:
    """Export tract-level accessibility gap tables.

    Args:
        data: Gap analysis results from ``analyze_gaps``.
        target: Export target with format ``"csv"`` and output path.

    Returns:
        The resolved output path.
    """

    output_path = _validate_target_format(target, expected_formats=("csv",))
    fieldnames = [
        "tract_id",
        "tract_name",
        "borough",
        "disability_rate",
        "senior_rate",
        "poverty_rate",
        "total_population",
        "need_score",
        "has_accessible_station",
        "accessible_station_count",
        "nearest_accessible_station_id",
        "nearest_accessible_station_name",
        "nearest_accessible_distance_meters",
        "gap_score",
        "gap_label",
    ]
    with output_path.open("w", newline="", encoding="utf-8") as handle:
        writer = csv.DictWriter(handle, fieldnames=fieldnames)
        writer.writeheader()
        for record in data.records:
            writer.writerow(
                {
                    "tract_id": record.tract_id,
                    "tract_name": record.tract_name,
                    "borough": record.borough,
                    "disability_rate": f"{record.disability_rate:.4f}",
                    "senior_rate": f"{record.senior_rate:.4f}",
                    "poverty_rate": f"{record.poverty_rate:.4f}",
                    "total_population": record.total_population,
                    "need_score": f"{record.need_score:.4f}",
                    "has_accessible_station": str(
                        record.has_accessible_station
                    ).lower(),
                    "accessible_station_count": record.accessible_station_count,
                    "nearest_accessible_station_id": record.nearest_accessible_station_id
                    or "",
                    "nearest_accessible_station_name": record.nearest_accessible_station_name
                    or "",
                    "nearest_accessible_distance_meters": ""
                    if record.nearest_accessible_distance_meters is None
                    else f"{record.nearest_accessible_distance_meters:.2f}",
                    "gap_score": f"{record.gap_score:.4f}",
                    "gap_label": record.gap_label,
                }
            )
    return output_path

export_station_metrics

export_station_metrics(
    data: StationMetricDataset, target: ExportTarget
) -> Path

Export station-level accessibility and reliability metrics.

Parameters:

Name Type Description Default
data StationMetricDataset

Station metric dataset from build_station_metrics.

required
target ExportTarget

Export target with format "csv" or "geojson".

required

Returns:

Type Description
Path

The resolved output path.

Source code in src/subway_access/export/_core.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def export_station_metrics(data: StationMetricDataset, target: ExportTarget) -> Path:
    """Export station-level accessibility and reliability metrics.

    Args:
        data: Station metric dataset from ``build_station_metrics``.
        target: Export target with format ``"csv"`` or ``"geojson"``.

    Returns:
        The resolved output path.
    """

    output_path = _validate_target_format(target, expected_formats=("csv", "geojson"))
    rows = [_station_metric_row(record) for record in data.records]

    if target.format.lower() == "csv":
        fieldnames = (
            list(rows[0])
            if rows
            else [
                "station_id",
                "station_name",
                "borough",
                "latitude",
                "longitude",
                "ada_status",
                "catchment_minutes",
                "catchment_radius_meters",
                "covered_tract_count",
                "covered_population",
                "nearby_gap_tract_count",
                "nearby_gap_population",
                "mean_need_score",
                "reliability_score",
                "reliability_label",
                "outage_minutes",
                "network_connection_count",
                "daytime_routes",
                "structure",
                "analysis_method",
            ]
        )
        with output_path.open("w", newline="", encoding="utf-8") as handle:
            writer = csv.DictWriter(handle, fieldnames=fieldnames)
            writer.writeheader()
            for row in rows:
                writer.writerow(row)
        return output_path

    payload = {
        "type": "FeatureCollection",
        "features": [
            {
                "type": "Feature",
                "id": row["station_id"],
                "properties": {
                    key: value
                    for key, value in row.items()
                    if key not in {"latitude", "longitude"}
                },
                "geometry": {
                    "type": "Point",
                    "coordinates": [row["longitude"], row["latitude"]],
                },
            }
            for row in rows
        ],
    }
    output_path.write_text(f"{json.dumps(payload, indent=2)}\n", encoding="utf-8")
    return output_path

Pipeline

subway_access.pipeline

High-level real-data pipeline helpers for subway-access.

fetch_study_area_snapshot

fetch_study_area_snapshot(
    query: AccessibilityQuery,
    *,
    cache_dir: str | Path,
    refresh: bool = False,
    availability_months: int = 12,
    include_gtfs_archive: bool = True,
) -> StudyAreaSnapshot

Fetch, cache, and load a real-data study-area snapshot.

Source code in src/subway_access/pipeline/_fetch.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def fetch_study_area_snapshot(
    query: AccessibilityQuery,
    *,
    cache_dir: str | Path,
    refresh: bool = False,
    availability_months: int = 12,
    include_gtfs_archive: bool = True,
) -> StudyAreaSnapshot:
    """Fetch, cache, and load a real-data study-area snapshot."""

    cache_root = ensure_directory(Path(cache_dir).expanduser().resolve())
    paths = _snapshot_paths(cache_root)
    if (
        not refresh
        and paths["stations"].exists()
        and paths["accessibility"].exists()
        and paths["tracts"].exists()
        and paths["outages"].exists()
        and paths["metadata"].exists()
    ):
        return load_cached_snapshot(cache_root)

    refreshed_at = cache_timestamp()
    study_area_shape, boundary_geojson = _selected_boundary_geometry(query)
    write_json(paths["boundary"], boundary_geojson)

    station_catalog_rows = fetch_mta_station_catalog()
    write_json(paths["station_catalog"], station_catalog_rows)
    selected_station_rows = _filter_station_rows(
        station_catalog_rows,
        study_area_shape=study_area_shape,
    )
    station_rows, accessibility_rows = build_station_snapshot_rows(
        selected_station_rows
    )
    write_csv_rows(
        paths["stations"],
        [dict(row) for row in station_rows],
    )
    write_csv_rows(
        paths["accessibility"],
        [dict(row) for row in accessibility_rows],
    )

    station_complex_ids = tuple(
        sorted(
            {str(row["complex_id"]) for row in station_rows if row.get("complex_id")}
        )
    )
    asset_rows = fetch_mta_equipment_assets(station_complex_ids=station_complex_ids)
    write_json(paths["assets"], asset_rows)

    start_month = _first_day_months_ago(availability_months)
    availability_rows = fetch_mta_availability_history(
        station_complex_ids=station_complex_ids,
        start_month=start_month,
    )
    write_json(paths["availability"], availability_rows)
    write_json(paths["outages"], build_outage_snapshot_rows(availability_rows))

    entrance_catalog_rows = fetch_mta_subway_entrances()
    selected_entrance_rows = _filter_entrance_rows(
        entrance_catalog_rows,
        study_area_shape=study_area_shape,
    )
    normalized_entrances = build_entrance_snapshot_rows(selected_entrance_rows)
    write_json(paths["entrances"], entrances_to_geojson(normalized_entrances))

    pathways_snapshot = None
    if include_gtfs_archive:
        fetch_mta_gtfs_archive(paths["gtfs_archive"], refresh=refresh)
        if paths["gtfs_archive"].exists():
            pathways_snapshot = parse_gtfs_pathways_zip(paths["gtfs_archive"])
            if pathways_snapshot is not None:
                write_json(
                    paths["gtfs_pathways"],
                    gtfs_pathways_snapshot_to_json(pathways_snapshot),
                )

    tract_features = _selected_tract_features(query, study_area_shape=study_area_shape)
    tract_geoids = tuple(feature.geography_value for feature in tract_features)
    acs_estimates = fetch_nyc_acs_tract_estimates(tract_geoids=tract_geoids)
    tract_payload = {
        "type": "FeatureCollection",
        "features": _build_demographic_snapshot_features(tract_features, acs_estimates),
    }
    write_json(paths["tracts"], tract_payload)

    metadata_items: list[DataSourceMetadata] = [
        DataSourceMetadata(
            name="mta_station_catalog",
            source_url=MTA_SUBWAY_STATIONS_API_URL,
            cache_path=paths["station_catalog"],
            refreshed_at=datetime.fromisoformat(refreshed_at),
            record_count=len(selected_station_rows),
            notes="Filtered in memory against the selected study area boundary.",
        ),
        DataSourceMetadata(
            name="mta_equipment_assets",
            source_url=MTA_EQUIPMENT_ASSET_API_URL,
            cache_path=paths["assets"],
            refreshed_at=datetime.fromisoformat(refreshed_at),
            record_count=len(asset_rows),
        ),
        DataSourceMetadata(
            name="mta_availability_history",
            source_url=MTA_ELEVATOR_AVAILABILITY_API_URL,
            cache_path=paths["availability"],
            refreshed_at=datetime.fromisoformat(refreshed_at),
            record_count=len(availability_rows),
            notes=f"Monthly availability rows since {start_month.isoformat()}.",
        ),
        DataSourceMetadata(
            name="mta_subway_entrances",
            source_url=MTA_SUBWAY_ENTRANCES_API_URL,
            cache_path=paths["entrances"],
            refreshed_at=datetime.fromisoformat(refreshed_at),
            record_count=len(normalized_entrances),
            notes="Filtered in memory against the selected study area boundary.",
        ),
        DataSourceMetadata(
            name="acs_tract_demographics",
            source_url=f"{ACS_COUNTS_API_URL} and {ACS_SUBJECT_API_URL}",
            cache_path=paths["tracts"],
            refreshed_at=datetime.fromisoformat(refreshed_at),
            record_count=len(tract_payload["features"]),
            notes=f"ACS 5-year {ACS_5YEAR_YEAR} estimates merged to NYC tract centroids.",
        ),
    ]
    if pathways_snapshot is not None:
        metadata_items.append(
            DataSourceMetadata(
                name="gtfs_pathways_static",
                source_url=MTA_GTFS_STATIC_URL,
                cache_path=paths["gtfs_pathways"],
                refreshed_at=datetime.fromisoformat(refreshed_at),
                record_count=len(pathways_snapshot.pathways)
                + len(pathways_snapshot.locations),
                notes="Parsed from GTFS zip when pathways.txt or locations.txt exists.",
            ),
        )
    metadata = tuple(metadata_items)
    metadata_payload = {
        "generated_at": refreshed_at,
        "query": asdict(query),
        "sources": [
            {
                "name": item.name,
                "source_url": item.source_url,
                "cache_path": str(item.cache_path),
                "refreshed_at": item.refreshed_at.isoformat(),
                "record_count": item.record_count,
                "notes": item.notes,
            }
            for item in metadata
        ],
        "include_gtfs_archive": include_gtfs_archive,
        "gtfs_archive_path": str(paths["gtfs_archive"])
        if include_gtfs_archive
        else None,
        "gtfs_archive_url": MTA_GTFS_STATIC_URL if include_gtfs_archive else None,
    }
    write_json(paths["metadata"], metadata_payload)
    return load_cached_snapshot(cache_root)

load_cached_snapshot

load_cached_snapshot(
    cache_dir: str | Path,
) -> StudyAreaSnapshot

Load a previously fetched real-data study-area snapshot.

Source code in src/subway_access/pipeline/_load.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def load_cached_snapshot(cache_dir: str | Path) -> StudyAreaSnapshot:
    """Load a previously fetched real-data study-area snapshot."""

    cache_root = Path(cache_dir).expanduser().resolve()
    paths = _snapshot_paths(cache_root)
    _require_cached_snapshot(paths)
    query, metadata = _load_metadata(paths["metadata"])
    accessibility = load_accessibility_status(paths["accessibility"])
    stations = load_gtfs(paths["stations"]).with_accessibility(accessibility)
    demographics = load_census_data(paths["tracts"])
    outages = load_outages(paths["outages"])
    if paths["entrances"].exists():
        entrances = load_entrances(paths["entrances"])
    else:
        entrances = EntranceDataset(entrances=())
    gtfs_pathways = None
    if paths["gtfs_pathways"].exists():
        gtfs_pathways = load_gtfs_pathways_snapshot(paths["gtfs_pathways"])
    return StudyAreaSnapshot(
        query=query,
        stations=stations,
        accessibility=accessibility,
        demographics=demographics,
        outages=outages,
        metadata=metadata,
        entrances=entrances,
        gtfs_pathways=gtfs_pathways,
        generated_at=datetime.fromisoformat(
            json.loads(paths["metadata"].read_text(encoding="utf-8"))["generated_at"]
        ),
        cache_dir=cache_root,
    )

fetch_walk_graph

fetch_walk_graph(
    query: AccessibilityQuery,
    *,
    cache_dir: str | Path,
    refresh: bool = False,
    buffer_meters: int = 0,
) -> NetworkGraphSnapshot

Fetch and cache an OSM walking graph for a study area.

Source code in src/subway_access/pipeline/_walk_graph.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def fetch_walk_graph(
    query: AccessibilityQuery,
    *,
    cache_dir: str | Path,
    refresh: bool = False,
    buffer_meters: int = 0,
) -> NetworkGraphSnapshot:
    """Fetch and cache an OSM walking graph for a study area."""

    return io_fetch_walk_graph(
        query,
        cache_dir=cache_dir,
        refresh=refresh,
        buffer_meters=buffer_meters,
    )

load_cached_walk_graph

load_cached_walk_graph(
    cache_dir: str | Path,
) -> tuple[Any, NetworkGraphSnapshot]

Load a cached OSM walking graph and its typed metadata.

Source code in src/subway_access/pipeline/_walk_graph.py
40
41
42
43
44
45
def load_cached_walk_graph(
    cache_dir: str | Path,
) -> tuple[Any, NetworkGraphSnapshot]:
    """Load a cached OSM walking graph and its typed metadata."""

    return io_load_cached_walk_graph(cache_dir)

Reporting

subway_access.reporting

Optional reporting helpers: jellycell tearsheets from factor-factory engine results.

This module is an optional surface. All dependencies on factor_factory and jellycell are deferred to call time, so importing subway_access.reporting succeeds even when those extras are not installed. Invoking any of the helpers below without the relevant extras raises a :class:ImportError that points at the right pip install command.

Install via::

pip install "subway-access[factor-factory,tearsheets]"

EngineKind module-attribute

EngineKind = Literal[
    "did",
    "rdd",
    "scm",
    "spatial",
    "event_study",
    "mediation",
]

Factor-factory engine-family identifiers supported by the bridge.

The family name is used as the artifact filename stem: an EngineKind of "did" writes to <artifacts_dir>/did_results.json, matching the key the shipped jellycell findings template reads from.

emit_findings_tearsheet

emit_findings_tearsheet(
    project_dir: Path,
    *,
    overwrite: bool = True,
    template_overrides: dict[str, Any] | None = None,
) -> Path

Render the FINDINGS.md tearsheet for a project directory.

Thin wrapper around factor_factory.jellycell.tearsheets.findings(...). The target project directory is expected to follow the factor-factory convention:

  • <project_dir>/artifacts/<family>_results.json (one per engine family fit, written via :func:write_engine_results_json)
  • <project_dir>/artifacts/figures/ (optional, picked up by the template if present)
  • <project_dir>/manuscripts/FINDINGS.md (the rendered output)

The freeze-marker splicing behavior of factor-factory is preserved: when overwrite=True, any text in an existing FINDINGS.md that sits below a <!-- tearsheet:freeze --> line is kept verbatim; text above the marker is regenerated from the template.

Parameters

project_dir Absolute or relative path to the project directory. overwrite When True (the default), regenerate the tearsheet using freeze-marker splicing. When False, raise FileExistsError if the target already exists. template_overrides Optional mapping overlaid on top of the default template context. Keys depend on the shipped findings.md.j2 template — consult factor-factory documentation for the supported fields.

Returns

Path The absolute path of the rendered tearsheet.

Raises

ImportError If factor-factory (which bundles jellycell bindings) is not installed.

Source code in src/subway_access/reporting/_jellycell_bridge.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def emit_findings_tearsheet(
    project_dir: Path,
    *,
    overwrite: bool = True,
    template_overrides: dict[str, Any] | None = None,
) -> Path:
    """Render the ``FINDINGS.md`` tearsheet for a project directory.

    Thin wrapper around ``factor_factory.jellycell.tearsheets.findings(...)``.
    The target project directory is expected to follow the factor-factory
    convention:

    - ``<project_dir>/artifacts/<family>_results.json`` (one per engine family
      fit, written via :func:`write_engine_results_json`)
    - ``<project_dir>/artifacts/figures/`` (optional, picked up by the
      template if present)
    - ``<project_dir>/manuscripts/FINDINGS.md`` (the rendered output)

    The freeze-marker splicing behavior of factor-factory is preserved: when
    ``overwrite=True``, any text in an existing ``FINDINGS.md`` that sits
    *below* a ``<!-- tearsheet:freeze -->`` line is kept verbatim; text above
    the marker is regenerated from the template.

    Parameters
    ----------
    project_dir
        Absolute or relative path to the project directory.
    overwrite
        When ``True`` (the default), regenerate the tearsheet using
        freeze-marker splicing. When ``False``, raise ``FileExistsError`` if
        the target already exists.
    template_overrides
        Optional mapping overlaid on top of the default template context.
        Keys depend on the shipped ``findings.md.j2`` template — consult
        factor-factory documentation for the supported fields.

    Returns
    -------
    Path
        The absolute path of the rendered tearsheet.

    Raises
    ------
    ImportError
        If ``factor-factory`` (which bundles jellycell bindings) is not
        installed.
    """
    factor_factory = require_factor_factory()
    # jellycell is a direct runtime dep of factor-factory's tearsheet renderer;
    # surface the friendlier subway-access-specific hint up front so install
    # guidance is consistent.
    require_jellycell()

    from factor_factory.jellycell.tearsheets import (  # pylint: disable=import-error
        findings as _render_findings,
    )

    _ = factor_factory  # kept for the import-validation side-effect above

    rendered = _render_findings(
        project=str(Path(project_dir).resolve()),
        overwrite=overwrite,
        template_overrides=template_overrides,
    )
    return Path(rendered)

render_findings_from_dict

render_findings_from_dict(
    results: Mapping[str, Mapping[str, Any]],
    *,
    out_path: Path,
    project: str,
    template_overrides: Mapping[str, Any] | None = None,
) -> Path

Render a FINDINGS.md tearsheet directly from in-memory results.

Thin wrapper over jellycell.tearsheets.findings (new in jellycell v1.4.0). Complementary to :func:emit_findings_tearsheet: that helper scans a factor-factory project directory for artifacts/<family>_results.json files and renders manuscripts/FINDINGS.md with freeze-marker splicing. This one takes a plain Python dict and writes to any path — useful when you already have engine fits in memory (notebook, CI smoke, blog-post assembly) and don't want the project-directory dance.

Parameters

results Mapping of method_name -> {field: value}. One ## <method_name> heading + two-column metric table is emitted per top-level key. Nested dicts flatten with dotted keys ({"cs": {"att": 0.2}}cs.att row). The canonical way to produce this shape from factor-factory results is::

    results_dict = {r.method: r.to_dict() for r in did_results}

out_path Target markdown path. Parent directories are created if needed. project Project name rendered in the manuscript header (e.g. "subway-access / accessibility-change"). Does not have to be a filesystem path — this is just a label. template_overrides Optional header-field overrides forwarded to jellycell.tearsheets.findings. Supported keys include author, author_url, month_year, version, project.

Returns

Path The resolved path of the rendered tearsheet.

Raises

ImportError If jellycell is not installed (points at [tearsheets]) or if the installed jellycell is older than v1.4.0.

Source code in src/subway_access/reporting/_jellycell_bridge.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def render_findings_from_dict(
    results: Mapping[str, Mapping[str, Any]],
    *,
    out_path: Path,
    project: str,
    template_overrides: Mapping[str, Any] | None = None,
) -> Path:
    """Render a ``FINDINGS.md`` tearsheet directly from in-memory results.

    Thin wrapper over
    [``jellycell.tearsheets.findings``](https://github.com/random-walks/jellycell/releases/tag/v1.4.0)
    (new in jellycell v1.4.0). Complementary to
    :func:`emit_findings_tearsheet`: that helper scans a factor-factory project
    directory for ``artifacts/<family>_results.json`` files and renders
    ``manuscripts/FINDINGS.md`` with freeze-marker splicing. This one takes a
    plain Python dict and writes to any path — useful when you already have
    engine fits in memory (notebook, CI smoke, blog-post assembly) and don't
    want the project-directory dance.

    Parameters
    ----------
    results
        Mapping of ``method_name -> {field: value}``. One ``## <method_name>``
        heading + two-column metric table is emitted per top-level key. Nested
        dicts flatten with dotted keys (``{"cs": {"att": 0.2}}`` →  ``cs.att``
        row). The canonical way to produce this shape from factor-factory
        results is::

            results_dict = {r.method: r.to_dict() for r in did_results}

    out_path
        Target markdown path. Parent directories are created if needed.
    project
        Project name rendered in the manuscript header (e.g.
        ``"subway-access / accessibility-change"``). Does not have to be a
        filesystem path — this is just a label.
    template_overrides
        Optional header-field overrides forwarded to
        ``jellycell.tearsheets.findings``. Supported keys include ``author``,
        ``author_url``, ``month_year``, ``version``, ``project``.

    Returns
    -------
    Path
        The resolved path of the rendered tearsheet.

    Raises
    ------
    ImportError
        If ``jellycell`` is not installed (points at ``[tearsheets]``) or if
        the installed ``jellycell`` is older than v1.4.0.
    """
    require_jellycell()
    try:
        from jellycell.tearsheets import (  # pylint: disable=import-error
            findings as _jc_findings,
        )
    except ImportError as exc:  # pragma: no cover - exercised in lazy-import tests
        raise ImportError(_JELLYCELL_14_HINT) from exc

    rendered = _jc_findings(
        results=dict(results),
        out_path=str(out_path),
        project=project,
        template_overrides=dict(template_overrides) if template_overrides else None,
    )
    return Path(rendered)

require_factor_factory

require_factor_factory() -> ModuleType

Return the factor_factory top-level module or raise a crisp error.

Returns

ModuleType The imported factor_factory module.

Raises

ImportError If factor-factory is not installed. The message includes the exact pip install command to recover.

Source code in src/subway_access/reporting/_jellycell_bridge.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def require_factor_factory() -> ModuleType:
    """Return the ``factor_factory`` top-level module or raise a crisp error.

    Returns
    -------
    ModuleType
        The imported ``factor_factory`` module.

    Raises
    ------
    ImportError
        If ``factor-factory`` is not installed. The message includes the
        exact ``pip install`` command to recover.
    """
    try:
        import factor_factory
    except ImportError as exc:  # pragma: no cover - exercised in lazy-import tests
        raise ImportError(_FACTOR_FACTORY_HINT) from exc
    # The paired ignore codes are intentional: ``no-any-return`` fires when
    # factor-factory is NOT installed (the mypy override treats the import
    # as ``Any``), and ``unused-ignore`` silences mypy when it IS installed
    # (the real module resolves cleanly). Keeping both keeps CI green under
    # either environment.
    return factor_factory  # type: ignore[no-any-return, unused-ignore]

require_jellycell

require_jellycell() -> ModuleType

Return the jellycell top-level module or raise a crisp error.

Returns

ModuleType The imported jellycell module.

Raises

ImportError If jellycell is not installed. The message includes the exact pip install command to recover.

Source code in src/subway_access/reporting/_jellycell_bridge.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def require_jellycell() -> ModuleType:
    """Return the ``jellycell`` top-level module or raise a crisp error.

    Returns
    -------
    ModuleType
        The imported ``jellycell`` module.

    Raises
    ------
    ImportError
        If ``jellycell`` is not installed. The message includes the exact
        ``pip install`` command to recover.
    """
    try:
        import jellycell
    except ImportError as exc:  # pragma: no cover - exercised in lazy-import tests
        raise ImportError(_JELLYCELL_HINT) from exc
    return jellycell  # type: ignore[no-any-return, unused-ignore]

write_engine_results_json

write_engine_results_json(
    results: Iterable[Any] | Any,
    *,
    artifacts_dir: Path,
    family: EngineKind | str,
) -> Path

Serialize one or more factor-factory engine results to <family>_results.json.

The shipped jellycell findings.md.j2 template reads from a structured JSON file at <project_dir>/artifacts/<family>_results.json with the shape {"results": [<to_dict>...]}. This helper accepts either a single result dataclass (with a .to_dict() method), an iterable of them, or a pre-built Results wrapper (with a .to_records() method, as returned by factor_factory.engines.<family>.estimate(...)).

Parameters

results One of:

- A ``*Results`` wrapper exposing ``.to_records()`` (e.g.
  ``DidResults``, ``RddResults``, ``ScmResults``, ``SpatialResults``).
- A single ``*Result`` frozen dataclass exposing ``.to_dict()``.
- An iterable of such dataclasses.

artifacts_dir Directory where the JSON file is written. Created if it does not already exist. family Engine-family name used as the filename stem. Supported values are listed in :data:EngineKind; any string is accepted to support downstream engines not yet enumerated here.

Returns

Path The absolute path of the written JSON file.

Source code in src/subway_access/reporting/_jellycell_bridge.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def write_engine_results_json(
    results: Iterable[Any] | Any,
    *,
    artifacts_dir: Path,
    family: EngineKind | str,
) -> Path:
    """Serialize one or more factor-factory engine results to ``<family>_results.json``.

    The shipped jellycell ``findings.md.j2`` template reads from a structured
    JSON file at ``<project_dir>/artifacts/<family>_results.json`` with the
    shape ``{"results": [<to_dict>...]}``. This helper accepts either a
    single result dataclass (with a ``.to_dict()`` method), an iterable of
    them, or a pre-built ``Results`` wrapper (with a ``.to_records()``
    method, as returned by ``factor_factory.engines.<family>.estimate(...)``).

    Parameters
    ----------
    results
        One of:

        - A ``*Results`` wrapper exposing ``.to_records()`` (e.g.
          ``DidResults``, ``RddResults``, ``ScmResults``, ``SpatialResults``).
        - A single ``*Result`` frozen dataclass exposing ``.to_dict()``.
        - An iterable of such dataclasses.

    artifacts_dir
        Directory where the JSON file is written. Created if it does not
        already exist.
    family
        Engine-family name used as the filename stem. Supported values are
        listed in :data:`EngineKind`; any string is accepted to support
        downstream engines not yet enumerated here.

    Returns
    -------
    Path
        The absolute path of the written JSON file.
    """
    artifacts_dir = Path(artifacts_dir)
    artifacts_dir.mkdir(parents=True, exist_ok=True)

    records: list[dict[str, Any]]
    if hasattr(results, "to_records") and not isinstance(results, (list, tuple)):
        records = list(results.to_records())
    elif hasattr(results, "to_dict") and not isinstance(results, (list, tuple)):
        records = [results.to_dict()]
    else:
        records = [
            item.to_dict() if hasattr(item, "to_dict") else dict(item)
            for item in results
        ]

    output_path = artifacts_dir / f"{family}_results.json"
    # Trailing newline keeps `end-of-file-fixer` (pre-commit) happy when the
    # JSON is committed alongside a jellycell-backed example.
    output_path.write_text(
        json.dumps({"results": records}, indent=2, default=str, sort_keys=True) + "\n",
        encoding="utf-8",
    )
    return output_path

Temporal

subway_access.temporal

Temporal panel infrastructure for accessibility-over-time analysis.

AVAILABLE_VINTAGE_YEARS module-attribute

AVAILABLE_VINTAGE_YEARS: tuple[int, ...] = (
    2017,
    2018,
    2019,
    2020,
    2021,
    2022,
    2023,
)

PanelDataset dataclass

Geographic panel: unit x time observations.

Parameters:

Name Type Description Default
observations tuple[PanelObservation, ...]

All panel rows.

required
unit_type str

Geographic unit type ("nta" or "tract").

required
periods tuple[str, ...]

Ordered tuple of period labels.

required
Example

panel.periods ('2017', '2018', '2019', '2020', '2021', '2022', '2023') len(panel.observations) 1400

Source code in src/subway_access/temporal/_models.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@dataclass(frozen=True, slots=True)
class PanelDataset:
    """Geographic panel: unit x time observations.

    Args:
        observations: All panel rows.
        unit_type: Geographic unit type (``"nta"`` or ``"tract"``).
        periods: Ordered tuple of period labels.

    Example:
        >>> panel.periods
        ('2017', '2018', '2019', '2020', '2021', '2022', '2023')
        >>> len(panel.observations)
        1400
    """

    observations: tuple[PanelObservation, ...]
    unit_type: str
    periods: tuple[str, ...]

    def treatment_group(self) -> PanelDataset:
        """Return only observations in units that were ever treated."""

        treated_units = {
            obs.unit_id for obs in self.observations if obs.treatment_year is not None
        }
        return PanelDataset(
            observations=tuple(
                obs for obs in self.observations if obs.unit_id in treated_units
            ),
            unit_type=self.unit_type,
            periods=self.periods,
        )

    def control_group(self) -> PanelDataset:
        """Return only observations in units that were never treated."""

        treated_units = {
            obs.unit_id for obs in self.observations if obs.treatment_year is not None
        }
        return PanelDataset(
            observations=tuple(
                obs for obs in self.observations if obs.unit_id not in treated_units
            ),
            unit_type=self.unit_type,
            periods=self.periods,
        )

    def to_dataframe(self) -> Any:
        """Convert to a pandas DataFrame with (unit_id, period) index.

        Returns:
            A pandas DataFrame.

        Raises:
            ImportError: If pandas is not installed.
        """

        try:
            import pandas as pd
        except ImportError as exc:
            message = (
                "pandas is required for to_dataframe(). "
                "Install it with: pip install subway-access[panel]"
            )
            raise ImportError(message) from exc

        rows: list[dict[str, Any]] = []
        for obs in self.observations:
            row: dict[str, Any] = {
                "unit_id": obs.unit_id,
                "period": obs.period,
                "has_accessible_station": obs.has_accessible_station,
                "treatment_year": obs.treatment_year,
                "disability_rate": obs.disability_rate,
                "senior_rate": obs.senior_rate,
                "poverty_rate": obs.poverty_rate,
                "total_population": obs.total_population,
                "accessible_station_count": obs.accessible_station_count,
                "nearest_accessible_distance_m": obs.nearest_accessible_distance_m,
                "need_score": obs.need_score,
                **(obs.covariates or {}),
            }
            rows.append(row)
        df = pd.DataFrame(rows)
        return df.set_index(["unit_id", "period"])

    @property
    def unit_ids(self) -> tuple[str, ...]:
        """Return sorted unique unit identifiers."""

        return tuple(sorted({obs.unit_id for obs in self.observations}))

observations instance-attribute

observations: tuple[PanelObservation, ...]

unit_type instance-attribute

unit_type: str

periods instance-attribute

periods: tuple[str, ...]

unit_ids property

unit_ids: tuple[str, ...]

Return sorted unique unit identifiers.

treatment_group

treatment_group() -> PanelDataset

Return only observations in units that were ever treated.

Source code in src/subway_access/temporal/_models.py
75
76
77
78
79
80
81
82
83
84
85
86
87
def treatment_group(self) -> PanelDataset:
    """Return only observations in units that were ever treated."""

    treated_units = {
        obs.unit_id for obs in self.observations if obs.treatment_year is not None
    }
    return PanelDataset(
        observations=tuple(
            obs for obs in self.observations if obs.unit_id in treated_units
        ),
        unit_type=self.unit_type,
        periods=self.periods,
    )

control_group

control_group() -> PanelDataset

Return only observations in units that were never treated.

Source code in src/subway_access/temporal/_models.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def control_group(self) -> PanelDataset:
    """Return only observations in units that were never treated."""

    treated_units = {
        obs.unit_id for obs in self.observations if obs.treatment_year is not None
    }
    return PanelDataset(
        observations=tuple(
            obs for obs in self.observations if obs.unit_id not in treated_units
        ),
        unit_type=self.unit_type,
        periods=self.periods,
    )

to_dataframe

to_dataframe() -> Any

Convert to a pandas DataFrame with (unit_id, period) index.

Returns:

Type Description
Any

A pandas DataFrame.

Raises:

Type Description
ImportError

If pandas is not installed.

Source code in src/subway_access/temporal/_models.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def to_dataframe(self) -> Any:
    """Convert to a pandas DataFrame with (unit_id, period) index.

    Returns:
        A pandas DataFrame.

    Raises:
        ImportError: If pandas is not installed.
    """

    try:
        import pandas as pd
    except ImportError as exc:
        message = (
            "pandas is required for to_dataframe(). "
            "Install it with: pip install subway-access[panel]"
        )
        raise ImportError(message) from exc

    rows: list[dict[str, Any]] = []
    for obs in self.observations:
        row: dict[str, Any] = {
            "unit_id": obs.unit_id,
            "period": obs.period,
            "has_accessible_station": obs.has_accessible_station,
            "treatment_year": obs.treatment_year,
            "disability_rate": obs.disability_rate,
            "senior_rate": obs.senior_rate,
            "poverty_rate": obs.poverty_rate,
            "total_population": obs.total_population,
            "accessible_station_count": obs.accessible_station_count,
            "nearest_accessible_distance_m": obs.nearest_accessible_distance_m,
            "need_score": obs.need_score,
            **(obs.covariates or {}),
        }
        rows.append(row)
    df = pd.DataFrame(rows)
    return df.set_index(["unit_id", "period"])

PanelObservation dataclass

Single (unit x period) observation in the geographic panel.

Parameters:

Name Type Description Default
unit_id str

NTA code or census tract GEOID.

required
period str

ACS vintage year, e.g. "2019".

required
has_accessible_station bool

Treatment indicator -- True if this unit gained at least one accessible station by this period.

required
treatment_year int | None

Year the first accessible station opened in this unit's catchment, or None if never treated.

required
disability_rate float

ACS 5-year estimate for this vintage.

required
senior_rate float

ACS 5-year estimate for this vintage.

required
poverty_rate float

ACS 5-year estimate for this vintage.

required
total_population int

ACS 5-year estimate for this vintage.

required
accessible_station_count int

Number of accessible stations in catchment.

required
nearest_accessible_distance_m float | None

Haversine distance to nearest accessible station, or None if none exist.

required
need_score float

Composite factor pipeline output.

required
covariates dict[str, float] | None

Extensible dict for additional time-varying covariates (median_rent, employment rate, etc.).

None
Example

obs = PanelObservation( ... unit_id="36061000100", period="2020", ... has_accessible_station=True, treatment_year=2019, ... disability_rate=0.08, senior_rate=0.12, poverty_rate=0.15, ... total_population=4500, accessible_station_count=2, ... nearest_accessible_distance_m=320.0, need_score=0.117, ... )

Source code in src/subway_access/temporal/_models.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass(frozen=True, slots=True)
class PanelObservation:
    """Single (unit x period) observation in the geographic panel.

    Args:
        unit_id: NTA code or census tract GEOID.
        period: ACS vintage year, e.g. ``"2019"``.
        has_accessible_station: Treatment indicator -- True if this unit
            gained at least one accessible station by this period.
        treatment_year: Year the first accessible station opened in
            this unit's catchment, or None if never treated.
        disability_rate: ACS 5-year estimate for this vintage.
        senior_rate: ACS 5-year estimate for this vintage.
        poverty_rate: ACS 5-year estimate for this vintage.
        total_population: ACS 5-year estimate for this vintage.
        accessible_station_count: Number of accessible stations in catchment.
        nearest_accessible_distance_m: Haversine distance to nearest
            accessible station, or None if none exist.
        need_score: Composite factor pipeline output.
        covariates: Extensible dict for additional time-varying covariates
            (median_rent, employment rate, etc.).

    Example:
        >>> obs = PanelObservation(
        ...     unit_id="36061000100", period="2020",
        ...     has_accessible_station=True, treatment_year=2019,
        ...     disability_rate=0.08, senior_rate=0.12, poverty_rate=0.15,
        ...     total_population=4500, accessible_station_count=2,
        ...     nearest_accessible_distance_m=320.0, need_score=0.117,
        ... )
    """

    unit_id: str
    period: str
    has_accessible_station: bool
    treatment_year: int | None
    disability_rate: float
    senior_rate: float
    poverty_rate: float
    total_population: int
    accessible_station_count: int
    nearest_accessible_distance_m: float | None
    need_score: float
    covariates: dict[str, float] | None = None

unit_id instance-attribute

unit_id: str

period instance-attribute

period: str

has_accessible_station instance-attribute

has_accessible_station: bool

treatment_year instance-attribute

treatment_year: int | None

disability_rate instance-attribute

disability_rate: float

senior_rate instance-attribute

senior_rate: float

poverty_rate instance-attribute

poverty_rate: float

total_population instance-attribute

total_population: int

accessible_station_count instance-attribute

accessible_station_count: int

nearest_accessible_distance_m instance-attribute

nearest_accessible_distance_m: float | None

need_score instance-attribute

need_score: float

covariates class-attribute instance-attribute

covariates: dict[str, float] | None = None

StationUpgradeRecord dataclass

Record of a station's ADA upgrade timeline.

Parameters:

Name Type Description Default
station_id str

MTA station identifier.

required
station_name str

Human-readable station name.

required
borough str

NYC borough.

required
latitude float

Station latitude.

required
longitude float

Station longitude.

required
upgrade_year int | None

Year the station became ADA-accessible, or None.

required
upgrade_source str

Data source for the upgrade date.

''
Source code in src/subway_access/temporal/_models.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@dataclass(frozen=True, slots=True)
class StationUpgradeRecord:
    """Record of a station's ADA upgrade timeline.

    Args:
        station_id: MTA station identifier.
        station_name: Human-readable station name.
        borough: NYC borough.
        latitude: Station latitude.
        longitude: Station longitude.
        upgrade_year: Year the station became ADA-accessible, or None.
        upgrade_source: Data source for the upgrade date.
    """

    station_id: str
    station_name: str
    borough: str
    latitude: float
    longitude: float
    upgrade_year: int | None
    upgrade_source: str = ""

station_id instance-attribute

station_id: str

station_name instance-attribute

station_name: str

borough instance-attribute

borough: str

latitude instance-attribute

latitude: float

longitude instance-attribute

longitude: float

upgrade_year instance-attribute

upgrade_year: int | None

upgrade_source class-attribute instance-attribute

upgrade_source: str = ''

UpgradeTimeline dataclass

Collection of station ADA upgrade records.

Parameters:

Name Type Description Default
records tuple[StationUpgradeRecord, ...]

All station upgrade records.

required
Example

timeline.stations_upgraded_by(2020) ('S1', 'S2', 'S3')

Source code in src/subway_access/temporal/_models.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@dataclass(frozen=True, slots=True)
class UpgradeTimeline:
    """Collection of station ADA upgrade records.

    Args:
        records: All station upgrade records.

    Example:
        >>> timeline.stations_upgraded_by(2020)
        ('S1', 'S2', 'S3')
    """

    records: tuple[StationUpgradeRecord, ...]

    def stations_upgraded_by(self, year: int) -> tuple[str, ...]:
        """Return station IDs upgraded on or before the given year."""

        return tuple(
            r.station_id
            for r in self.records
            if r.upgrade_year is not None and r.upgrade_year <= year
        )

    def upgrade_year_for(self, station_id: str) -> int | None:
        """Return the upgrade year for a specific station, or None."""

        for r in self.records:
            if r.station_id == station_id:
                return r.upgrade_year
        return None

records instance-attribute

records: tuple[StationUpgradeRecord, ...]

stations_upgraded_by

stations_upgraded_by(year: int) -> tuple[str, ...]

Return station IDs upgraded on or before the given year.

Source code in src/subway_access/temporal/_models.py
186
187
188
189
190
191
192
193
def stations_upgraded_by(self, year: int) -> tuple[str, ...]:
    """Return station IDs upgraded on or before the given year."""

    return tuple(
        r.station_id
        for r in self.records
        if r.upgrade_year is not None and r.upgrade_year <= year
    )

upgrade_year_for

upgrade_year_for(station_id: str) -> int | None

Return the upgrade year for a specific station, or None.

Source code in src/subway_access/temporal/_models.py
195
196
197
198
199
200
201
def upgrade_year_for(self, station_id: str) -> int | None:
    """Return the upgrade year for a specific station, or None."""

    for r in self.records:
        if r.station_id == station_id:
            return r.upgrade_year
    return None

fetch_acs_tract_estimates_for_year

fetch_acs_tract_estimates_for_year(
    year: int,
    *,
    tract_geoids: tuple[str, ...] | None = None,
) -> dict[str, dict[str, object]]

Fetch ACS tract-level estimates for a specific vintage year.

Parameters:

Name Type Description Default
year int

ACS 5-year vintage year (e.g. 2020).

required
tract_geoids tuple[str, ...] | None

Optional tuple of tract GEOIDs to filter to. If None, fetches all NYC tracts.

None

Returns:

Type Description
dict[str, dict[str, object]]

Mapping of tract GEOID to estimate dict with keys: tract_id,

dict[str, dict[str, object]]

tract_name, total_population, senior_rate, poverty_rate,

dict[str, dict[str, object]]

disability_rate.

Example

estimates = fetch_acs_tract_estimates_for_year(2020) len(estimates) 2168

Source code in src/subway_access/temporal/_acs_vintage.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def fetch_acs_tract_estimates_for_year(
    year: int,
    *,
    tract_geoids: tuple[str, ...] | None = None,
) -> dict[str, dict[str, object]]:
    """Fetch ACS tract-level estimates for a specific vintage year.

    Args:
        year: ACS 5-year vintage year (e.g. 2020).
        tract_geoids: Optional tuple of tract GEOIDs to filter to.
            If None, fetches all NYC tracts.

    Returns:
        Mapping of tract GEOID to estimate dict with keys: tract_id,
        tract_name, total_population, senior_rate, poverty_rate,
        disability_rate.

    Example:
        >>> estimates = fetch_acs_tract_estimates_for_year(2020)
        >>> len(estimates)
        2168
    """

    requested_geoids = set(tract_geoids or ())
    county_codes = (
        sorted({geoid[2:5] for geoid in requested_geoids})
        if requested_geoids
        else list(NYC_COUNTY_CODES)
    )
    estimates: dict[str, dict[str, object]] = {}

    for county_code in county_codes:
        count_header, count_rows = _read_census_rows(
            _counts_url(county_code, year=year)
        )
        subject_header, subject_rows = _read_census_rows(
            _subject_url(county_code, year=year)
        )
        subject_lookup = {
            f"{row[subject_header.index('state')]}{row[subject_header.index('county')]}{row[subject_header.index('tract')]}": row
            for row in subject_rows
        }

        for row in count_rows:
            row_map = {name: row[index] for index, name in enumerate(count_header)}
            geoid = f"{row_map['state']}{row_map['county']}{row_map['tract']}"
            if requested_geoids and geoid not in requested_geoids:
                continue
            subject_row = subject_lookup.get(geoid)
            if subject_row is None:
                continue
            subject_map = {
                name: subject_row[index] for index, name in enumerate(subject_header)
            }
            total_population = _as_int(row_map["B01003_001E"])
            senior_population = sum(
                _as_int(row_map[name]) for name in _SENIOR_VARIABLES
            )
            estimates[geoid] = {
                "tract_id": geoid,
                "tract_name": row_map["NAME"],
                "total_population": total_population,
                "senior_rate": 0.0
                if total_population == 0
                else senior_population / total_population,
                "poverty_rate": _as_percent(subject_map["S1701_C02_001E"]),
                "disability_rate": _as_percent(subject_map["S1810_C02_001E"]),
            }

    return estimates

fetch_multi_vintage_estimates

fetch_multi_vintage_estimates(
    years: tuple[int, ...] | None = None,
    *,
    tract_geoids: tuple[str, ...] | None = None,
) -> dict[int, dict[str, dict[str, object]]]

Fetch ACS estimates for multiple vintage years.

Parameters:

Name Type Description Default
years tuple[int, ...] | None

Tuple of vintage years to fetch. Defaults to all available.

None
tract_geoids tuple[str, ...] | None

Optional tuple of tract GEOIDs to filter to.

None

Returns:

Type Description
dict[int, dict[str, dict[str, object]]]

Nested mapping: year -> tract_geoid -> estimate dict.

Example

multi = fetch_multi_vintage_estimates(years=(2019, 2020, 2021)) sorted(multi.keys()) [2019, 2020, 2021]

Source code in src/subway_access/temporal/_acs_vintage.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def fetch_multi_vintage_estimates(
    years: tuple[int, ...] | None = None,
    *,
    tract_geoids: tuple[str, ...] | None = None,
) -> dict[int, dict[str, dict[str, object]]]:
    """Fetch ACS estimates for multiple vintage years.

    Args:
        years: Tuple of vintage years to fetch. Defaults to all available.
        tract_geoids: Optional tuple of tract GEOIDs to filter to.

    Returns:
        Nested mapping: year -> tract_geoid -> estimate dict.

    Example:
        >>> multi = fetch_multi_vintage_estimates(years=(2019, 2020, 2021))
        >>> sorted(multi.keys())
        [2019, 2020, 2021]
    """

    vintage_years = years or AVAILABLE_VINTAGE_YEARS
    result: dict[int, dict[str, dict[str, object]]] = {}
    for year in vintage_years:
        result[year] = fetch_acs_tract_estimates_for_year(
            year, tract_geoids=tract_geoids
        )
    return result

build_panel_dataset

build_panel_dataset(
    vintage_estimates: dict[
        int, dict[str, dict[str, object]]
    ],
    station_locations: dict[str, tuple[float, float]],
    upgrade_timeline: UpgradeTimeline,
    *,
    catchment_radius_meters: float = 800.0,
    unit_type: str = "tract",
    extra_covariates: dict[int, dict[str, dict[str, float]]]
    | None = None,
) -> PanelDataset

Construct a geographic panel dataset from multi-vintage ACS estimates.

Joins demographic estimates across vintage years with station accessibility status (derived from the upgrade timeline) to produce a panel suitable for difference-in-differences or spatial panel regression.

Parameters:

Name Type Description Default
vintage_estimates dict[int, dict[str, dict[str, object]]]

Nested mapping year -> tract_geoid -> estimate dict. Each estimate dict must have keys: disability_rate, senior_rate, poverty_rate, total_population.

required
station_locations dict[str, tuple[float, float]]

Mapping of station_id -> (latitude, longitude) for all stations in the study area.

required
upgrade_timeline UpgradeTimeline

Station ADA upgrade records with year information.

required
catchment_radius_meters float

Maximum distance in meters for a tract to be considered "covered" by a station. Defaults to 800m (~10 min walk).

800.0
unit_type str

Geographic unit type label ("tract" or "nta").

'tract'
extra_covariates dict[int, dict[str, dict[str, float]]] | None

Optional nested mapping year -> tract_geoid -> dict of additional time-varying covariates (e.g. median_rent).

None

Returns:

Type Description
PanelDataset

A PanelDataset with one observation per (unit, period) pair.

Example

panel = build_panel_dataset( ... vintage_estimates=multi_vintage, ... station_locations=stations, ... upgrade_timeline=timeline, ... ) len(panel.observations) 1400

Source code in src/subway_access/temporal/_panel.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def build_panel_dataset(
    vintage_estimates: dict[int, dict[str, dict[str, object]]],
    station_locations: dict[str, tuple[float, float]],
    upgrade_timeline: UpgradeTimeline,
    *,
    catchment_radius_meters: float = 800.0,
    unit_type: str = "tract",
    extra_covariates: dict[int, dict[str, dict[str, float]]] | None = None,
) -> PanelDataset:
    """Construct a geographic panel dataset from multi-vintage ACS estimates.

    Joins demographic estimates across vintage years with station accessibility
    status (derived from the upgrade timeline) to produce a panel suitable for
    difference-in-differences or spatial panel regression.

    Args:
        vintage_estimates: Nested mapping year -> tract_geoid -> estimate dict.
            Each estimate dict must have keys: disability_rate, senior_rate,
            poverty_rate, total_population.
        station_locations: Mapping of station_id -> (latitude, longitude) for
            all stations in the study area.
        upgrade_timeline: Station ADA upgrade records with year information.
        catchment_radius_meters: Maximum distance in meters for a tract to
            be considered "covered" by a station. Defaults to 800m (~10 min walk).
        unit_type: Geographic unit type label (``"tract"`` or ``"nta"``).
        extra_covariates: Optional nested mapping year -> tract_geoid -> dict
            of additional time-varying covariates (e.g. median_rent).

    Returns:
        A PanelDataset with one observation per (unit, period) pair.

    Example:
        >>> panel = build_panel_dataset(
        ...     vintage_estimates=multi_vintage,
        ...     station_locations=stations,
        ...     upgrade_timeline=timeline,
        ... )
        >>> len(panel.observations)
        1400
    """

    periods = tuple(str(year) for year in sorted(vintage_estimates))
    extras = extra_covariates or {}
    observations: list[PanelObservation] = []

    # Pre-compute which station_ids are accessible by each year.
    years = sorted(vintage_estimates)
    accessible_by_year: dict[int, set[str]] = {}
    for year in years:
        accessible_by_year[year] = set(upgrade_timeline.stations_upgraded_by(year))

    # Collect all tract_ids across all vintages.
    all_tract_ids: set[str] = set()
    for year_estimates in vintage_estimates.values():
        all_tract_ids.update(year_estimates)

    for tract_id in sorted(all_tract_ids):
        # Determine treatment year: earliest year this tract gains an accessible station.
        first_treatment_year: int | None = None

        for year in years:
            estimate = vintage_estimates[year].get(tract_id)
            if estimate is None:
                continue

            # Compute which accessible stations cover this tract.
            tract_lat = _safe_float(estimate.get("centroid_latitude"))
            tract_lon = _safe_float(estimate.get("centroid_longitude"))
            accessible_stations_in_year = accessible_by_year[year]

            covering_count = 0
            nearest_distance: float | None = None

            if tract_lat is not None and tract_lon is not None:
                for station_id, (slat, slon) in station_locations.items():
                    distance = haversine_distance_meters(
                        latitude_a=tract_lat,
                        longitude_a=tract_lon,
                        latitude_b=slat,
                        longitude_b=slon,
                    )
                    if station_id in accessible_stations_in_year:
                        if distance <= catchment_radius_meters:
                            covering_count += 1
                        if nearest_distance is None or distance < nearest_distance:
                            nearest_distance = distance

            has_accessible = covering_count > 0
            if has_accessible and first_treatment_year is None:
                first_treatment_year = year

            disability_rate = _safe_float(estimate.get("disability_rate")) or 0.0
            senior_rate = _safe_float(estimate.get("senior_rate")) or 0.0
            poverty_rate = _safe_float(estimate.get("poverty_rate")) or 0.0
            raw_pop = estimate.get("total_population")
            total_population = int(str(raw_pop)) if raw_pop is not None else 0

            year_extras = extras.get(year, {}).get(tract_id, {})

            observations.append(
                PanelObservation(
                    unit_id=tract_id,
                    period=str(year),
                    has_accessible_station=has_accessible,
                    treatment_year=first_treatment_year,
                    disability_rate=disability_rate,
                    senior_rate=senior_rate,
                    poverty_rate=poverty_rate,
                    total_population=total_population,
                    accessible_station_count=covering_count,
                    nearest_accessible_distance_m=nearest_distance,
                    need_score=fmean((disability_rate, senior_rate, poverty_rate)),
                    covariates=year_extras,
                )
            )

    return PanelDataset(
        observations=tuple(observations),
        unit_type=unit_type,
        periods=periods,
    )

build_distance_weights

build_distance_weights(
    unit_centroids: dict[str, tuple[float, float]],
    *,
    threshold_meters: float = 2000.0,
    row_standardize: bool = True,
) -> dict[str, dict[str, float]]

Build a distance-based spatial weights matrix.

Creates a symmetric weights matrix where units within the distance threshold are neighbors, weighted by inverse distance.

Parameters:

Name Type Description Default
unit_centroids dict[str, tuple[float, float]]

Mapping of unit_id -> (latitude, longitude).

required
threshold_meters float

Maximum distance for two units to be neighbors.

2000.0
row_standardize bool

If True, normalize each row to sum to 1.0.

True

Returns:

Type Description
dict[str, dict[str, float]]

Nested dict: unit_id -> neighbor_unit_id -> weight.

dict[str, dict[str, float]]

Units with no neighbors have an empty inner dict.

Example

centroids = {"T1": (40.75, -73.99), "T2": (40.751, -73.991)} weights = build_distance_weights(centroids, threshold_meters=500) len(weights) 2

Source code in src/subway_access/temporal/_spatial_weights.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def build_distance_weights(
    unit_centroids: dict[str, tuple[float, float]],
    *,
    threshold_meters: float = 2000.0,
    row_standardize: bool = True,
) -> dict[str, dict[str, float]]:
    """Build a distance-based spatial weights matrix.

    Creates a symmetric weights matrix where units within the distance
    threshold are neighbors, weighted by inverse distance.

    Args:
        unit_centroids: Mapping of unit_id -> (latitude, longitude).
        threshold_meters: Maximum distance for two units to be neighbors.
        row_standardize: If True, normalize each row to sum to 1.0.

    Returns:
        Nested dict: unit_id -> neighbor_unit_id -> weight.
        Units with no neighbors have an empty inner dict.

    Example:
        >>> centroids = {"T1": (40.75, -73.99), "T2": (40.751, -73.991)}
        >>> weights = build_distance_weights(centroids, threshold_meters=500)
        >>> len(weights)
        2
    """

    unit_ids = sorted(unit_centroids)
    raw_weights: dict[str, dict[str, float]] = {uid: {} for uid in unit_ids}

    for i, uid_a in enumerate(unit_ids):
        lat_a, lon_a = unit_centroids[uid_a]
        for uid_b in unit_ids[i + 1 :]:
            lat_b, lon_b = unit_centroids[uid_b]
            distance = haversine_distance_meters(
                latitude_a=lat_a,
                longitude_a=lon_a,
                latitude_b=lat_b,
                longitude_b=lon_b,
            )
            if 0 < distance <= threshold_meters:
                weight = 1.0 / distance
                raw_weights[uid_a][uid_b] = weight
                raw_weights[uid_b][uid_a] = weight

    if row_standardize:
        for uid in unit_ids:
            row_sum = sum(raw_weights[uid].values())
            if row_sum > 0:
                raw_weights[uid] = {
                    neighbor: w / row_sum for neighbor, w in raw_weights[uid].items()
                }

    return raw_weights

weights_to_pysal

weights_to_pysal(
    weights: dict[str, dict[str, float]],
) -> Any

Convert a weights dict to a PySAL W object.

Parameters:

Name Type Description Default
weights dict[str, dict[str, float]]

Nested dict from build_distance_weights.

required

Returns:

Type Description
Any

A libpysal.weights.W instance.

Raises:

Type Description
ImportError

If libpysal is not installed.

Example

w = weights_to_pysal(weights) w.n 200

Source code in src/subway_access/temporal/_spatial_weights.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def weights_to_pysal(
    weights: dict[str, dict[str, float]],
) -> Any:
    """Convert a weights dict to a PySAL W object.

    Args:
        weights: Nested dict from ``build_distance_weights``.

    Returns:
        A ``libpysal.weights.W`` instance.

    Raises:
        ImportError: If libpysal is not installed.

    Example:
        >>> w = weights_to_pysal(weights)
        >>> w.n
        200
    """

    try:
        from libpysal.weights import W
    except ImportError as exc:
        message = (
            "libpysal is required for spatial weights. "
            "Install with: pip install subway-access[spatial-models]"
        )
        raise ImportError(message) from exc

    neighbors = {uid: list(nbrs) for uid, nbrs in weights.items()}
    weight_values = {uid: list(nbrs.values()) for uid, nbrs in weights.items()}
    return W(neighbors, weight_values)

build_upgrade_timeline

build_upgrade_timeline(
    station_data: StationDataset,
    *,
    known_upgrades: dict[str, int] | None = None,
    known_upgrade_sources: dict[str, str] | None = None,
    source: str = "mta_ada_status",
) -> UpgradeTimeline

Build an upgrade timeline from station dataset and known upgrade years.

For stations currently marked "accessible" but without a known upgrade year, they are treated as always-accessible (upgrade_year=None, which means they are in the treatment group for all periods).

Parameters:

Name Type Description Default
station_data StationDataset

Current station dataset with ADA status.

required
known_upgrades dict[str, int] | None

Optional mapping of station_id -> upgrade year. Overrides or supplements the built-in database.

None
known_upgrade_sources dict[str, str] | None

Optional mapping of station_id -> per-station provenance tag. When a station id appears here, the tag is used as the record's upgrade_source — letting callers distinguish e.g. "press_release_sourced" stations from "hash_fallback" stations in the same timeline. Stations without an explicit entry fall back to the source kwarg.

None
source str

Default label used for the data source when no per-station tag is supplied via known_upgrade_sources.

'mta_ada_status'

Returns:

Type Description
UpgradeTimeline

An UpgradeTimeline with one record per station.

Example

timeline = build_upgrade_timeline(stations) timeline.stations_upgraded_by(2020) ('S1', 'S2')

Source code in src/subway_access/temporal/_upgrade_timeline.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def build_upgrade_timeline(
    station_data: StationDataset,
    *,
    known_upgrades: dict[str, int] | None = None,
    known_upgrade_sources: dict[str, str] | None = None,
    source: str = "mta_ada_status",
) -> UpgradeTimeline:
    """Build an upgrade timeline from station dataset and known upgrade years.

    For stations currently marked "accessible" but without a known upgrade
    year, they are treated as always-accessible (upgrade_year=None, which
    means they are in the treatment group for all periods).

    Args:
        station_data: Current station dataset with ADA status.
        known_upgrades: Optional mapping of station_id -> upgrade year.
            Overrides or supplements the built-in database.
        known_upgrade_sources: Optional mapping of station_id -> per-station
            provenance tag. When a station id appears here, the tag is used
            as the record's ``upgrade_source`` — letting callers distinguish
            e.g. ``"press_release_sourced"`` stations from
            ``"hash_fallback"`` stations in the same timeline. Stations
            without an explicit entry fall back to the ``source`` kwarg.
        source: Default label used for the data source when no per-station
            tag is supplied via ``known_upgrade_sources``.

    Returns:
        An UpgradeTimeline with one record per station.

    Example:
        >>> timeline = build_upgrade_timeline(stations)
        >>> timeline.stations_upgraded_by(2020)
        ('S1', 'S2')
    """

    upgrades = dict(_KNOWN_UPGRADE_YEARS)
    if known_upgrades:
        upgrades.update(known_upgrades)

    per_station_sources = known_upgrade_sources or {}

    records: list[StationUpgradeRecord] = []
    for station in station_data.stations:
        upgrade_year: int | None = upgrades.get(station.station_id)
        records.append(
            StationUpgradeRecord(
                station_id=station.station_id,
                station_name=station.name,
                borough=station.borough,
                latitude=station.latitude,
                longitude=station.longitude,
                upgrade_year=upgrade_year,
                upgrade_source=per_station_sources.get(station.station_id, source),
            )
        )

    return UpgradeTimeline(records=tuple(records))

load_known_upgrades

load_known_upgrades(csv_path: Path) -> dict[str, int]

Read a seeds CSV and return {station_id: upgrade_year} for filled rows.

Rows where upgrade_year is empty or non-numeric are silently skipped.

Source code in src/subway_access/temporal/_upgrade_timeline.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def load_known_upgrades(csv_path: Path) -> dict[str, int]:
    """Read a seeds CSV and return ``{station_id: upgrade_year}`` for filled rows.

    Rows where ``upgrade_year`` is empty or non-numeric are silently skipped.
    """
    known: dict[str, int] = {}
    with csv_path.open(newline="", encoding="utf-8") as fh:
        for row in csv.DictReader(fh):
            year_str = row.get("upgrade_year", "").strip()
            station_id = row.get("station_id", "").strip()
            if year_str and station_id:
                try:
                    known[station_id] = int(year_str)
                except ValueError:
                    continue
    return known

load_known_upgrades_from_dir

load_known_upgrades_from_dir(
    directory: Path,
) -> dict[str, int]

Scan directory for per-borough CSVs and merge all filled upgrade years.

_all_boroughs.csv is excluded to avoid double-counting.

Source code in src/subway_access/temporal/_upgrade_timeline.py
103
104
105
106
107
108
109
110
111
112
113
def load_known_upgrades_from_dir(directory: Path) -> dict[str, int]:
    """Scan *directory* for per-borough CSVs and merge all filled upgrade years.

    ``_all_boroughs.csv`` is excluded to avoid double-counting.
    """
    known: dict[str, int] = {}
    for path in sorted(directory.glob("*.csv")):
        if path.name.startswith("_"):
            continue
        known.update(load_known_upgrades(path))
    return known

CLI

subway_access.cli

Public CLI entry points for subway-access.

main

main(argv: Sequence[str] | None = None) -> int

Entry point for the installed CLI.

Source code in src/subway_access/cli/_main.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def main(argv: Sequence[str] | None = None) -> int:
    """Entry point for the installed CLI."""

    parser = _build_parser()
    command_line = list(argv) if argv is not None else None
    args = parser.parse_args(command_line)

    try:
        if args.command == "fetch-snapshot":
            return run_fetch_snapshot(
                args.cache_dir,
                geography=args.geography,
                value=args.value,
                availability_months=args.availability_months,
                refresh=args.refresh,
                skip_gtfs_archive=args.skip_gtfs_archive,
            )
        if args.command == "analyze-snapshot":
            return run_analyze_snapshot(
                args.cache_dir,
                args.output_dir,
                minutes=args.minutes,
                reliability_window_days=args.reliability_window_days,
            )
        if args.command == "demo":
            return run_demo(
                args.output_dir,
                minutes=args.minutes,
                reliability_window_days=args.reliability_window_days,
            )
        message = f"Unsupported command: {args.command}"
        raise RuntimeError(message)
    except ValueError as exc:
        sys.stderr.write(f"{parser.prog}: error: {exc}\n")
        raise SystemExit(2) from exc

run_analyze_snapshot

run_analyze_snapshot(
    cache_dir: Path,
    output_dir: Path,
    *,
    minutes: int,
    reliability_window_days: int,
) -> int

Analyze a cached snapshot and export real-data outputs.

Source code in src/subway_access/cli/_main.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def run_analyze_snapshot(
    cache_dir: Path,
    output_dir: Path,
    *,
    minutes: int,
    reliability_window_days: int,
) -> int:
    """Analyze a cached snapshot and export real-data outputs."""

    snapshot = load_cached_snapshot(cache_dir)
    if minutes <= 0:
        message = "Catchment minutes must be greater than zero."
        raise ValueError(message)

    catchments = generate_catchments(
        snapshot.stations, CatchmentRequest(minutes=minutes)
    )
    scores = score_accessibility(snapshot.stations, catchments, snapshot.demographics)
    gaps = analyze_gaps(scores)
    reliability = compute_reliability(
        snapshot.stations,
        snapshot.outages,
        TimeWindow(days=reliability_window_days),
    )
    station_metrics = build_station_metrics(
        snapshot.stations,
        catchments,
        scores,
        reliability=reliability,
    )

    output_dir.mkdir(parents=True, exist_ok=True)
    catchments_path = output_dir / "catchments.geojson"
    gaps_path = output_dir / "accessibility-gaps.csv"
    station_metrics_path = output_dir / "station-metrics.csv"

    export_catchments_geojson(
        catchments,
        ExportTarget(format="geojson", output_path=catchments_path),
    )
    export_gap_table(gaps, ExportTarget(format="csv", output_path=gaps_path))
    export_station_metrics(
        station_metrics,
        ExportTarget(format="csv", output_path=station_metrics_path),
    )

    sys.stdout.write("Generated subway-access snapshot outputs:\n")
    sys.stdout.write(
        f"- Study area: {snapshot.query.geography}={snapshot.query.value}\n"
    )
    sys.stdout.write(f"- Catchment GeoJSON: {catchments_path}\n")
    sys.stdout.write(f"- Accessibility gap CSV: {gaps_path}\n")
    sys.stdout.write(f"- Station metrics CSV: {station_metrics_path}\n")
    return 0

run_fetch_snapshot

run_fetch_snapshot(
    cache_dir: Path,
    *,
    geography: str,
    value: str,
    availability_months: int,
    refresh: bool,
    skip_gtfs_archive: bool,
) -> int

Fetch and cache a real-data snapshot for one study area.

Source code in src/subway_access/cli/_main.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def run_fetch_snapshot(
    cache_dir: Path,
    *,
    geography: str,
    value: str,
    availability_months: int,
    refresh: bool,
    skip_gtfs_archive: bool,
) -> int:
    """Fetch and cache a real-data snapshot for one study area."""

    snapshot = fetch_study_area_snapshot(
        AccessibilityQuery(geography=geography, value=value),
        cache_dir=cache_dir,
        refresh=refresh,
        availability_months=availability_months,
        include_gtfs_archive=not skip_gtfs_archive,
    )
    sys.stdout.write("Fetched subway-access real-data snapshot:\n")
    sys.stdout.write(
        f"- Study area: {snapshot.query.geography}={snapshot.query.value}\n"
    )
    sys.stdout.write(f"- Cache directory: {cache_dir}\n")
    sys.stdout.write(f"- Stations: {len(snapshot.stations.stations)}\n")
    sys.stdout.write(f"- Tracts: {len(snapshot.demographics.tracts)}\n")
    sys.stdout.write(f"- Availability rows: {len(snapshot.outages.records)}\n")
    return 0