在java开发中我们经常会涉及到http 请求接口,一般有几种方式:
- java自带的 HttpURLConnection
- okHttpClient
- apache http client
一般我们使用apache http client会比较多点,在代码中会进行如下调用方式:
private static class HttpClientPool {private static CloseableHttpClient httpClient;private static PoolingHttpClientConnectionManager HTTP_CLIENT_POOL = new PoolingHttpClientConnectionManager();static {//连接池最大连接数:300HTTP_CLIENT_POOL.setMaxTotal(300);// 每个路由的最大连接数HTTP_CLIENT_POOL.setDefaultMaxPerRoute(50);if (httpClient == null) {httpClient = HttpClients.custom().setConnectionManager(HTTP_CLIENT_POOL).setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(10000).setSocketTimeout(10000).build()).setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(10000).build()).build();}Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {httpClient.close();HTTP_CLIENT_POOL.close();} catch (Exception e) {LoggerFactory.getLogger(com.ipinyou.mip.base.utils.HttpClientHolder.class).error("关闭httpClient 连接池失败!", e);}}));}public static CloseableHttpClient getHttpClient() {return httpClient;}}private static HttpResponseValue httpCallWithHttpClient(String url, HttpMethodEnum method, Map<String, String> formData, String jsonData, Map<String, String> headers) {HttpResponseValue responseValue = new HttpResponseValue();if (url == null || StringUtils.isBlank(url)) {log.info("url is empty,return ");return responseValue;}long start = System.currentTimeMillis();HttpPost post = null;HttpGet get = null;try {// 设置请求超时时间为 10 分钟RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(HTTP_CONNECT_TIMEOUT_MS) // 连接超时时间.setSocketTimeout(HTTP_CONNECT_TIMEOUT_MS) // 数据传输超时时间.build();if (method == HttpMethodEnum.POST) {post = new HttpPost(url);post.setConfig(requestConfig);if (headers != null && !headers.isEmpty()) {for (Map.Entry<String, String> kv : headers.entrySet()) {post.addHeader(kv.getKey(), kv.getValue());}}if (formData != null && !formData.isEmpty()) {List<NameValuePair> nvps = new ArrayList<>();formData.forEach((k, v) -> nvps.add(new BasicNameValuePair(k, v)));post.setEntity(new UrlEncodedFormEntity(nvps, DEFAULT_UTF8_ENCODING));} else if (StringUtils.isNotBlank(jsonData)) {StringEntity entity = new StringEntity(jsonData, ContentType.APPLICATION_JSON);entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, CONTENT_TYPE_APPLICATION_JSON));post.setEntity(entity);}}if (method == HttpMethodEnum.GET) {get = new HttpGet();get.setConfig(requestConfig);}CloseableHttpClient client = HttpClientPool.getHttpClient();//entity编码指定urf-8CloseableHttpResponse response = null;if (method == HttpMethodEnum.GET) {response = client.execute(get);} else {response = client.execute(post);}HttpEntity entity = response.getEntity();if (entity != null) {responseValue.content = EntityUtils.toString(entity, DEFAULT_UTF8_ENCODING);}response.close();responseValue.code = 200;} catch (Exception e) {log.info("httpCallWithHttpClient failed: {} ", e);} finally {log.info("httpCallWithHttpClient url=[{}],cost={} ms,", url, (System.currentTimeMillis() - start));}return responseValue;}
今天我们来研究下,apache http client的源码。
HttpClient
是其内部一个抽象接口:
这里我们以InternalHttpClient
为例来研究(CloseableHttpClient是一个抽象类):
public CloseableHttpResponse execute(final HttpHost target,final HttpRequest request,final HttpContext context) throws IOException, ClientProtocolException {return doExecute(target, request, context);}protected CloseableHttpResponse doExecute(final HttpHost target,final HttpRequest request,final HttpContext context) throws IOException, ClientProtocolException {Args.notNull(request, "HTTP request");HttpExecutionAware execAware = null;if (request instanceof HttpExecutionAware) {execAware = (HttpExecutionAware) request;}try {final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);final HttpClientContext localcontext = HttpClientContext.adapt(context != null ? context : new BasicHttpContext());RequestConfig config = null;if (request instanceof Configurable) {config = ((Configurable) request).getConfig();}if (config == null) {final HttpParams params = request.getParams();if (params instanceof HttpParamsNames) {if (!((HttpParamsNames) params).getNames().isEmpty()) {config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);}} else {config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);}}if (config != null) {localcontext.setRequestConfig(config);}setupContext(localcontext);final HttpRoute route = determineRoute(target, wrapper, localcontext);return this.execChain.execute(route, wrapper, localcontext, execAware);} catch (final HttpException httpException) {throw new ClientProtocolException(httpException);}}
实际执行发送请求是在InternalHttpClient
中,发送时会根据determineRoute
来确定本次请求的路径,请求中的HttpPost、HttpGet都是实现了HttpUriRequest
接口,在请求前,会通过HttpUriRequest
获取到请求的地址信息,将其封装到HttpHost
中,主要包含如下信息:
public final class HttpHost implements java.lang.Cloneable, java.io.Serializable {public static final java.lang.String DEFAULT_SCHEME_NAME = "http";protected final java.lang.String hostname;protected final java.lang.String lcHostname;protected final int port;protected final java.lang.String schemeName;protected final java.net.InetAddress address;@Overridepublic boolean equals(final Object obj) {if (this == obj) {return true;}if (obj instanceof HttpHost) {final HttpHost that = (HttpHost) obj;return this.lcHostname.equals(that.lcHostname)&& this.port == that.port&& this.schemeName.equals(that.schemeName)&& (this.address==null ? that.address== null : this.address.equals(that.address));} else {return false;}}@Overridepublic int hashCode() {int hash = LangUtils.HASH_SEED;hash = LangUtils.hashCode(hash, this.lcHostname);hash = LangUtils.hashCode(hash, this.port);hash = LangUtils.hashCode(hash, this.schemeName);if (address!=null) {hash = LangUtils.hashCode(hash, address);}return hash;}}
可以看到,主要包含了请求地址host,端口,协议(http、https),尤其需要注意其重写了equals
和hashCode
方法,可以看到,判断两个HttpHost
是否一样,主要是看协议(http、https)、地址、端口号
然后根据通过routePlanner.determineRoute
将HttpHost
和一些其他信息封装到HttpRoute
,表示一个请求的路由信息:
public final class HttpRoute implements RouteInfo, Cloneable {
private final HttpHost targetHost;private final InetAddress localAddress;private final List<HttpHost> proxyChain;private final TunnelType tunnelled;private final LayerType layered;private final boolean secure;@Overridepublic final boolean equals(final Object obj) {if (this == obj) {return true;}if (obj instanceof HttpRoute) {final HttpRoute that = (HttpRoute) obj;return// Do the cheapest tests first(this.secure == that.secure) &&(this.tunnelled == that.tunnelled) &&(this.layered == that.layered) &&LangUtils.equals(this.targetHost, that.targetHost) &&LangUtils.equals(this.localAddress, that.localAddress) &&LangUtils.equals(this.proxyChain, that.proxyChain);} else {return false;}}@Overridepublic final int hashCode() {int hash = LangUtils.HASH_SEED;hash = LangUtils.hashCode(hash, this.targetHost);hash = LangUtils.hashCode(hash, this.localAddress);if (this.proxyChain != null) {for (final HttpHost element : this.proxyChain) {hash = LangUtils.hashCode(hash, element);}}hash = LangUtils.hashCode(hash, this.secure);hash = LangUtils.hashCode(hash, this.tunnelled);hash = LangUtils.hashCode(hash, this.layered);return hash;}
}
需要注意的是,HttpRoute
重写了equals
和hashCode
方法,也就是说,一般常规情况下,两个HttpRoute
是否相等,主要就是协议、地址、端口号,也就是说,我们经常设置的连接池的setDefaultMaxPerRoute
这里设置的是协议、地址、端口号 为分类
通过HttpRoute
,apache http client将会找到服务端并建立连接。
加下来通过ClientExecChain
进行请求的发送:
public interface ClientExecChain {CloseableHttpResponse execute(HttpRoute route,HttpRequestWrapper request,HttpClientContext clientContext,HttpExecutionAware execAware) throws IOException, HttpException;}
其主要实现类为MainClientExec
:
public CloseableHttpResponse execute(final HttpRoute route,final HttpRequestWrapper request,final HttpClientContext context,final HttpExecutionAware execAware) throws IOException, HttpException {Args.notNull(route, "HTTP route");Args.notNull(request, "HTTP request");Args.notNull(context, "HTTP context");AuthState targetAuthState = context.getTargetAuthState();if (targetAuthState == null) {targetAuthState = new AuthState();context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);}AuthState proxyAuthState = context.getProxyAuthState();if (proxyAuthState == null) {proxyAuthState = new AuthState();context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);}if (request instanceof HttpEntityEnclosingRequest) {RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);}Object userToken = context.getUserToken();final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);if (execAware != null) {if (execAware.isAborted()) {connRequest.cancel();throw new RequestAbortedException("Request aborted");} else {execAware.setCancellable(connRequest);}}final RequestConfig config = context.getRequestConfig();final HttpClientConnection managedConn;try {final int timeout = config.getConnectionRequestTimeout();managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);} catch(final InterruptedException interrupted) {Thread.currentThread().interrupt();throw new RequestAbortedException("Request aborted", interrupted);} catch(final ExecutionException ex) {Throwable cause = ex.getCause();if (cause == null) {cause = ex;}throw new RequestAbortedException("Request execution failed", cause);}context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);if (config.isStaleConnectionCheckEnabled()) {// validate connectionif (managedConn.isOpen()) {this.log.debug("Stale connection check");if (managedConn.isStale()) {this.log.debug("Stale connection detected");managedConn.close();}}}final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);try {if (execAware != null) {execAware.setCancellable(connHolder);}HttpResponse response;for (int execCount = 1;; execCount++) {if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {throw new NonRepeatableRequestException("Cannot retry request " +"with a non-repeatable request entity.");}if (execAware != null && execAware.isAborted()) {throw new RequestAbortedException("Request aborted");}if (!managedConn.isOpen()) {this.log.debug("Opening connection " + route);try {establishRoute(proxyAuthState, managedConn, route, request, context);} catch (final TunnelRefusedException ex) {if (this.log.isDebugEnabled()) {this.log.debug(ex.getMessage());}response = ex.getResponse();break;}}final int timeout = config.getSocketTimeout();if (timeout >= 0) {managedConn.setSocketTimeout(timeout);}if (execAware != null && execAware.isAborted()) {throw new RequestAbortedException("Request aborted");}if (this.log.isDebugEnabled()) {this.log.debug("Executing request " + request.getRequestLine());}if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {if (this.log.isDebugEnabled()) {this.log.debug("Target auth state: " + targetAuthState.getState());}this.authenticator.generateAuthResponse(request, targetAuthState, context);}if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {if (this.log.isDebugEnabled()) {this.log.debug("Proxy auth state: " + proxyAuthState.getState());}this.authenticator.generateAuthResponse(request, proxyAuthState, context);}response = requestExecutor.execute(request, managedConn, context);// The connection is in or can be brought to a re-usable state.if (reuseStrategy.keepAlive(response, context)) {// Set the idle duration of this connectionfinal long duration = keepAliveStrategy.getKeepAliveDuration(response, context);if (this.log.isDebugEnabled()) {final String s;if (duration > 0) {s = "for " + duration + " " + TimeUnit.MILLISECONDS;} else {s = "indefinitely";}this.log.debug("Connection can be kept alive " + s);}connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);connHolder.markReusable();} else {connHolder.markNonReusable();}if (needAuthentication(targetAuthState, proxyAuthState, route, response, context)) {// Make sure the response body is fully consumed, if presentfinal HttpEntity entity = response.getEntity();if (connHolder.isReusable()) {EntityUtils.consume(entity);} else {managedConn.close();if (proxyAuthState.getState() == AuthProtocolState.SUCCESS&& proxyAuthState.getAuthScheme() != null&& proxyAuthState.getAuthScheme().isConnectionBased()) {this.log.debug("Resetting proxy auth state");proxyAuthState.reset();}if (targetAuthState.getState() == AuthProtocolState.SUCCESS&& targetAuthState.getAuthScheme() != null&& targetAuthState.getAuthScheme().isConnectionBased()) {this.log.debug("Resetting target auth state");targetAuthState.reset();}}// discard previous auth headersfinal HttpRequest original = request.getOriginal();if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {request.removeHeaders(AUTH.WWW_AUTH_RESP);}if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {request.removeHeaders(AUTH.PROXY_AUTH_RESP);}} else {break;}}if (userToken == null) {userToken = userTokenHandler.getUserToken(context);context.setAttribute(HttpClientContext.USER_TOKEN, userToken);}if (userToken != null) {connHolder.setState(userToken);}// check for entity, release connection if possiblefinal HttpEntity entity = response.getEntity();if (entity == null || !entity.isStreaming()) {// connection not needed and (assumed to be) in re-usable stateconnHolder.releaseConnection();return new HttpResponseProxy(response, null);} else {return new HttpResponseProxy(response, connHolder);}} catch (final ConnectionShutdownException ex) {final InterruptedIOException ioex = new InterruptedIOException("Connection has been shut down");ioex.initCause(ex);throw ioex;} catch (final HttpException ex) {connHolder.abortConnection();throw ex;} catch (final IOException ex) {connHolder.abortConnection();throw ex;} catch (final RuntimeException ex) {connHolder.abortConnection();throw ex;}}
实际发送的这个方法比较长,我们分几段看。
第一步是根据上述的HttpRoute
和服务端建立TCP连接
通过connManager.requestConnection(route, userToken);
建立连接,在HttpClientBuilder
中,设置connManager
为PoolingHttpClientConnectionManager
public ConnectionRequest requestConnection(final HttpRoute route,final Object state) {Args.notNull(route, "HTTP route");if (this.log.isDebugEnabled()) {this.log.debug("Connection request: " + format(route, state) + formatStats(route));}final Future<CPoolEntry> future = this.pool.lease(route, state, null);return new ConnectionRequest() {@Overridepublic boolean cancel() {return future.cancel(true);}@Overridepublic HttpClientConnection get(final long timeout,final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {return leaseConnection(future, timeout, tunit);}};}
上面这一连串下来,起始就是要通过中连接池拿连接,建立连接的实际调用在其父类AbstractConnPool
中:
private E getPoolEntryBlocking(final T route, final Object state,final long timeout, final TimeUnit tunit,final Future<E> future) throws IOException, InterruptedException, TimeoutException {Date deadline = null;if (timeout > 0) {deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));}this.lock.lock();try {final RouteSpecificPool<T, C, E> pool = getPool(route);E entry;for (;;) {Asserts.check(!this.isShutDown, "Connection pool shut down");for (;;) {entry = pool.getFree(state);if (entry == null) {break;}if (entry.isExpired(System.currentTimeMillis())) {entry.close();}if (entry.isClosed()) {this.available.remove(entry);pool.free(entry, false);} else {break;}}if (entry != null) {this.available.remove(entry);this.leased.add(entry);onReuse(entry);return entry;}// New connection is neededfinal int maxPerRoute = getMax(route);// Shrink the pool prior to allocating a new connectionfinal int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);if (excess > 0) {for (int i = 0; i < excess; i++) {final E lastUsed = pool.getLastUsed();if (lastUsed == null) {break;}lastUsed.close();this.available.remove(lastUsed);pool.remove(lastUsed);}}if (pool.getAllocatedCount() < maxPerRoute) {final int totalUsed = this.leased.size();final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);if (freeCapacity > 0) {final int totalAvailable = this.available.size();if (totalAvailable > freeCapacity - 1) {if (!this.available.isEmpty()) {final E lastUsed = this.available.removeLast();lastUsed.close();final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());otherpool.remove(lastUsed);}}final C conn = this.connFactory.create(route);entry = pool.add(conn);this.leased.add(entry);return entry;}}boolean success = false;try {if (future.isCancelled()) {throw new InterruptedException("Operation interrupted");}pool.queue(future);this.pending.add(future);if (deadline != null) {success = this.condition.awaitUntil(deadline);} else {this.condition.await();success = true;}if (future.isCancelled()) {throw new InterruptedException("Operation interrupted");}} finally {pool.unqueue(future);this.pending.remove(future);}if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {break;}}throw new TimeoutException("Timeout waiting for connection");} finally {this.lock.unlock();}}
这里面当连接不够时,如果没超过maxPerRoute
和maxTotal
就会在创建一个链接,最终通过ManagedHttpClientConnectionFactory
创建:
public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;CharsetDecoder chardecoder = null;CharsetEncoder charencoder = null;final Charset charset = cconfig.getCharset();final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;if (charset != null) {chardecoder = charset.newDecoder();chardecoder.onMalformedInput(malformedInputAction);chardecoder.onUnmappableCharacter(unmappableInputAction);charencoder = charset.newEncoder();charencoder.onMalformedInput(malformedInputAction);charencoder.onUnmappableCharacter(unmappableInputAction);}final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());return new LoggingManagedHttpClientConnection(id,log,headerlog,wirelog,cconfig.getBufferSize(),cconfig.getFragmentSizeHint(),chardecoder,charencoder,cconfig.getMessageConstraints(),incomingContentStrategy,outgoingContentStrategy,requestWriterFactory,responseParserFactory);}
到这里我们获取到了一个HttpClientConnection
,但是这个时候并没有建立真正的连接。加下来通过:
this.connManager.connect(managedConn,route,timeout > 0 ? timeout : 0,context);
public void connect(final HttpClientConnection managedConn,final HttpRoute route,final int connectTimeout,final HttpContext context) throws IOException {Args.notNull(managedConn, "Managed Connection");Args.notNull(route, "HTTP route");final ManagedHttpClientConnection conn;synchronized (managedConn) {final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);conn = entry.getConnection();}final HttpHost host;if (route.getProxyHost() != null) {host = route.getProxyHost();} else {host = route.getTargetHost();}final InetSocketAddress localAddress = route.getLocalSocketAddress();SocketConfig socketConfig = this.configData.getSocketConfig(host);if (socketConfig == null) {socketConfig = this.configData.getDefaultSocketConfig();}if (socketConfig == null) {socketConfig = SocketConfig.DEFAULT;}this.connectionOperator.connect(conn, host, localAddress, connectTimeout, socketConfig, context);} public void connect(final ManagedHttpClientConnection conn,final HttpHost host,final InetSocketAddress localAddress,final int connectTimeout,final SocketConfig socketConfig,final HttpContext context) throws IOException {final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());if (sf == null) {throw new UnsupportedSchemeException(host.getSchemeName() +" protocol is not supported");}final InetAddress[] addresses = host.getAddress() != null ?new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());final int port = this.schemePortResolver.resolve(host);for (int i = 0; i < addresses.length; i++) {final InetAddress address = addresses[i];final boolean last = i == addresses.length - 1;Socket sock = sf.createSocket(context);sock.setSoTimeout(socketConfig.getSoTimeout());sock.setReuseAddress(socketConfig.isSoReuseAddress());sock.setTcpNoDelay(socketConfig.isTcpNoDelay());sock.setKeepAlive(socketConfig.isSoKeepAlive());if (socketConfig.getRcvBufSize() > 0) {sock.setReceiveBufferSize(socketConfig.getRcvBufSize());}if (socketConfig.getSndBufSize() > 0) {sock.setSendBufferSize(socketConfig.getSndBufSize());}final int linger = socketConfig.getSoLinger();if (linger >= 0) {sock.setSoLinger(true, linger);}conn.bind(sock);final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);if (this.log.isDebugEnabled()) {this.log.debug("Connecting to " + remoteAddress);}try {sock = sf.connectSocket(connectTimeout, sock, host, remoteAddress, localAddress, context);conn.bind(sock);if (this.log.isDebugEnabled()) {this.log.debug("Connection established " + conn);}return;} catch (final SocketTimeoutException ex) {if (last) {throw new ConnectTimeoutException(ex, host, addresses);}} catch (final ConnectException ex) {if (last) {final String msg = ex.getMessage();if ("Connection timed out".equals(msg)) {throw new ConnectTimeoutException(ex, host, addresses);} else {throw new HttpHostConnectException(ex, host, addresses);}}} catch (final NoRouteToHostException ex) {if (last) {throw ex;}}if (this.log.isDebugEnabled()) {this.log.debug("Connect to " + remoteAddress + " timed out. " +"Connection will be retried using another IP address");}}}
建立真正的连接。以http
协议为例,这里通过PlainConnectionSocketFactory
创建一个普通的Socket
,然后绑定到到连接上:
public Socket createSocket(final HttpContext context) throws IOException {return new Socket();}@Overridepublic Socket connectSocket(final int connectTimeout,final Socket socket,final HttpHost host,final InetSocketAddress remoteAddress,final InetSocketAddress localAddress,final HttpContext context) throws IOException {final Socket sock = socket != null ? socket : createSocket(context);if (localAddress != null) {sock.bind(localAddress);}try {sock.connect(remoteAddress, connectTimeout);} catch (final IOException ex) {try {sock.close();} catch (final IOException ignore) {}throw ex;}return sock;}
到这一步,连接就真正建立起来了。
第二步发送请求信息
请求的发送,是通过HttpRequestExecutor
进行发送的:
protected HttpResponse doSendRequest(final HttpRequest request,final HttpClientConnection conn,final HttpContext context) throws IOException, HttpException {Args.notNull(request, "HTTP request");Args.notNull(conn, "Client connection");Args.notNull(context, "HTTP context");HttpResponse response = null;context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.FALSE);conn.sendRequestHeader(request);if (request instanceof HttpEntityEnclosingRequest) {// Check for expect-continue handshake. We have to flush the// headers and wait for an 100-continue response to handle it.// If we get a different response, we must not send the entity.boolean sendentity = true;final ProtocolVersion ver =request.getRequestLine().getProtocolVersion();if (((HttpEntityEnclosingRequest) request).expectContinue() &&!ver.lessEquals(HttpVersion.HTTP_1_0)) {conn.flush();// As suggested by RFC 2616 section 8.2.3, we don't wait for a// 100-continue response forever. On timeout, send the entity.if (conn.isResponseAvailable(this.waitForContinue)) {response = conn.receiveResponseHeader();if (canResponseHaveBody(request, response)) {conn.receiveResponseEntity(response);}final int status = response.getStatusLine().getStatusCode();if (status < 200) {if (status != HttpStatus.SC_CONTINUE) {throw new ProtocolException("Unexpected response: " + response.getStatusLine());}// discard 100-continueresponse = null;} else {sendentity = false;}}}if (sendentity) {conn.sendRequestEntity((HttpEntityEnclosingRequest) request);}}conn.flush();context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);return response;}
首先是写入请求的header:
public void write(final T message) throws IOException, HttpException {Args.notNull(message, "HTTP message");writeHeadLine(message);for (final HeaderIterator it = message.headerIterator(); it.hasNext(); ) {final Header header = it.nextHeader();this.sessionBuffer.writeLine(lineFormatter.formatHeader(this.lineBuf, header));}this.lineBuf.clear();this.sessionBuffer.writeLine(this.lineBuf);}protected void doFormatHeader(final CharArrayBuffer buffer,final Header header) {final String name = header.getName();final String value = header.getValue();int len = name.length() + 2;if (value != null) {len += value.length();}buffer.ensureCapacity(len);buffer.append(name);buffer.append(": ");if (value != null) {buffer.append(value);}}
可以看到,http请求中,header每个key-value分行写入,并且按照Key: Value
的格式。
写入完header之后,接下来就会写入请求体:
public void sendRequestEntity(final HttpEntityEnclosingRequest request)throws HttpException, IOException {Args.notNull(request, "HTTP request");assertOpen();if (request.getEntity() == null) {return;}this.entityserializer.serialize(this.outbuffer,request,request.getEntity());}
public void serialize(final SessionOutputBuffer outbuffer,final HttpMessage message,final HttpEntity entity) throws HttpException, IOException {Args.notNull(outbuffer, "Session output buffer");Args.notNull(message, "HTTP message");Args.notNull(entity, "HTTP entity");final OutputStream outstream = doSerialize(outbuffer, message);entity.writeTo(outstream);outstream.close();}
protected OutputStream doSerialize(final SessionOutputBuffer outbuffer,final HttpMessage message) throws HttpException, IOException {final long len = this.lenStrategy.determineLength(message);if (len == ContentLengthStrategy.CHUNKED) {return new ChunkedOutputStream(outbuffer);} else if (len == ContentLengthStrategy.IDENTITY) {// 默认走这里return new IdentityOutputStream(outbuffer);} else {return new ContentLengthOutputStream(outbuffer, len);}}
如果是我们常见的UrlEncodedFormEntity
,则是拼接成 key1=value1&key2=value2
的格式,获取其byte数组写入到流中。
第三步获取响应
在发送完请求之后,会通过doReceiveResponse
获取响应:
protected HttpResponse doReceiveResponse(final HttpRequest request,final HttpClientConnection conn,final HttpContext context) throws HttpException, IOException {Args.notNull(request, "HTTP request");Args.notNull(conn, "Client connection");Args.notNull(context, "HTTP context");HttpResponse response = null;int statusCode = 0;while (response == null || statusCode < HttpStatus.SC_OK) {response = conn.receiveResponseHeader();if (canResponseHaveBody(request, response)) {conn.receiveResponseEntity(response);}statusCode = response.getStatusLine().getStatusCode();} // while intermediate responsereturn response;}
获取响应也是先获取响应的header,然后获取响应体:
public HttpResponse receiveResponseHeader()throws HttpException, IOException {assertOpen();final HttpResponse response = this.responseParser.parse();if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {this.metrics.incrementResponseCount();}return response;}
public T parse() throws IOException, HttpException {final int st = this.state;switch (st) {case HEAD_LINE:try {this.message = parseHead(this.sessionBuffer);} catch (final ParseException px) {throw new ProtocolException(px.getMessage(), px);}this.state = HEADERS;//$FALL-THROUGH$case HEADERS:final Header[] headers = AbstractMessageParser.parseHeaders(this.sessionBuffer,this.messageConstraints.getMaxHeaderCount(),this.messageConstraints.getMaxLineLength(),this.lineParser,this.headerLines);this.message.setHeaders(headers);final T result = this.message;this.message = null;this.headerLines.clear();this.state = HEAD_LINE;return result;default:throw new IllegalStateException("Inconsistent parser state");}}
在AbstractHttpClientConnection
实现了receiveResponseHeader
:
public HttpResponse receiveResponseHeader()throws HttpException, IOException {assertOpen();final HttpResponse response = this.responseParser.parse();if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {this.metrics.incrementResponseCount();}return response;}
而这里的responseParser
其实现为DefaultHttpResponseParser
这个里面会对响应进行HTTP协议头的解析,先解析出响应的状态行的信息,
最常见的是HTTP-Version Status-Code Reason-Phrase格式。
组成部分:
- HTTP-Version:这是HTTP协议的版本,如HTTP/1.1或HTTP/2。
- Status-Code:这是一个三位数字的代码,用来表示请求的结果状态。例如,200表示成功,404表示未找到,500表示服务器内部错误等。
- Reason-Phrase:这是一个简短的文本描述,用来解释状态码的含义。例如,对于状态码200,原因短语可能是"OK"。
基础实现为BasicStatusLine
:
public interface StatusLine {ProtocolVersion getProtocolVersion();int getStatusCode();String getReasonPhrase();
}
public class BasicStatusLine implements StatusLine, Cloneable, Serializable {private final ProtocolVersion protoVersion;private final int statusCode;private final String reasonPhrase;
}
可以看到,上面先从连接的input流中读取了StausLine,然后紧接着按行读取请求头,按照“ : ”分割每个header。
响应的请求头处理完之后,会判断是否有响应内容需要处理:
protected boolean canResponseHaveBody(final HttpRequest request,final HttpResponse response) {if ("HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {return false;}final int status = response.getStatusLine().getStatusCode();return status >= HttpStatus.SC_OK&& status != HttpStatus.SC_NO_CONTENT&& status != HttpStatus.SC_NOT_MODIFIED&& status != HttpStatus.SC_RESET_CONTENT;}
判断是否有响应内容读取,主要判断请求方法和响应码。如果有响应内容需要读取,则通过EntityDeserializer
读取连接的InputBuffer:
public void receiveResponseEntity(final HttpResponse response)throws HttpException, IOException {Args.notNull(response, "HTTP response");assertOpen();final HttpEntity entity = this.entitydeserializer.deserialize(this.inbuffer, response);response.setEntity(entity);}
protected BasicHttpEntity doDeserialize(final SessionInputBuffer inbuffer,final HttpMessage message) throws HttpException, IOException {final BasicHttpEntity entity = new BasicHttpEntity();final long len = this.lenStrategy.determineLength(message);if (len == ContentLengthStrategy.CHUNKED) {entity.setChunked(true);entity.setContentLength(-1);entity.setContent(new ChunkedInputStream(inbuffer));} else if (len == ContentLengthStrategy.IDENTITY) {// 默认走这里entity.setChunked(false);entity.setContentLength(-1);entity.setContent(new IdentityInputStream(inbuffer));} else {entity.setChunked(false);entity.setContentLength(len);entity.setContent(new ContentLengthInputStream(inbuffer, len));}final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);if (contentTypeHeader != null) {entity.setContentType(contentTypeHeader);}final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);if (contentEncodingHeader != null) {entity.setContentEncoding(contentEncodingHeader);}return entity;}
这里默认读取响应流的策略是IDENTITY
.这里我们也可以看到,默认返回BasicHttpEntity
的content
就是一个流。
得到响应流之后,我们一般会从HttpEntity
响应中得到具体的响应内容,然后在关闭流:
CloseableHttpResponse response = client.execute(httpPost);HttpEntity entity = response.getEntity();if (entity != null) {result = EntityUtils.toString(entity, DEFAULT_UTF8_ENCODING);}response.close();
EntityUtils.toString
则将HttpEntity
总input流读取到一个String中去。
private static String toString(final HttpEntity entity,final ContentType contentType) throws IOException {final InputStream inStream = entity.getContent();if (inStream == null) {return null;}try {Args.check(entity.getContentLength() <= Integer.MAX_VALUE,"HTTP entity too large to be buffered in memory");int capacity = (int)entity.getContentLength();if (capacity < 0) {capacity = DEFAULT_BUFFER_SIZE;}Charset charset = null;if (contentType != null) {charset = contentType.getCharset();if (charset == null) {final ContentType defaultContentType = ContentType.getByMimeType(contentType.getMimeType());charset = defaultContentType != null ? defaultContentType.getCharset() : null;}}if (charset == null) {charset = HTTP.DEF_CONTENT_CHARSET;}final Reader reader = new InputStreamReader(inStream, charset);final CharArrayBuffer buffer = new CharArrayBuffer(capacity);final char[] tmp = new char[1024];int l;while((l = reader.read(tmp)) != -1) {buffer.append(tmp, 0, l);}return buffer.toString();} finally {inStream.close();}}
可以看到,读取完之后就把HttpEntity
中的流关闭了,也就是这个流一直到我们实际读取内容之后才关闭。接下来,就是关闭response,
public void close() throws IOException {if (this.connHolder != null) {this.connHolder.close();}}public void close() throws IOException {releaseConnection(false);}private void releaseConnection(final boolean reusable) {if (this.released.compareAndSet(false, true)) {synchronized (this.managedConn) {if (reusable) {this.manager.releaseConnection(this.managedConn,this.state, this.validDuration, this.tunit);} else {try {this.managedConn.close();log.debug("Connection discarded");} catch (final IOException ex) {if (this.log.isDebugEnabled()) {this.log.debug(ex.getMessage(), ex);}} finally {this.manager.releaseConnection(this.managedConn, null, 0, TimeUnit.MILLISECONDS);}}}}}
如果是可以复用的,则会将连接归还到连接池中。注意,这里的连接时在http client层封装的,底层的网络Socket是一次性的,也会被关闭。
这样就完成了一次完整的http调用