赞
踩
Jetty是一个开源的servlet容器,由Mort Bay Consulting公司创建,主要用于为基于Java的web内容(如JSP和servlet)提供运行环境。
- 功能丰富:Jetty不仅可以作为独立的Web服务器使用,还提供了支持JSP和Servlet的运行时环境,允许开发人员在Java应用程序中轻松地提供网络和Web连接。
- 设计模块化:Jetty的设计非常模块化,这意味着它可以根据需要进行灵活的配置和定制,从而提高了资源的利用率。
- 性能优异:Jetty支持异步Servlet,能够处理更高的并发量,特别适用于需要处理大量长连接的业务场景。它默认采用的NIO模型,使其在这类场景下成为更好的选择。
- 易用性高:Jetty注重易用性,其API以一组JAR包的形式发布,便于开发人员快速上手和使用。
- 社区活跃:作为一个开源项目,Jetty拥有一个活跃的社区,用户可以参与到社区中,贡献代码或寻求帮助。
- 应用广泛:由于其轻量级和灵活性,Jetty被广泛应用于许多知名的产品和项目中,如ActiveMQ、Maven、Spark、Google App Engine、Eclipse和Hadoop等。
- 易于集成:Jetty可以嵌入到现有的应用程序中,这使得普通的应用程序能够快速地支持HTTP服务。
综上所述,Jetty以其高性能、易用性和灵活性在Web服务器和Servlet容器领域占有一席之地,是Java开发者在构建Web应用程序时的一个优秀选择。
功能强大、易于使用、高度可定制的servlet容器,适用于各种Java Web应用程序的开发和部署。
- public Server(@Name("port")int port)
- {
- this((ThreadPool)null);
- ServerConnector connector=new ServerConnector(this);
- connector.setPort(port);
- setConnectors(new Connector[]{connector});
- }
- public Server(@Name("threadpool") ThreadPool pool)
- {
- _threadPool=pool!=null?pool:new QueuedThreadPool();
- addBean(_threadPool);
- setServer(this);
- }
实现了SizedThreadPool
execute()方法
@Override
public void execute(Runnable job)
{
if (!isRunning() || !_jobs.offer(job))
{
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
else
{
// Make sure there is at least one thread executing the job.
if (getThreads() == 0)
startThreads(1);
}
}
BlockingQueue
将任务推入
BlockingQueue<Runnable> org.eclipse.jetty.util.thread.QueuedThreadPool._jobs
HTTP connector using NIO ByteChannels and Selectors
继承自 AbstractConnector
based on JDK's {@link ScheduledThreadPoolExecutor}.
在数据传输过程中,不可避免需要byte数组
buffer池
默认产生 ArrayByteBufferPool
ByteBufferPool 接口有2个方法:
public ByteBuffer acquire(int size, boolean direct);
public void release(ByteBuffer buffer);
这是一个很好的对象池范本
- public ArrayByteBufferPool(int minSize, int increment, int maxSize)
-
-
- public ArrayByteBufferPool()
- {
-
- this(0,1024,64*1024);
-
- }
-
-
- _direct=new Bucket[maxSize/increment];
- _indirect=new Bucket[maxSize/increment];
结构
Bucket
_direct Bucket数组
_indirect Bucket数组
为每一个大小,新建一个Bucket
但不初始化ByteBuffer
int size=0;
for (int i=0;i<_direct.length;i++)
{
size+=_inc;
_direct[i]=new Bucket(size);
_indirect[i]=new Bucket(size);
}
一个Bucekt存放大小相同的所有的ByteBuffer
_size
bytebuffer大小
_queue
public final Queue<ByteBuffer> _queue= new ConcurrentLinkedQueue<>();
acquire
public ByteBuffer acquire(int size, boolean direct)
取得合适的Bucket
每个Bucket的大小不同,这里找到最合适的
Bucket bucket = bucketFor(size,direct);
从Bucket中取得ByteBuffer
ByteBuffer buffer = bucket==null?null:bucket._queue.poll();
不存在则新建
if (buffer == null)
{
int capacity = bucket==null?size:bucket._size;
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}
release
public void release(ByteBuffer buffer)
{
if (buffer!=null)
{
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null)
{
BufferUtil.clear(buffer);
bucket._queue.offer(buffer);
}
}
}
取得合适的Bucket
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
清空Buffer
BufferUtil.clear(buffer);
归还Pool
bucket._queue.offer(buffer);
例外处理
如果申请的ByteBuffer过大或者过小,
无法在POOL中满足,则可以申请成功,但无法归还给POOL。
HttpConnectionFactory
用于创建连接,
比如Accept后,需要创建一个表示连接的对象
int cores = Runtime.getRuntime().availableProcessors();
if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));
_acceptors = new Thread[acceptors];
继承自 SelectorManager
_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));
保存selector线程数量Math.min(4,Runtime.getRuntime().availableProcessors()/2))
connector.setPort(port);
setConnectors(new Connector[]{connector});
org.eclipse.jetty.server.Server
启动web服务器
WebAppContext context = new WebAppContext();
context.setContextPath("/"); context.setResourceBase("./web/");
context.setClassLoader(Thread.currentThread().getContextClassLoader()); server.setHandler(context);
server.start();
- // AbstractLifeCycle
-
-
- private void setStarting()
- {
- if (LOG.isDebugEnabled())
- LOG.debug("starting {}",this);
- _state = STARTING;
-
- for (Listener listener : _listeners) {
- listener.lifeCycleStarting(this);
- }
- }
Server
启动整个server
- protected void doStart() throws Exception
- {
-
- //If the Server should be stopped when the jvm exits, register
- //with the shutdown handler thread.
- if (getStopAtShutdown())
- ShutdownThread.register(this);
-
-
- //Register the Server with the handler thread for receiving
- //remote stop commands ShutdownMonitor.register(this);
-
- //Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize
-
- LOG.info("jetty-" + getVersion()); HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION);
-
- MultiException mex=new MultiException();
-
- // check size of thread pool
- SizedThreadPool pool = getBean(SizedThreadPool.class);
- int max=pool==null?-1:pool.getMaxThreads();
- int selectors=0;
- int acceptors=0;
- if (mex.size()==0)
- {
- for (Connector connector : _connectors)
- {
- if (connector instanceof AbstractConnector)
- acceptors+=((AbstractConnector)connector).getAcceptors();
-
- if (connector instanceof ServerConnector)
- selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
- }
- }
-
-
- int needed=1+selectors+acceptors;
- if (max>0 && needed>max)
- throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));
-
- try
- {
- super.doStart();
-
- }
- catch(Throwable e)
- {
- mex.add(e);
- }
-
-
- // start connectors last
- for (Connector connector : _connectors)
- {
- try
- {
- connector.start();
- }
- catch(Throwable e)
- {
- mex.add(e);
- }
- }
-
-
- if (isDumpAfterStart())
- dumpStdErr();
-
- mex.ifExceptionThrow();
-
-
- LOG.info(String.format("Started @%dms",Uptime.getUptime()));
- }

