1 module hunt.trace.HttpSender; 2 3 import hunt.trace.Span; 4 import hunt.trace.Constrants; 5 6 import hunt.http.client; 7 import hunt.logging; 8 9 import std.array; 10 import std.concurrency : initOnce; 11 import std.parallelism; 12 13 /** 14 * Reports spans to Zipkin, using its <a href="https://zipkin.io/zipkin-api/#/">POST</a> endpoint. 15 * 16 * <h3>Usage</h3> 17 * 18 * This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}. 19 * 20 * <p>Here's a simple configuration, configured for json: 21 * 22 * <pre>{@code 23 * sender = OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans"); 24 * }</pre> 25 * 26 * <p>Here's an example that adds <a href="https://github.com/square/okhttp/blob/master/samples/guide/src/main/java/okhttp3/recipes/Authenticate.java">basic 27 * auth</a> (assuming you have an authenticating proxy): 28 * 29 * <pre>{@code 30 * credential = Credentials.basic("me", "secure"); 31 * sender = OkHttpSender.newBuilder() 32 * .endpoint("https://authenticated-proxy/api/v2/spans") 33 * .clientBuilder().authenticator(new Authenticator() { 34 * @Override 35 * public Request authenticate(Route route, Response response) throws IOException { 36 * if (response.request().header("Authorization") != null) { 37 * return null; // Give up, we've already attempted to authenticate. 38 * } 39 * return response.request().newBuilder() 40 * .header("Authorization", credential) 41 * .build(); 42 * } 43 * }) 44 * .build(); 45 * }</pre> 46 * 47 * <h3>Implementation Notes</h3> 48 * 49 * <p>This sender is thread-safe. 50 */ 51 class HttpSender { 52 53 private string _endpoint; 54 HttpClient client; 55 56 this() { 57 client = new HttpClient(); 58 } 59 60 this(string endpoint) { 61 _endpoint = endpoint; 62 client = new HttpClient(); 63 } 64 65 void endpoint(string value) { 66 _endpoint = value; 67 } 68 69 void sendSpans(Span[] spans...) { 70 if (spans.length == 0) { 71 return; 72 } 73 74 string str = "["; 75 foreach (i, s; spans) { 76 str ~= s.toString(); 77 if (i != spans.length - 1) 78 str ~= ","; 79 } 80 str ~= "]"; 81 82 // warning(str); 83 // doSend(str); 84 auto sendTask = task(&doSend, str); 85 taskPool.put(sendTask); 86 } 87 88 private void doSend(string content) { 89 90 version(HUNT_HTTP_DEBUG) { 91 tracef("endpoint: %s", _endpoint); 92 trace(content); 93 } else version(HUNT_DEBUG) { 94 tracef("Span(s) are sending to the endpoint: %s", _endpoint); 95 } 96 97 assert(!_endpoint.empty()); 98 99 HttpBody b = HttpBody.create(MimeType.APPLICATION_JSON_VALUE, content); 100 Request request = new RequestBuilder() 101 .enableTracing(false) 102 .url(_endpoint) 103 .post(b) 104 .build(); 105 106 // client = new HttpClient(); 107 Response response = client.newCall(request).execute(); 108 109 if (response !is null) { 110 version(HUNT_HTTP_DEBUG) warningf("status code: %d", response.getStatus()); 111 if(response.haveBody()) 112 trace(response.getBody().asString()); 113 } else { 114 warning("no response"); 115 } 116 } 117 } 118 119 120 HttpSender httpSender() { 121 __gshared HttpSender inst; 122 return initOnce!inst(new HttpSender()); 123 } 124