java多線程小結
java多線程小結:
進程是指一個內存中運行的應用程序,每個進程都有自己獨立的一塊內存空間,線程是指進程中的一個執行流程,一個進程可以啟動多個線程,線程總是屬於某個進程,進程中的多個線程共享進程的內存。
在java中,“線程”指兩件不同的事情:1,java.lang.Thread類的一個實例 2,線程的執行。
一個Thread實例只是一個對象,像java中的任何其他對象一樣,具有變量和方法,生死於堆上。
線程棧:指某時刻內存中線程調度的棧信息,當前調用的方法總是位於棧頂。
多線程是為了減少cpu空閒時間,支持多任務並發處理;微觀串行,宏觀並行。
只有亂序的代碼才有必要設計為多線程。
Thread類的API是進行多線程的基礎;
熟悉java中數據結構及其應用場景:
例如HashMap比CurrentHashMap執行效率要高,但線程不安全,而後者線程安全。
多線程編程的注意事項:
1,明確目的,為什麼要使用多線程?如果是單線程讀寫(例如http訪問互聯網)出現性能瓶頸,可以考慮使用線程池;對不同資源(例如socket連接)進行管理,可以考慮使用多個線程;
2,如何控制線程的調度和阻塞,例如利用事件的觸發來控制,也可以利用消息;
3,保障共享資源的線程安全,一般用Lock鎖機制來控制,一定要保證不要有死鎖;
4,合理使用sleep,減少線程對CPU的搶奪,每次線程的就緒和激活都會占用一定資源,過多使用sleep會導致性能下降;
5,一般要使線程體在完成一件工作的情況下終止,盡量不要直接使用拋出線程異常的方式終止;
6,線程的優先級一定根據程序的需要有個整體的規劃。
線程五態:
1、新建狀態(New):新創建了一個線程對象。
2、就緒狀態(Runnable):線程對象創建後,其他線程調用了該對象的start()方法。該狀態的線程位於可運行線程池中,變得可運行,等待獲取CPU的使用權。
3、運行狀態(Running):就緒狀態的線程獲取了CPU,執行程序代碼。
4、阻塞狀態(Blocked):阻塞狀態是線程因為某種原因放棄CPU使用權,暫時停止運行。直到線程進入就緒狀態,才有機會轉到運行狀態。阻塞的情況分三種:
(一)、等待阻塞:運行的線程執行wait()方法,JVM會把該線程放入等待池中。
(二)、同步阻塞:運行的線程在獲取對象的同步鎖時,若該同步鎖被別的線程占用,則JVM會把該線程放入鎖池中。
(三)、其他阻塞:運行的線程執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該線程置為阻塞狀態。當sleep()狀態超時、join()等待線程終止或者超時、或者I/O處理完畢時,線程重新轉入就緒狀態。
5、死亡狀態(Dead):線程執行完了或者因異常退出了run()方法,該線程結束生命周期。
頻繁創建和銷毀線程會大大降低系統的效率;
JVM運行時刻內存分配原理:當線程訪問某一個對象的時候,首先通過對象的引用找到對應堆內存的值,然後把堆內存變量的具體值load到線程本地內存,建立一個變量副本,之後線程就不再和對象在堆內存值有任何關系,而是直接修改副本變量的值,並在線程退出前,自動把線程變量副本的值回寫到對象在堆中變量,這樣在堆中的對象值就產生了變化。
線程池原理:java.util.concurrent.ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue
workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
public interface Executor{
void execute(Runnable command);
}
繼承關系:
ThreadPoolExecutor ext AbstractExecutorService imp ExecutorService ext Executor
ThreadPoolExecutor幾個重要的方法:execute() submit() shutdown() shutdownNow()
還有其他一些比較重要的成員變量:private final BlockingQueue
workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet
workers = new HashSet();
深究execute()方法實現原理(代碼逐層解剖):
$execute$
public void execute(Runnable command){
if(command ==null){
throw new NullPointerException();
//corePoolSize是線程池大小
if(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)){
//當線程池處於RUNNING狀態就將任務放入緩存隊列
if(runState == RUNNING && workQueue.offer(command){
//當其他線程突然調用shutdown或shutdownNow關閉線程池時
if(runState != RUNNING || poolSize == 0)
//確保任務添加到緩存隊列
ensureQueuedTaskHandler(command);
}
//maximumPoolSize是線程池的一種補救措施
else if(!addIfUnderMaximumPoolSize(command))
reject(command);
}
}
$addIfUnderCorePoolSize$
private boolean addIfUnderCorePoolSize(Runnable firstTask){
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
//加鎖前的那段時間可能有其他線程提交了任務,所以這裡再次判斷
if(poolSize < corePoolSize && runState == RUNNING){
t = addThread(firstTask); //創建線程去執行firstTask任務
}finally{
mainLock.unlock();
}
}
if(t == null) //當線程池滿或者不處於RUNNING狀態,則創建線程失敗
return false;
t.start(); //執行任務
return true;
}
$addThread$
private Thread addThread(Runnable firstTask){
Worker w = new Work(firstTask);
Thread t = threadFactory.newThread(w); //真正創建一個線程
if(t != null){
w.thread = t;
workers.add(w); //將Worker對象添加到工作集中
int nt = ++poolSize;
if(nt > largestPoolSize)
largestPoolSize = nt; //記錄池中線程數歷史新高
}
return t;
}
$Worker$
注意: volatile變量,JVM只保證從堆load到棧的值是最新的,仍存在並發問題,線程不安全。
private final class Worker implements Runnable{
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
volatile long completedTasks;
Thread thread;
Worker(Runnable firstTask){
this.firstTask = firstTask;
}
boolean isActive(){
return runLock.isLocked();
}
void interruptIfIdle(){
final ReentrantLock runLock = this.runLock;
if(runLock.tryLock()){ //如果成功獲取鎖,說明worker空閒,反之正在執行
try{
if(thread != Thread.currentThread())
thread.interrupt();
}finally{
runLock.unlock();
}
}
}
void interruptNow(){
thread.interrupt();
}
private void runTask(Runnable task){
final ReentrantLock runLock = this.runLock;
runLock.lock();
try{
if(runState < STOP && Thread.interrupted() &&runState >= STOP)
boolean ran = false;
beforeExecute(thread,task);
try{
task.run();
ran = true;
afterExecute(task,null);
++completedTasks;
}catch(RuntimeException ex){
if(!ran)
afterExecute(task,ex);
throw ex;
}
}finally{
runLock.unlock();
}
}
public void run(){
try{
Runnable task = firstTask;
firstTask = null;
while(task != null || (task = getTask()) != null){
runTask(task);
task = null;
}
}finally{
workerDone(this); //當任務隊列沒有任務時,進行清理工作
}
}
}
$getTask$
注意:getTask是ThreadPoolExecutor類中的方法
Runnable getTask(){
for(;;){
try{
int state = runState;
if(state > SHUTDOWN)
return null;
Runnable r;
if(state == SHUTDOWN)
r = workQueue.poll(); //耗盡隊列中任務
else if(poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);
else
r = workQueue.take(); //線程在這裡阻塞,等待任務隊列中有任務
if(r !=null)
return r;
if(workerCanExit()){ //若沒取到任務,即r為null,則判斷當前worker是否可以退出
if(runState >= SHUTDOWN) //即STOP或TERMINATED
interruptIdleWorkers(); //中斷處於空閒狀態的worker
return null;
}
}catch(InterruptedException ie){
}
}
}
$workerCanExit$
private boolean workCanExit(){
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try{
canExit = runState >= Stop || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1,corePoolSize));
}finally{
mainLock.unlock();
}
return canExit;
}
$interruptIdleWorkers$
void interruptIdleWorkers(){
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
for(Worker w : workers) //實際上調用的是worker的interruptIfIdle()方法
w.interruptIfIdle();
}finally{
mainLock.unlock();
}
}
以上代碼就是線程池原理核心源碼。
Thread的run方法的實現:
public void run(){
if(target !=null){
target.run(); //這裡的target實際保存的是一個Runnable接口的實現的引用
}
}
線程互斥:synchronized修飾的普通方法相當於synchronized(this){…}代碼塊
synchronized修飾的靜態方法相當於synchronized(類對象:類名.class){…}代碼塊
線程要想同步就必須使用同一個對象的鎖;
經驗:要用到共同數據(包括同步鎖)的若干方法應該歸在同一個資源類中,這樣設計能保障高內聚和程序的健壯性。
線程范圍內的共享數據:ThreadLocal,線程內共享,線程外獨立,key是當前線程,一個threadLocal只能存一個共享變量;
JVM虛擬機進程相關方法在Runtime中;
多個線程共享數據的方式:
方式一:如果代碼相同,則把代碼寫入Runnable實現類中,new多個線程共享該類中資源;
方式二:代碼不同,則分布在不同Runnable中,共享數據作為外部類的成員變量;
方式三:將共享數據封裝到另一個對象中,每個線程對共享數據的操作方法也分配到那個對象中完成,對象作為這個外部類中的成員變量或方法中的局部變量,每個線程的Runnable對象作為外部類中的成員內部類或局部內部類。
內部類的作用:1,實現隱藏;2,訪問外部類所有元素;3,多重繼承;4,父類和接口同名方法調用。
java.util.concurrent.atomic.*:保證基本類型數據操作的原子性;
java.util.concurrent.Executors:線程池;
如何實現線程死掉後重新啟動:創建單一線程池;
線程池定時器:Executors.newScheduledThreadPool(3).scheduleAtFixedRate(command,delay,interval,unit);
java.util.concurrent.locks.*:鎖,更面向對象,必須在finally塊釋放鎖;
讀寫鎖:只讀不互斥,有寫就互斥;
設計一個緩存系統:
public class MyCache{
private Map
cache = new HashMap();
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public Object getData(String key){
rwl.readLock().lock();
Object value = null;
try{
value = cache.get(key);
if(value == null){
rwl.readLock().unlock();
rwl.writeLock().lock(); //假如多個線程同時走到這裡,後邊value非空判斷
try{
if(value == null)
value = queryDB();
}
}finally{
rwl.writeLock().unlock();
}
rwl.readLock().lock();
}finally{
rwl.readLock().unlock();
}
return value;
}
}
java.util.concurrent.locks.Condition:功能類似於object.wait()和notify(),但能用於多路線程;
設計一個緩沖區(阻塞隊列):對應java中的BlockingQueue
數據結構
class BoundedBuffer{
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr,takeptr,count;
public void put(Object x) throws InterruptedException{
lock.lock();
try{
while(count == items.length)
notFull.await();
items[putptr] = x;
if(++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
}finally{
lock.unlock();
}
}
public Object take() throws InterruptedException{
lock.lock();
try{
while(count == 0)
notEmpty.await();
Object x = items[takeptr];
if(++takeptr == items.length) takeptr =0;
--count;
notFull.signal();
return x;
}finally{
lock.unlock();
}
}
}
Semaphore(信號燈):控制訪問資源線程的個數,例如允許一個文件的並發訪問,單個信號量的Semaphore對象可以實現互斥鎖的功能,並且可以是由一個獲得了鎖,再由另一個線程釋放鎖,這可應用於死鎖恢復的一些場合。
CyclicBarrier(同步工具類):保證規定數目的線程都執行完後再統一出發;
CountDownLatch(同步工具類):通過countDown方法將計數器減1,可以實現一個線程通知多個線程的效果以及一個(或多個)線程等待其他線程來通知它;
Exchanger(同步工具類):當兩個線程都到達時,交換數據。
常用同步集合類:例如CopyOnWriteArrayList,允許迭代過程中增減數據;
如果在一個死去的線程上調用start(),會拋出java.lang.IllegalThreadStateException異常。
sleep注意事項:
1,線程睡眠是幫助所有線程獲得運行機會的最好方法。
2,線程睡眠到期自動蘇醒,並返回到可運行狀態,不是運行狀態。sleep()中指定的時間是線程不會運行的最短時間,因此,sleep()方法不能保證該線程睡眠到期後就開始執行。
3,sleep()是靜態方法,只能控制當前正在運行的線程。
線程總是存在優先級,優先級范圍在1~10之間。
yield()從未導致線程轉到等待/睡眠/阻塞狀態,在大多數情況下,yield()將導致線程從運行狀態轉到可運行狀態,但可能沒有效果。
假定線程a正在執行,某一時刻調用了b.join(),相當於線程b被加入到了a的線程棧,b執行完後a才繼續執行。
關於同步:防止多個線程訪問同一個數據對象時對數據造成破壞;只能同步方法,不能同步變量和類;如果一個線程在對象上獲得一個鎖,就沒有任何其他線程可以進入該對象的任何一個同步方法;線程睡眠時,它所持的任何鎖都不會釋放;線程可以獲得多個鎖,例如:在一個對象的同步方法裡調用另一個對象的同步方法,則獲得兩把鎖;同步損害並發,應該盡可能縮小同步范圍;在同一對象上進行同步的線程將彼此阻塞,進入該對象的鎖池中,直到鎖釋放,再次變為可運行或運行為止。
與每個對象具有鎖一樣,每個對象可以有一個線程列表。
線程必須是對象鎖的擁有者才能調用該對象wait()或notify()方法,所以通常和同步代碼塊配合使用;調用wait(),執行該代碼的線程立即放棄該對象的鎖,然而調用notify(),線程在移出同步代碼塊之前不會放棄鎖,所以並不意味著這時該鎖變得可用。
多數情況下,最好notifyAll(),讓等待線程返回可運行狀態。