/*
 * Decompiled with CFR 0.152.
 */
package eu.dicodeproject.twitterstream.sink;

import eu.dicodeproject.analysis.hbase.TweetCols;
import eu.dicodeproject.analysis.util.Language;
import eu.dicodeproject.twitterstream.sink.TweetSink;
import eu.dicodeproject.twitterstream.vectorize.Vectorizer;
import java.io.IOException;
import java.net.URL;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.GeoLocation;
import twitter4j.HashtagEntity;
import twitter4j.Place;
import twitter4j.Status;
import twitter4j.URLEntity;
import twitter4j.User;
import twitter4j.UserMentionEntity;

public class HBaseTweetSink
implements TweetSink {
    private static final Logger log = LoggerFactory.getLogger(HBaseTweetSink.class);
    private byte[] tweetFamily;
    private byte[] vectorFamily;
    private byte[] userFamily;
    private String tweetTableName;
    private String userTableName;
    private String zookeeperQuorum = "localhost";
    private int zookeeperPort = 2181;
    private Configuration configuration;
    private HTablePool hTablePool;
    private Vectorizer vectorizer;
    private static long counter;

    @PostConstruct
    public void init() throws IOException {
        if (this.configuration == null) {
            this.configuration = new Configuration();
            this.configuration.clear();
            this.configuration.set("hbase.zookeeper.quorum", this.zookeeperQuorum);
            this.configuration.setInt("hbase.zookeeper.property.clientPort", this.zookeeperPort);
        }
        HBaseAdmin admin = new HBaseAdmin(this.configuration);
        this.initTable(this.tweetTableName, admin, this.tweetFamily, this.vectorFamily);
        this.initTable(this.userTableName, admin, new byte[][]{this.userFamily});
        this.hTablePool = new HTablePool(this.configuration, 100);
    }

    @PreDestroy
    public void shutdown() {
        this.hTablePool.closeTablePool(this.userTableName);
        this.hTablePool.closeTablePool(this.tweetTableName);
    }

    private void initTable(String tablename, HBaseAdmin admin, byte[] ... families) throws IOException {
        if (!this.checkTable(tablename, admin)) {
            HTableDescriptor desc = new HTableDescriptor(tablename);
            for (byte[] family : families) {
                HColumnDescriptor colFam = new HColumnDescriptor(family);
                desc.addFamily(colFam);
                if (!log.isInfoEnabled()) continue;
                StringBuilder message = new StringBuilder("Creating table ");
                message.append(tablename);
                message.append(" with column family " + Bytes.toString((byte[])family) + " as non were found.");
                log.info(message.toString());
            }
            admin.createTable(desc);
        }
    }

    private boolean checkTable(String tablename, HBaseAdmin admin) throws IOException {
        HTableDescriptor[] descs;
        for (HTableDescriptor desc : descs = admin.listTables()) {
            if (!tablename.equals(desc.getNameAsString())) continue;
            return true;
        }
        return false;
    }

    private void add(byte[] family, byte[] col, String value, Put targetPut) {
        if (value != null) {
            targetPut.add(family, col, Bytes.toBytes((String)value));
        }
    }

    private void add(byte[] family, byte[] col, long value, Put targetPut) {
        targetPut.add(family, col, Bytes.toBytes((long)value));
    }

    private void add(byte[] family, byte[] col, int value, Put targetPut) {
        targetPut.add(family, col, Bytes.toBytes((int)value));
    }

    private void add(byte[] family, byte[] col, double value, Put targetPut) {
        targetPut.add(family, col, Bytes.toBytes((double)value));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void store(Status status) throws IOException {
        HTableInterface tweetTable = this.hTablePool.getTable(this.tweetTableName);
        HTableInterface userTable = this.hTablePool.getTable(this.userTableName);
        try {
            this.storeTweetData(status, tweetTable);
            this.storeUserData(status, userTable);
            if (++counter % 100000L == 0L) {
                log.info(counter + " since restart / 100,000 tweets stored");
            }
        }
        catch (Exception e) {
            log.warn("Error processing tweet: " + e.getMessage() + " (Ignoring and continuing processing - be alarmed if that happens often for your desired definition of often.) ", (Throwable)e);
            log.warn("Tweet that let to the exception: " + status);
        }
        finally {
            this.hTablePool.putTable(userTable);
            this.hTablePool.putTable(tweetTable);
        }
    }

    private void storeUserData(Status status, HTableInterface userTable) throws IOException {
        Put putUser = null;
        if (status.getUser() != null) {
            User user = status.getUser();
            putUser = new Put(Bytes.toBytes((long)user.getId()));
            if (user.getProfileImageURL() != null) {
                this.add(this.userFamily, TweetCols.IMAGE_URL.bytes(), user.getProfileImageURL().toString(), putUser);
            }
            this.add(this.userFamily, TweetCols.LANG.bytes(), user.getLang(), putUser);
            if (user.getCreatedAt() != null) {
                this.add(this.userFamily, TweetCols.USER_CREATED_AT.bytes(), user.getCreatedAt().getTime(), putUser);
            }
            this.add(this.userFamily, TweetCols.USER_DESCRIPTION.bytes(), user.getDescription(), putUser);
            this.add(this.userFamily, TweetCols.USER_FAVOURITES_COUNT.bytes(), user.getFavouritesCount(), putUser);
            this.add(this.userFamily, TweetCols.USER_FOLLOWERS_COUNT.bytes(), user.getFollowersCount(), putUser);
            this.add(this.userFamily, TweetCols.USER_FRIENDS_COUNT.bytes(), user.getFriendsCount(), putUser);
            if (user.getListedCount() > 0) {
                this.add(this.userFamily, TweetCols.USER_LISTED_COUNT.bytes(), user.getListedCount(), putUser);
            }
            this.add(this.userFamily, TweetCols.USER_LOCATION.bytes(), user.getLocation(), putUser);
            this.add(this.userFamily, TweetCols.USER_NAME.bytes(), user.getName(), putUser);
            this.add(this.userFamily, TweetCols.USER_PROFILE_BACKGROUND_COLOR.bytes(), user.getProfileBackgroundColor(), putUser);
            this.add(this.userFamily, TweetCols.USER_PROFILE_BACKGROUND_IMAGE_URL.bytes(), user.getProfileBackgroundImageUrl(), putUser);
            if (user.getProfileImageURL() != null) {
                this.add(this.userFamily, TweetCols.USER_PROFILE_IMAGE_URL.bytes(), user.getProfileImageURL().toString(), putUser);
            }
            this.add(this.userFamily, TweetCols.USER_PROFILE_LINK_COLOR.bytes(), user.getProfileLinkColor(), putUser);
            this.add(this.userFamily, TweetCols.USER_PROFILE_SIDEBAR_BORDER_COLOR.bytes(), user.getProfileSidebarBorderColor(), putUser);
            this.add(this.userFamily, TweetCols.USER_PROFILE_SIDEBAR_FILL_COLOR.bytes(), user.getProfileSidebarFillColor(), putUser);
            this.add(this.userFamily, TweetCols.USER_PROFILE_TEXT_COLOR.bytes(), user.getProfileTextColor(), putUser);
            this.add(this.userFamily, TweetCols.USER_SCREEN_NAME.bytes(), user.getScreenName(), putUser);
            this.add(this.userFamily, TweetCols.USER_STATUSES_COUNT.bytes(), user.getStatusesCount(), putUser);
            this.add(this.userFamily, TweetCols.USER_TIMEZONE.bytes(), user.getTimeZone(), putUser);
            if (user.getURL() != null) {
                this.add(this.userFamily, TweetCols.USER_URL.bytes(), user.getURL().toString(), putUser);
            }
            this.add(this.userFamily, TweetCols.USER_UTC_OFFSET.bytes(), user.getUtcOffset(), putUser);
        }
        if (putUser != null) {
            userTable.put(putUser);
        }
    }

    private void storeTweetData(Status status, HTableInterface tweetTable) throws IOException {
        Put putTweet = new Put(Bytes.toBytes((long)status.getId()));
        String language = "";
        if (status.getUser() != null) {
            User user = status.getUser();
            this.add(this.tweetFamily, TweetCols.FROM.bytes(), user.getName(), putTweet);
            this.add(this.tweetFamily, TweetCols.FROM_ID.bytes(), user.getId(), putTweet);
            language = user.getLang();
            this.add(this.tweetFamily, TweetCols.LANG.bytes(), language, putTweet);
        }
        if (status.getCreatedAt() != null) {
            this.add(this.tweetFamily, TweetCols.CREATION_DATE.bytes(), status.getCreatedAt().getTime(), putTweet);
        }
        if (status.getInReplyToStatusId() > 0L) {
            this.add(this.tweetFamily, TweetCols.IN_REPLY_TO_STATUS_ID.bytes(), "" + status.getInReplyToStatusId(), putTweet);
        }
        this.add(this.tweetFamily, TweetCols.SOURCE.bytes(), status.getSource(), putTweet);
        String tweetText = status.getText();
        this.add(this.tweetFamily, TweetCols.TEXT.bytes(), tweetText, putTweet);
        if (this.vectorizer != null) {
            this.vectorizer.setLanguage(Language.fromCode((String)language));
            for (Map.Entry<String, Integer> entry : this.vectorizer.createVector(tweetText).entrySet()) {
                this.add(this.vectorFamily, Bytes.toBytes((String)entry.getKey()), entry.getValue(), putTweet);
            }
        }
        if (status.getInReplyToUserId() > 0L) {
            this.add(this.tweetFamily, TweetCols.TO.bytes(), "" + status.getInReplyToUserId(), putTweet);
        }
        this.addHashtags(status, putTweet);
        this.addLocationData(status, putTweet);
        this.addFirstUrl(status, putTweet);
        this.addUsersMentioned(status, putTweet);
        if (status.getRetweetCount() > 0L) {
            this.add(this.tweetFamily, TweetCols.RETWEET_COUNT.bytes(), status.getRetweetCount(), putTweet);
        }
        if (status.getRetweetedStatus() != null) {
            this.store(status.getRetweetedStatus());
        }
        tweetTable.put(putTweet);
    }

    private void addUsersMentioned(Status status, Put putTweet) {
        UserMentionEntity[] userMentionEntities = status.getUserMentionEntities();
        if (userMentionEntities != null && userMentionEntities.length > 0) {
            StringBuilder userIds = new StringBuilder();
            StringBuilder userNames = new StringBuilder();
            for (UserMentionEntity entity : userMentionEntities) {
                userIds.append('#');
                userIds.append(entity.getId());
                userNames.append('#');
                userNames.append(entity.getName());
            }
            this.add(this.tweetFamily, TweetCols.USER_IDS_MENTIONED.bytes(), userIds.toString(), putTweet);
            this.add(this.tweetFamily, TweetCols.USER_NAMES_MENTIONED.bytes(), userNames.toString(), putTweet);
        }
    }

    private void addHashtags(Status status, Put putTweet) {
        HashtagEntity[] hashtagEntities = status.getHashtagEntities();
        if (hashtagEntities != null && hashtagEntities.length > 0) {
            StringBuilder hashTagsBuilder = new StringBuilder();
            for (HashtagEntity tag : hashtagEntities) {
                hashTagsBuilder.append('#');
                hashTagsBuilder.append(tag.getText());
            }
            this.add(this.tweetFamily, TweetCols.HASHTAGS.bytes(), hashTagsBuilder.toString(), putTweet);
        }
    }

    private void addFirstUrl(Status status, Put putTweet) {
        URLEntity[] urlEntities = status.getURLEntities();
        URL url = null;
        if (urlEntities != null && urlEntities.length > 0) {
            URLEntity entity = urlEntities[0];
            url = entity.getExpandedURL();
            if (url == null) {
                url = entity.getURL();
            }
            this.add(this.tweetFamily, TweetCols.URL.bytes(), url.toString(), putTweet);
        }
    }

    private void addLocationData(Status status, Put putTweet) {
        boolean hasGeodata = false;
        double placeLatitude = 0.0;
        double placeLongitude = 0.0;
        if (status.getGeoLocation() != null) {
            this.add(this.tweetFamily, TweetCols.GEO.bytes(), status.getGeoLocation().toString(), putTweet);
        }
        if (status.getPlace() != null) {
            GeoLocation[][] coordinates;
            Place place = status.getPlace();
            this.add(this.tweetFamily, TweetCols.PLACE_COUNTRY_CODE.bytes(), place.getCountryCode(), putTweet);
            this.add(this.tweetFamily, TweetCols.PLACE_FULL_NAME.bytes(), place.getFullName(), putTweet);
            this.add(this.tweetFamily, TweetCols.PLACE_TYPE.bytes(), place.getPlaceType(), putTweet);
            if (place.getGeometryType() != null && place.getGeometryType().equals("Point")) {
                coordinates = place.getGeometryCoordinates();
                placeLatitude = coordinates[0][0].getLatitude();
                placeLongitude = coordinates[0][0].getLongitude();
                hasGeodata = true;
            }
            if (!hasGeodata && status.getGeoLocation() != null) {
                placeLatitude = status.getGeoLocation().getLatitude();
                placeLongitude = status.getGeoLocation().getLongitude();
                hasGeodata = true;
            }
            if (!hasGeodata && place.getBoundingBoxType() != null && place.getBoundingBoxType().equals("Polygon")) {
                coordinates = place.getBoundingBoxCoordinates();
                placeLatitude = coordinates[0][0].getLatitude();
                placeLongitude = coordinates[0][0].getLongitude();
                hasGeodata = true;
            }
        }
        if (hasGeodata) {
            this.add(this.tweetFamily, TweetCols.PLACE_LATITUDE.bytes(), placeLatitude, putTweet);
            this.add(this.tweetFamily, TweetCols.PLACE_LONGITUDE.bytes(), placeLongitude, putTweet);
        }
    }

    public void setTweetFamily(String tweetFamily) {
        this.tweetFamily = tweetFamily.getBytes();
    }

    public void setTweetVectorsFamily(String tweetVectorsFamily) {
        this.vectorFamily = tweetVectorsFamily.getBytes();
    }

    public void setUserFamily(String userFamily) {
        this.userFamily = userFamily.getBytes();
    }

    public void setTweetTableName(String tweetTableName) {
        this.tweetTableName = tweetTableName.trim();
    }

    public void setUserTableName(String userTableName) {
        this.userTableName = userTableName.trim();
    }

    public void setZookeeperQuorum(String zookeeperQuorum) {
        this.zookeeperQuorum = zookeeperQuorum;
    }

    public void setZookeeperPort(int zookeeperPort) {
        this.zookeeperPort = zookeeperPort;
    }

    public void setVectorizer(Vectorizer vectorizer) {
        this.vectorizer = vectorizer;
    }

    void setHBaseConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }
}