远程控制接口
//Register the Server with the handler thread for receiving
//remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands.
ShutdownMonitor.getInstance().start(); // initialize
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class);
QueuedThreadPool
根据Connector数量进行累计
大部分情况下,只有一个ServerConnector
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector)
acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector)
selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
累计所有Connector的需求
nt needed=1+selectors+acceptors;
如果大于默认的200则中断程序
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d
+ selectors=%d + request=1)",max,acceptors,selectors));
启动QueuedThreadPool
doStart()
startThreads()
建立需要的线程
创建线程
Thread thread = newThread(_runnable);
_runnable
_jobs中取任务并执行
设置线程的属性
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
_threads.add(thread);
启动线程
thread.start();启动WebAppContext
如果需要使用,在此处启动
取得ConnectionFactory
_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
创建selector线程并启动
for (int i = 0; i < _selectors.length; i++)
{
ManagedSelector selector = newSelector(i);
_selectors[i] = selector;
selector.start();
execute(new NonBlockingThread(selector));
}
newSelector()
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(id);
}
创建Acceptor线程
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a); getExecutor().execute(a);
}
Acceptor
设置线程名字
final Thread thread = Thread.currentThread();
String name=thread.getName();
_name=String.format("%s-acceptor-%d@%x-%s",name,_acceptor,hashCode(),AbstractConnector.this.toString()); thread.setName(_name);
设置优先级
将自己放入_acceptors数组
synchronized (AbstractConnector.this)
{
_acceptors[_acceptor] = thread;
}
监听端口
try
{
while (isAccepting())
{
try
{
accept(_acceptor);
}
catch (Throwable e)
{
if (isAccepting()) LOG.warn(e);
else
LOG.ignore(e);
}
}
}
finally
{
thread.setName(name);
if (_acceptorPriorityDelta!=0)
thread.setPriority(priority);
synchronized (AbstractConnector.this)
{
_acceptors[_acceptor] = null;
}
CountDownLatch stopping=_stopping;
if (stopping!=null)
stopping.countDown();
}
ServerConnector.accept()
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
accepted(channel);
}
}
在accept的地方等待
没有Acceptor的情况
channle默认是blocking的
如果acceptor数量为0,没有安排线程专门进行accept,则设置为非阻塞模式若是非0,有专门线程进行accept,因此,为阻塞模式
protected void doStart() throws Exception
{
super.doStart();
if (getAcceptors()==0)
{
_acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel);
}
}
AbstractLifeCycle
private void setStarted()
{
_state = STARTED;
if (LOG.isDebugEnabled())
LOG.debug(STARTED+" @{}ms {}",Uptime.getUptime(),this);
for (Listener listener : _listeners)
listener.lifeCycleStarted(this);
}
- private void accepted(SocketChannel channel) throws IOException
- {
- channel.configureBlocking(false);
- Socket socket = channel.socket();
- configure(socket);
- _manager.accept(channel);
-
- }
channel.configureBlocking(false);
Socket socket = channel.socket(); configure(socket);
SelectorManager _manager;
_manager.accept(channel);
选择可用的ManagedSelector线程
- private ManagedSelector chooseSelector()
- {
- // The ++ increment here is not atomic, but it does not matter,
- // so long as the value changes sometimes, then connections will
- // be distributed over the available selectors.
- long s = _selectorIndex++;
- int index = (int)(s % getSelectorCount());
- return _selectors[index];
- }
ManagedSelector处理
ManagedSelector 是一个线程封装了Selector 的使用
提交任务
selector.submit(selector.new Accept(channel, attachment));
提交这个处理任务到ManagedSelector:
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
_changes.offer(change);
ConcurrentArrayQueue
与ConcurrentLinkedQueue相似的性能,但直接保存元素而不是node,因此需要更少的对象,更少的GC
while (isRunning()) select();
select()
发现有任务就执行
runChanges();
private void runChanges()
{
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}
runChange()
change.run();
Accept.run
SelectionKey key = channel.register(_selector, 0, attachment);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
select()
int selected = _selector.select();
处理SelectionKey
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
for (SelectionKey key : selectedKeys)
{
if (key.isValid())
{
processKey(key);
}
else
{
if (debug)
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
((EndPoint)attachment).close();
}
}
selectedKeys.clear();
processKey()
private void processKey(SelectionKey key)
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
((SelectableEndPoint)attachment).onSelected();
}
else if (key.isConnectable())
{
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException();
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
onSelected()
@Override
public void onSelected()
{
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps;
setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable())
getFillInterest().fillable();
if (_key.isWritable())
getWriteFlusher().completeWrite();
}
会使用新的线程进行HTTP业务处理 (提交到线程池)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。