一個很大的文件,例如10G,僅包含ip地址和訪問時間二列,格式如下:
127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ... 127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ...
從文件裡查出在5分鐘內連續登陸10次以上的ip地址集合並輸出。這類問題是一個很常見的應用,通常都是從大的log日志文件中找出有攻擊嫌疑的ip。
這類應用因為要處理分析的文件非常大,顯然不能將整個文件全部讀入內存,然後進行下一步工作。常見的比較成熟的解決方案有:分治+Hash,Bloom filter,2-Bitmap等。 這裡就使用第一種方式來解決。
下面是分治與hash的代碼
public class DuplicateIP { private String delimiter = " "; private String FILE_PRE = "ip_"; private int MAGIC = 10,BATCH_MAGIC = 500; private String root = "/DuplicateIP/"; private String filename = ""; public DuplicateIP(final String filename) { this.filename = filename; } /** * 將大文件拆分成較小的文件,進行預處理 * @throws IOException */ private void preProcess() throws IOException { //Path newfile = FileSystems.getDefault().getPath(filename); BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename))); // 用5M的緩沖讀取文本文件 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024); //假設文件是10G,那麼先根據hashcode拆成小文件,再進行讀寫判斷 //如果不拆分文件,將ip地址當做key,訪問時間當做value存到hashmap時, //當來訪的ip地址足夠多的情況下,內存開銷吃不消 // List<Entity> entities = new ArrayList<Entity>(); //存放ip的hashcode->accessTimes集合 Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>(); String line = ""; int count = 0; while((line = reader.readLine()) != null){ String split[] = line.split(delimiter); if(split != null && split.length >= 2){ //根據ip的hashcode這樣拆分文件,拆分後的文件大小在1G上下波動 //極端情況是整個文件的ip地址全都相同,只有一個,那麼拆分後還是只有一個文件 int serial = split[0].trim().hashCode() % MAGIC; String splitFilename = FILE_PRE + serial; List<String> lines = hashcodeMap.get(splitFilename); if(lines == null){ lines = new ArrayList<String>(); hashcodeMap.put(splitFilename, lines); } lines.add(line); } count ++; if(count > 0 && count % BATCH_MAGIC == 0){ for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){ //System.out.println(entry.getKey()+"--->"+entry.getValue()); DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8")); } //一次操作500之後清空,重新執行 hashcodeMap.clear(); } } reader.close(); fis.close(); } private boolean process() throws IOException{ Path target = Paths.get(root); //ip -> List<Date> Map<String,List<Date>> resMap = new HashMap<String,List<Date>>(); this.recurseFile(target,resMap); for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){ System.out.println(entry.getKey()); for(Date date : entry.getValue()){ System.out.println(date); } } return true; } /** * 遞歸執行,將5分鐘內訪問超過阈值的ip找出來 * * @param parent * @return * @throws IOException */ private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{ //Path target = Paths.get(dir); if(!Files.exists(parent) || !Files.isDirectory(parent)){ return; } Iterator<Path> targets = parent.iterator(); for(;targets.hasNext();){ Path path = targets.next(); if(Files.isDirectory(parent)){ //如果還是目錄,遞歸 recurseFile(path.toAbsolutePath(),resMap); }else { //將一個文件中所有的行讀上來 List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8")); judgeAndcollection(lines,resMap); } } } /** * 根據從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip * 並放入resMap * * @param lines * @param resMap */ private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) { if(lines != null){ //ip->List<String>accessTimes Map<String,List<String>> judgeMap = new HashMap<String,List<String>>(); for(String line : lines){ line = line.trim(); int space = line.indexOf(delimiter); String ip = line.substring(0, space); List<String> accessTimes = judgeMap.get(ip); if(accessTimes == null){ accessTimes = new ArrayList<String>(); } accessTimes.add(line.substring(space + 1).trim()); judgeMap.put(ip, accessTimes); } if(judgeMap.size() == 0){ return; } for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){ List<String> acessTimes = entry.getValue(); //相同ip,先判斷整體大於10個 if(acessTimes != null && acessTimes.size() >= MAGIC){ //開始判斷在List集合中,5分鐘內訪問超過MAGIC=10 List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); if(attackTimes != null){ resMap.put(entry.getKey(), attackTimes); } } } } } /** * @param args */ public static void main(String[] args) { String filename = "/DuplicateIP/log.txt"; DuplicateIP dip = new DuplicateIP(filename); try { dip.preProcess(); dip.process(); } catch (IOException e) { e.printStackTrace(); } } } public class DuplicateIP { private String delimiter = " "; private String FILE_PRE = "ip_"; private int MAGIC = 10,BATCH_MAGIC = 500; private String root = "/DuplicateIP/"; private String filename = ""; public DuplicateIP(final String filename) { this.filename = filename; } /** * 將大文件拆分成較小的文件,進行預處理 * @throws IOException */ private void preProcess() throws IOException { //Path newfile = FileSystems.getDefault().getPath(filename); BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename))); // 用5M的緩沖讀取文本文件 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024); //假設文件是10G,那麼先根據hashcode拆成小文件,再進行讀寫判斷 //如果不拆分文件,將ip地址當做key,訪問時間當做value存到hashmap時, //當來訪的ip地址足夠多的情況下,內存開銷吃不消 // List<Entity> entities = new ArrayList<Entity>(); //存放ip的hashcode->accessTimes集合 Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>(); String line = ""; int count = 0; while((line = reader.readLine()) != null){ String split[] = line.split(delimiter); if(split != null && split.length >= 2){ //根據ip的hashcode這樣拆分文件,拆分後的文件大小在1G上下波動 //極端情況是整個文件的ip地址全都相同,只有一個,那麼拆分後還是只有一個文件 int serial = split[0].trim().hashCode() % MAGIC; String splitFilename = FILE_PRE + serial; List<String> lines = hashcodeMap.get(splitFilename); if(lines == null){ lines = new ArrayList<String>(); hashcodeMap.put(splitFilename, lines); } lines.add(line); } count ++; if(count > 0 && count % BATCH_MAGIC == 0){ for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){ //System.out.println(entry.getKey()+"--->"+entry.getValue()); DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8")); } //一次操作500之後清空,重新執行 hashcodeMap.clear(); } } reader.close(); fis.close(); } private boolean process() throws IOException{ Path target = Paths.get(root); //ip -> List<Date> Map<String,List<Date>> resMap = new HashMap<String,List<Date>>(); this.recurseFile(target,resMap); for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){ System.out.println(entry.getKey()); for(Date date : entry.getValue()){ System.out.println(date); } } return true; } /** * 遞歸執行,將5分鐘內訪問超過阈值的ip找出來 * * @param parent * @return * @throws IOException */ private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{ //Path target = Paths.get(dir); if(!Files.exists(parent) || !Files.isDirectory(parent)){ return; } Iterator<Path> targets = parent.iterator(); for(;targets.hasNext();){ Path path = targets.next(); if(Files.isDirectory(parent)){ //如果還是目錄,遞歸 recurseFile(path.toAbsolutePath(),resMap); }else { //將一個文件中所有的行讀上來 List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8")); judgeAndcollection(lines,resMap); } } } /** * 根據從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip * 並放入resMap * * @param lines * @param resMap */ private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) { if(lines != null){ //ip->List<String>accessTimes Map<String,List<String>> judgeMap = new HashMap<String,List<String>>(); for(String line : lines){ line = line.trim(); int space = line.indexOf(delimiter); String ip = line.substring(0, space); List<String> accessTimes = judgeMap.get(ip); if(accessTimes == null){ accessTimes = new ArrayList<String>(); } accessTimes.add(line.substring(space + 1).trim()); judgeMap.put(ip, accessTimes); } if(judgeMap.size() == 0){ return; } for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){ List<String> acessTimes = entry.getValue(); //相同ip,先判斷整體大於10個 if(acessTimes != null && acessTimes.size() >= MAGIC){ //開始判斷在List集合中,5分鐘內訪問超過MAGIC=10 List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); if(attackTimes != null){ resMap.put(entry.getKey(), attackTimes); } } } } } /** * @param args */ public static void main(String[] args) { String filename = "/DuplicateIP/log.txt"; DuplicateIP dip = new DuplicateIP(filename); try { dip.preProcess(); dip.process(); } catch (IOException e) { e.printStackTrace(); } } }
下面是工具類,提供了一些文件讀寫及查找的功能
public class DuplicateUtils { /** * 根據給出的數據,往給定的文件形參中追加一行或者幾行數據 * * @param file * @throws IOException */ public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException { if(accessTimes != null){ Path target = Paths.get(splitFilename); if(target == null){ createFile(splitFilename); } return Files.write(target, accessTimes, cs);//, options) } return null; } /** * 創建文件 * @throws IOException */ public static void createFile(String splitFilename) throws IOException { Path target = Paths.get(splitFilename); Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-"); FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms); Files.createFile(target, attr); } public static Date stringToDate(String dateStr,String dateStyle){ if(dateStr == null || "".equals(dateStr)) return null; DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss"); try { return format.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); return null; } } public static String dateToString(Date date,String dateStyle){ if(date == null) return null; DateFormat format = new SimpleDateFormat(dateStyle); return format.format(date); } /** * 根據間隔時間,判斷列表中的數據是否已經大於magic給出的魔法數 * 返回true or false * * @param dates * @param intervalDate * @param magic * @return * @throws ParseException */ public static boolean attack(List<String> dateStrs,long intervalDate,int magic) { if(dateStrs == null || dateStrs.size() < magic){ return false; } List<Date> dates = new ArrayList<Date>(); for(String date : dateStrs){ if(date != null && !"".equals(date)) dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); } Collections.sort(dates); return judgeAttack(dates,intervalDate,magic); } public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){ if(sequenceDates == null || sequenceDates.size() < magic){ return false; } for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate); int count = 1; for(int i = x + 1;i< sequenceDates.size();i++){ Date compareDate = sequenceDates.get(i); if(compareDate.before(dateAfter5)) count ++ ; else break; } if(count >= magic) return true; } return false; } /** * 判斷在間隔時間內,是否有大於magic的上限的數據集合, * 如果有,則返回滿足條件的集合 * 如果找不到滿足條件的,就返回null * * @param sequenceDates 已經按照時間順序排序了的數組 * @param intervalDate * @param magic * @return */ public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){ if(sequenceDates == null || sequenceDates.size() < magic){ return null; } List<Date> res = new ArrayList<Date>(); for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ Date souceDate = sequenceDates.get(x); Date dateAfter5 = new Date(souceDate.getTime() + intervalDate); res.add(souceDate); for(int i = x + 1;i< sequenceDates.size();i++){ Date compareDate = sequenceDates.get(i); if(compareDate.before(dateAfter5)){ res.add(compareDate); }else break; } if(res.size() >= magic) return res; else res.clear(); } return null; } public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){ if(dateStrs == null || dateStrs.size() < magic){ return null; } List<Date> dates = new ArrayList<Date>(); for(String date : dateStrs){ if(date != null && !"".equals(date)) dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); } Collections.sort(dates); return attackTimes(dates,intervalDate,magic); } } public class DuplicateUtils { /** * 根據給出的數據,往給定的文件形參中追加一行或者幾行數據 * * @param file * @throws IOException */ public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException { if(accessTimes != null){ Path target = Paths.get(splitFilename); if(target == null){ createFile(splitFilename); } return Files.write(target, accessTimes, cs);//, options) } return null; } /** * 創建文件 * @throws IOException */ public static void createFile(String splitFilename) throws IOException { Path target = Paths.get(splitFilename); Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-"); FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms); Files.createFile(target, attr); } public static Date stringToDate(String dateStr,String dateStyle){ if(dateStr == null || "".equals(dateStr)) return null; DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss"); try { return format.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); return null; } } public static String dateToString(Date date,String dateStyle){ if(date == null) return null; DateFormat format = new SimpleDateFormat(dateStyle); return format.format(date); } /** * 根據間隔時間,判斷列表中的數據是否已經大於magic給出的魔法數 * 返回true or false * * @param dates * @param intervalDate * @param magic * @return * @throws ParseException */ public static boolean attack(List<String> dateStrs,long intervalDate,int magic) { if(dateStrs == null || dateStrs.size() < magic){ return false; } List<Date> dates = new ArrayList<Date>(); for(String date : dateStrs){ if(date != null && !"".equals(date)) dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); } Collections.sort(dates); return judgeAttack(dates,intervalDate,magic); } public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){ if(sequenceDates == null || sequenceDates.size() < magic){ return false; } for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate); int count = 1; for(int i = x + 1;i< sequenceDates.size();i++){ Date compareDate = sequenceDates.get(i); if(compareDate.before(dateAfter5)) count ++ ; else break; } if(count >= magic) return true; } return false; } /** * 判斷在間隔時間內,是否有大於magic的上限的數據集合, * 如果有,則返回滿足條件的集合 * 如果找不到滿足條件的,就返回null * * @param sequenceDates 已經按照時間順序排序了的數組 * @param intervalDate * @param magic * @return */ public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){ if(sequenceDates == null || sequenceDates.size() < magic){ return null; } List<Date> res = new ArrayList<Date>(); for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ Date souceDate = sequenceDates.get(x); Date dateAfter5 = new Date(souceDate.getTime() + intervalDate); res.add(souceDate); for(int i = x + 1;i< sequenceDates.size();i++){ Date compareDate = sequenceDates.get(i); if(compareDate.before(dateAfter5)){ res.add(compareDate); }else break; } if(res.size() >= magic) return res; else res.clear(); } return null; } public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){ if(dateStrs == null || dateStrs.size() < magic){ return null; } List<Date> dates = new ArrayList<Date>(); for(String date : dateStrs){ if(date != null && !"".equals(date)) dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); } Collections.sort(dates); return attackTimes(dates,intervalDate,magic); } }