From 51b9edbc4a74ca935efc0431652c1305982690fd Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Fri, 2 Sep 2016 17:57:58 +0100 Subject: [PATCH] github: fix the webhook server Signed-off-by: Thomas Gazagnaire --- bridge/github/src/datakit_github_api.ml | 20 +- bridge/github/src/datakit_github_webhook.ml | 313 +++++++++++-------- bridge/github/src/datakit_github_webhook.mli | 2 +- 3 files changed, 189 insertions(+), 146 deletions(-) diff --git a/bridge/github/src/datakit_github_api.ml b/bridge/github/src/datakit_github_api.ml index 4ba7e6cbe..e6b4b64a9 100644 --- a/bridge/github/src/datakit_github_api.ml +++ b/bridge/github/src/datakit_github_api.ml @@ -103,13 +103,9 @@ module Event = struct include Event - let of_gh e = - let repo = match String.cut ~sep:"/" e.event_repo.repo_name with - | None -> failwith (e.event_repo.repo_name ^ " is not a valid repo name") - | Some (user, repo) -> { Repo.user; repo } - in + let of_gh_constr repo e = let other str = Other (repo, str) in - match e.event_payload with + match e with | `Status s -> Status (Status.of_event repo s) | `PullRequest pr -> PR (PR.of_event repo pr) | `Push p -> Ref (Ref.of_event repo p) @@ -130,9 +126,17 @@ module Event = struct | `PullRequestReviewComment _ -> other "pull-request-review-comment" | `CommitComment _ -> other "commit-comment" + + let of_gh e = + let repo = match String.cut ~sep:"/" e.event_repo.repo_name with + | None -> failwith (e.event_repo.repo_name ^ " is not a valid repo name") + | Some (user, repo) -> { Repo.user; repo } + in + of_gh_constr repo e.event_payload + end -let event = Event.of_gh +let event_constr = Event.of_gh_constr open Rresult open Lwt.Infix @@ -246,5 +250,5 @@ let events token r = module Webhook = struct include Datakit_github_webhook - let events t = List.map event (events t) + let events t = List.map (fun (r, e) -> event_constr r e) (events t) end diff --git a/bridge/github/src/datakit_github_webhook.ml b/bridge/github/src/datakit_github_webhook.ml index 7283b80f2..15be09416 100644 --- a/bridge/github/src/datakit_github_webhook.ml +++ b/bridge/github/src/datakit_github_webhook.ml @@ -9,6 +9,7 @@ open Cohttp open Github_t open Datakit_github open Lwt.Infix +open Astring let some x = Some x @@ -36,20 +37,22 @@ module HTTP = struct routes : Re.t; handler: response option handler; } - type service_search = { service : service; continue : unit -> service_search } type t = { port : int; mutable services: service list; - mutable dispatch: service_search handler; + mutable dispatch: service handler; } - let not_found_service = { + let service_not_found s = { name = "Default404"; routes = Re.any; handler = Lwt.(fun _id req _body -> + let routes = List.map (fun s -> s.name) s in let body = - Fmt.strf "404: Resource '%s' not found\n" (Uri.path (Request.uri req)) + Fmt.strf "404: Resource '%s' not found\nExisting services:\n%a" + (Uri.path (Request.uri req)) + Fmt.(list ~sep:(unit "\n") string) routes in Cohttp_lwt_unix.Server.respond_string ~status:`Not_found ~body () >|= some @@ -59,44 +62,44 @@ module HTTP = struct let make_dispatch services = let routes = List.map (fun s -> Re.compile s.routes, s) services in fun _id req _body -> - let rec cascade = function - | [] -> let rec fix = - {service=not_found_service; continue=fun () -> fix} in fix - | (rt,service)::rest -> - if Re.execp rt (Uri.path (Request.uri req)) - then {service; continue=fun () -> cascade rest} - else cascade rest + Log.debug (fun l -> + l "dispatch %a" Fmt.(Dump.list string) + (List.map (fun (_, x) -> x.name) routes)); + let rec dispatch = function + | [] -> service_not_found services + | (rt, s) :: rest -> + let path = Uri.path (Request.uri req) in + let m = Re.execp rt path in + if m then s else dispatch rest in - Lwt.return (cascade routes) + Lwt.return (dispatch routes) let create port = { port; services=[]; dispatch = make_dispatch [] } - let with_service server service = - let services = service::server.services in - let dispatch = make_dispatch services in - { server with services; dispatch } + let add_service t service = + t.services <- service :: t.services; + t.dispatch <- make_dispatch t.services let listen server = let port = server.port in - let rec callback kfn conn_id req body = - kfn conn_id req body >>= fun {service; continue} -> + let callback t conn_id req body = + t.dispatch conn_id req body >>= fun service -> let pathquery = Uri.path_and_query (Request.uri req) in Log.debug (fun l -> l "%s for %s dispatched to %s" (Code.string_of_method (Request.meth req)) pathquery service.name); service.handler conn_id req body >>= function | None -> - Log.debug (fun l -> - l "%s refused to service %s: continuing" service.name pathquery); - callback (fun _ _ _ -> Lwt.return (continue ())) conn_id req body + Log.err (fun l -> l "%s refused to service %s" service.name pathquery); + (* FIXME: should be a better error *) + Lwt.fail_with "listen" | Some resp -> Lwt.return resp in let conn_closed (_, conn_id) = Log.debug (fun l -> l "conn %s closed" (Connection.to_string conn_id)) in let config = - Cohttp_lwt_unix.Server.make ~callback:(callback server.dispatch) - ~conn_closed () + Cohttp_lwt_unix.Server.make ~callback:(callback server) ~conn_closed () in Cohttp_lwt_unix.Server.create ~mode:(`TCP (`Port port)) config @@ -114,11 +117,11 @@ module Webhook = struct url : Uri.t; secret : Cstruct.t; repo : Repo.t; - status : status; - update_event: t Lwt_condition.t; - last_event : Time.t; + update_event: unit Lwt_condition.t; token : Github.Token.t; handler : HTTP.response option HTTP.handler; + mutable status : status; + mutable last_event: Time.t; } let secret_prefix = "datakit" @@ -136,19 +139,18 @@ module Webhook = struct let verify_event ~secret req body = match Header.get (Request.headers req) "x-hub-signature" with + | None -> Lwt.return_none | Some sign -> - let hmac_label = Re_str.string_before sign 5 in - let hmac_hex = `Hex (Re_str.string_after sign 5) in - if hmac_label <> "sha1=" then Lwt.return_false - else ( + match String.cut ~sep:"=" sign with + | Some ("sha1", hex) -> Cohttp_lwt_body.to_string body >|= fun body -> - hmac_hex = (hmac ~secret body) - ) - | None -> Lwt.return_false + let `Hex hmac = hmac ~secret body in + if String.equal hex hmac then Some body else None + | _ -> Lwt.return_none let new_secret prefix = let `Hex s = Hex.of_cstruct (Nocrypto.Rng.generate 20) in - prefix ^ ":" ^ s + Cstruct.of_string (prefix ^ ":" ^ s) let default_events = [ `Create; `Delete; `Push; (* ref updates *) @@ -156,13 +158,12 @@ module Webhook = struct `PullRequest; (* PR updates *) ] - let new_hook ?(events=default_events) url = - let secret = new_secret secret_prefix in + let new_hook ?(events=default_events) url secret = let new_hook_config = `Web { web_hook_config_url = Uri.to_string url; web_hook_config_content_type = "json"; web_hook_config_insecure_ssl = false; (* FIXME: review *) - web_hook_config_secret = Some secret; + web_hook_config_secret = Some (Cstruct.to_string secret); } in { new_hook_name = "web"; @@ -176,80 +177,69 @@ module Webhook = struct Log.debug (fun l -> l "ignoring hook config for %s" h.hook_name); None - let endpoint_of_hook ~token ~hook (repo, handler) = + let handler t id req body = + verify_event ~secret:t.secret req body >>= function + | None -> + t.status <- Unauthorized; + Log.err (fun l -> + l "FAILURE: Hook registration of %s for %a" + (Uri.to_string t.url) Repo.pp t.repo); + Lwt_condition.broadcast t.update_event (); + verification_failure + | Some body -> + t.last_event <- Time.now (); + t.status <- Connected; + Lwt_condition.broadcast t.update_event (); + Lwt.async (fun () -> + t.handler id req (`String body) >|= function + | None -> () + | Some _ -> ()); + Log.info (fun l -> + l "SUCCESS: Hook registration of %s for %a" + (Uri.to_string t.url) Repo.pp t.repo); + Cohttp_lwt_unix.Server.respond_string ~status:`No_content ~body:"" () + >|= some + + let of_hook ~token (repo, handler) hook secret = match web_hook_config hook with | None -> assert false - | Some w -> - let secret = match w.web_hook_config_secret with - | None -> Cstruct.of_string "" - | Some secret -> Cstruct.of_string secret - in { + | Some w -> { id = hook.hook_id; url = Uri.of_string w.web_hook_config_url; status = Indicated; update_event = Lwt_condition.create (); last_event = Time.min; - token; secret; repo; - handler = (fun conn_id req body -> - verify_event ~secret req body >>= function - | true -> handler conn_id req body - | false -> verification_failure - ); + handler; token; secret; repo; } - let test_endpoint ~token e = + let test ~token t = let open Github.Monad in let f = - Github.Hook.test ~token ~user:e.repo.Repo.user ~repo:e.repo.Repo.repo - ~id:e.id () - |> map Github.Response.value + Github.Hook.test ~token ~user:t.repo.Repo.user ~repo:t.repo.Repo.repo + ~id:t.id () + |> map ignore in run f - let register registry endpoint = - let rec handler _id req body = - verify_event ~secret:endpoint.secret req body >>= fun verified -> - if verified then begin - let last_event = Time.now () in - let endpoint = { endpoint with last_event; status=Connected } in - Hashtbl.replace registry (Uri.path endpoint.url) endpoint; - Lwt_condition.broadcast endpoint.update_event endpoint; - Log.info (fun l -> - l "SUCCESS: Hook registration of %s for %a" - (Uri.to_string endpoint.url) Repo.pp endpoint.repo); - Cohttp_lwt_unix.Server.respond_string ~status:`No_content ~body:"" () - >|= some - end - else begin - let endpoint = { endpoint with handler; status=Unauthorized } in - Log.err (fun l -> - l "FAILURE: Hook registration of %s for %a" - (Uri.to_string endpoint.url) Repo.pp endpoint.repo); - Hashtbl.replace registry (Uri.path endpoint.url) endpoint; - Lwt_condition.broadcast endpoint.update_event endpoint; - verification_failure - end + let register registry t = Hashtbl.replace registry (Uri.path t.url) t + + let check_connectivity t timeout_s = + let timeout () = + Lwt_unix.sleep timeout_s >>= fun () -> + t.status <- Timeout; + Lwt_condition.broadcast t.update_event (); + Lwt.return () in - Hashtbl.replace registry (Uri.path endpoint.url) {endpoint with handler} - - let check_connectivity registry endpoint timeout_s = - Lwt.pick [ - Lwt_unix.sleep timeout_s - >>= begin fun () -> - let endpoint = {endpoint with status=Timeout} in - Hashtbl.replace registry (Uri.path endpoint.url) endpoint; - Lwt_condition.broadcast endpoint.update_event endpoint; - Lwt.return () - end; - let rec wait endpoint = - Lwt_condition.wait endpoint.update_event >>= fun endpoint -> - if endpoint.status=Connected then Lwt.return () - else wait endpoint - in - wait endpoint - ] + let rec wait () = + Log.debug (fun l -> l "wait %Ld" t.id); + Lwt_condition.wait t.update_event >>= fun () -> + Log.debug (fun l -> l "after-wait %Ld" t.id); + if t.status = Connected then Lwt.return_unit + else wait () + in + Lwt.pick [ timeout (); wait () ] - let connect ~token registry url ({Repo.user; repo}, _ as x) = + let connect ~token registry url ({Repo.user; repo}, _ as r) = let points_to_us h = match web_hook_config h with | None -> false @@ -262,24 +252,29 @@ module Webhook = struct >>= fun hooks -> List.fold_left (fun m h -> m >>= fun () -> - if points_to_us h then - Github.Hook.delete ~token ~user ~repo ~id:h.hook_id () + if not (points_to_us h) then return () else + let id = h.hook_id in + Log.info (fun l -> l "Github.Hook.delete %s/%s/%Ld" user repo id); + Github.Hook.delete ~token ~user ~repo ~id () |> map Github.Response.value - else return () ) (return ()) hooks >>= fun () -> - let hook = new_hook url in + let secret = new_secret secret_prefix in + let hook = new_hook url secret in + Log.info (fun l -> + l "Github.Hook.create %s/%s (%s)" user repo @@ Uri.to_string url); Github.Hook.create ~token ~user ~repo ~hook () >>~ fun hook -> - endpoint_of_hook ~token ~hook x + of_hook ~token r hook secret |> return in - Github.Monad.run create >>= fun endpoint -> - register registry {endpoint with status=Pending}; + Github.Monad.run create >>= fun t -> + t.status <- Pending; + register registry t; Lwt.join [ - check_connectivity registry endpoint 10.; - test_endpoint ~token endpoint; + check_connectivity t 10.; + test ~token t; ] >|= fun () -> - Hashtbl.find registry (Uri.path url) + t end @@ -292,7 +287,7 @@ type s = { type t = { s: s; - mutable events: Github_t.event list; + mutable events: (Repo.t * Github_t.event_constr) list; http: HTTP.t; cond: unit Lwt_condition.t; } @@ -302,30 +297,91 @@ let empty token uri = let repos = Repo.Set.empty in { uri; registry; token; repos } -let notify_query = "notify" -let path_seg = Re.(rep1 (compl [char '/'])) let github_error_str = Fmt.strf "GitHub connection for %a failed:" Repo.pp -let notify_re = - Re.(seq [str "github/"; group path_seg; char '/'; group path_seg]) -let notification_handler t repo _id _req body = +let safe_parse f x = + try Some (f x) + with Ag_oj_run.Error e -> + Log.err (fun l -> l "parsing error: %s\n%s" e x); + None + +let event_type req = + let parse s = safe_parse Github_j.event_type_of_string ("\"" ^ s ^ "\"") in + match Header.get (Request.headers req) "x-github-event" with + | Some s -> parse s + | None -> None + +let parse_event t b: Github_t.event_constr option = + let ( >|= ) x f = match safe_parse x b with + | None -> None + | Some x -> Some (f x) + in + match t with + | `Push -> Github_j.push_event_of_string >|= fun x -> `Push x + | `Status -> Github_j.status_event_of_string >|= fun x -> `Status x + | `Delete -> Github_j.delete_event_of_string >|= fun x -> `Delete x + | `Create -> Github_j.create_event_of_string >|= fun x -> `Create x + | `PullRequest -> + Github_j.pull_request_event_of_string >|= fun x -> `PullRequest x + | _ -> None + +let pp_event ppf = function + | `Push _ -> Fmt.string ppf "push" + | `Status _ -> Fmt.string ppf "status" + | `Delete _ -> Fmt.string ppf "delete" + | `Create _ -> Fmt.string ppf "create" + | `PullRequest _ -> Fmt.string ppf "pull-request" + | _ -> Fmt.string ppf "unknown" + +let notification_handler t repo _id req body = Cohttp_lwt_body.to_string body >>= fun body -> - t.events <- Github_j.event_of_string body :: t.events; - Lwt_condition.signal t.cond (); - let body = Fmt.strf "Got event for %a\n" Repo.pp repo in - Cohttp_lwt_unix.Server.respond_string ~status:`OK ~body () - >|= some + let e = match event_type req with + | None -> None + | Some typ -> parse_event typ body + in + match e with + | Some e -> + Log.info (fun l -> + l "received webhook event for %a: %a" Repo.pp repo pp_event e); + t.events <- (repo, e) :: t.events; + Lwt_condition.signal t.cond (); + let body = Fmt.strf "Got event for %a\n" Repo.pp repo in + Cohttp_lwt_unix.Server.respond_string ~status:`OK ~body () + >|= some + | None -> + Log.info (fun l -> l "received unknown webhook event"); + Lwt.return_none + +let service registry uri service_fn = + let root = Uri.path uri in + let routes = Re.str root in + let handler conn_id req body = + let uri = Request.uri req in + try + let endpoint = Hashtbl.find registry (Uri.path uri) in + Webhook.handler endpoint conn_id req body + with Not_found -> + Lwt.return_none + in + service_fn ~routes ~handler + +let (++) x y = Uri.resolve "" x (Uri.of_string y) let watch t repo = if Repo.Set.mem repo t.s.repos then ( Log.debug (fun l -> l "Alreday watching %a" Repo.Set.pp t.s.repos); Lwt.return_unit ) else - let url = Uri.resolve "" t.s.uri (Uri.of_string ("?"^notify_query)) in + let uri = t.s.uri ++ Fmt.to_to_string Repo.pp repo in Log.info (fun l -> - l "Connecting GitHub to callback %s\n%!" (Uri.to_string url)); + l "Connecting GitHub to callback %s\n%!" (Uri.to_string uri)); + let service = + let msg = Fmt.strf "GitHub listener for %a" Repo.pp repo in + service t.s.registry uri (HTTP.service msg) + in let err = github_error_str repo in - Webhook.connect ~token:t.s.token t.s.registry url + HTTP.add_service t.http service; + Webhook.connect ~token:t.s.token t.s.registry uri (repo, notification_handler t repo) >|= fun endpoint -> match endpoint.Webhook.status with | Webhook.Indicated -> Log.err (fun l -> l "%s wedged prerequest" err) @@ -336,27 +392,10 @@ let watch t repo = Log.info (fun l -> l "%a connected" Repo.pp repo); t.s.repos <- Repo.Set.add repo t.s.repos -let service { uri; registry; _ } service_fn = - let root = Uri.path uri in - let routes = Re.(seq [str root; notify_re]) in - let handler conn_id req body = - let uri = Request.uri req in - if Uri.query uri <> [notify_query,[]] then Lwt.return_none - else - try - let endpoint = Hashtbl.find registry (Uri.path uri) in - endpoint.Webhook.handler conn_id req body - with Not_found -> - Lwt.return_none - in - service_fn ~routes ~handler - let create token uri = let port = match Uri.port uri with None -> 80 | Some p -> p in let http = HTTP.create port in - let s = empty token uri in - let service = service s (HTTP.service "GitHub listener") in - let http = HTTP.with_service http service in + let s = empty token uri in let cond = Lwt_condition.create () in { s; http; events = []; cond } diff --git a/bridge/github/src/datakit_github_webhook.mli b/bridge/github/src/datakit_github_webhook.mli index 6370b5cf9..604420883 100644 --- a/bridge/github/src/datakit_github_webhook.mli +++ b/bridge/github/src/datakit_github_webhook.mli @@ -7,6 +7,6 @@ val create: Github.Token.t -> Uri.t -> t val run: t -> unit Lwt.t val repos: t -> Repo.Set.t val watch: t -> Repo.t -> unit Lwt.t -val events: t -> Github_t.event list +val events: t -> (Repo.t * Github_t.event_constr) list val clear: t -> unit val wait: t -> unit Lwt.t