Dallas Police Active Calls

The city of Dallas provides a real-time snapshot of active police calls on Dallas Open Data. The published data set shows a block, location, nature of call, etc. for current active calls. It is updated every two minutes. These are calls where the unit assigned to the call has arrived and is currently working the call. Calls for service that are not releaseable due to privacy laws are not included.

The primary objective of this project is to capture these events in real-time and display them on a map of the city of Dallas. The map should automatically update as new calls are arrive, and older calls fall off. Server side events will deliver these updates to the front end.

Instead of geographic coordinates, the source data set provides an optional block field, and a location which may be a cross-street or actual address. In order to visualize these calls on a map, lattitude and longitude will be determined by forward geocoding location information. The project will:

  • Capture real-time active calls from the Dallas Open Data API
  • Track changes between API calls, integrating new, updated, and removed records into a local dataset.
  • Enrich call data by forward geocoding location information.
  • Persist updates to Amazon S3 for historical reference.
  • Publish these updates via an API endpoint using Server-Sent Events (SSE).
  • Use Terraform to manage and provision project infrastructure.

The Data Pipeline

The data pipeline is an event-driven workflow triggered when a new json file is downloaded from Dallas Open Data.

The workflow is divided into parallel branches. One branch for identifies additions, updates and deletions by comparing the current Open Data API response with the previous api response. Another branch attempts to forward geocode the call's location information. Calls and geographic data are cached in dynamodb. Dynamodb streams are utilized to push updates an API and persist updates in s3 for further analysis. Calls and Address event are synthesized on the front end.

data pipeline worflow

  1. A lambda function downloads the current snapshot of active calls from Dallas Open Data, and saves the raw json to an s3 bucket based on an eventbridge schedule.
  2. The downloaded file emits an s3:ObjectCreated:Put event.
  3. The s3 event is published to an SNS topic.
  4. The SNS topic feeds the event to parallel processes that track call changes and forward geocode location data.
  5. Location information and active calls data are written to separate dynamodb tables.
  6. Dynamodb streams send updates to a lambda function.
  7. The lambda function receiving streamed updates persists changes to s3, and publishes address data and call updates as Server-Sent events to an API.

The API

The API prototype was built using python's FastAPI framework. It runs in a docker container hosted fly.io. Its methods allow fetching the most recent active calls with enriched location details, and streaming real-time updates to the client. When a new client connects, it will download all current calls, and then subscribe to /get-events/ to receive streaming updates for calls and address information.

  1. Event Queue for Real-Time Updates:
    • An event queue manages incoming events.
    • The event_publisher function continuously yields events from the queue, which are streamed to connected clients via the /get-events/ endpoint using the SSE protocol.
  2. Event Publishing:
    • The /events/ POST endpoint receives JSON payloads from dynamodb streams and puts them into the event queue for processing.
    • Clients subscribe to updates by connecting to the /get-events/ endpoint, which streams events using the StreamingResponse class.
  3. Current Calls Retrieval:
    • The /current-calls/ endpoint retrieves current active calls from two dynamodb tables: dpd_active_calls for call data. address_cache for associated addresses.
    • The endpoint returns the latest updates for each call or newly processed address data